How to Build Your First Kafka Consumer: A Step-by-Step Tutorial

Apache Kafka has become the backbone of modern data streaming architectures. While producers send data to Kafka topics, consumers are responsible for fetching and processing that data. If you’re just starting with Kafka consumers, this tutorial will walk you through the basics and provide a step-by-step guide to get you up and running.

What is a Kafka Consumer?

A Kafka consumer is a client application that reads data from Kafka topics. Consumers are part of consumer groups, which enable Kafka to distribute data across multiple consumers for scalability and fault tolerance. Each consumer in a group reads data from different partitions of a topic.

Kafka consumers are responsible for –

  • Polling messages – Regularly polls the Kafka broker to retrieve messages from assigned partitions.
  • Deserialization – Converts the byte array messages received from Kafka into usable objects (e.g., strings, JSON, or custom objects) using deserializers.
  • Offset management – Tracks and commits offsets to Kafka, to keep a record of the last processed message.
    • Automatic Offset Commit: Periodically commits offsets as specified in the configuration.
    • Manual Offset Commit: Allows precise control over when offsets are committed.
  • Creates and maintains a communication channel/connection with Kafka broker.

Thankfully, Kafka provides us a Consumer Client library that handles most of the complexities behind the scenes, so developers don’t have to worry about all the technicalities.

Step-by-Step Guide to Implement Kafka Consumer

1. Install Apache Kafka

Before creating a Kafka consumer, you need to have Apache Kafka broker setup and running.

One of the easiest ways to setup Apache Kafka broker is by setting up a single-node Kafka cluster and deploying it using Docker. You can find the installation steps for a single-node Kafka cluster in my other blog post here – Setting Up a Single-Node Kafka Cluster using KRaft Mode

You will also need a Kafka Producer to

2. Set up your Development Environment

You will need –

  • A Java development kit (JDK) installed (Java 8 or above).
  • A development environment like IntelliJ IDEA, Eclipse, or Visual Studio Code.

3. Add Kafka Dependencies

Include Kafka client dependencies in your project –

Maven –

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version> <!-- Use the latest version -->
</dependency>

Gradle –

implementation 'org.apache.kafka:kafka-clients:3.5.0'

4. Write the Kafka Consumer

Here’s a step-by-step guide to writing your first Kafka consumer:

4.1. Import Required Libraries

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

4.2. Configure Consumer Properties

Set up the consumer configuration to connect to the Kafka cluster:

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // Define bootstrap server
        String bootstrapServers = "localhost:9092";

        String topicName = "my_topic";
        String consumerGroupId = "test-group";

        // Set consumer properties
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Create KafkaConsumer using above properties
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    }
}
  • bootstrap.servers: This is the address of the target Kafka broker in host:port format.
  • group.id: The consumer group ID. A Kafka consumer group is a collection of consumers that work together to consume messages from a topic in parallel.
  • key.deserializer & value.deserializer: Define how keys and values are deserialized. In the example above, we used the StringDeserializer class, which is designed for deserializing messages into strings.
  • auto.offset.reset: Specifies where the consumer should begin reading messages when no previously committed offset is found. This applies when the consumer reads from the topic for the first time. In this example, we have set it to earliest which means all messages from the beginning will be consumed.

4.3. Subscribe to a Topic

Subscribe the consumer to a topic:

consumer.subscribe(Collections.singletonList(topicName));

4.4. Poll for Records

Fetch messages in a loop and process them:

while (keepRunning) {
    ConsumerRecords<String, String> records = consumer
                .poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Offset: %d, Key: %s, Value: %s%n",
                          record.offset(), record.key(), record.value());
    }
}

4.5. Graceful Shutdown

Runtime.getRuntime().addShutdownHook(new Thread(
    () -> {
        keepRunning = false;    
    }
));

Full Example

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    // Terminating condition for the consumer
    private static volatile boolean keepRunning = true;

    public static void main(String[] args) {
        // Define bootstrap server
        String bootstrapServers = "localhost:9092";

        String topicName = "test-topic";
        String consumerGroupId = "test-group";

        // Set consumer properties
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Create KafkaConsumer using above properties
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // subscribe to topic
        consumer.subscribe(Collections.singletonList(topicName));
        
        // register a shutdown hook for graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(
            () -> {
                keepRunning = false;    
            }
        ));
        
        // poll messages in a loop
        try {
            while (keepRunning) {
                ConsumerRecords<String, String> records = consumer
                            .poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Offset: %d, Key: %s, Value: %s%n",
                                      record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Running Your Kafka Consumer

Make sure the test-topic exists, then execute the KafkaConsumerExample class to confirm that it is successfully consuming messages from the Kafka topic.

To produce messages to the test-topic, you can use the Kafka Console Producer with Docker by running the following command:

docker run --rm -it \
--network host \
apache/kafka:3.7.1 \
/opt/kafka/bin/kafka-console-producer.sh --topic test-topic \
--bootstrap-server localhost:29092 

If you’d like a comprehensive producer application to generate messages, you can check out my other blog for a detailed step-by-step tutorial here – How to Build Your First Kafka Producer: A Step-by-Step Tutorial.

Conclusion

In this tutorial, you learned how to set up and run a Kafka consumer in Java. Kafka consumers are at the heart of processing streaming data, and mastering them is key to building robust, real-time data pipelines.

1 thought on “How to Build Your First Kafka Consumer: A Step-by-Step Tutorial”

  1. Pingback: The Secret Sauce of Kafka Consumer Groups: How to Scale Your Data Pipelines » CodingJigs

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top