Kafka is a fast-streaming service suitable for heavy data streaming. This article presents a technical guide that takes you through the necessary steps to distribute messages between Java microservices using the streaming service Kafka. To conclude, we will briefly present some performance benchmarks as well.

Installing Kafka

Before even installing Kafka, you need to make sure that you have Java and Zookeeper installed on your system. Zookeeper manages Kafka brokers, i.e., systems that store and receive messages. It also informs producers and consumers if there is any failure or the presence of a new broker in the system. 

So, let’s take a look at getting Zookeeper up and running.

Zookeeper Setup

Download Zookeeper here and extract the tar file using the following command:

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

Now you can create a configuration file by opening a file named “conf/zoo.cfg” using the command:

vi “conf/zoo.cfg”  
$ vi conf/zoo.cfg
tickTime=1000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

Save this configuration. You have now installed Zookeeper successfully.

To start the Zookeeper server, you need to open a new terminal by running the command:

$ bin/zkServer.sh start

Zookeeper will now start, and the following output will be displayed:

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

To connect to the Zookeeper server, run the command:

$ bin/zkCli.sh

After you execute the command, this response will appear:

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................

Now that you have Zookeeper successfully installed, Kafka can be installed on your machine.

Kafka Setup( )

Download Kafka here and extract the tar file by executing the following command:

$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

Now start the server via the following command: 

$ bin/kafka-server-start.sh config/server.properties

After the server starts, the following response ID will be displayed on the screen:

