Developing Kafka Streams in Java

In this example, we are going to develop an example to build a music chart to see the number of times that a song has been played.

There is one service (player-app) that it is periodically producing played songs to the played-songs topic. This service is not using Kafka Streams.

The second service is a Kafka Stream service that joins songs and played-songs topics, to get the song name, and finally, it updates the number of times that the song has been played.

Rememeber to have Zookeeper and Apache Kafka up and running. If not, you can follow [Setup Kafka] section to learn how to do it using docker-compose.

You need to open 2 terminal windows to run this example, one for each service.

Player Service

The producer code is at Player Service.

Deploying Player Service

In this case, the Quarkus service is deployed to produce played songs. You’ve got two options, using Docker or building yourself the service. In terminal 1, run one of these two options:

  • Building & Run

  • Docker

cd $TUTORIAL_HOME/apps/player-app/quarkus/player-app
./mvnw clean package
docker build -f src/main/docker/Dockerfile.jvm -t quay.io/rhdevelopers/kafka-tutorial-player-app:quarkus .
docker run --rm -ti -p 8080:8080 --link=kafka:kafka quay.io/rhdevelopers/kafka-tutorial-player-app:quarkus
exec java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/app.jar
__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-06 10:36:57,049 INFO  [io.sma.rea.mes.ext.MediatorManager] (main) Deployment done... start processing
2020-05-06 10:36:57,092 INFO  [io.sma.rea.mes.imp.ConfiguredChannelFactory] (main) Found incoming connectors: [smallrye-kafka]
2020-05-06 10:36:57,093 INFO  [io.sma.rea.mes.imp.ConfiguredChannelFactory] (main) Found outgoing connectors: [smallrye-kafka]
2020-05-06 10:36:57,093 INFO  [io.sma.rea.mes.imp.ConfiguredChannelFactory] (main) Channel manager initializing...
...

