Implementing Kafka in Java: A Practical Guide

Deep Patel
7 min readAug 31, 2024

--

Hi everyone! In today’s blog, we’ll dive into implementing Apache Kafka in a Spring Boot application and explore what makes Kafka an essential tool in modern data processing.

But before we get into the implementation details, let’s first understand why Kafka is such a critical technology today.

Photo by Adam Jang on Unsplash

Why Kafka

Unlike traditional messaging systems such as RabbitMQ, Kafka is designed to process and manage vast amounts of data with high throughput and low latency.

Another key advantage of Kafka is its flexibility in integration. Kafka can easily integrate with various data processing frameworks, such as Apache Flink, Apache Storm, and Spark, enabling sophisticated stream processing and analytics.

There are more features like zero-copy in Kafka, that make it faster.

Aim

Our today’s aim is to develop an application that pushes a string message to a Kafka topic and consumes the messages. Additionally, our goal is to understand each line of code that we write, not just the code itself, but also to understand every single command we run in the terminal. By understanding what we are coding, we make ourselves better developers. Let’s begin…

Step 1

Download Kafka from the official website: Apache Kafka

Step 2

Start Zookeeper with the below command

cd /opt/homebrew/opt/kafka/libexec/bin
./zookeeper-server-start.sh /opt/homebrew/etc/kafka/zookeeper.properties

We are redirecting to the directory where Kafka bin is present. Then, we start the zookeeper by invoking the shell script file ./zookeeper-server-start.sh and give the zookeeper.properties file location as a parameter.

Step 3

Start Kafka

Here, I am using the homebrew command on my MacOS. You can get multiple references online to start Kafka if you are working on Windows.

Now, there is a reason why we first started Zookeeper and then Kafka, Kafka relies on ZooKeeper for managing cluster coordination, metadata, and leader elections. ZooKeeper keeps track of Kafka brokers, topics, and partitions, ensuring they are synchronized and available. Starting ZooKeeper first allows Kafka to connect to it and perform necessary initialization tasks.

Step 4

Initialize the project from Spring Initializr. Make sure you select proper Java version. You can type java -version on your terminal to check the Java version installed on your pc.

Further, we are adding Web and Kafka dependencies here. I have added Web dependency because I can add a controller and create a REST endpoint if needed in future.

Step 5

Do Maven clean and Maven Install

If you face any Java version-related issues then make sure that your Java version in pom.xml , in Intellij File → Project Structure → Project → SDK and the Java version active on your pc (java -version) must be the same in all places.

Maven clean install
Project Structure Setting

Step 6

Add the below properties to the application.properties file.

spring.application.name=kafka

spring.kafka.topic.name=demo_1

spring.kafka.producer.bootstrap-servers= localhost:9092
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer

spring.kafka.consumer.bootstrap-servers= localhost:9092
spring.kafka.consumer.group-id= any_unique_id_1
spring.kafka.consumer.auto-offset-reset= earliest
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.apache.kafka.common.serialization.StringDeserializer
  • spring.application.name: Sets the name of the Spring Boot application
  • spring.kafka.topic.name: Specifies the Kafka topic name as "demo_1."
  • spring.kafka.producer.bootstrap-servers: Defines the Kafka broker address for the producer as localhost:9092.
  • spring.kafka.producer.key-serializer: Configures the key serializer for the Kafka producer to serialize keys as strings.
  • spring.kafka.producer.value-serializer Configures the value serializer for the Kafka producer to serialize values as strings.
  • spring.kafka.consumer.bootstrap-servers: Defines the Kafka broker address for the consumer as localhost:9092.
  • spring.kafka.consumer.group-id: Sets the consumer group ID to "any_unique_id_1."
  • spring.kafka.consumer.auto-offset-reset: Configures the consumer to start reading from the earliest available offset if no previous offset is found.
  • spring.kafka.consumer.key-deserializer: Configures the key deserializer for the Kafka consumer to deserialize keys as strings.
  • spring.kafka.consumer.value-deserializer: Configures the value deserializer for the Kafka consumer to deserialize values as strings.

Step 7

Create a new package config and inside that, create a new file KafkaTopicConfig.java.

@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.topic.name}") //to get topic name defined in application.properties file
private String topicName;

