Developing Consumers and Producers in Java

In this example, we are going to develop an example where there is one service that is putting songs description to the songs topic.

By default, it is not necessary to create the Kafka topic manually, Kafka creates it automatically with default parameters.

Then there is another service that consumes from the songs topic these songs. Notice that usually this consumer would take the songs and process them (i.e adding them in a graph database like Neo4J, or an ElasticSearch for searching capabilities), but for the sake of simplicity, in this case, the service is printing them in the console.

Rememeber to have Zookeeper and Apache Kafka up and running. If not, you can follow [Setup Kafka] section to learn how to do it using docker-compose.

You need to open 4 terminal windows to run this example, 2 terminals for running the services, and 2 terminals for sending requests.

terminals song

Producing messages with Java

The producer code is at Song App.

Deploying Producer

In this case, the Spring Boot service is deployed to produce songs. You’ve got two options, using Docker or building yourself the service. In terminal 1, run one of these two options:

  • Building & Run

  • Docker

cd $TUTORIAL_HOME/apps/song-app/springboot/song-app
./mvnw clean package -DskipTests
docker build -t quay.io/rhdevelopers/kafka-tutorial-song-app:springboot .
docker run --rm -ti -p 8080:8080 --link=kafka:kafka quay.io/rhdevelopers/kafka-tutorial-song-app:springboot
2020-04-29 09:38:23.290  INFO 3309 --- [main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 921 ms
2020-04-29 09:38:23.484  INFO 3309 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-04-29 09:38:23.670  INFO 3309 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2020-04-29 09:38:23.674  INFO 3309 --- [main] org.acme.song.app.SongAppApplication     : Started SongAppApplication in 1.719 seconds (JVM running for 2.096)
docker run --rm -ti -p 8080:8080 --link=kafka:kafka quay.io/rhdevelopers/kafka-tutorial-song-app:springboot
2020-04-29 09:38:23.290  INFO 3309 --- [main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 921 ms
2020-04-29 09:38:23.484  INFO 3309 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-04-29 09:38:23.670  INFO 3309 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2020-04-29 09:38:23.674  INFO 3309 --- [main] org.acme.song.app.SongAppApplication     : Started SongAppApplication in 1.719 seconds (JVM running for 2.096)

Consuming messages with Java

The consumer code is at Song Indexer App.

Deploying Consumer

In this case, the Spring Boot service is deployed to consume songs. You’ve got two options, using Docker or building yourself the service. In terminal 2, run one of these two options:

  • Build & Run

  • Docker

cd $TUTORIAL_HOME/apps/song-indexer-app/springboot/song-indexer-app
./mvnw clean package -DskipTests
docker build -t quay.io/rhdevelopers/kafka-tutorial-song-indexer-app:springboot .
docker run --rm -ti -p 9090:8080 --link=kafka:kafka quay.io/rhdevelopers/kafka-tutorial-song-indexer-app:springboot
2020-04-29 14:37:03.530  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=G1] Successfully joined group with generation 3
2020-04-29 14:37:03.534  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=G1] Setting newly assigned partitions: songs-0
2020-04-29 14:37:03.542  INFO 1 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2020-04-29 14:37:03.545  INFO 1 --- [main] o.a.s.i.app.SongIndexerAppApplication    : Started SongIndexerAppApplication in 3.062 seconds (JVM running for 3.524)
2020-04-29 14:37:03.556  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=G1] Setting offset for partition songs-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka:9092 (id: 0 rack: null), epoch=0}}
2020-04-29 14:37:03.565  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : G1: partitions assigned: [songs-0]
docker run --rm -ti -p 9090:8080 --link=kafka:kafka quay.io/rhdevelopers/kafka-tutorial-song-indexer-app:springboot
2020-04-29 14:37:03.530  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=G1] Successfully joined group with generation 3
2020-04-29 14:37:03.534  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=G1] Setting newly assigned partitions: songs-0
2020-04-29 14:37:03.542  INFO 1 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2020-04-29 14:37:03.545  INFO 1 --- [main] o.a.s.i.app.SongIndexerAppApplication    : Started SongIndexerAppApplication in 3.062 seconds (JVM running for 3.524)
2020-04-29 14:37:03.556  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=G1] Setting offset for partition songs-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka:9092 (id: 0 rack: null), epoch=0}}
2020-04-29 14:37:03.565  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : G1: partitions assigned:

Providing Songs

Song Indexer Service is exposing using Server-Side Events, the songs that are published to songs topic. In the terminal 3, run the following command to start listening for the events:

http GET localhost:9090/events --stream --timeout=600

Song Service exposes an endpoint to publish songs into songs topic. In the terminal 4, run the following command to publish a song:

http POST localhost:8080/songs id=1000 name=Portals author='Alan Silvestri'
HTTP/1.1 201
Connection: keep-alive
Content-Length: 0
Date: Wed, 29 Apr 2020 14:34:27 GMT
Keep-Alive: timeout=60

To verify that the song has been processed, check terminal 2 (song indexer service) for the next logline:

2020-04-29 14:34:18.340  INFO 1 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.1          : onSubscribe(EmitterProcessor.EmitterInner)
2020-04-29 14:34:18.343  INFO 1 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.1          : request(1)
2020-04-29 14:34:18.345  INFO 1 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.1          : onNext(ServerSentEvent [id = '20ad9cdd-4629-4683-8442-4943b0161354', event='null', retry=null, comment='null', data=Song 1000 processed])
2020-04-29 14:34:18.352  INFO 1 --- [         task-1] reactor.Flux.EmitterProcessor.1          : request(1)
Song [author=Alan Silvestri, id=1000, name=Portals] indexed.

And in the termianl 4, you should see that en event has been streamed:

id:04ad6bd0-463a-4fd7-a39c-59ce042f54a4
data:Song 1000 processed

Clean Up

Stop the processes that are running in the terminal 1 and 2 by typing Ctrl+C.

To restart Kafka (and Zookeeper) go to docker-compose terminal and stop the process by typing Ctrl+C.

Then run:

docker-compose rm

Going to remove kafka, it_zookeeper_1
Are you sure? [yN] y
Removing kafka          ... done
Removing it_zookeeper_1 ... done
docker-compose up --remove-orphans

kafka        | [2020-05-04 11:30:27,392] INFO [SocketServer brokerId=0] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
kafka        | [2020-05-04 11:30:27,396] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,397] INFO Kafka startTimeMs: 1588591827393 (org.apache.kafka.common.utils.AppInfoParser)
kafka        | [2020-05-04 11:30:27,398] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)