Producers and Consumers

What are Producers?

Producers publish messages to a topic at the end of a partition. By default, if a message contains a key, the hashed value of the key is used to decide in which partition the message is stored. If the key is null, then a round-robin algorithm is used to balance the messages across all partitions.

Kafka guarantees that all as long as no new partitions are added, elements with the same key will be stored in the same partition.

If you want you can also implement a custom partitioning strategy to store messages in a specific partition following any kind of business rule.

p

What are Consumers?

Each message published to a topic is delivered to a consumer that is subscribed to that topic.

A consumer can read data from any position of the partition, and internally it is stored as a pointer called offset. In most of the cases, a consumer advances its offset linearly, but it could be in any order, or starting from any given position.

Each consumer belongs to a consumer group. A consumer group is a list of consumer instances that ensures the scalability for processing messages and also fault tolerance.

When a consumer group only contains one consumer, this consumer is responsible for processing all messages of all partitions.

The more consumers you add to a group, each consumer will only receive messages from a subset of the partitions.

c
If you add more consumers in a consumer group than the number of partitions for a topic, then they will stay in an idle state, without getting any message.

As you can see what is registered to a topic is the consumer group and not the consumer. The consumer is "subscribed" to a partition.

Consume and Produce Messages

Before digging into how to produce and consume data from Java, let’s see how to use kcat to do it.

kcat is a great tool for these use cases, and it is the de-facto tool to inspect data in Kafka.

Consuming messages with Kafkacat

Open a new terminal, terminal 1, and run the following command:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
% Reached end of topic songs [0] at offset 0

Producing messages with Kafkacat

Open a new terminal, terminal 2, and create a new file with the next content:

initial_songs
1: {"id": 1, "name": "The Ecstasy of Gold", "author":"Ennio Morricone", "op":"ADD"}
2: {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}

Then insert each of this line as a new message. The number before the : is taken as an id.

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -P -l -K: initial_songs
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: initial_songs

Switch back to terminal 1 where you are running kcat as a consumer. You’ve defined this at Consuming messages with Kafkacat section.

Now the output of the terminal has been updated showing the consumed messages:

1t {"id": 1, "name": "The Ecstasy of Gold", "author":"Ennio Morricone", "op":"ADD"}
2t {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}
% Reached end of topic songs [0] at offset 2

Notice that the offset of this consumer has been updated to 2, as the two first records has been consumed from the topic.

Playing with Offsets

A Producer always produces content at the end of a topic, meanwhile, a consumer can consume from any offset. Still on terminal 1, stop the kcat process by typing Ctrl+C.

Then start again kcat with the same command:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -P -l -K: initial_songs
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: initial_songs
1: {"id": 1, "name": "The Ecstasy of Gold", "author":"Ennio Morricone", "op":"ADD"}
2: {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}
% Reached end of topic songs [0] at offset 2

Notice that all stream is consumed. This is happening because by default kcat is getting all the content from a topic since the beginning of the stream. But you can change that.

Stop again kcat process by typing Ctrl+C.

And start again with the following command:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -o 1 -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -o 1 -C -K:
2: {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}
% Reached end of topic songs [0] at offset 2

Now the initial offset was not set to the beginning but from the second element.

Changing Retention

By default, Kafka retains data on a topic for 168 hours (7 days). But retention can be changed, let’s change it to 1 minute.

You can also use kafka-topic.sh tool to change the retention time of a topic.

Open a third terminal:

This tool is bundled inside Kafka installation, so let’s exec a bash terminal inside the Kafka container.

docker exec -it $(docker ps -q --filter "label=com.docker.compose.service=kafka") /bin/bash

Inside the container run:

./bin/kafka-configs.sh --zookeeper zookeeper_1:2181 --alter --entity-type topics --entity-name songs --add-config 'retention.ms=10000'
Completed Updating config for entity: topic 'songs'.

And check that it has been configured correctly:

./bin/kafka-topics.sh --describe --bootstrap-server localhost:29092 --topic songs
Topic: songs	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824,retention.ms=10000
    Topic: songs	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

In one terminal produce data again:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -P -l -K: initial_songs
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: initial_songs

In the other terminal consume this data so you know that it has been inserted correctly:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
1: {"id": 1, "name": "The Ecstasy of Gold", "author":"Ennio Morricone", "op":"ADD"}
2: {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}
% Reached end of topic songs [0] at offset 2

Stop the kcat process by typing Ctrl+C and wait for ~10 seconds (ie sleep 15).

Run the kcat in consumer mode again:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
% Reached end of topic songs [0] at offset 2

The important part is the last line, notice that it returns no messages, but the offset is 2. The reason is that the messages are expired and deleted from the topic, but the offset has its retention period. By default, this retention period is 1 week, so although there is no data, the current offset for a consumer is '2'.

Change retention time to the default one by running in the third terminal (the one you did docker exec), the following command:

./bin/kafka-configs.sh --zookeeper zookeeper_1:2181 --alter --entity-type topics --entity-name songs --delete-config retention.ms
Completed Updating config for entity: topic 'songs'.

Now that the topic configuration is back to default, you can run the exit command:

exit

Deleting Topic Content

You can also use kafka-topic.sh tool to delete the content of a topic manually.

This tool is bundled inside Kafka installation, so let’s exec a bash terminal inside the Kafka container.

docker exec -it $(docker ps -q --filter "label=com.docker.compose.service=kafka") /bin/bash

Inside the container run:

./bin/kafka-topics.sh --delete --bootstrap-server localhost:29092 --topic songs

Now that the topic is deleted, you can run the exit command:

exit

If you run kcat again, you’ll see that no data is in the topic:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:

Stop kcat process by typing Ctrl+C.

Clean Up

Stop the current kcat process by typing Ctrl+C on the terminal.