Developing Kafka Streams in Java
In this example, we are going to develop an example to build a music chart to see the number of times that a song has been played.
There is one service (player-app
) that it is periodically producing played songs to the played-songs
topic.
This service is not using Kafka Streams but instead just uses the Kafka producer API under the covers of Smallrye Reactive Messaging for Kafka.
The second service is a Kafka Stream application that joins the songs
and played-songs
topics, to get the song name. Also it computes the number of times that each particular song has been played by aggregating the joined records based on the song’s id.
Remember you need to have both, ZooKeeper and Apache Kafka correctly configured as well as up and running. If not, you can follow the [Setup Kafka] section to learn how to do it using docker compose.
You need to open 2 terminal windows to run this example, one for each service.
Player Service
The producer code is at Player Service.
Deploying Player Service
In this case, the Quarkus service is deployed to produce played songs.
You’ve got two options, using the existing Docker image or building the service and image yourself. Note, that due to the configured Quarkus extension quarkus-container-image-jib
the container image will be build automatically when you trigger the local build.
In terminal 1, run one of these two options:
cd $TUTORIAL_HOME/apps/player-app/quarkus/player-app
./mvnw clean package
docker run --rm --network=kafka-tutorial quay.io/rhdevelopers/kafka-tutorial-player-app-quarkus:v23.01
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2023-01-09 08:45:18,099 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-0) SRMSG18258: Kafka producer kafka-producer-songs, connected to Kafka brokers 'kafka:9092', is configured to write records to 'songs'
2023-01-09 08:45:18,471 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-1) SRMSG18258: Kafka producer kafka-producer-played-songs, connected to Kafka brokers 'kafka:9092', is configured to write records to 'played-songs'
2023-01-09 08:45:18,573 INFO [org.acm.PlaySongsGenerator] (main) creating song reference data
2023-01-09 08:45:18,691 INFO [org.acm.PlaySongsGenerator] (main) starting to play random songs every 2500 ms
2023-01-09 08:45:18,739 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=1, name=The Good The Bad And The Ugly, author=Ennio Morricone]
2023-01-09 08:45:19,675 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=2, name=Believe, author=Cher]
2023-01-09 08:45:19,686 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=3, name=Still Loving You, author=Scorpions]
2023-01-09 08:45:19,689 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=4, name=Bohemian Rhapsody, author=Queen]
2023-01-09 08:45:19,691 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=5, name=Sometimes, author=James]
2023-01-09 08:45:19,692 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=6, name=Into The Unknown, author=Frozen II]
2023-01-09 08:45:19,694 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=7, name=Fox On The Run, author=Sweet]
2023-01-09 08:45:19,696 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=8, name=Perfect, author=Ed Sheeran]
2023-01-09 08:45:19,761 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 8 (Perfect) for user Edson
2023-01-09 08:45:19,785 INFO [io.quarkus] (main) player-app 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.15.2.Final) started in 7.027s.
2023-01-09 08:45:19,787 INFO [io.quarkus] (main) Profile prod activated.
2023-01-09 08:45:19,788 INFO [io.quarkus] (main) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
2023-01-09 08:45:20,102 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-played-songs) [Producer clientId=kafka-producer-played-songs] Error while fetching metadata with correlation id 1 : {played-songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,106 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-songs) [Producer clientId=kafka-producer-songs] Error while fetching metadata with correlation id 1 : {songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,213 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-songs) [Producer clientId=kafka-producer-songs] Error while fetching metadata with correlation id 3 : {songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,220 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-played-songs) [Producer clientId=kafka-producer-played-songs] Error while fetching metadata with correlation id 3 : {played-songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:22,231 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 5 (Sometimes) for user Edson
2023-01-09 08:45:24,729 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 7 (Fox On The Run) for user Burr
2023-01-09 08:45:27,230 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Edson
2023-01-09 08:45:29,735 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 2 (Believe) for user Edson
2023-01-09 08:45:32,238 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 8 (Perfect) for user Alex
2023-01-09 08:45:34,743 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 7 (Fox On The Run) for user Kamesh
2023-01-09 08:45:37,230 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Kamesh
2023-01-09 08:45:39,728 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Sebi
2023-01-09 08:45:42,229 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 4 (Bohemian Rhapsody) for user Edson
2023-01-09 08:45:44,726 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 2 (Believe) for user Burr
...
docker run --rm --network=kafka-tutorial quay.io/rhdevelopers/kafka-tutorial-player-app-quarkus:v23.01
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2023-01-09 08:45:18,099 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-0) SRMSG18258: Kafka producer kafka-producer-songs, connected to Kafka brokers 'kafka:9092', is configured to write records to 'songs'
2023-01-09 08:45:18,471 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-producer-thread-1) SRMSG18258: Kafka producer kafka-producer-played-songs, connected to Kafka brokers 'kafka:9092', is configured to write records to 'played-songs'
2023-01-09 08:45:18,573 INFO [org.acm.PlaySongsGenerator] (main) creating song reference data
2023-01-09 08:45:18,691 INFO [org.acm.PlaySongsGenerator] (main) starting to play random songs every 2500 ms
2023-01-09 08:45:18,739 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=1, name=The Good The Bad And The Ugly, author=Ennio Morricone]
2023-01-09 08:45:19,675 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=2, name=Believe, author=Cher]
2023-01-09 08:45:19,686 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=3, name=Still Loving You, author=Scorpions]
2023-01-09 08:45:19,689 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=4, name=Bohemian Rhapsody, author=Queen]
2023-01-09 08:45:19,691 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=5, name=Sometimes, author=James]
2023-01-09 08:45:19,692 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=6, name=Into The Unknown, author=Frozen II]
2023-01-09 08:45:19,694 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=7, name=Fox On The Run, author=Sweet]
2023-01-09 08:45:19,696 INFO [org.acm.PlaySongsGenerator] (main) producing -> Song[id=8, name=Perfect, author=Ed Sheeran]
2023-01-09 08:45:19,761 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 8 (Perfect) for user Edson
2023-01-09 08:45:19,785 INFO [io.quarkus] (main) player-app 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.15.2.Final) started in 7.027s.
2023-01-09 08:45:19,787 INFO [io.quarkus] (main) Profile prod activated.
2023-01-09 08:45:19,788 INFO [io.quarkus] (main) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
2023-01-09 08:45:20,102 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-played-songs) [Producer clientId=kafka-producer-played-songs] Error while fetching metadata with correlation id 1 : {played-songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,106 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-songs) [Producer clientId=kafka-producer-songs] Error while fetching metadata with correlation id 1 : {songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,213 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-songs) [Producer clientId=kafka-producer-songs] Error while fetching metadata with correlation id 3 : {songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:20,220 WARN [org.apa.kaf.cli.NetworkClient] (kafka-producer-network-thread | kafka-producer-played-songs) [Producer clientId=kafka-producer-played-songs] Error while fetching metadata with correlation id 3 : {played-songs=LEADER_NOT_AVAILABLE}
2023-01-09 08:45:22,231 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 5 (Sometimes) for user Edson
2023-01-09 08:45:24,729 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 7 (Fox On The Run) for user Burr
2023-01-09 08:45:27,230 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Edson
2023-01-09 08:45:29,735 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 2 (Believe) for user Edson
2023-01-09 08:45:32,238 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 8 (Perfect) for user Alex
2023-01-09 08:45:34,743 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 7 (Fox On The Run) for user Kamesh
2023-01-09 08:45:37,230 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Kamesh
2023-01-09 08:45:39,728 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 3 (Still Loving You) for user Sebi
2023-01-09 08:45:42,229 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 4 (Bohemian Rhapsody) for user Edson
2023-01-09 08:45:44,726 INFO [org.acm.PlaySongsGenerator] (executor-thread-0) Played song id 2 (Believe) for user Burr
...
Kafka Streams with Java
The KStreams code is at Music Chart Service.
Deploying Music Chart Service
In this case, the Quarkus service is deployed to use Kafka Streams.
You’ve got two options, using the existing Docker image or building the service and image yourself. Note, that due to the configured Quarkus extension quarkus-container-image-jib
the container image will be created automatically when you trigger the local build.
In terminal 2, run one of these two options:
cd $TUTORIAL_HOME/apps/music-chart-app/quarkus/music-chart-app
./mvnw clean package
docker run --rm --network=kafka-tutorial quay.io/rhdevelopers/kafka-tutorial-music-chart-app-quarkus:v23.01
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2023-01-09 08:45:54,755 WARN [org.apa.kaf.cli.adm.AdminClientConfig] (main) These configurations '[ssl.endpoint.identification.algorithm]' were supplied but are not used yet.
2023-01-09 08:45:55,340 INFO [org.apa.kaf.str.StreamsConfig] (main) StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = music-chart
application.server = localhost:9090
bootstrap.servers = [kafka:9092]
...
2023-01-09 08:46:00,952 INFO [org.apa.kaf.str.pro.int.StreamThread] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) stream-thread [music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2023-01-09 08:46:00,958 INFO [org.apa.kaf.str.KafkaStreams] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) stream-client [music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a] State transition from REBALANCING to RUNNING
2023-01-09 08:46:02,098 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 5 -> PlayedSong [count=1, songName=Sometimes]
2023-01-09 08:46:02,128 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 8 -> PlayedSong [count=2, songName=Perfect]
2023-01-09 08:46:02,133 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 7 -> PlayedSong [count=2, songName=Fox On The Run]
2023-01-09 08:46:02,135 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 3 -> PlayedSong [count=3, songName=Still Loving You]
2023-01-09 08:46:02,137 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 4 -> PlayedSong [count=1, songName=Bohemian Rhapsody]
2023-01-09 08:46:02,140 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 2 -> PlayedSong [count=2, songName=Believe]
docker run --rm --network=kafka-tutorial quay.io/rhdevelopers/kafka-tutorial-music-chart-app-quarkus:v23.01
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2023-01-09 08:45:54,755 WARN [org.apa.kaf.cli.adm.AdminClientConfig] (main) These configurations '[ssl.endpoint.identification.algorithm]' were supplied but are not used yet.
2023-01-09 08:45:55,340 INFO [org.apa.kaf.str.StreamsConfig] (main) StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = music-chart
application.server = localhost:9090
bootstrap.servers = [kafka:9092]
...
2023-01-09 08:46:00,952 INFO [org.apa.kaf.str.pro.int.StreamThread] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) stream-thread [music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2023-01-09 08:46:00,958 INFO [org.apa.kaf.str.KafkaStreams] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) stream-client [music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a] State transition from REBALANCING to RUNNING
2023-01-09 08:46:02,098 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 5 -> PlayedSong [count=1, songName=Sometimes]
2023-01-09 08:46:02,128 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 8 -> PlayedSong [count=2, songName=Perfect]
2023-01-09 08:46:02,133 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 7 -> PlayedSong [count=2, songName=Fox On The Run]
2023-01-09 08:46:02,135 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 3 -> PlayedSong [count=3, songName=Still Loving You]
2023-01-09 08:46:02,137 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 4 -> PlayedSong [count=1, songName=Bohemian Rhapsody]
2023-01-09 08:46:02,140 INFO [org.acm.MusicChartTopologyProducer] (music-chart-08427007-2905-4d91-b260-44bcb8ac5c6a-StreamThread-1) music chart updated for song id 2 -> PlayedSong [count=2, songName=Believe]
Clean Up
Stop both services by typing Ctrl+C in both terminals 1 and 2.
To restart Kafka (and ZooKeeper) go to the docker compose
terminal and stop the process by typing Ctrl+C.
Then run:
docker compose rm
? Going to remove kafka, zookeeper (y/N) -> y
[+] Running 2/0
⠿ Container zookeeper Removed
⠿ Container kafka Removed
docker compose up --remove-orphans
kafka | [2022-12-15 08:59:25,688] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,688] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,688] INFO Kafka startTimeMs: 1671094765681 (org.apache.kafka.common.utils.AppInfoParser)
kafka | [2022-12-15 08:59:25,689] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
kafka | [2022-12-15 08:59:25,750] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka | [2022-12-15 08:59:25,751] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)