Producers and Consumers
What are Producers?
Producers publish messages to a topic at the end of a partition.
By default, if a message contains a key, the hashed value of the key is used to decide in which partition the message is stored.
If the key is null
, then a round-robin algorithm is used to balance the messages across all partitions.
Kafka guarantees that all as long as no new partitions are added, elements with the same key will be stored in the same partition.
If you want you can also implement a custom partitioning strategy to store messages in a specific partition following any kind of business rule.

What are Consumers?
Each message published to a topic is delivered to a consumer that is subscribed to that topic.
A consumer can read data from any position of the partition, and internally it is stored as a pointer called offset
.
In most of the cases, a consumer advances its offset
linearly, but it could be in any order, or starting from any given position.
Each consumer belongs to a consumer group. A consumer group is a list of consumer instances that ensures the scalability for processing messages and also fault tolerance.
When a consumer group only contains one consumer, this consumer is responsible for processing all messages of all partitions.
The more consumers you add to a group, each consumer will only receive messages from a subset of the partitions.

If you add more consumers in a consumer group than the number of partitions for a topic, then they will stay in an idle state, without getting any message. |
As you can see what is registered to a topic is the consumer group and not the consumer. The consumer is "subscribed" to a partition.
Consume and Produce Messages
Before digging into how to produce and consume data from Java, let’s see how to use kcat
to do it.
kcat
is a great tool for these use cases, and it is the de-facto tool to inspect data in Kafka.
Producing messages with Kafkacat
Open a new terminal, terminal 2, and create a new file with the next content:
1: {"id": 1, "name": "The Ecstasy of Gold", "author":"Ennio Morricone", "op":"ADD"}
2: {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}
Then insert each of this line as a new message.
The number before the :
is taken as an id.
kcat -b localhost:29092 -t songs -P -l -K: initial_songs
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: initial_songs
Switch back to terminal 1 where you are running kcat
as a consumer.
You’ve defined this at Consuming messages with Kafkacat section.
Now the output of the terminal has been updated showing the consumed messages:
1t {"id": 1, "name": "The Ecstasy of Gold", "author":"Ennio Morricone", "op":"ADD"}
2t {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}
% Reached end of topic songs [0] at offset 2
Notice that the offset
of this consumer has been updated to 2, as the two first records has been consumed from the topic.
Playing with Offsets
A Producer always produces content at the end of a topic, meanwhile, a consumer can consume from any offset.
Still on terminal 1, stop the kcat
process by typing Ctrl+C.
Then start again kcat
with the same command:
kcat -b localhost:29092 -t songs -P -l -K: initial_songs
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: initial_songs
1: {"id": 1, "name": "The Ecstasy of Gold", "author":"Ennio Morricone", "op":"ADD"}
2: {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}
% Reached end of topic songs [0] at offset 2
Notice that all stream is consumed.
This is happening because by default kcat
is getting all the content from a topic since the beginning of the stream.
But you can change that.
Stop again kcat
process by typing Ctrl+C.
And start again with the following command:
kcat -b localhost:29092 -t songs -o 1 -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -o 1 -C -K:
2: {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}
% Reached end of topic songs [0] at offset 2
Now the initial offset was not set to the beginning but from the second element.
Changing Retention
By default, Kafka retains data on a topic for 168 hours (7 days). But retention can be changed, let’s change it to 1 minute.
You can also use kafka-topic.sh
tool to change the retention time of a topic.
Open a third terminal:
This tool is bundled inside Kafka installation, so let’s exec
a bash
terminal inside the Kafka container.
docker exec -it $(docker ps -q --filter "label=com.docker.compose.service=kafka") /bin/bash
Inside the container run:
./bin/kafka-configs.sh --zookeeper zookeeper_1:2181 --alter --entity-type topics --entity-name songs --add-config 'retention.ms=10000'
Completed Updating config for entity: topic 'songs'.
And check that it has been configured correctly:
./bin/kafka-topics.sh --describe --bootstrap-server localhost:29092 --topic songs
Topic: songs PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=10000
Topic: songs Partition: 0 Leader: 0 Replicas: 0 Isr: 0
In one terminal produce data again:
kcat -b localhost:29092 -t songs -P -l -K: initial_songs
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: initial_songs
In the other terminal consume this data so you know that it has been inserted correctly:
kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
1: {"id": 1, "name": "The Ecstasy of Gold", "author":"Ennio Morricone", "op":"ADD"}
2: {"id": 2, "name": "Still Loving you", "author":"Scorpions", "op":"ADD"}
% Reached end of topic songs [0] at offset 2
Stop the kcat
process by typing Ctrl+C and wait for ~10 seconds (ie sleep 15
).
Run the kcat
in consumer mode again:
kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
% Reached end of topic songs [0] at offset 2
The important part is the last line, notice that it returns no messages, but the offset is 2
.
The reason is that the messages are expired and deleted from the topic, but the offset has its retention period.
By default, this retention period is 1 week, so although there is no data, the current offset for a consumer is '2'.
Change retention time to the default one by running in the third terminal (the one you did docker exec
), the following command:
./bin/kafka-configs.sh --zookeeper zookeeper_1:2181 --alter --entity-type topics --entity-name songs --delete-config retention.ms
Completed Updating config for entity: topic 'songs'.
Now that the topic configuration is back to default, you can run the exit
command:
exit
Deleting Topic Content
You can also use kafka-topic.sh
tool to delete the content of a topic manually.
This tool is bundled inside Kafka installation, so let’s exec
a bash
terminal inside the Kafka container.
docker exec -it $(docker ps -q --filter "label=com.docker.compose.service=kafka") /bin/bash
Inside the container run:
./bin/kafka-topics.sh --delete --bootstrap-server localhost:29092 --topic songs
Now that the topic is deleted, you can run the exit
command:
exit
If you run kcat
again, you’ll see that no data is in the topic:
kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
Stop kcat
process by typing Ctrl+C.