Apache Kafka is a powerful tool for building real-time data pipelines and streaming applications. One of the key components of Kafka is the Producer, which is responsible for sending messages to Kafka topics. In this blog, we’ll walk through the steps to build a Kafka Producer application using Java.
What is a Kafka Producer?
A Kafka Producer is an application that sends records (messages) to Kafka topics. Producers are responsible for:
- Creation of records or messages that can be sent to Kafka.
- Determining the appropriate partition of the topic to send the message to.
- Sending the message to the right partition of the intended topic on Kafka.
- Creating and maintaining a communication channel/connection with Kafka.
- Batching the messages together, so as to reduce network roundtrips and enable higher throughput.
- Retrying sending the failed messages so that messages are not lost.
- Applying compression to messages to allow higher throughput.
- Providing delivery acknowledgments to confirm successful message transmission in order to prevent data loss.
Thankfully, Kafka provides us a Producer Client library that handles most of the complexities behind the scenes, so developers don’t have to worry about all the technicalities.
Additionally, some of the Producer features mentioned above require a considerable configuration. In this post, we will focus on a basic producer that sends data to Kafka without batching, compression, or acknowledgments.
Step-by-Step Guide to Implement Kafka Producer
1. Install Apache Kafka
Before creating a producer, you need to have Apache Kafka installed and running.
One of the easiest ways to install Apache Kafka is by setting up a single-node Kafka cluster and deploying it using Docker. You can find the installation steps for a single-node Kafka cluster in my other blog post here – Setting Up a Single-Node Kafka Cluster using KRaft Mode
2. Set up your Development Environment
You will need –
- A Java development kit (JDK) installed (Java 8 or above).
- A development environment like IntelliJ IDEA, Eclipse, or Visual Studio Code.
3. Add Kafka Dependencies
Include Kafka client dependencies in your project –
Maven –
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version> <!-- Use the latest version -->
</dependency>
Gradle –
implementation 'org.apache.kafka:kafka-clients:3.5.0'
4. Write the Kafka Producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class SimpleKafkaProducer {
public static void main(String[] args) {
// Define bootstrap server
String bootstrapServers = "localhost:9092";
String topicName = "my_topic";
String messageKey = "mykey";
String messageBody = "Hello, Kafka!";
// Set producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Create producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// Create a message
ProducerRecord<String, String> message = new ProducerRecord<>(topicName, messageKey, messageBody);
// send the message - asynchronous
Future<RecordMetadata> metadataFuture = producer.send(message);
// block until message is sent
try {
// following get() call is blocking in nature
RecordMetadata metadata = metadataFuture.get();
} catch (InterruptedException | ExecutionException e {
// handle exception
}
// Close the producer
producer.close();
}
}
Explanation
- The
SimpleKafkaProducer
class above uses Kafka client library to create a Kafka producer and submit message records to the producer. - The producer requires some minimum configuration as follows, supplied through a
Properties
object –- BOOTSTRAP_SERVERS_CONFIG – this is the address of the target Kafka broker in
host:port
format. If there are multiple brokers in the target Kafka cluster, all of them can be provided here although its not mandatory. - KEY_SERIALIZER_CLASS_CONFIG – this is the name of the class that will be used to serialize the keys of each message (messages can optionally include a key). Serialization is the process of converting data into a binary format before it can be transmitted over a network. In the example above, we used the
StringSerializer
class, which is designed for serializing plain text. - VALUE_SERIALIZER_CLASS_CONFIG – this is the name of the class that will be used to serialize the actual message itself.
- BOOTSTRAP_SERVERS_CONFIG – this is the address of the target Kafka broker in
- Next step is to create the producer itself by instantiating
KafkaProducer
class from the Kafka client library. This step hides all the complexities of creating a communication channel with the target Kafka broker/cluster. - Next step is to create an object of
ProducerRecord
class for each message to be sent. Creating a ProducerRecord takes following 3 arguments –- Name of the target topic – Here, it’s important to note that since the topic name is specified when creating a
ProducerRecord
, the sameKafkaProducer
instance can be used to send messages to multiple topics. In other words, there is no need to create a separateKafkaProducer
object for each topic. - Message Key – This is optional. Keys are used to group messages, ensuring that all messages with the same key are written to the same partition in Kafka.
- Message – The actual payload/content of the message.
- Name of the target topic – Here, it’s important to note that since the topic name is specified when creating a
- And finally submit the message to the producer via
send()
method. Again, the underlying producer takes care of all the complexities associated with delivering the message to the specified topic of the target Kafka broker.
A Note about the Send Method of the KafkaProducer Class
The send
method of the KafkaProducer
class is asynchronous by design. It accepts a ProducerRecord
and returns a Future<RecordMetadata>
. The method queues the record for sending to the Kafka broker, allowing the producer to continue processing without waiting for the network operation to complete.
Asynchronous Behavior – The method immediately returns a Future
object, meaning that it does not block the caller. The record is processed and sent in the background by the producer’s internal threads.
// send the message - asynchronous
Future<RecordMetadata> metadataFuture = producer.send(message);
Blocking with get()
– If you need to wait for the send operation to complete, you can call the get()
method on the returned Future
. This will block until the request is completed and the metadata is returned, which includes –
- The partition to which the message was assigned.
- The offset assigned to the message.
- The topic to which the message was sent.
// following get() call is blocking in nature
RecordMetadata metadata = metadataFuture.get();
Error Handling: If an error occurs while sending the record, calling get()
will throw an ExecutionException
that needs to be handled –
// block until message is sent
try {
// following get() call is blocking in nature
RecordMetadata metadata = metadataFuture.get();
} catch (InterruptedException | ExecutionException e {
// handle exception
}
5. Run the Producer and Verify using Console-Consumer
Run the SimpleKafkaProducer class and check if the message appears in your Kafka topic.
Use the Kafka console consumer using Docker to verify –
docker run --rm -it \
--network host \
apache/kafka:3.7.1 \
/opt/kafka/bin/kafka-console-consumer.sh --topic my_topic \
--bootstrap-server localhost:29092 --from-beginning
If you don’t have Docker or prefer running the kafka-console-consumer.sh
shell script directly from the Kafka package, use the following command:
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my_topic \
--from-beginning
That’s it! If you were able to follow the above steps without hitting any errors, you should see your messages streamed to Kafka.
Conclusion
Setting up a Kafka producer is simple and powerful. By sending messages to Kafka topics, you can build the foundation for real-time data pipelines and event-driven applications. Experiment with different configurations and explore advanced features like partitioning and message batching to optimize your producer.
Pingback: A Practical Guide to Setting Up a 6 Node KRaft-based Kafka Cluster
Pingback: Configuring a Single Node Kafka Cluster with Kraft Consensus Protocol
Pingback: Kafka Producer Acknowledgements: A Complete Guide to Message Reliability - CodingJigs
Pingback: How to Build Your First Kafka Consumer: A Step-by-Step Tutorial » CodingJigs