@Bean
public NewTopic kafkaTopic(){
return TopicBuilder.name(topicName)
.build();
}
}

Here, we are trying to create a Kafka Topic when the application starts. The @Bean method kafkaTopic() creates a new Kafka topic based on the name provided in the application properties. This setup ensures that the Kafka topic is created with the desired configuration when the Spring Boot application starts.

Step 8

Create a producer package and add the KafkaProducer.java file to it.

@Service
public class KafkaProducer {
@Value("${spring.kafka.topic.name}")
private String topicName;
private final KafkaTemplate<String, String> kafkaTemplate;

public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(String message){
System.out.println("Kafka producer is sending message : " + message + " to the topic : " + topicName);
kafkaTemplate.send(topicName, message);
}
}

Now, we want to produce or publish a message on a Kafka Topic.

  • We are declaring private final KafkaTemplate<String, String> kafkaTemplate. This KafkaTemplate instance for sending messages to Kafka. The String type indicates that both the key and value of Kafka messages are strings.
  • Constructor is used to initialise this.kafkaTemplate = kafkaTemplate;
  • The sendMessage function is designed to send a string message to a Kafka topic. So whenever we want to publish any message to the topic, we will call the sendMessage function.

Step 9

Create a consumer package and add KafkaConsumer.java file.

@Service
public class KafkaConsumer {

@Value("${spring.kafka.topic.name}")
private String topicName;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}")
public void consume(String message){
System.out.println("Kafka consumer belongs to the group " + groupId + " has received message : " + message + " to the topic : " + topicName);
}
}

The KafkaConsumer class is designed to receive and process messages (apply any logic we want) from a Kafka topic. It uses Spring's Kafka support to listen to messages published on a specified topic and handle them accordingly.

@KafkaListener: Annotation that marks the method as a Kafka message listener. It tells Spring to invoke this method whenever a message is received from the specified Kafka topic.

Step 10

Modify Application.java file to publish a message and run your application.

@SpringBootApplication
public class KafkaApplication {

@Autowired
private KafkaProducer kafkaProducer;

public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args).getBean(KafkaApplication.class).publishMessage();
}

private void publishMessage() {
for (int i = 0; i < 5; i++) {
kafkaProducer.sendMessage("message " + i);
}
}
}

Here, we are trying to publish simple messages using a for loop and check whether they are getting consumed or not.

One thing to notice here is, that all the message gets published first, and then all are consumed together, but why so? Why don't both producers and consumers work parallelly?

This is where we observe that Kafka producers and consumers operate asynchronously. When you publish messages, the producer may not wait for the messages to be acknowledged before sending the next one.

Try to add sleep of 1 sec after the production of each message and check the results.

@SpringBootApplication
public class KafkaApplication {

@Autowired
private KafkaProducer kafkaProducer;

public static void main(String[] args) throws InterruptedException {
SpringApplication.run(KafkaApplication.class, args).getBean(KafkaApplication.class).publishMessage();
}

private void publishMessage() throws InterruptedException {
for (int i = 0; i < 10; i++) {
kafkaProducer.sendMessage("message " + i);
Thread.sleep(1000);
}
}
}

Complete code here: https://github.com/deeppatel23/Kafka-with-Spring-Boot.git

Practical Challenge

Let’s dive into a practical challenge to solidify your understanding of Kafka. You will create two separate Spring boot applications – a producer and a consumer, both running on different ports. The ProducerApp will publish messages to a Kafka topic, and the ConsumerApp will consume those messages from the same topic. This exercise will demonstrate how Kafka topics serve as a bridge for communication between producer and consumer applications and will prove that producers and consumers are decoupled applications.

I hope you found this guide on implementing Kafka in Java useful and informative! I’d love to hear about your experiences with Kafka or any questions you might have about the implementation process. Feel free to share your thoughts and queries in the comments section below — I’ll do my best to respond to each one.

Don’t forget to subscribe to my blog for more articles on Java development, project-building tips, and other tech insights. Stay tuned for more content and keep coding!

--

--

Deep Patel
Deep Patel

Written by Deep Patel

Hi everyone, I am a software engineer at American Express. I am here to share my experience in tech.