Harnessing the Power of Apache Kafka in Spring Boot: A Developer's Guide

 

In one of our project we had a requirement to process events and run some business logic accordingly and then save it to a database along with creating a new event so that other applications can consume this event and do the needful.

For this purpose, we went ahead and started working with Apache Kafka which can be integrated with Spring boot and used for event streaming.

Introduction to Apache Kafka with Spring Boot

Apache Kafka has emerged as a powerful solution for building real-time streaming applications. Combined with the simplicity and flexibility of Spring Boot, developers can create robust and scalable systems to handle high-throughput data streams. In this tutorial, we will explore how to set up and integrate Kafka producers and consumers using Spring Boot.

Prerequisites

Before we begin, make sure you have the following installed:

  • Java Development Kit (JDK) 8 or higher
  • Apache Kafka (download from Apache Kafka website)
  • Maven or Gradle
  • IDE (IntelliJ IDEA, Eclipse, etc.)

Setting Up Apache Kafka

  1. Download and Extract Kafka:

    • Download the latest version of Kafka and extract it to a preferred location on your system.
  2. Start Zookeeper:

    • Kafka depends on Zookeeper for managing and coordinating distributed systems. Start Zookeeper using the terminal:
      bash
      bin/zookeeper-server-start.sh config/zookeeper.properties
  3. Start Kafka Server:

    • Open a new terminal window and start the Kafka server:
      bash
      bin/kafka-server-start.sh config/server.properties

Creating a Spring Boot Project

Let's create a new Spring Boot project using Spring Initializr. Include the following dependencies:

  • Spring Web
  • Spring for Apache Kafka

For Maven, your pom.xml should look like this:

xml
<dependencies> <!-- Spring Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring for Apache Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>

Kafka Producer Configuration

Application Configuration

First, configure your Kafka producer and consumer settings in src/main/resources/application.yml:

bootstrap-servers is a list of kafka server ip's along with port. Here you can provide either one or multiple kafka server ip's comma separated. It is recommended to have multiple server ip's to be given as in case one of the server goes down the application can connect to the other server listed in this bootstrap-servers configuration.

group-id is used in case of a consumer application. Any number of consumer application having the same group-id- will ready the message only once, which means that consumers with the same group-id make sures that atleast one of them will consume the message/event from kafka, thus helping us in case of scaling the application and bringing in more consumers. This is to make sure that the event is consumed only once and not multiple times or by multiple different consumers. In case you want multiple different microservices to consume the event, then change the group-id for each of the microservice.

Note: Please refer to Kafka configuration documentation to know more about the available configurations.

yaml
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

Kafka Producer Configuration

Create a configuration class for the Kafka producer (KafkaProducerConfig.java):

java
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }

Kafka Producer Service

Next, create a service (KafkaProducerService.java) to send messages to a Kafka topic:

java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerService { private static final String TOPIC = "my_topic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send(TOPIC, message); } }

Kafka Consumer Configuration

Kafka Consumer Configuration

Create a configuration class for the Kafka consumer (KafkaConsumerConfig.java):

java
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }

Kafka Consumer Service

Next, create a service (KafkaConsumerService.java) to consume messages from the Kafka topic:

java
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = "my_topic", groupId = "my-group") public void listen(String message) { System.out.println("Received Message: " + message); } }

Testing the Kafka Producer

Create a REST controller (KafkaController.java) to test the Kafka producer:

java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaController { @Autowired private KafkaProducerService kafkaProducerService; @GetMapping("/send") public String sendMessage(@RequestParam("message") String message) { kafkaProducerService.sendMessage(message); return "Message sent to Kafka topic"; } }

Running the Application

  1. Start Kafka and Zookeeper:

    • Ensure Zookeeper and Kafka server are running.
  2. Run the Spring Boot Application:

    • Run your Spring Boot application from your IDE or using Maven:
      bash
      mvn spring-boot:run
  3. Test the Kafka Producer:

    • Use a tool like Postman or a web browser to send a GET request:

      bash
      http://localhost:8080/send?message=HelloKafka
    • Check the console output where the consumer service (KafkaConsumerService) is running to see the received message.


Note: There are multiple ways to create a producer and consumer, in the blog here we have covered one of the way to do it. Please do go through the other ways and use the one which is optimal for your solution.

Conclusion

In this tutorial, we covered the basics of integrating Apache Kafka with Spring Boot to create both producers and consumers. You learned how to configure Kafka settings, create producer and consumer services, and test the application. This setup provides a solid foundation for building real-time streaming applications with Kafka and Spring Boot, empowering you to handle high-throughput data streams efficiently.

Feel free to extend this example by handling different data types, exploring Kafka's advanced features, and scaling your application as needed. Happy coding!

Post a Comment

0 Comments