Implementing Kafka in Java: A Practical Guide
Hi everyone! In today’s blog, we’ll dive into implementing Apache Kafka in a Spring Boot application and explore what makes Kafka an essential tool in modern data processing.
But before we get into the implementation details, let’s first understand why Kafka is such a critical technology today.
Why Kafka
Unlike traditional messaging systems such as RabbitMQ, Kafka is designed to process and manage vast amounts of data with high throughput and low latency.
Another key advantage of Kafka is its flexibility in integration. Kafka can easily integrate with various data processing frameworks, such as Apache Flink, Apache Storm, and Spark, enabling sophisticated stream processing and analytics.
There are more features like zero-copy in Kafka, that make it faster.
Aim
Our today’s aim is to develop an application that pushes a string message to a Kafka topic and consumes the messages. Additionally, our goal is to understand each line of code that we write, not just the code itself, but also to understand every single command we run in the terminal. By understanding what we are coding, we make ourselves better developers. Let’s begin…
Step 1
Download Kafka from the official website: Apache Kafka
Step 2
Start Zookeeper with the below command
cd /opt/homebrew/opt/kafka/libexec/bin
./zookeeper-server-start.sh /opt/homebrew/etc/kafka/zookeeper.properties
We are redirecting to the directory where Kafka bin is present. Then, we start the zookeeper by invoking the shell script file ./zookeeper-server-start.sh and give the zookeeper.properties file location as a parameter.
Step 3
Start Kafka
Here, I am using the homebrew command on my MacOS. You can get multiple references online to start Kafka if you are working on Windows.
Now, there is a reason why we first started Zookeeper and then Kafka, Kafka relies on ZooKeeper for managing cluster coordination, metadata, and leader elections. ZooKeeper keeps track of Kafka brokers, topics, and partitions, ensuring they are synchronized and available. Starting ZooKeeper first allows Kafka to connect to it and perform necessary initialization tasks.
Step 4
Initialize the project from Spring Initializr. Make sure you select proper Java version. You can type java -version
on your terminal to check the Java version installed on your pc.
Further, we are adding Web and Kafka dependencies here. I have added Web dependency because I can add a controller and create a REST endpoint if needed in future.
Step 5
Do Maven clean and Maven Install
If you face any Java version-related issues then make sure that your Java version in pom.xml
, in Intellij File → Project Structure → Project → SDK
and the Java version active on your pc (java -version
) must be the same in all places.
Step 6
Add the below properties to the application.properties file.
spring.application.name=kafka
spring.kafka.topic.name=demo_1
spring.kafka.producer.bootstrap-servers= localhost:9092
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.bootstrap-servers= localhost:9092
spring.kafka.consumer.group-id= any_unique_id_1
spring.kafka.consumer.auto-offset-reset= earliest
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.application.name
: Sets the name of the Spring Boot applicationspring.kafka.topic.name
: Specifies the Kafka topic name as "demo_1."spring.kafka.producer.bootstrap-servers
: Defines the Kafka broker address for the producer aslocalhost:9092
.spring.kafka.producer.key-serializer
: Configures the key serializer for the Kafka producer to serialize keys as strings.spring.kafka.producer.value-serializer
Configures the value serializer for the Kafka producer to serialize values as strings.spring.kafka.consumer.bootstrap-servers
: Defines the Kafka broker address for the consumer aslocalhost:9092
.spring.kafka.consumer.group-id
: Sets the consumer group ID to "any_unique_id_1."spring.kafka.consumer.auto-offset-reset
: Configures the consumer to start reading from the earliest available offset if no previous offset is found.spring.kafka.consumer.key-deserializer
: Configures the key deserializer for the Kafka consumer to deserialize keys as strings.spring.kafka.consumer.value-deserializer
: Configures the value deserializer for the Kafka consumer to deserialize values as strings.
Step 7
Create a new package config
and inside that, create a new file KafkaTopicConfig.java
.
@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.topic.name}") //to get topic name defined in application.properties file
private String topicName;
@Bean
public NewTopic kafkaTopic(){
return TopicBuilder.name(topicName)
.build();
}
}
Here, we are trying to create a Kafka Topic when the application starts. The @Bean
method kafkaTopic()
creates a new Kafka topic based on the name provided in the application properties. This setup ensures that the Kafka topic is created with the desired configuration when the Spring Boot application starts.
Step 8
Create a producer
package and add the KafkaProducer.java
file to it.
@Service
public class KafkaProducer {
@Value("${spring.kafka.topic.name}")
private String topicName;
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message){
System.out.println("Kafka producer is sending message : " + message + " to the topic : " + topicName);
kafkaTemplate.send(topicName, message);
}
}
Now, we want to produce or publish a message on a Kafka Topic.
- We are declaring
private final KafkaTemplate<String, String> kafkaTemplate.
ThisKafkaTemplate
instance for sending messages to Kafka. TheString
type indicates that both the key and value of Kafka messages are strings. - Constructor is used to initialise
this.kafkaTemplate = kafkaTemplate;
- The
sendMessage
function is designed to send a string message to a Kafka topic. So whenever we want to publish any message to the topic, we will call the sendMessage function.
Step 9
Create a consumer
package and add KafkaConsumer.java
file.
@Service
public class KafkaConsumer {
@Value("${spring.kafka.topic.name}")
private String topicName;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}")
public void consume(String message){
System.out.println("Kafka consumer belongs to the group " + groupId + " has received message : " + message + " to the topic : " + topicName);
}
}
The KafkaConsumer
class is designed to receive and process messages (apply any logic we want) from a Kafka topic. It uses Spring's Kafka support to listen to messages published on a specified topic and handle them accordingly.
@KafkaListener
: Annotation that marks the method as a Kafka message listener. It tells Spring to invoke this method whenever a message is received from the specified Kafka topic.
Step 10
Modify Application.java
file to publish a message and run your application.
@SpringBootApplication
public class KafkaApplication {
@Autowired
private KafkaProducer kafkaProducer;
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args).getBean(KafkaApplication.class).publishMessage();
}
private void publishMessage() {
for (int i = 0; i < 5; i++) {
kafkaProducer.sendMessage("message " + i);
}
}
}
Here, we are trying to publish simple messages using a for loop and check whether they are getting consumed or not.
One thing to notice here is, that all the message gets published first, and then all are consumed together, but why so? Why don't both producers and consumers work parallelly?
This is where we observe that Kafka producers and consumers operate asynchronously. When you publish messages, the producer may not wait for the messages to be acknowledged before sending the next one.
Try to add sleep of 1 sec after the production of each message and check the results.
@SpringBootApplication
public class KafkaApplication {
@Autowired
private KafkaProducer kafkaProducer;
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(KafkaApplication.class, args).getBean(KafkaApplication.class).publishMessage();
}
private void publishMessage() throws InterruptedException {
for (int i = 0; i < 10; i++) {
kafkaProducer.sendMessage("message " + i);
Thread.sleep(1000);
}
}
}
Complete code here: https://github.com/deeppatel23/Kafka-with-Spring-Boot.git
Practical Challenge
Let’s dive into a practical challenge to solidify your understanding of Kafka. You will create two separate Spring boot applications – a producer and a consumer, both running on different ports. The ProducerApp will publish messages to a Kafka topic, and the ConsumerApp will consume those messages from the same topic. This exercise will demonstrate how Kafka topics serve as a bridge for communication between producer and consumer applications and will prove that producers and consumers are decoupled applications.
I hope you found this guide on implementing Kafka in Java useful and informative! I’d love to hear about your experiences with Kafka or any questions you might have about the implementation process. Feel free to share your thoughts and queries in the comments section below — I’ll do my best to respond to each one.
Don’t forget to subscribe to my blog for more articles on Java development, project-building tips, and other tech insights. Stay tuned for more content and keep coding!