Kafka Streams

So far, you’ve seen that you can produce to and consume from Kafka topics, but sometimes you want to do more advanced things: joining records from different topics, grouping and aggregating records, or processing data on the fly and produce the result into another topic.

What are Kafka Streams?

Kafka Streams is a Java API that implements all these more advanced features, while processing records in a fault-tolerant and scalable way. One of the important things to understand is that a Kafka Streams application does not(!) run inside a broker, but instead runs in a separate JVM instance - maybe in the same or in a different cluster - but it is a different process.

A Kafka Stream application can be horizontally scaled such that multiple instances can run in parallel, in different machines and they will automatically collaborate on the data processing. This is what makes Kafka Streams applications fault-tolerance and scalable.

Kafka Streams Concepts

The following is important terminology for Kafka Streams:

Processor Topology

A topology specifies the data processing logic that needs to be executed by the Kafka Streams application.

Stream Processor

A stream processor is one particular compute step of the topology. It represents a specific operation to be executed based on a stream. Some examples of built-in operations are: filter, map, join, or aggregate. Usually, a Kafka Streams application is created for one or more such operations defined as the topology.

KStream

A KStream is a logical abstraction of and unbounded stream of records. Each record is interpreted as a separate "insert" event.

KTable

A KTable is a logical abstraction of a changelog stream of records. Each record is interpreted as an "update" event in case a record with the same key existed before, otherwise it is interpreted as an "insert" event. This results in a table semantic where key-value pairs always represent the latest value for any given key. There is two kinds of tables:

  • KTable: only represents records from a subset of table partitions in case you run multiple application instances

  • GlobalKTable: always represents records from all partitions of a given topic for every application instance you run

Aggregation Operation

Takes one input stream or table, and yields a new table by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum.

Join Operation

Merges two input streams and/or tables based on the keys of their data records, and yields a new stream/table.

Windowing

Allows to control how to group records that have the same key by different types of time windows. For example, you can group records which have occurred during a certain period of time, say 1 hour.

Interactive Queries

Treat the stream processing layer as a lightweight embedded key-value store, to directly query the latest state of your stream processing application e.g. based on record keys / key ranges.

kstreams

Kafka Streams Operations

Let’s see some of the Kafka-Streams concepts in action.

Remember you need to have both, ZooKeeper and Apache Kafka correctly configured as well as up and running. If not, you can follow the [Setup Kafka] section to learn how to do it using docker compose.

Tables

You need only one terminal window to run this section.

Create a new file named update_songs with some new songs on it.

update_songs
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}

Then insert each of this line as a new message. The number before the : is taken as an id.

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -P -l -K: update_songs
docker run --rm -it --network=kafka-tutorial -v $(pwd)/documentation/modules/ROOT/examples/update_songs.json:/home/update_songs edenhill/kcat:1.7.1 kcat -b kafka:9092 -t songs -P -l -K: /home/update_songs

Then inspect the inserted songs using kcat:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -C -K:
docker run --rm -it --network=kafka-tutorial edenhill/kcat:1.7.1 kcat -b kafka:9092 -t songs -C -K:
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}

Stop the process by typing Ctrl+C.

Create a GlobalKTable from songs topic:

java -jar kstreamscat.jar --topic=songs --id=songs -b=localhost:29092 --GT
Starting Kafka Streams...
Kafka Stream Threads started
3:  {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4:  {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5:  {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}

Stop the process by typing Ctrl+C.

Notice that the output of kcat and kstreamscat is fairly the same. Probably most of you have noticed that The Imperial March was not written by Alain Silvestri but by John Williams. So let’s fix this.

Create a file that creates a new event to fix the problem.

update2_songs
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}

And add it to the songs topic:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -P -l -K: update2_songs
docker run --rm -it --network=kafka-tutorial -v $(pwd)/documentation/modules/ROOT/examples/update2_songs.json:/home/update2_songs edenhill/kcat:1.7.1 kcat -b kafka:9092 -t songs -P -l -K: /home/update2_songs

Then inspect the inserted songs using kcat:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -C -K:
docker run --rm -it --network=kafka-tutorial edenhill/kcat:1.7.1 kcat -b kafka:29092 -t songs -C -K:
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}

Stop the process by typing Ctrl+C.

Create a GlobalKTable from songs topic:

