Developing Kafka Streams in Java

In this example, we are going to develop an example to build a music chart to see the number of times that a song has been played.

There is one service (player-app) that it is periodically producing played songs to the played-songs topic. This service is not using Kafka Streams but instead just uses the Kafka producer API under the covers of Smallrye Reactive Messaging for Kafka.

The second service is a Kafka Stream application that joins the songs and played-songs topics, to get the song name. Also it computes the number of times that each particular song has been played by aggregating the joined records based on the song’s id.

Remember you need to have both, ZooKeeper and Apache Kafka correctly configured as well as up and running. If not, you can follow the [Setup Kafka] section to learn how to do it using docker compose.

You need to open 2 terminal windows to run this example, one for each service.

Player Service

The producer code is at Player Service.

Deploying Player Service

In this case, the Quarkus service is deployed to produce played songs. You’ve got two options, using the existing Docker image or building the service and image yourself. Note, that due to the configured Quarkus extension quarkus-container-image-jib the container image will be build automatically when you trigger the local build. In terminal 1, run one of these two options:

  • Building & Run

  • Docker

cd $TUTORIAL_HOME/apps/player-app/quarkus/player-app
./mvnw clean package
docker run --rm --network=kafka-tutorial quay.io/rhdevelopers/kafka-tutorial-player-app-quarkus:v23.01
__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2023-01-09 08:45:18,099 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-0) SRMSG18258: Kafka producer kafka-producer-songs, connected to Kafka brokers 'kafka:9092', is configured to write records to 'songs'
2023-01-09 08:45:18,471 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-1) SRMSG18258: Kafka producer kafka-producer-played-songs, connected to Kafka brokers 'kafka:9092', is configured to write records to 'played-songs'
2023-01-09 08:45:18,573 INFO  [org.acm.PlaySongsGenerator] (main) creating song reference data
2023-01-09 08:45:18,691 INFO  [org.acm.PlaySongsGenerator] (main) starting to play random songs every 2500 ms
2023-01-09 08:45:18,739 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=1, name=The Good The Bad And The Ugly, author=Ennio Morricone]
2023-01-09 08:45:19,675 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=2, name=Believe, author=Cher]
2023-01-09 08:45:19,686 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=3, name=Still Loving You, author=Scorpions]
2023-01-09 08:45:19,689 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=4, name=Bohemian Rhapsody, author=Queen]
2023-01-09 08:45:19,691 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=5, name=Sometimes, author=James]
2023-01-09 08:45:19,692 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=6, name=Into The Unknown, author=Frozen II]
2023-01-09 08:45:19,694 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=7, name=Fox On The Run, author=Sweet]
2023-01-09 08:45:19,696 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=8, name=Perfect, author=Ed Sheeran]
2023-01-09 08:45:19,761 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 8 (Perfect) for user Edson
2023-01-09 08:45:19,785 INFO  [io.quarkus] (main) player-app 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.15.2.Final) started in 7.027s.
2023-01-09 08:45:19,787 INFO  [io.quarkus] (main) Profile prod activated.
2023-01-09 08:45:19,788 INFO  [io.quarkus] (main) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
2023-01-09 08:45:20,102 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-played-songs) [Producer clientId=kafka-producer-played-songs] Error while fetching metadata with correlation id 1 : {played-songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,106 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-songs) [Producer clientId=kafka-producer-songs] Error while fetching metadata with correlation id 1 : {songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,213 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-songs) [Producer clientId=kafka-producer-songs] Error while fetching metadata with correlation id 3 : {songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,220 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-played-songs) [Producer clientId=kafka-producer-played-songs] Error while fetching metadata with correlation id 3 : {played-songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:22,231 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 5 (Sometimes) for user Edson
2023-01-09 08:45:24,729 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 7 (Fox On The Run) for user Burr
2023-01-09 08:45:27,230 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Edson
2023-01-09 08:45:29,735 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 2 (Believe) for user Edson
2023-01-09 08:45:32,238 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 8 (Perfect) for user Alex
2023-01-09 08:45:34,743 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 7 (Fox On The Run) for user Kamesh
2023-01-09 08:45:37,230 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Kamesh
2023-01-09 08:45:39,728 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Sebi
2023-01-09 08:45:42,229 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 4 (Bohemian Rhapsody) for user Edson
2023-01-09 08:45:44,726 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 2 (Believe) for user Burr
...
docker run --rm --network=kafka-tutorial quay.io/rhdevelopers/kafka-tutorial-player-app-quarkus:v23.01
__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2023-01-09 08:45:18,099 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-0) SRMSG18258: Kafka producer kafka-producer-songs, connected to Kafka brokers 'kafka:9092', is configured to write records to 'songs'
2023-01-09 08:45:18,471 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-1) SRMSG18258: Kafka producer kafka-producer-played-songs, connected to Kafka brokers 'kafka:9092', is configured to write records to 'played-songs'
2023-01-09 08:45:18,573 INFO  [org.acm.PlaySongsGenerator] (main) creating song reference data
2023-01-09 08:45:18,691 INFO  [org.acm.PlaySongsGenerator] (main) starting to play random songs every 2500 ms
2023-01-09 08:45:18,739 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=1, name=The Good The Bad And The Ugly, author=Ennio Morricone]
2023-01-09 08:45:19,675 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=2, name=Believe, author=Cher]
2023-01-09 08:45:19,686 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=3, name=Still Loving You, author=Scorpions]
2023-01-09 08:45:19,689 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=4, name=Bohemian Rhapsody, author=Queen]
2023-01-09 08:45:19,691 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=5, name=Sometimes, author=James]
2023-01-09 08:45:19,692 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=6, name=Into The Unknown, author=Frozen II]
2023-01-09 08:45:19,694 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=7, name=Fox On The Run, author=Sweet]
2023-01-09 08:45:19,696 INFO  [org.acm.PlaySongsGenerator] (main) producing -> Song[id=8, name=Perfect, author=Ed Sheeran]
2023-01-09 08:45:19,761 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 8 (Perfect) for user Edson
2023-01-09 08:45:19,785 INFO  [io.quarkus] (main) player-app 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.15.2.Final) started in 7.027s.
2023-01-09 08:45:19,787 INFO  [io.quarkus] (main) Profile prod activated.
2023-01-09 08:45:19,788 INFO  [io.quarkus] (main) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
2023-01-09 08:45:20,102 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-played-songs) [Producer clientId=kafka-producer-played-songs] Error while fetching metadata with correlation id 1 : {played-songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,106 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-songs) [Producer clientId=kafka-producer-songs] Error while fetching metadata with correlation id 1 : {songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,213 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-songs) [Producer clientId=kafka-producer-songs] Error while fetching metadata with correlation id 3 : {songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,220 WARN  [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-played-songs) [Producer clientId=kafka-producer-played-songs] Error while fetching metadata with correlation id 3 : {played-songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:22,231 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 5 (Sometimes) for user Edson
2023-01-09 08:45:24,729 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 7 (Fox On The Run) for user Burr
2023-01-09 08:45:27,230 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Edson
2023-01-09 08:45:29,735 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 2 (Believe) for user Edson
2023-01-09 08:45:32,238 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 8 (Perfect) for user Alex
2023-01-09 08:45:34,743 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 7 (Fox On The Run) for user Kamesh
2023-01-09 08:45:37,230 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Kamesh
2023-01-09 08:45:39,728 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Sebi
2023-01-09 08:45:42,229 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 4 (Bohemian Rhapsody) for user Edson
2023-01-09 08:45:44,726 INFO  [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 2 (Believe) for user Burr
...

Kafka Streams with Java

The KStreams code is at Music Chart Service.

Deploying Music Chart Service

In this case, the Quarkus service is deployed to use Kafka Streams. You’ve got two options, using the existing Docker image or building the service and image yourself. Note, that due to the configured Quarkus extension quarkus-container-image-jib the container image will be created automatically when you trigger the local build. In terminal 2, run one of these two options:

  • Building & Run

  • Docker

cd $TUTORIAL_HOME/apps/music-chart-app/quarkus/music-chart-app
./mvnw clean package
docker run --rm --network=kafka-tutorial quay.io/rhdevelopers/kafka-tutorial-music-chart-app-quarkus:v23.01
__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2023-01-09 08:45:54,755 WARN  [org.apa.kaf.cli.adm.AdminClientConfig] (main) These configurations '[ssl.endpoint.identification.algorithm]' were supplied but are not used yet.
2023-01-09 08:45:55,340 INFO  [org.apa.kaf.str.StreamsConfig] (main) StreamsConfig values:
        acceptable.recovery.lag = 10000
        application.id = music-chart
        application.server = localhost:9090
        bootstrap.servers = [kafka:9092]
...
2023-01-09 08:46:00,952 INFO  [org.apa.kaf.str.pro.int.StreamThread] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) stream-thread [music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2023-01-09 08:46:00,958 INFO  [org.apa.kaf.str.KafkaStreams] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) stream-client [music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a] State transition from REBALANCING to RUNNING
2023-01-09 08:46:02,098 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 5 -> PlayedSong [count=1, songName=Sometimes]
2023-01-09 08:46:02,128 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 8 -> PlayedSong [count=2, songName=Perfect]
2023-01-09 08:46:02,133 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 7 -> PlayedSong [count=2, songName=Fox On The Run]
2023-01-09 08:46:02,135 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 3 -> PlayedSong [count=3, songName=Still Loving You]
2023-01-09 08:46:02,137 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 4 -> PlayedSong [count=1, songName=Bohemian Rhapsody]
2023-01-09 08:46:02,140 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 2 -> PlayedSong [count=2, songName=Believe]
docker run --rm --network=kafka-tutorial quay.io/rhdevelopers/kafka-tutorial-music-chart-app-quarkus:v23.01
__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2023-01-09 08:45:54,755 WARN  [org.apa.kaf.cli.adm.AdminClientConfig] (main) These configurations '[ssl.endpoint.identification.algorithm]' were supplied but are not used yet.
2023-01-09 08:45:55,340 INFO  [org.apa.kaf.str.StreamsConfig] (main) StreamsConfig values:
        acceptable.recovery.lag = 10000
        application.id = music-chart
        application.server = localhost:9090
        bootstrap.servers = [kafka:9092]
...
2023-01-09 08:46:00,952 INFO  [org.apa.kaf.str.pro.int.StreamThread] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) stream-thread [music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2023-01-09 08:46:00,958 INFO  [org.apa.kaf.str.KafkaStreams] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) stream-client [music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a] State transition from REBALANCING to RUNNING
2023-01-09 08:46:02,098 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 5 -> PlayedSong [count=1, songName=Sometimes]
2023-01-09 08:46:02,128 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 8 -> PlayedSong [count=2, songName=Perfect]
2023-01-09 08:46:02,133 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 7 -> PlayedSong [count=2, songName=Fox On The Run]
2023-01-09 08:46:02,135 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 3 -> PlayedSong [count=3, songName=Still Loving You]
2023-01-09 08:46:02,137 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 4 -> PlayedSong [count=1, songName=Bohemian Rhapsody]
2023-01-09 08:46:02,140 INFO  [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 2 -> PlayedSong [count=2, songName=Believe]

Music Chart Result

If you inspect the output of Music Chart service, you see how many times a concrete song has been played, and every time a new song is played, the counter is updated.

Clean Up

Stop both services by typing Ctrl+C in both terminals 1 and 2.

To restart Kafka (and ZooKeeper) go to the docker compose terminal and stop the process by typing Ctrl+C.

Then run:

docker compose rm

? Going to remove kafka, zookeeper (y/N) -> y
[+] Running 2/0
 ⠿ Container zookeeper  Removed
 ⠿ Container kafka      Removed
docker compose up --remove-orphans

kafka      | [2022-12-15 08:59:25,688] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka startTimeMs: 1671094765681 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,689] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
kafka      | [2022-12-15 08:59:25,750] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka      | [2022-12-15 08:59:25,751] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)