In one of our project we had a requirement to process events and run some business logic accordingly and then save it to a database along with creating a new event so that other applications can consume this event and do the needful.
For this purpose, we went ahead and started working with Apache Kafka which can be integrated with Spring boot and used for event streaming.
Introduction to Apache Kafka with Spring Boot
Apache Kafka has emerged as a powerful solution for building real-time streaming applications. Combined with the simplicity and flexibility of Spring Boot, developers can create robust and scalable systems to handle high-throughput data streams. In this tutorial, we will explore how to set up and integrate Kafka producers and consumers using Spring Boot.
Prerequisites
Before we begin, make sure you have the following installed:
- Java Development Kit (JDK) 8 or higher
- Apache Kafka (download from Apache Kafka website)
- Maven or Gradle
- IDE (IntelliJ IDEA, Eclipse, etc.)
Setting Up Apache Kafka
Download and Extract Kafka:
- Download the latest version of Kafka and extract it to a preferred location on your system.
Start Zookeeper:
- Kafka depends on Zookeeper for managing and coordinating distributed systems. Start Zookeeper using the terminal:bash
bin/zookeeper-server-start.sh config/zookeeper.properties
- Kafka depends on Zookeeper for managing and coordinating distributed systems. Start Zookeeper using the terminal:
Start Kafka Server:
- Open a new terminal window and start the Kafka server:bash
bin/kafka-server-start.sh config/server.properties
- Open a new terminal window and start the Kafka server:
Creating a Spring Boot Project
Let's create a new Spring Boot project using Spring Initializr. Include the following dependencies:
- Spring Web
- Spring for Apache Kafka
For Maven, your pom.xml
should look like this:
xml<dependencies>
<!-- Spring Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring for Apache Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
Kafka Producer Configuration
Application Configuration
First, configure your Kafka producer and consumer settings in src/main/resources/application.yml
:
bootstrap-servers is a list of kafka server ip's along with port. Here you can provide either one or multiple kafka server ip's comma separated. It is recommended to have multiple server ip's to be given as in case one of the server goes down the application can connect to the other server listed in this bootstrap-servers configuration.
group-id is used in case of a consumer application. Any number of consumer application having the same group-id- will ready the message only once, which means that consumers with the same group-id make sures that atleast one of them will consume the message/event from kafka, thus helping us in case of scaling the application and bringing in more consumers. This is to make sure that the event is consumed only once and not multiple times or by multiple different consumers. In case you want multiple different microservices to consume the event, then change the group-id for each of the microservice.
Note: Please refer to Kafka configuration documentation to know more about the available configurations.
yamlspring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Kafka Producer Configuration
Create a configuration class for the Kafka producer (KafkaProducerConfig.java
):
javaimport org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Kafka Producer Service
Next, create a service (KafkaProducerService.java
) to send messages to a Kafka topic:
javaimport org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private static final String TOPIC = "my_topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
Kafka Consumer Configuration
Kafka Consumer Configuration
Create a configuration class for the Kafka consumer (KafkaConsumerConfig.java
):
javaimport org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Kafka Consumer Service
Next, create a service (KafkaConsumerService.java
) to consume messages from the Kafka topic:
javaimport org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my_topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
Testing the Kafka Producer
Create a REST controller (KafkaController.java
) to test the Kafka producer:
javaimport org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaProducerService kafkaProducerService;
@GetMapping("/send")
public String sendMessage(@RequestParam("message") String message) {
kafkaProducerService.sendMessage(message);
return "Message sent to Kafka topic";
}
}
Running the Application
Start Kafka and Zookeeper:
- Ensure Zookeeper and Kafka server are running.
Run the Spring Boot Application:
- Run your Spring Boot application from your IDE or using Maven:bash
mvn spring-boot:run
- Run your Spring Boot application from your IDE or using Maven:
Test the Kafka Producer:
Use a tool like Postman or a web browser to send a GET request:
bashhttp://localhost:8080/send?message=HelloKafka
Check the console output where the consumer service (
KafkaConsumerService
) is running to see the received message.
Conclusion
In this tutorial, we covered the basics of integrating Apache Kafka with Spring Boot to create both producers and consumers. You learned how to configure Kafka settings, create producer and consumer services, and test the application. This setup provides a solid foundation for building real-time streaming applications with Kafka and Spring Boot, empowering you to handle high-throughput data streams efficiently.
Feel free to extend this example by handling different data types, exploring Kafka's advanced features, and scaling your application as needed. Happy coding!
0 Comments