java -jar kstreamscat.jar --topic=songs --id=songs -b=localhost:29092 --GT
Starting Kafka Streams...
Kafka Stream Threads started
3:  {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4:  {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5:  {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}

Stop the process by typing Ctrl+C.

Notice that kcat shows the content of all the events that have been added into the songs topic, so both events referring to the song The Imperial March (key=5/"id":5) are shown.

However, the GlobalKTable just shows the latest values for each key in the topic. For this reason, only the update event of The Imperial March (key=5/"id":5) is shown, and not the initial insertion.

Clean Up

To restart Kafka (and ZooKeeper) go to the docker compose terminal and stop the process by typing Ctrl+C.

Then run:

docker compose rm

? Going to remove kafka, zookeeper (y/N) -> y
[+] Running 2/0
 ⠿ Container zookeeper  Removed
 ⠿ Container kafka      Removed
docker compose up --remove-orphans

kafka      | [2022-12-15 08:59:25,688] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka startTimeMs: 1671094765681 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,689] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
kafka      | [2022-12-15 08:59:25,750] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka      | [2022-12-15 08:59:25,751] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

Windowing

Windowing allows you to control how to group records that have the same key based on a time window. For example, you can answer questions like how many songs has each user played within the last 30 minutes or how many songs has each user played per day.

There are 4 types of windows:

Tumbling time window

Fixed-size, non-overlapping, and gap-less.

win1
Hopping time window

Fixed-size, overlapping windows.

win2
Sliding time window

Fixed-size, overlapping windows that work on differences between record timestamps. In the case of Kafka Streams, it defines a maximum time difference for a join over two streams on the same key.

Session window

Dynamically-sized, non-overlapping, data-driven windows. Sessions represent a period of activity separated by a defined gap of inactivity. This window is used for user behaviour analysis.

win3
About Retention

Events can be processed out-of-order or late-arriving data records for a given window. This means that these events could be processed in the wrong window. Retention configures the time that a window might remain open to process events that come out-of-order.

retention

For this section, you need to open two terminals on the same screen.

Let’s see how affect different window strategies in a topic that contains the history of the songs that each user has played.

Tumbling time window

Let’s create a tumbling window of 60 seconds. What we are doing here is creating fixed buckets of 60 seconds each, so that every 60 seconds, a new window is created to process the events that fall into one such tumbling window.

For example, let’s count the number of played songs per user in a 60 seconds tumbling window.

With the Kafka cluster started, prepare but do not yet execute the next command in terminal 2:

java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --time-window=60

In terminal 1, run the next command, and then immediately run the command you’ve prepared in terminal 2:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run --rm -it --network=kafka-tutorial -v $(pwd)/apps/windowing/first-batch.json:/home/first-batch edenhill/kcat:1.7.1 kcat -b kafka:9092 -t played -P -l -K: /home/first-batch

And run the command that you’ve prepared in terminal 2:

java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --time-window=60
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Starting Kafka Streams...
Kafka Stream Threads started
Window: 1673267640000 -> Key: alex = 4
Window: 1673267640000 -> Key: burr = 3
Window: 1673267640000 -> Key: kamesh = 2
Window: 1673267640000 -> Key: edson = 1
Window: 1673267640000 -> Key: sebi = 1

Then in the terminal 1, run again the kcat command to insert again the same played song events:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run --rm -it --network=kafka-tutorial -v $(pwd)/apps/windowing/first-batch.json:/home/first-batch edenhill/kcat:1.7.1 kcat -b kafka:9092 -t played -P -l -K: /home/first-batch

And if you inspect the kstreamscat output you’ll see:

...
Window: 1673267640000 -> Key: alex = 8
Window: 1673267640000 -> Key: burr = 6
Window: 1673267640000 -> Key: kamesh = 4
Window: 1673267640000 -> Key: edson = 2
Window: 1673267640000 -> Key: sebi = 2

Notice that the played song counts have been doubled. This is because the same events were produced and the same time window received the same batch of played songs again.

Now wait for at least one minute and then run the kcat command again to insert again the played songs:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run --rm -it --network=kafka-tutorial -v $(pwd)/apps/windowing/first-batch.json:/home/first-batch edenhill/kcat:1.7.1 kcat -b kafka:9092 -t played -P -l -K: /home/first-batch

And if you inspect the kstreamscat output you’ll now see:

...
Window: 1673267820000 -> Key: alex = 4
Window: 1673267820000 -> Key: burr = 3
Window: 1673267820000 -> Key: kamesh = 2
Window: 1673267820000 -> Key: edson = 1
Window: 1673267820000 -> Key: sebi = 1

After 1 minute, the window has shifted into a new one and the played song events have fallen into this new window for which all counters are starting from 0 again.

Clean Up

Stop kstreamscat process by typing Ctrl+C in the terminal 2.

To restart Kafka (and ZooKeeper) go to the docker compose terminal and stop the process by typing Ctrl+C.

Then run:

docker compose rm

? Going to remove kafka, zookeeper (y/N) -> y
[+] Running 2/0
 ⠿ Container zookeeper  Removed
 ⠿ Container kafka      Removed
docker compose up --remove-orphans

kafka      | [2022-12-15 08:59:25,688] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka startTimeMs: 1671094765681 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,689] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
kafka      | [2022-12-15 08:59:25,750] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka      | [2022-12-15 08:59:25,751] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