2020-05-06 10:36:57,460 INFO  [org.acm.PlaySongsGenerator] (main) Registered: {"author":"Ennio Morricone","id":1,"name":"The Good The Bad And The Ugly"}
2020-05-06 10:36:57,465 INFO  [org.acm.PlaySongsGenerator] (main) Registered: {"author":"Cher","id":2,"name":"Believe"}
2020-05-06 10:36:57,473 INFO  [org.acm.PlaySongsGenerator] (main) Registered: {"author":"Scorpions","id":3,"name":"Still Loving You"}
2020-05-06 10:36:57,485 INFO  [org.acm.PlaySongsGenerator] (main) Registered: {"author":"Queen","id":4,"name":"Bohemian Rhapsody"}
...
2020-05-06 10:37:02,657 INFO  [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 8: Perfect played.
docker run --rm -ti -p 8080:8080 --link=kafka:kafka quay.io/rhdevelopers/kafka-tutorial-player-app:quarkus
exec java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/app.jar
__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-06 10:36:57,049 INFO  [io.sma.rea.mes.ext.MediatorManager] (main) Deployment done... start processing
2020-05-06 10:36:57,092 INFO  [io.sma.rea.mes.imp.ConfiguredChannelFactory] (main) Found incoming connectors: [smallrye-kafka]
2020-05-06 10:36:57,093 INFO  [io.sma.rea.mes.imp.ConfiguredChannelFactory] (main) Found outgoing connectors: [smallrye-kafka]
2020-05-06 10:36:57,093 INFO  [io.sma.rea.mes.imp.ConfiguredChannelFactory] (main) Channel manager initializing...
...

2020-05-06 10:36:57,460 INFO  [org.acm.PlaySongsGenerator] (main) Registered: {"author":"Ennio Morricone","id":1,"name":"The Good The Bad And The Ugly"}
2020-05-06 10:36:57,465 INFO  [org.acm.PlaySongsGenerator] (main) Registered: {"author":"Cher","id":2,"name":"Believe"}
2020-05-06 10:36:57,473 INFO  [org.acm.PlaySongsGenerator] (main) Registered: {"author":"Scorpions","id":3,"name":"Still Loving You"}
2020-05-06 10:36:57,485 INFO  [org.acm.PlaySongsGenerator] (main) Registered: {"author":"Queen","id":4,"name":"Bohemian Rhapsody"}
...
2020-05-06 10:37:02,657 INFO  [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 8: Perfect played.

Kafka Streams with Java

The KStreams code is at Music Chart Service.

Deploying Music Chart Service

In this case, the Quarkus service is deployed to use Kafka Streams. You’ve got two options, using Docker or building yourself the service. In terminal 2, run one of these two options:

  • Building & Run

  • Docker

cd $TUTORIAL_HOME/apps/music-chart-app/quarkus/music-chart-app
./mvnw clean package
docker build -f src/main/docker/Dockerfile.jvm -t quay.io/rhdevelopers/kafka-tutorial-music-chart-app:quarkus .
docker run --rm -ti -p 9090:8080 --link=kafka:kafka quay.io/rhdevelopers/kafka-tutorial-music-chart-app:quarkus
exec java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/app.jar
__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-06 11:12:25,856 INFO  [org.apa.kaf.str.StreamsConfig] (main) StreamsConfig values:
	application.id = music-chart
	application.server = localhost:9090
...
2020-05-06 11:12:27,705 INFO  [org.apa.kaf.str.pro.int.StreamThread] (music-chart-97e40fb8-ef19-4bd5-8a1f-e7bd4b391e1a-StreamThread-1) stream-thread [music-chart-97e40fb8-ef19-4bd5-8a1f-e7bd4b391e1a-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2020-05-06 11:12:27,707 INFO  [org.apa.kaf.str.KafkaStreams] (music-chart-97e40fb8-ef19-4bd5-8a1f-e7bd4b391e1a-StreamThread-1) stream-client [music-chart-97e40fb8-ef19-4bd5-8a1f-e7bd4b391e1a] State transition from REBALANCING to RUNNING
[KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=1, songName=Believe]
[KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=1, songName=The Good The Bad And The Ugly]
[KTABLE-TOSTREAM-0000000006]: 5, PlayedSong [count=1, songName=Sometimes]
[KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=2, songName=Believe]
[KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=2, songName=The Good The Bad And The Ugly]
[KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=3, songName=Believe]
docker run --rm -ti -p 9090:8080 --link=kafka:kafka quay.io/rhdevelopers/kafka-tutorial-music-chart-app:quarkus
exec java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/app.jar
__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-06 11:12:25,856 INFO  [org.apa.kaf.str.StreamsConfig] (main) StreamsConfig values:
	application.id = music-chart
	application.server = localhost:9090
...
2020-05-06 11:12:27,705 INFO  [org.apa.kaf.str.pro.int.StreamThread] (music-chart-97e40fb8-ef19-4bd5-8a1f-e7bd4b391e1a-StreamThread-1) stream-thread [music-chart-97e40fb8-ef19-4bd5-8a1f-e7bd4b391e1a-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2020-05-06 11:12:27,707 INFO  [org.apa.kaf.str.KafkaStreams] (music-chart-97e40fb8-ef19-4bd5-8a1f-e7bd4b391e1a-StreamThread-1) stream-client [music-chart-97e40fb8-ef19-4bd5-8a1f-e7bd4b391e1a] State transition from REBALANCING to RUNNING
[KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=1, songName=Believe]
[KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=1, songName=The Good The Bad And The Ugly]
[KTABLE-TOSTREAM-0000000006]: 5, PlayedSong [count=1, songName=Sometimes]
[KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=2, songName=Believe]
[KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=2, songName=The Good The Bad And The Ugly]
[KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=3, songName=Believe]

Music Chart Result

If you inspect the output of Music Chart service, you see how many times a concrete song has been played, and every time a new song is played, the counter is updated.

Clean Up

Stop both services by typing Ctrl+C in both terminals 1 and 2.

To restart Kafka (and Zookeeper) go to docker-compose terminal and stop the process by typing Ctrl+C.

Then run:

docker-compose rm

Going to remove kafka, it_zookeeper_1
Are you sure? [yN] y
Removing kafka          ... done
Removing it_zookeeper_1 ... done
docker-compose up --remove-orphans

kafka        | [2020-05-04 11:30:27,392] INFO [SocketServer brokerId=0] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
kafka        | [2020-05-04 11:30:27,396] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka startTimeMs: 1588591827393 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,398] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)