$ bin/kafka-server-start.sh config/server.properties
[2019-08-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

Kafka Basics

With Kafka installed, you can publish and subscribe to streams of messages. Different components in Kafka are responsible for storing and processing these record streams as they occur. Here are some of these basic components:

Producer

This is an application responsible for sending messages to a Kafka Topic. Its key components are:

  • BOOTSTRAP_SERVERS_CONFIG: This configures the Kafka broker’s address.
  • CLIENT_ID: ID of the producer. It is required by the broker to determine the source of a request.
  • PARTITIONER_CLASS_CONFIG: Determines which partition a record will go to 
  • GROUP_ID_CONFIG: The consumer group ID.

Brokers

These are the systems used to maintain published data, i.e., to receive and store messages sent by the producers. A broker may have zero to many partitions in each topic. After connecting to one or more brokers (bootstrap.servers), you will be connected to all the servers in the cluster. A broker is identified with its ID (broker.id) and can handle hundreds of thousands, or even millions, of messages per second.

Cluster

This is a Kafka system that has more than one Kafka broker. Having multiple brokers helps manage high replication of messages.

The main Kafka APIs are:

  • Producer API – enables an application to send messages or records to one or more Kafka topics.
  • Consumer API – allows an application to subscribe to one or many Kafka topics.
  • Streams API – enables an application to act as a stream processor, i.e, to consume an input stream from one or more topics and produce an output stream. 
  • Connector API – enables the building and running of reusable producers or consumers to connect to existing Kafka topics in applications or data systems. 

Record

This is a topic name and partition number (key) to be sent by a producer. If the key is unspecified, a round-robin algorithm is used to distribute records.

Topics

These are feed names where records are published. Topics are always multi-subscriber, meaning a topic can have zero to many consumers that subscribe to its data.

Note: A topic must have at least one partition, and multiple topics can exist in a single broker.

Partitions

Partitions enable a topic to handle a large amount of data through parallelism. This means that the data is split across multiple brokers in a single topic. Two or more consumers cannot consume messages from the same topic. However, multiple consumers can read from a single topic in parallel if each partition is placed in a different machine.

Offsets

This is a unique sequence ID contained in each partitioned message. It is an integer number used to maintain a consumer’s current position. It points to the latest record sent by a producer to a consumer.

Looking to automate your tracing? Get started with Epsagon today for free.

Kafka and Java

In this example, we will focus on creating a producer and consumer application using Java. Before sending and receiving messages in Kafka, the following concepts need to be understood.  

Creating a Kafka Topic

To create a topic, we will use kafka-topics.sh. This tool is located at ~/kafka-training/kafka/bin/kafka-topics.sh. We will create a topic called my-sample-topic with thirteen partitions, meaning we could have up to 13 Kafka consumers.

Open a new terminal and create this script kafka-training\lab1 to run Kafka:

# Create topics
kafka/bin/kafka-topics.sh --create \
    --replication-factor 2 \
    --partitions 10 \
    --topic my-example-topic \
    --zookeeper  localhost:2181
## List created topics
kafka/bin/kafka-topics.sh --list \
    --zookeeper localhost:2181

Here, a topic named my-sample-topic with 10 partitions and a replication factor of 2 is created. The following output is printed after running create-topic.sh:

~/kafka-training/lab1
$ ./create-topic.sh
Created topic "my-example-topic".
__consumer_offsets
my-example-topic
My-failsafe-topic

List of All Topics

To display the list of topics in your Kafka server, run the following command:

./bin/kafka-topics.sh --list --zookeeper localhost:2181

Note: In this case, Zookeeper’s address is localhost:2181.

Modifying a Topic

A topic in the Kafka cluster can be modified later on if needed. To do so, run the command:

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count

In our example, we have already created a topic (my-sample-topic) with a partition count of 10 and a replication factor of 2. The command alter is used to increase or decrease a partition count.

Deleting a Topic

To delete a created topic in the Kafka cluster, run the following command:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-example-topic-kafka

After running this command, the following message will be displayed:

> Topic my-example-topic-kafka marked for deletion

Note: This command has no effect if delete.topic.enable is not set as true.

Creating a Kafka Producer to Send Records

To create a Kafka producer, you will use java.util.Properties to define certain properties that you then pass to the constructor of a KafkaProducer:

 public class KafkaProducerExample {
    ...
    private static Producer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

Above, KafkaProducerExample.createProducer sets the BOOTSTRAP_SERVERS_CONFIG property to the list of broker addresses we defined earlier.

Creating a Kafka Consumer to Receive Records Using a Topic

Creating a Kafka Consumer is similar to creating a Kafka producer; you create a Java properties instance with properties you want to assign to the consumer:

package com.cloudurable.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
   private final static String TOPIC = "my-example-topic";
   private final static String BOOTSTRAP_SERVERS =
           "localhost:9092,localhost:9093,localhost:9094";
   ...
}

Here, LongDeserializer and StringDeserializer are imported by KafkaConsumerExample. LongDeserializer is configured as the Kafka record key deserializer, and StringDeserializer is set up as the record value deserializer.

You can see that the constant BOOTSTRAP_SERVERS gets set to localhost:9092, localhost:9093, and localhost:9094 servers. And the constant TOPIC is set to the replicated Kafka topic.

After defining these constants and importing the Kafka classes, you can now create a consumer as shown below:

 public class KafkaConsumerExample {
 ...
 private static Consumer<Long, String> createConsumer() {
     final Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                 BOOTSTRAP_SERVERS);
     props.put(ConsumerConfig.GROUP_ID_CONFIG,
                                 "KafkaExampleConsumer");
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
             LongDeserializer.class.getName());
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
             StringDeserializer.class.getName());
     // Create the consumer using props.
     final Consumer<Long, String> consumer =
                                 new KafkaConsumer<>(props);
     // Subscribe to the topic.
     consumer.subscribe(Collections.singletonList(TOPIC));
     return consumer;
 }
 ...
}

Above, KafkaConsumerExample.createConsumer sets the BOOTSTRAP_SERVERS_CONFIG property to the list of broker addresses defined earlier. The GROUP_ID_CONFIG is the ID of the consumer in the consumer group, and consumer.subscribe(Collections.singletonList(TOPIC))subscribes the consumer to the topic.

Looking to automate your tracing? Get started with Epsagon today for free.

Sending Records With Kafka Producer

Kafka can send records in two ways: synchronous and asynchronous.

Synchronous Send Method

In this method, a message is sent after the acknowledgment is received. This method allows you to stop sending messages in case of a fault in the system occurs. Below, the Kafka producer sends records using the synchronous method:

