Kafka Streams

So far, you’ve seen that you can consume and produce data into a Kafka topic, but sometimes you want to do things more advanced like joining different topics, aggregating content, or processing on the fly to send it to redirect the result to another topic.

What are Kafka Streams?

Kafka Streams is a Java API that implements all these features, doing in a fault-tolerant, scalable way. One of the important things of Kafka Streams application is that it doesn’t run inside a broker, but it runs in a separate JVM instance, maybe in the same cluster, or maybe in a different cluster but it is a different process.

A Kafka Stream application instances can run in parallel, in different machines and they will automatically collaborate on the data processing. This is what makes Kafka Streams applications fault-tolerance and scalable.

Kafka Streams Concepts

Stream Processor

A Stream processor represents an operation to execute to a stream, some examples of built-in operations can be a filter, map, join, or aggregate. Usually, a Kafka Stream application is created for one or more operations.

Table

A table is a collection of key-value pairs, that represents the last value for the same record key. The big difference is that a stream contains the log of all events (ie an insertion of a song, and update of a song, …​) and a table contains the last value (ie the last update done in a song). There is two kinds of tables, a KTable which represents a table from a partition, and a GlobalKTable which aggregates the content from all partitions of a given topic. We’ll take a look later.

Aggregation Operation

Takes one input stream or table, and yields a new table by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum.

Join Operation

Merges two input streams and/or tables based on the keys of their data records, and yields a new stream/table.

Windowing

Group records that have the same key by a window time. For example, you can group which events have occurred between a period of time.

Interactive Queries

Treat the stream processing layer as a lightweight embedded database, to directly query the latest state of your stream processing application.

kstreams

Kafka Streams Operations

Let’s see some of the Kafka-Streams concepts in action.

Rememeber to have Zookeeper and Apache Kafka up and running. If not, you can follow [Setup Kafka] section to learn how to do it using docker-compose.

Tables

You need only one terminal window to run this section.

Create a new file named update_songs with some new songs on it.

update_songs
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}

Then insert each of this line as a new message. The number before the : is taken as an id.

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -P -l -K: update_songs
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: update_songs

Then inspect the inserted songs using kafkacat:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}

Stop the process by typing Ctrl+C.

Create a Global Table from songs topic:

java -jar kstreamscat.jar --topic=songs --id=songs -b=localhost:29092 --GT
Starting Kafka Streams...
Kafka Stream Threads started
3:  {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4:  {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5:  {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}

Stop the process by typing Ctrl+C.

Notice that the output of the kafkacat and kstreamcat is fairly the same. Probably most of you have noticed that The Imperial March was not written by Alain Silvestri but by John Williams. So let’s fix this.

Create a file that creates a new event to fix the problem.

update2_songs
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}

And add it to songs topic:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -P -l -K: update_songs2
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -P -l -K: update2_songs

Then inspect the inserted songs using kafkacat:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t songs -C -K:
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t songs -C -K:
3: {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4: {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"Alan Silvestri", "op":"ADD"}
5: {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}

Stop the process by typing Ctrl+C.

Create a Global Table from the songs topic:

java -jar kstreamscat.jar --topic=songs --id=songs -b=localhost:29092 --GT
Starting Kafka Streams...
Kafka Stream Threads started
3:  {"id": 3, "name": "Time", "author":"Hans Zimmer", "op":"ADD"}
4:  {"id": 4, "name": "Friend Like Me", "author":"Alan Menken", "op":"ADD"}
5:  {"id": 5, "name": "The Imperial March", "author":"John Williams", "op":"MODIFY"}

Stop the process by typing Ctrl+C.

Notice that kafkacat shows the content of all the events that have been added into the songs topic, so both events of The Imperial March songs are visible.

On the other side, Global Table just shows the latest values of the topic. For this reason, only the update event of The Imperial March is shown, and not the initial insertion.

Clean Up

To restart Kafka (and Zookeeper) go to docker-compose terminal and stop the process by typing Ctrl+C.

Then run:

docker-compose rm

Going to remove kafka, it_zookeeper_1
Are you sure? [yN] y
Removing kafka          ... done
Removing it_zookeeper_1 ... done
docker-compose up --remove-orphans

kafka        | [2020-05-04 11:30:27,392] INFO [SocketServer brokerId=0] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
kafka        | [2020-05-04 11:30:27,396] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka startTimeMs: 1588591827393 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,398] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Windowing

Windowing allows you to control how to group records that have the same key based on a time window. For example, you can answer questions like how many songs each user has played the last 30 minutes or how many songs each user has played every day.

There are 4 types of windows:

Tumbling time window

Fixed-size, non-overlapping, and gap-less.

win1
Hopping time window

Fixed-size, overlapping windows.

win2
Sliding time window

Fixed-size, overlapping windows that work on differences between record timestamps. In the case of Kafka Streams, it defines a maximum time difference for a join over two streams on the same key.

Session window

Dynamically-sized, non-overlapping, data-driven windows. Sessions represent a period of activity separated by a defined gap of inactivity. This window is used for user behaviour analysis.

win3
About Retention

Events can be processed out-of-order or late-arriving data records for a given window. This means that these events could be processed in the wrong window. Retention configures the time that a window might remain open to process events that come out-of-order.

retention

For this section, you need to open two terminals on the same screen.

Let’s see how affect different window strategies in a topic that contains the history of the songs that each user has played.

Tumbling time window

Let’s create a periodic window of 60 seconds. What we are doing here is creating fixed buckets of 60 seconds, so every 60 seconds a new window is created to process the events that fall into that window.

For example, let’s count the number of played songs by the user in a bucket of 60 seconds.

With Kafka cluster started, prepare but do not execute the next command in terminal 2:

java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --time-window=60

In terminal 1, run the next command, and then immediatelly run the commandd you’ve prepared in terminal 2:

kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json

And run the command that you’ve prepared in the terminal 2:

java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --time-window=60
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Starting Kafka Streams...
Kafka Stream Threads started
Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1

Then in the terminal 1, run again the kafkacat command to insert again the played songs:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json

And if you inspect the kstreamscat output you’ll see:

Key: alex : 8
Key: burr : 6
Key: kamesh : 4
Key: edson : 2
Key: sebi : 2

Notice that the played songs have been increased by 2.

Now wait for one minute and then run the kafkacat command again to insert again the played songs:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json

And if you inspect the kstreamscat output you’ll see:

Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1

After 1 minute, the window has shifted into a new one so these new played songs have fallen into this new window, so the count for this window is starting from 0.

Clean Up

Stop kstreamscat process by typing Ctrl+C in the terminal 2.

To restart Kafka (and Zookeeper) go to docker-compose terminal and stop the process by typing Ctrl+C.

Then run:

docker-compose rm

Going to remove kafka, it_zookeeper_1
Are you sure? [yN] y
Removing kafka          ... done
Removing it_zookeeper_1 ... done
docker-compose up --remove-orphans

kafka        | [2020-05-04 11:30:27,392] INFO [SocketServer brokerId=0] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
kafka        | [2020-05-04 11:30:27,396] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka startTimeMs: 1588591827393 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,398] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Session time window

