Spring Boot Kafka Consumer Example

Kafka Consumer Example - Spring Boot 

In this blog post, we’ll walk through the process of creating a Kafka Consumer using Spring Boot. Apache Kafka is a distributed streaming platform that can be used for building real-time streaming data pipelines and applications. Integrating Kafka with Spring Boot makes it easy to produce and consume messages. 

Kafka Consumer Example - Spring Boot
Kafka Consumer Example - Spring Boot 


Prerequisites

  • Java Development Kit (JDK) 8 or higher
  • Apache Kafka
  • Spring Boot
  • Maven

Project Setup

First, create a new Spring Boot project using Spring Initializr (https://start.spring.io/) with the following dependencies:

  • Spring Web
  • Spring for Apache Kafka

Once the project is generated, open it in your favorite IDE.

Configuration

Create a configuration class to set up Kafka consumer properties.


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Creating a Kafka Listener

Now, create a Kafka listener to consume messages from the Kafka topic.


import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my_topic", groupId = "group_id")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}

Running the Application

To run the application, ensure that your Kafka server is up and running. You can start the Kafka server using the following commands (assuming you have Kafka installed):


# Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# Start Kafka server
bin/kafka-server-start.sh config/server.properties

Once Kafka is up and running, you can start the Spring Boot application. The application will start consuming messages from the specified Kafka topic.

Diagram

Here’s a simple text-based diagram to illustrate the flow:


Kafka Server
    |
    V
Kafka Topic (my_topic)
    |
    V
Kafka Consumer (Spring Boot Application)
    |
    V
Process and Consume Messages

Conclusion

In this blog post, we’ve demonstrated how to create a Kafka Consumer using Spring Boot. This is a basic example to get you started with consuming messages from a Kafka topic. You can extend this setup to handle more complex use cases and integrate it into your Spring Boot applications.

Happy coding!

Post a Comment

Previous Post Next Post