Kafka Streams
So far, you’ve seen that you can produce to and consume from Kafka topics, but sometimes you want to do more advanced things: joining records from different topics, grouping and aggregating records, or processing data on the fly and produce the result into another topic.
What are Kafka Streams?
Kafka Streams is a Java API that implements all these more advanced features, while processing records in a fault-tolerant and scalable way. One of the important things to understand is that a Kafka Streams application does not(!) run inside a broker, but instead runs in a separate JVM instance - maybe in the same or in a different cluster - but it is a different process.
A Kafka Stream application can be horizontally scaled such that multiple instances can run in parallel, in different machines and they will automatically collaborate on the data processing. This is what makes Kafka Streams applications fault-tolerance and scalable.
Kafka Streams Concepts
The following is important terminology for Kafka Streams:
- Processor Topology
-
A topology specifies the data processing logic that needs to be executed by the Kafka Streams application.
- Stream Processor
-
A stream processor is one particular compute step of the topology. It represents a specific operation to be executed based on a stream. Some examples of built-in operations are: filter, map, join, or aggregate. Usually, a Kafka Streams application is created for one or more such operations defined as the topology.
- KStream
-
A KStream is a logical abstraction of and unbounded stream of records. Each record is interpreted as a separate "insert" event.
- KTable
-
A KTable is a logical abstraction of a changelog stream of records. Each record is interpreted as an "update" event in case a record with the same key existed before, otherwise it is interpreted as an "insert" event. This results in a table semantic where key-value pairs always represent the latest value for any given key. There is two kinds of tables:
-
KTable: only represents records from a subset of table partitions in case you run multiple application instances
-
GlobalKTable: always represents records from all partitions of a given topic for every application instance you run
-
- Aggregation Operation
-
Takes one input stream or table, and yields a new table by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum.
- Join Operation
-
Merges two input streams and/or tables based on the keys of their data records, and yields a new stream/table.
- Windowing
-
Allows to control how to group records that have the same key by different types of time windows. For example, you can group records which have occurred during a certain period of time, say 1 hour.
- Interactive Queries
-
Treat the stream processing layer as a lightweight embedded key-value store, to directly query the latest state of your stream processing application e.g. based on record keys / key ranges.
![kstreams](_images/kstreams.png)
Kafka Streams Operations
Let’s see some of the Kafka-Streams concepts in action.
Remember you need to have both, ZooKeeper and Apache Kafka correctly configured as well as up and running. If not, you can follow the [Setup Kafka] section to learn how to do it using docker compose.
Tables
You need only one terminal window to run this section.
Create a new file named update_songs
with some new songs on it.
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
Then insert each of this line as a new message.
The number before the :
is taken as an id.
kcat -b localhost:29092 -t songs -P -l -K: update_songs
docker run --rm -it --network=kafka-tutorial -v $(pwd)/documentation/modules/ROOT/examples/update_songs.json:/home/update_songs edenhill/kcat:1.7.1 kcat -b kafka:9092 -t songs -P -l -K: /home/update_songs
Then inspect the inserted songs using kcat
:
kcat -b localhost:29092 -t songs -C -K:
docker run --rm -it --network=kafka-tutorial edenhill/kcat:1.7.1 kcat -b kafka:9092 -t songs -C -K:
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
Stop the process by typing Ctrl+C.
Create a GlobalKTable from songs
topic:
java -jar kstreamscat.jar --topic=songs --id=songs -b=localhost:29092 --GT
Starting Kafka Streams...
Kafka Stream Threads started
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
Stop the process by typing Ctrl+C.
Notice that the output of kcat
and kstreamscat
is fairly the same.
Probably most of you have noticed that The Imperial March
was not written by Alain Silvestri but by John Williams.
So let’s fix this.
Create a file that creates a new event to fix the problem.
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}
And add it to the songs
topic:
kcat -b localhost:29092 -t songs -P -l -K: update2_songs
docker run --rm -it --network=kafka-tutorial -v $(pwd)/documentation/modules/ROOT/examples/update2_songs.json:/home/update2_songs edenhill/kcat:1.7.1 kcat -b kafka:9092 -t songs -P -l -K: /home/update2_songs
Then inspect the inserted songs using kcat
:
kcat -b localhost:29092 -t songs -C -K:
docker run --rm -it --network=kafka-tutorial edenhill/kcat:1.7.1 kcat -b kafka:29092 -t songs -C -K:
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}
Stop the process by typing Ctrl+C.
Create a GlobalKTable from songs
topic:
java -jar kstreamscat.jar --topic=songs --id=songs -b=localhost:29092 --GT
Starting Kafka Streams...
Kafka Stream Threads started
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}
Stop the process by typing Ctrl+C.
Notice that kcat
shows the content of all the events that have been added into the songs
topic, so both events referring to the song The Imperial March (key=5/"id":5) are shown.
However, the GlobalKTable just shows the latest values for each key in the topic. For this reason, only the update event of The Imperial March (key=5/"id":5) is shown, and not the initial insertion.
Clean Up
To restart Kafka (and ZooKeeper) go to the docker compose
terminal and stop the process by typing Ctrl+C.
Then run:
docker compose rm
? Going to remove kafka, zookeeper (y/N) -> y
[+] Running 2/0
⠿ Container zookeeper Removed
⠿ Container kafka Removed
docker compose up --remove-orphans
kafka | [2022-12-15 08:59:25,688] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,688] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,688] INFO Kafka startTimeMs: 1671094765681 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,689] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
kafka | [2022-12-15 08:59:25,750] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka | [2022-12-15 08:59:25,751] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
Windowing
Windowing allows you to control how to group records that have the same key based on a time window. For example, you can answer questions like how many songs has each user played within the last 30 minutes or how many songs has each user played per day.
There are 4 types of windows:
- Tumbling time window
-
Fixed-size, non-overlapping, and gap-less.
![win1](_images/win1.png)
- Hopping time window
-
Fixed-size, overlapping windows.
![win2](_images/win2.png)
- Sliding time window
-
Fixed-size, overlapping windows that work on differences between record timestamps. In the case of Kafka Streams, it defines a maximum time difference for a join over two streams on the same key.
- Session window
-
Dynamically-sized, non-overlapping, data-driven windows. Sessions represent a period of activity separated by a defined gap of inactivity. This window is used for user behaviour analysis.
![win3](_images/win3.png)
For this section, you need to open two terminals on the same screen.
Let’s see how affect different window strategies in a topic that contains the history of the songs that each user has played.
Tumbling time window
Let’s create a tumbling window of 60 seconds. What we are doing here is creating fixed buckets of 60 seconds each, so that every 60 seconds, a new window is created to process the events that fall into one such tumbling window.
For example, let’s count the number of played songs per user in a 60 seconds tumbling window.
With the Kafka cluster started, prepare but do not yet execute the next command in terminal 2:
java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --time-window=60
In terminal 1, run the next command, and then immediately run the command you’ve prepared in terminal 2:
kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run --rm -it --network=kafka-tutorial -v $(pwd)/apps/windowing/first-batch.json:/home/first-batch edenhill/kcat:1.7.1 kcat -b kafka:9092 -t played -P -l -K: /home/first-batch
And run the command that you’ve prepared in terminal 2:
java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --time-window=60
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Starting Kafka Streams...
Kafka Stream Threads started
Window: 1673267640000 -> Key: alex = 4
Window: 1673267640000 -> Key: burr = 3
Window: 1673267640000 -> Key: kamesh = 2
Window: 1673267640000 -> Key: edson = 1
Window: 1673267640000 -> Key: sebi = 1
Then in the terminal 1, run again the kcat
command to insert again the same played song events:
kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run --rm -it --network=kafka-tutorial -v $(pwd)/apps/windowing/first-batch.json:/home/first-batch edenhill/kcat:1.7.1 kcat -b kafka:9092 -t played -P -l -K: /home/first-batch
And if you inspect the kstreamscat
output you’ll see:
...
Window: 1673267640000 -> Key: alex = 8
Window: 1673267640000 -> Key: burr = 6
Window: 1673267640000 -> Key: kamesh = 4
Window: 1673267640000 -> Key: edson = 2
Window: 1673267640000 -> Key: sebi = 2
Notice that the played song counts have been doubled. This is because the same events were produced and the same time window received the same batch of played songs again.
Now wait for at least one minute and then run the kcat
command again to insert again the played songs:
kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run --rm -it --network=kafka-tutorial -v $(pwd)/apps/windowing/first-batch.json:/home/first-batch edenhill/kcat:1.7.1 kcat -b kafka:9092 -t played -P -l -K: /home/first-batch
And if you inspect the kstreamscat
output you’ll now see:
...
Window: 1673267820000 -> Key: alex = 4
Window: 1673267820000 -> Key: burr = 3
Window: 1673267820000 -> Key: kamesh = 2
Window: 1673267820000 -> Key: edson = 1
Window: 1673267820000 -> Key: sebi = 1
After 1 minute, the window has shifted into a new one and the played song events have fallen into this new window for which all counters are starting from 0 again.
Clean Up
Stop kstreamscat
process by typing Ctrl+C in the terminal 2.
To restart Kafka (and ZooKeeper) go to the docker compose
terminal and stop the process by typing Ctrl+C.
Then run:
docker compose rm
? Going to remove kafka, zookeeper (y/N) -> y
[+] Running 2/0
⠿ Container zookeeper Removed
⠿ Container kafka Removed
docker compose up --remove-orphans
kafka | [2022-12-15 08:59:25,688] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,688] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,688] INFO Kafka startTimeMs: 1671094765681 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,689] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
kafka | [2022-12-15 08:59:25,750] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka | [2022-12-15 08:59:25,751] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
Session time window
Let’s explore the windowing semantics when instead of a Tumbling time window, we use a session time window of 60 seconds duration.
With the Kafka cluster started, prepare but do not yet execute the next command in terminal 2:
java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --session-window=60
In terminal 1, run the next command, and then immediately run the command you’ve prepared in terminal 2:
kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run --rm -it --network=kafka-tutorial -v $(pwd)/apps/windowing/first-batch.json:/home/first-batch edenhill/kcat:1.7.1 kcat -b kafka:9092 -t played -P -l -K: /home/first-batch
And run the command that you’ve prepared in the terminal 2:
java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --session-window=60
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Starting Kafka Streams...
Kafka Stream Threads started
Window: 1673268538354 -> Key: alex = 4
Window: 1673268538355 -> Key: burr = 3
Window: 1673268538355 -> Key: kamesh = 2
Window: 1673268538355 -> Key: edson = 1
Window: 1673268538356 -> Key: sebi = 1
Then in terminal 1 run the following command:
./apps/windowing/session-window.sh
Sleeping 30s
Insert new batch
Sleeping 40s
Insert new batch
Notice that since the beginning it has passed more than 60s but still in the same time window
Sleeping 65s
Insert new batch
Now new session window
Monitor the output of the terminal 2 where you should something similar to the following:
...
Window: 1673268538355 -> Key: alex = 4
Window: 1673268538355 -> Key: burr = 3
Window: 1673268538355 -> Key: kamesh = 2
Window: 1673268538355 -> Key: edson = 1
Window: 1673268538355 -> Key: sebi = 1
Window: 1673268538355 -> Key: alex = 8
Window: 1673268538355 -> Key: burr = 6
Window: 1673268538355 -> Key: kamesh = 4
Window: 1673268538355 -> Key: edson = 2
Window: 1673268538355 -> Key: sebi = 2
Window: 1673268538355 -> Key: alex = 12
Window: 1673268538355 -> Key: burr = 9
Window: 1673268538355 -> Key: kamesh = 6
Window: 1673268538355 -> Key: edson = 3
Window: 1673268538355 -> Key: sebi = 3
Window: 1673268538355 -> Key: alex = 16
Window: 1673268538355 -> Key: burr = 12
Window: 1673268538355 -> Key: kamesh = 8
Window: 1673268538355 -> Key: edson = 4
Window: 1673268538355 -> Key: sebi = 4
Window: 1673268716848 -> Key: alex = 4
Window: 1673268716848 -> Key: burr = 3
Window: 1673268716848 -> Key: kamesh = 2
Window: 1673268716848 -> Key: edson = 1
Window: 1673268716848 -> Key: sebi = 1
Notice that we configured the session window to 60 seconds --session-window=60
. Since until the last sleep (Sleeping 65
) we were producing content without ever pausing for one minute or longer, all of these events have been processed in the same session window.
The new session window is only created before producing the last batch of song play events because of an inactivity period greater than 60 seconds.
Clean Up
Stop kstreamscat
process by typing Ctrl+C in the terminal 2.
To restart Kafka (and ZooKeeper) go to the docker compose
terminal and stop the process by typing Ctrl+C.
Then run:
docker compose rm
? Going to remove kafka, zookeeper (y/N) -> y
[+] Running 2/0
⠿ Container zookeeper Removed
⠿ Container kafka Removed
docker compose up --remove-orphans
kafka | [2022-12-15 08:59:25,688] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,688] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,688] INFO Kafka startTimeMs: 1671094765681 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,689] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
kafka | [2022-12-15 08:59:25,750] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka | [2022-12-15 08:59:25,751] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)