Kafka Streams
So far, you’ve seen that you can consume and produce data into a Kafka topic, but sometimes you want to do things more advanced like joining different topics, aggregating content, or processing on the fly to send it to redirect the result to another topic.
What are Kafka Streams?
Kafka Streams is a Java API that implements all these features, doing in a fault-tolerant, scalable way. One of the important things of Kafka Streams application is that it doesn’t run inside a broker, but it runs in a separate JVM instance, maybe in the same cluster, or maybe in a different cluster but it is a different process.
A Kafka Stream application instances can run in parallel, in different machines and they will automatically collaborate on the data processing. This is what makes Kafka Streams applications fault-tolerance and scalable.
Kafka Streams Concepts
- Stream Processor
-
A Stream processor represents an operation to execute to a stream, some examples of built-in operations can be a filter, map, join, or aggregate. Usually, a Kafka Stream application is created for one or more operations.
- Table
-
A table is a collection of key-value pairs, that represents the last value for the same record key. The big difference is that a stream contains the log of all events (ie an insertion of a song, and update of a song, …) and a table contains the last value (ie the last update done in a song). There is two kinds of tables, a KTable which represents a table from a partition, and a GlobalKTable which aggregates the content from all partitions of a given topic. We’ll take a look later.
- Aggregation Operation
-
Takes one input stream or table, and yields a new table by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum.
- Join Operation
-
Merges two input streams and/or tables based on the keys of their data records, and yields a new stream/table.
- Windowing
-
Group records that have the same key by a window time. For example, you can group which events have occurred between a period of time.
- Interactive Queries
-
Treat the stream processing layer as a lightweight embedded database, to directly query the latest state of your stream processing application.

Kafka Streams Operations
Let’s see some of the Kafka-Streams concepts in action.
Rememeber to have Zookeeper and Apache Kafka up and running. If not, you can follow [Setup Kafka] section to learn how to do it using docker-compose.
Tables
You need only one terminal window to run this section.
Create a new file named update_songs
with some new songs on it.
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
Then insert each of this line as a new message.
The number before the :
is taken as an id.
kcat -b localhost:29092 -t songs -P -l -K: update_songs
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: update_songs
Then inspect the inserted songs using kafkacat
:
kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
Stop the process by typing Ctrl+C.
Create a Global Table from songs
topic:
java -jar kstreamscat.jar --topic=songs --id=songs -b=localhost:29092 --GT
Starting Kafka Streams...
Kafka Stream Threads started
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
Stop the process by typing Ctrl+C.
Notice that the output of the kafkacat
and kstreamcat
is fairly the same.
Probably most of you have noticed that The Imperial March
was not written by Alain Silvestri but by John Williams.
So let’s fix this.
Create a file that creates a new event to fix the problem.
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}
And add it to songs
topic:
kcat -b localhost:29092 -t songs -P -l -K: update_songs2
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: update2_songs
Then inspect the inserted songs using kafkacat
:
kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}
Stop the process by typing Ctrl+C.
Create a Global Table from the songs
topic:
java -jar kstreamscat.jar --topic=songs --id=songs -b=localhost:29092 --GT
Starting Kafka Streams...
Kafka Stream Threads started
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}
Stop the process by typing Ctrl+C.
Notice that kafkacat
shows the content of all the events that have been added into the songs
topic, so both events of The Imperial March songs are visible.
On the other side, Global Table just shows the latest values of the topic. For this reason, only the update event of The Imperial March is shown, and not the initial insertion.
Clean Up
To restart Kafka (and Zookeeper) go to docker-compose
terminal and stop the process by typing Ctrl+C.
Then run:
docker-compose rm
Going to remove kafka, it_zookeeper_1
Are you sure? [yN] y
Removing kafka ... done
Removing it_zookeeper_1 ... done
docker-compose up --remove-orphans
kafka | [2020-05-04 11:30:27,392] INFO [SocketServer brokerId=0] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
kafka | [2020-05-04 11:30:27,396] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2020-05-04 11:30:27,397] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2020-05-04 11:30:27,397] INFO Kafka startTimeMs: 1588591827393 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2020-05-04 11:30:27,398] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Windowing
Windowing allows you to control how to group records that have the same key based on a time window. For example, you can answer questions like how many songs each user has played the last 30 minutes or how many songs each user has played every day.
There are 4 types of windows:
- Tumbling time window
-
Fixed-size, non-overlapping, and gap-less.

- Hopping time window
-
Fixed-size, overlapping windows.

