Developing Consumers and Producers in Java

In this example, we are going to develop two simple services which will produce data to and consume data from a Kafka topic named songs.

By default, it is not necessary to create the Kafka topic manually, Kafka creates it automatically with default parameters.

The first service acts as a Kafka producer to write song related information (e.g. id, author, name) into the songs topic. Then there is a second service acting as a Kafka consumer which reads this data from the songs topic. Usually this consumer would perform some "real business logic" such as taking the songs data and process it somehow, e.g. adding songs in a graph database like Neo4J, or build a fulltext search index based on Elastic. However, for the sake of simplicity, our service is just printing the consumed data to the console and exposes every consumed Kafka record over a server-sent event (SSE) stream for connected HTTP clients.

Remember you need to have both, ZooKeeper and Apache Kafka correctly configured as well as up and running. If not, you can follow the [Setup Kafka] section to learn how to do it using docker compose.

It’s best to have 4 terminal windows open for running this example, 2 terminals for the producer and consumer services, and another 2 terminals for sending and receiving HTTP requests/responses.

terminals song

Producing messages with Java

The producer code is at Song App.

Deploying Producer

In this case, the Spring Boot service is deployed to produce songs. You’ve got two options, using Docker or building the service yourself from the sources. In terminal 1, run one of these two options:

  • Building & Run

  • Docker

cd $TUTORIAL_HOME/apps/song-app/springboot/song-app
./mvnw clean package -DskipTests
docker build -t quay.io/rhdevelopers/kafka-tutorial-song-app-springboot:latest .
docker run --rm --network=kafka-tutorial -p 8080:8080 quay.io/rhdevelopers/kafka-tutorial-song-app-springboot:latest
2022-12-19T12:31:01.985Z  INFO 1 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-12-19T12:31:01.986Z  INFO 1 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 646 ms
2022-12-19T12:31:02.334Z  INFO 1 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-12-19T12:31:02.349Z  INFO 1 --- [           main] org.acme.song.app.SongApplication        : Started SongApplication in 1.382 seconds (process running for 1.821)
docker run --rm --network=kafka-tutorial -p 8080:8080 quay.io/rhdevelopers/kafka-tutorial-song-app-springboot:v22.12
2022-12-19T12:31:01.985Z  INFO 1 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-12-19T12:31:01.986Z  INFO 1 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 646 ms
2022-12-19T12:31:02.334Z  INFO 1 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-12-19T12:31:02.349Z  INFO 1 --- [           main] org.acme.song.app.SongApplication        : Started SongApplication in 1.382 seconds (process running for 1.821)

Consuming messages with Java

The consumer code is at Song Indexer App.

Deploying Consumer

In this case, the Spring Boot service is deployed to consume songs. You’ve got two options, using Docker or building the service yourself from the sources. In terminal 2, run one of these two options:

  • Build & Run

  • Docker