Session time window

Let’s explore the windowing semantics when instead of a Tumbling time window, we use a session time window of 60 seconds duration.

With the Kafka cluster started, prepare but do not yet execute the next command in terminal 2:

java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --session-window=60

In terminal 1, run the next command, and then immediately run the command you’ve prepared in terminal 2:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run --rm -it --network=kafka-tutorial -v $(pwd)/apps/windowing/first-batch.json:/home/first-batch edenhill/kcat:1.7.1 kcat -b kafka:9092 -t played -P -l -K: /home/first-batch

And run the command that you’ve prepared in the terminal 2:

java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --session-window=60
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Starting Kafka Streams...
Kafka Stream Threads started
Window: 1673268538354 -> Key: alex = 4
Window: 1673268538355 -> Key: burr = 3
Window: 1673268538355 -> Key: kamesh = 2
Window: 1673268538355 -> Key: edson = 1
Window: 1673268538356 -> Key: sebi = 1

Then in terminal 1 run the following command:

./apps/windowing/session-window.sh
Sleeping 30s
Insert new batch
Sleeping 40s
Insert new batch
Notice that since the beginning it has passed more than 60s but still in the same time window
Sleeping 65s
Insert new batch
Now new session window

Monitor the output of the terminal 2 where you should something similar to the following:

...
Window: 1673268538355 -> Key: alex = 4
Window: 1673268538355 -> Key: burr = 3
Window: 1673268538355 -> Key: kamesh = 2
Window: 1673268538355 -> Key: edson = 1
Window: 1673268538355 -> Key: sebi = 1
Window: 1673268538355 -> Key: alex = 8
Window: 1673268538355 -> Key: burr = 6
Window: 1673268538355 -> Key: kamesh = 4
Window: 1673268538355 -> Key: edson = 2
Window: 1673268538355 -> Key: sebi = 2
Window: 1673268538355 -> Key: alex = 12
Window: 1673268538355 -> Key: burr = 9
Window: 1673268538355 -> Key: kamesh = 6
Window: 1673268538355 -> Key: edson = 3
Window: 1673268538355 -> Key: sebi = 3
Window: 1673268538355 -> Key: alex = 16
Window: 1673268538355 -> Key: burr = 12
Window: 1673268538355 -> Key: kamesh = 8
Window: 1673268538355 -> Key: edson = 4
Window: 1673268538355 -> Key: sebi = 4
Window: 1673268716848 -> Key: alex = 4
Window: 1673268716848 -> Key: burr = 3
Window: 1673268716848 -> Key: kamesh = 2
Window: 1673268716848 -> Key: edson = 1
Window: 1673268716848 -> Key: sebi = 1

Notice that we configured the session window to 60 seconds --session-window=60. Since until the last sleep (Sleeping 65) we were producing content without ever pausing for one minute or longer, all of these events have been processed in the same session window. The new session window is only created before producing the last batch of song play events because of an inactivity period greater than 60 seconds.

Clean Up

Stop kstreamscat process by typing Ctrl+C in the terminal 2.

To restart Kafka (and ZooKeeper) go to the docker compose terminal and stop the process by typing Ctrl+C.

Then run:

docker compose rm

? Going to remove kafka, zookeeper (y/N) -> y
[+] Running 2/0
 ⠿ Container zookeeper  Removed
 ⠿ Container kafka      Removed
docker compose up --remove-orphans

kafka      | [2022-12-15 08:59:25,688] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka startTimeMs: 1671094765681 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,689] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
kafka      | [2022-12-15 08:59:25,750] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka      | [2022-12-15 08:59:25,751] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)