Apache Kafka has become the backbone of modern data streaming architectures. While producers send data to Kafka topics, consumers are responsible for fetching and processing that data. If you’re just starting with Kafka consumers, this tutorial will walk you through the basics and provide a step-by-step guide to get you up and running.
What is a Kafka Consumer?
A Kafka consumer is a client application that reads data from Kafka topics. Consumers are part of consumer groups, which enable Kafka to distribute data across multiple consumers for scalability and fault tolerance. Each consumer in a group reads data from different partitions of a topic.
Kafka consumers are responsible for –
- Polling messages – Regularly polls the Kafka broker to retrieve messages from assigned partitions.
- Deserialization – Converts the byte array messages received from Kafka into usable objects (e.g., strings, JSON, or custom objects) using deserializers.
- Offset management – Tracks and commits offsets to Kafka, to keep a record of the last processed message.
- Automatic Offset Commit: Periodically commits offsets as specified in the configuration.
- Manual Offset Commit: Allows precise control over when offsets are committed.
- Creates and maintains a communication channel/connection with Kafka broker.
Thankfully, Kafka provides us a Consumer Client library that handles most of the complexities behind the scenes, so developers don’t have to worry about all the technicalities.
Step-by-Step Guide to Implement Kafka Consumer
1. Install Apache Kafka
Before creating a Kafka consumer, you need to have Apache Kafka broker setup and running.
One of the easiest ways to setup Apache Kafka broker is by setting up a single-node Kafka cluster and deploying it using Docker. You can find the installation steps for a single-node Kafka cluster in my other blog post here – Setting Up a Single-Node Kafka Cluster using KRaft Mode
You will also need a Kafka Producer to
2. Set up your Development Environment
You will need –
- A Java development kit (JDK) installed (Java 8 or above).
- A development environment like IntelliJ IDEA, Eclipse, or Visual Studio Code.
3. Add Kafka Dependencies
Include Kafka client dependencies in your project –
Maven –
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version> <!-- Use the latest version -->
</dependency>
Gradle –
implementation 'org.apache.kafka:kafka-clients:3.5.0'
4. Write the Kafka Consumer
Here’s a step-by-step guide to writing your first Kafka consumer:
4.1. Import Required Libraries
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
4.2. Configure Consumer Properties
Set up the consumer configuration to connect to the Kafka cluster:
public class KafkaConsumerExample {
public static void main(String[] args) {
// Define bootstrap server
String bootstrapServers = "localhost:9092";
String topicName = "my_topic";
String consumerGroupId = "test-group";
// Set consumer properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create KafkaConsumer using above properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
}
}
bootstrap.servers
: This is the address of the target Kafka broker inhost:port
format.group.id
: The consumer group ID. A Kafka consumer group is a collection of consumers that work together to consume messages from a topic in parallel.key.deserializer
&value.deserializer
: Define how keys and values are deserialized. In the example above, we used theStringDeserializer
class, which is designed for deserializing messages into strings.auto.offset.reset
: Specifies where the consumer should begin reading messages when no previously committed offset is found. This applies when the consumer reads from the topic for the first time. In this example, we have set it toearliest
which means all messages from the beginning will be consumed.
4.3. Subscribe to a Topic
Subscribe the consumer to a topic:
consumer.subscribe(Collections.singletonList(topicName));
4.4. Poll for Records
Fetch messages in a loop and process them:
while (keepRunning) {
ConsumerRecords<String, String> records = consumer
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Offset: %d, Key: %s, Value: %s%n",
record.offset(), record.key(), record.value());
}
}
4.5. Graceful Shutdown
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
keepRunning = false;
}
));
Full Example
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
// Terminating condition for the consumer
private static volatile boolean keepRunning = true;
public static void main(String[] args) {
// Define bootstrap server
String bootstrapServers = "localhost:9092";
String topicName = "test-topic";
String consumerGroupId = "test-group";
// Set consumer properties
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create KafkaConsumer using above properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// subscribe to topic
consumer.subscribe(Collections.singletonList(topicName));
// register a shutdown hook for graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
keepRunning = false;
}
));
// poll messages in a loop
try {
while (keepRunning) {
ConsumerRecords<String, String> records = consumer
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Offset: %d, Key: %s, Value: %s%n",
record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
Running Your Kafka Consumer
Make sure the test-topic
exists, then execute the KafkaConsumerExample
class to confirm that it is successfully consuming messages from the Kafka topic.
To produce messages to the test-topic
, you can use the Kafka Console Producer with Docker by running the following command:
docker run --rm -it \
--network host \
apache/kafka:3.7.1 \
/opt/kafka/bin/kafka-console-producer.sh --topic test-topic \
--bootstrap-server localhost:29092
If you’d like a comprehensive producer application to generate messages, you can check out my other blog for a detailed step-by-step tutorial here – How to Build Your First Kafka Producer: A Step-by-Step Tutorial.
Conclusion
In this tutorial, you learned how to set up and run a Kafka consumer in Java. Kafka consumers are at the heart of processing streaming data, and mastering them is key to building robust, real-time data pipelines.
Pingback: The Secret Sauce of Kafka Consumer Groups: How to Scale Your Data Pipelines » CodingJigs