- Sliding time window
-
Fixed-size, overlapping windows that work on differences between record timestamps. In the case of Kafka Streams, it defines a maximum time difference for a join over two streams on the same key.
- Session window
-
Dynamically-sized, non-overlapping, data-driven windows. Sessions represent a period of activity separated by a defined gap of inactivity. This window is used for user behaviour analysis.

For this section, you need to open two terminals on the same screen.
Let’s see how affect different window strategies in a topic that contains the history of the songs that each user has played.
Tumbling time window
Let’s create a periodic window of 60 seconds. What we are doing here is creating fixed buckets of 60 seconds, so every 60 seconds a new window is created to process the events that fall into that window.
For example, let’s count the number of played songs by the user in a bucket of 60 seconds.
With Kafka cluster started, prepare but do not execute the next command in terminal 2:
java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --time-window=60
In terminal 1, run the next command, and then immediatelly run the commandd you’ve prepared in terminal 2:
kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
And run the command that you’ve prepared in the terminal 2:
java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --time-window=60
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Starting Kafka Streams...
Kafka Stream Threads started
Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1
Then in the terminal 1, run again the kafkacat
command to insert again the played songs:
kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
And if you inspect the kstreamscat
output you’ll see:
Key: alex : 8
Key: burr : 6
Key: kamesh : 4
Key: edson : 2
Key: sebi : 2
Notice that the played songs have been increased by 2.
Now wait for one minute and then run the kafkacat
command again to insert again the played songs:
kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
And if you inspect the kstreamscat
output you’ll see:
Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1
After 1 minute, the window has shifted into a new one so these new played songs have fallen into this new window, so the count for this window is starting from 0.
Clean Up
Stop kstreamscat
process by typing Ctrl+C in the terminal 2.
To restart Kafka (and Zookeeper) go to docker-compose
terminal and stop the process by typing Ctrl+C.
Then run:
docker-compose rm
Going to remove kafka, it_zookeeper_1
Are you sure? [yN] y
Removing kafka ... done
Removing it_zookeeper_1 ... done
docker-compose up --remove-orphans
kafka | [2020-05-04 11:30:27,392] INFO [SocketServer brokerId=0] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
kafka | [2020-05-04 11:30:27,396] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2020-05-04 11:30:27,397] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2020-05-04 11:30:27,397] INFO Kafka startTimeMs: 1588591827393 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2020-05-04 11:30:27,398] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Session time window
Let’s see how it behaves when instead of a Tumbling time window, we use a Session time window:
With Kafka cluster started, prepare but do not execute the next command in terminal 2:
java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --session-window=60
In terminal 1, run the next command, and then immediatelly run the commandd you’ve prepared in terminal 2:
kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
And run the command that you’ve prepared in the terminal 2:
java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --session-window=60
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Starting Kafka Streams...
Kafka Stream Threads started
Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1
Then in terminal 1 run the following command:
./apps/windowing/session-window.sh
Sleeping 30s
Insert new batch
Sleeping 40s
Insert new batch
Notice that since the beginning it has passed more than 60s but still in the same time window
Sleeping 65s
Insert new batch
Now new session window
Monitor the output of the terminal 2, you should see something like:
Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1
Key: alex : 8
Key: burr : 6
Key: kamesh : 4
Key: edson : 2
Key: sebi : 2
Key: alex : 12
Key: burr : 9
Key: kamesh : 6
Key: edson : 3
Key: sebi : 3
Key: alex : 16
Key: burr : 12
Key: kamesh : 8
Key: edson : 4
Key: sebi : 4
Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1
Notice that we configured the session window to 60 seconds --session-window=60
but since until the last sleep (Sleeping 65
) we were producing content continuously without stopping for one minute, all of these events where processed in the same window.
The new window is created in the last batch because the last batch is inserted after a timelapse of 60 seconds without producing any event.
Clean Up
Stop kstreamscat
process by typing Ctrl+C in the terminal 2.
To restart Kafka (and Zookeeper) go to docker-compose
terminal and stop the process by typing Ctrl+C.
Then run:
docker-compose rm
Going to remove kafka, it_zookeeper_1
Are you sure? [yN] y
Removing kafka ... done
Removing it_zookeeper_1 ... done
docker-compose up --remove-orphans
kafka | [2020-05-04 11:30:27,392] INFO [SocketServer brokerId=0] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
kafka | [2020-05-04 11:30:27,396] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2020-05-04 11:30:27,397] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2020-05-04 11:30:27,397] INFO Kafka startTimeMs: 1588591827393 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2020-05-04 11:30:27,398] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)