Kafka Consumer Example - Spring Boot
In this blog post, we’ll walk through the process of creating a Kafka Consumer using Spring Boot. Apache Kafka is a distributed streaming platform that can be used for building real-time streaming data pipelines and applications. Integrating Kafka with Spring Boot makes it easy to produce and consume messages.
![]() |
Kafka Consumer Example - Spring Boot |
Prerequisites
- Java Development Kit (JDK) 8 or higher
- Apache Kafka
- Spring Boot
- Maven
Project Setup
First, create a new Spring Boot project using Spring Initializr (https://start.spring.io/) with the following dependencies:
- Spring Web
- Spring for Apache Kafka
Once the project is generated, open it in your favorite IDE.
Configuration
Create a configuration class to set up Kafka consumer properties.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Creating a Kafka Listener
Now, create a Kafka listener to consume messages from the Kafka topic.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my_topic", groupId = "group_id")
public void consume(String message) {
System.out.println("Consumed message: " + message);
}
}
Running the Application
To run the application, ensure that your Kafka server is up and running. You can start the Kafka server using the following commands (assuming you have Kafka installed):
# Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka server
bin/kafka-server-start.sh config/server.properties
Once Kafka is up and running, you can start the Spring Boot application. The application will start consuming messages from the specified Kafka topic.
Diagram
Here’s a simple text-based diagram to illustrate the flow:
Kafka Server
|
V
Kafka Topic (my_topic)
|
V
Kafka Consumer (Spring Boot Application)
|
V
Process and Consume Messages
Conclusion
In this blog post, we’ve demonstrated how to create a Kafka Consumer using Spring Boot. This is a basic example to get you started with consuming messages from a Kafka topic. You can extend this setup to handle more complex use cases and integrate it into your Spring Boot applications.
Happy coding!