cd $TUTORIAL_HOME/apps/song-indexer-app/springboot/song-indexer-app
./mvnw clean package -DskipTests
docker build -t quay.io/rhdevelopers/kafka-tutorial-song-indexer-app-springboot:latest .
docker run --rm --network=kafka-tutorial -p 9090:8080 quay.io/rhdevelopers/kafka-tutorial-song-indexer-app-springboot:latest
2022-12-15T09:35:14.545+01:00  INFO 43531 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] (Re-)joining group
2022-12-15T09:35:14.549+01:00  INFO 43531 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Successfully joined group with generation Generation{generationId=35, memberId='consumer-G1-1-678970e3-7deb-4679-a469-10b1b53a200c', protocol='range'}
2022-12-15T09:35:14.550+01:00  INFO 43531 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Finished assignment for group at generation 35: {consumer-G1-1-678970e3-7deb-4679-a469-10b1b53a200c=Assignment(partitions=[songs-0])}
2022-12-15T09:35:14.555+01:00  INFO 43531 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Successfully synced group in generation Generation{generationId=35, memberId='consumer-G1-1-678970e3-7deb-4679-a469-10b1b53a200c', protocol='range'}
2022-12-15T09:35:14.555+01:00  INFO 43531 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Notifying assignor about the new Assignment(partitions=[songs-0])
2022-12-15T09:35:14.557+01:00  INFO 43531 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Adding newly assigned partitions: songs-0
2022-12-15T09:35:14.564+01:00  INFO 43531 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Setting offset for partition songs-0 to the committed offset FetchPosition{offset=49, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 0 rack: null)], epoch=0}}
2022-12-15T09:35:14.565+01:00  INFO 43531 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : G1: partitions assigned: [songs-0]
docker run --rm --network=kafka-tutorial -p 9090:8080 quay.io/rhdevelopers/kafka-tutorial-song-indexer-app-springboot:v22.12
2022-12-15T09:44:02.594+01:00  INFO 45680 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Finished assignment for group at generation 1: {consumer-G1-1-95c3358a-e4a1-4d93-bd10-37453f8d21ed=Assignment(partitions=[songs-0])}
2022-12-15T09:44:02.632+01:00  INFO 45680 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Successfully synced group in generation Generation{generationId=1, memberId='consumer-G1-1-95c3358a-e4a1-4d93-bd10-37453f8d21ed', protocol='range'}
2022-12-15T09:44:02.633+01:00  INFO 45680 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Notifying assignor about the new Assignment(partitions=[songs-0])
2022-12-15T09:44:02.637+01:00  INFO 45680 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Adding newly assigned partitions: songs-0
2022-12-15T09:44:02.646+01:00  INFO 45680 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Found no committed offset for partition songs-0
2022-12-15T09:44:02.651+01:00  INFO 45680 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-G1-1, groupId=G1] Found no committed offset for partition songs-0
2022-12-15T09:44:02.660+01:00  INFO 45680 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-G1-1, groupId=G1] Resetting offset for partition songs-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 0 rack: null)], epoch=0}}.
2022-12-15T09:44:02.682+01:00  INFO 45680 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : G1: partitions assigned: [songs-0]

Providing Songs

The song indexer service consume songs that have been produced to the songs topic. Besides printing the song to the console it exposes every consumed record as sever-sent event (SSE) over HTTP. In the terminal 3, run the following command to start listening for sever-sent events over the HTTP connection:

http GET localhost:9090/events --stream --timeout=600

The song service can produce songs into the songs the topic. For that, it exposes an HTTP POST endpoint which allows us to publish songs. In the terminal 4, run the following command to publish a song:

http POST localhost:8080/songs id=1000 name='Portals' author='Alan Silvestri'
HTTP/1.1 201
Connection: keep-alive
Content-Length: 0
Date: Thu, 15 Dec 2022 08:46:57 GMT
Keep-Alive: timeout=60

To verify that the song has been processed, check terminal 2 (song indexer service) for the last couple of log lines:

2022-12-15T09:47:08.234+01:00  INFO 45680 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : G1: partitions assigned: [songs-0]
Song[id=1000, name=Portals, author=Alan Silvestri, op=ADD] indexed.
2022-12-15T09:49:09.068+01:00  INFO 46855 --- [ntainer#0-0-C-1] reactor.Flux.SinkManyBestEffort.1        : onNext(ServerSentEvent [id = '6fad2b6f-6400-45a1-9dc9-8bcfed7b5399', event='null', retry=null, comment='null', data=Song 1000 processed])

And in the terminal 4, you should see that the sever-sent event has been streamed successfully:

HTTP/1.1 200 OK
Content-Type: text/event-stream;charset=UTF-8
Vary: Origin
Vary: Access-Control-Request-Method
Vary: Access-Control-Request-Headers
transfer-encoding: chunked

id:6fad2b6f-6400-45a1-9dc9-8bcfed7b5399
data:Song 1000 processed

Clean Up

Stop the processes that are running in the terminal 1 and 2 by typing Ctrl+C.

To restart Kafka (and ZooKeeper) go to the docker compose terminal and stop the process by typing Ctrl+C.

Then run:

docker compose rm

? Going to remove kafka, zookeeper (y/N) -> y
[+] Running 2/0
 ⠿ Container zookeeper  Removed
 ⠿ Container kafka      Removed
docker compose up --remove-orphans

kafka      | [2022-12-15 08:59:25,688] INFO Kafka version: 3.3.1 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka commitId: e23c59d00e687ff5 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,688] INFO Kafka startTimeMs: 1671094765681 (org.apache.kafka.common.utils.AppInfoParser)
kafka      | [2022-12-15 08:59:25,689] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
kafka      | [2022-12-15 08:59:25,750] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
kafka      | [2022-12-15 08:59:25,751] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use broker kafka:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)