Let’s see how it behaves when instead of a Tumbling time window, we use a Session time window:

With Kafka cluster started, prepare but do not execute the next command in terminal 2:

java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --session-window=60

In terminal 1, run the next command, and then immediatelly run the commandd you’ve prepared in terminal 2:

  • kcat

  • kcat in Docker

kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json
docker run -it --network=host edenhill/kcat:1.7.0 kcat -b localhost:29092 -t played -P -l -K: apps/windowing/first-batch.json

And run the command that you’ve prepared in the terminal 2:

java -jar kstreamscat.jar --topic=played --id=songs -b=localhost:29092 --session-window=60
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Starting Kafka Streams...
Kafka Stream Threads started
Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1

Then in terminal 1 run the following command:

./apps/windowing/session-window.sh
Sleeping 30s
Insert new batch
Sleeping 40s
Insert new batch
Notice that since the beginning it has passed more than 60s but still in the same time window
Sleeping 65s
Insert new batch
Now new session window

Monitor the output of the terminal 2, you should see something like:

Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1
Key: alex : 8
Key: burr : 6
Key: kamesh : 4
Key: edson : 2
Key: sebi : 2
Key: alex : 12
Key: burr : 9
Key: kamesh : 6
Key: edson : 3
Key: sebi : 3
Key: alex : 16
Key: burr : 12
Key: kamesh : 8
Key: edson : 4
Key: sebi : 4
Key: alex : 4
Key: burr : 3
Key: kamesh : 2
Key: edson : 1
Key: sebi : 1

Notice that we configured the session window to 60 seconds --session-window=60 but since until the last sleep (Sleeping 65) we were producing content continuously without stopping for one minute, all of these events where processed in the same window. The new window is created in the last batch because the last batch is inserted after a timelapse of 60 seconds without producing any event.

Clean Up

Stop kstreamscat process by typing Ctrl+C in the terminal 2.

To restart Kafka (and Zookeeper) go to docker-compose terminal and stop the process by typing Ctrl+C.

Then run:

docker-compose rm

Going to remove kafka, it_zookeeper_1
Are you sure? [yN] y
Removing kafka          ... done
Removing it_zookeeper_1 ... done
docker-compose up --remove-orphans

kafka        | [2020-05-04 11:30:27,392] INFO [SocketServer brokerId=0] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
kafka        | [2020-05-04 11:30:27,396] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka startTimeMs: 1588591827393 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,398] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)