Apache Kafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. In this post, I will explain how to use Kafka in Java to publish and consume messages.
Setting up Kafka
Before we can start using Kafka, we need to set it up. Follow the instructions below to set up Kafka on your local machine:
- Download the latest version of Apache Kafka from the official website.
- Extract the contents of the downloaded file to a directory of your choice.
- Start the ZooKeeper server by running the following command from the Kafka directory:
bin/zookeeper-server-start.sh config/zookeeper.properties
- Start the Kafka server by running the following command from the Kafka directory:
bin/kafka-server-start.sh config/server.properties
Publishing Messages to Kafka
To publish messages to Kafka, we need to create a producer. The producer is responsible for sending messages to the Kafka broker. Follow the instructions below to create a Kafka producer in Java:
- Add the following dependencies to your project:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>
- Create a Kafka producer instance with the following code:
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { // Set the producer properties Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // Create the Kafka producer Producer<String, String> producer = new KafkaProducer<>(properties); // Send a message ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key", "value"); producer.send(record); // Close the producer producer.close(); } }
- Run the producer to send a message to the Kafka broker. In the above code, we are sending a message with a key and value to the
test_topic
.
Consuming Messages from Kafka
To consume messages from Kafka, we need to create a consumer. The consumer is responsible for receiving messages from the Kafka broker. Follow the instructions below to create a Kafka consumer in Java:
- Add the following dependencies to your project:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>
- Create a Kafka consumer instance with the following code:
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { // Set the consumer properties Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.deserializer", StringDeserializer.class.getName()); properties.put("value.deserializer", StringDeserializer.class.getName()); properties.put("group.id", "test_group"); // Create the Kafka consumer Consumer<String, String> consumer = new KafkaConsumer<>(properties); // Subscribe to the topic consumer.subscribe(Collections.singletonList("test_topic")); // Poll for new messages while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("key = %s, value = %s%n", record.key(), record.value()); } } } }
- Run the consumer to receive messages from the Kafka broker. In the above code, we are subscribing to the
test_topic
and printing the key and value of each message received.
Conclusion
In this post, we learned how to use Kafka in Java to publish and consume messages. We set up Kafka on our local machine, created a producer to send messages to Kafka, and created a consumer to receive messages from Kafka. With this knowledge, you can now start building your own real-time data pipelines and streaming applications using Kafka in Java.