public class KafkaProducerExample {
  ...
  static void runProducer(final int sendMessageCount) throws Exception {
      final Producer<Long, String> producer = createProducer();
      long time = System.currentTimeMillis();
      try {
          for (long index = time; index < time + sendMessageCount; index++) {
              final ProducerRecord<Long, String> record =
                      new ProducerRecord<>(TOPIC, index,
                                  "Learning Kafka " + index);
              RecordMetadata metadata = producer.send(record).get();
              long elapsedTime = System.currentTimeMillis() - time;
              System.out.printf("sent record(key=%s value=%s) " +
                              "meta(partition=%d, offset=%d) time=%d\n",
                      record.key(), record.value(), metadata.partition(),
                      metadata.offset(), elapsedTime);
          }
      } finally {
          producer.flush();
          producer.close();
      }
  }
  ...

Here, we define a repetition through a loop. This creates a ProducerRecord that sends an example message (Learning Kafka + index) as the record value. The synchronous method is the slowest, but it is best to use if you can’t afford to lose any messages.

Asynchronous Send Method

This approach has a better throughput compared to the synchronous method and does not have guaranteed ordering. Below, the producer sends messages using the asynchronous send method:

static void runProducer(final int sendMessageCount) throws InterruptedException {
    final Producer<Long, String> producer = createProducer();
    long time = System.currentTimeMillis();
    final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);
    try {
        for (long index = time; index < time + sendMessageCount; index++) {
            final ProducerRecord<Long, String> record =
                    new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);
            producer.send(record, (metadata, exception) -> {
                long elapsedTime = System.currentTimeMillis() - time;
                if (metadata != null) {
                    System.out.printf("sent record(key=%s value=%s) " +
                                    "meta(partition=%d, offset=%d) time=%d\n",
                            record.key(), record.value(), metadata.partition(),
                            metadata.offset(), elapsedTime);
                } else {
                    exception.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await(25, TimeUnit.SECONDS);
    } finally {
        producer.flush();
        producer.close();
    }
}

Note: CountDownLatch is used so that we can send all messages and then wait for all of them to be sent.

Processing Messages From Kafka With a Consumer

The Kafka messaging protocol is a TCP-based protocol that provides a fast, scalable, and durable method for exchanging data between applications. Messaging providers are typically used in IT architectures in order to decouple the processing of messages from the applications that produce them.

Below, you will process some messages using the Kafka consumer example you created:

public class KafkaConsumerExample {
 ...
   static void runConsumer() throws InterruptedException {
       final Consumer<Long, String> consumer = createConsumer();
       final int giveUp = 100;   int noRecordsCount = 0;
       while (true) {
           final ConsumerRecords<Long, String> consumerRecords =
                   consumer.poll(1000);
           if (consumerRecords.count()==0) {
               noRecordsCount++;
               if (noRecordsCount > giveUp) break;
               else continue;
           }
           consumerRecords.forEach(record -> {
               System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                       record.key(), record.value(),
                       record.partition(), record.offset());
           });
           consumer.commitAsync();
       }
       consumer.close();
       System.out.println("DONE");
   }
}

The ConsumerRecords class holds a list of ConsumerRecord(s) per partition for a particular topic.

Note: In every topic partition returned by the consumer.poll(), there is one ConsumerRecord list.

Conclusion

Kafka is a fast-streaming messaging service equipped with a command-line client that takes standard input and converts it before sending that input out as messages. A consumer is created that subscribes to the appropriate topic to start receiving messages. The received messages are then validated and written down as results.

A single consumer may cause an application to fall further and further behind if it doesn’t keep up with the rate of incoming messages. To beat this lag, multiple consumers should be allowed to read from the same topic. Kafka can perform well in high-latency tasks by adding consumers to consumer groups. This ensures that all messages are consumed in time and that there is no overflow of messages.

In comparison to JMS, RabbitMQ, and AMQP, Kafka is preferred due to its faster response times, better throughput, built-in partitioning, replication, and fault-tolerance. For large-scale message processing applications, these factors make Kafka a solid solution. It’s simplicity, durability, and excellent performance offer guaranteed tunable consistency.

Looking to automate your tracing? Get started with Epsagon today for free.