Apache Kafka with Reactive Streams

Mutiny is just one part of the Reactive story. To complement it, we could use Reactive Streams too. An important service that can serve as the underlying implementation for our stream is Apache Kafka.

In this chapter, we’ll make a small change: We will send beers with a price to a Kafka broker instead of using an in-memory channel.

Add the Reactive Messaging Kafka extension

Open a new terminal window, and make sure you’re at the root of your tutorial-app project, then run:

  • Maven

  • Quarkus CLI

./mvnw quarkus:add-extension -Dextensions=messaging-kafka
quarkus extension add messaging-kafka

Modify BeerProcessor

In the BeerProcessor Java class in src/main/java in the org.acme package should have the print method commented as it’s not necessary anymore because the content of the messages channel will be send to a Kafka topic:

/**@Incoming("messages")
public void print(JsonObject beer) {
    System.out.println(JsonbBuilder.create().toJson(beer));
}**/

Add the Reactive Messaging Kafka properties

Add the following properties to your application.properties in src/main/resources to configure messages channel to be backed by a Kafka topic instead of a memory channel:

mp.messaging.outgoing.messages.connector=smallrye-kafka(1)
mp.messaging.outgoing.messages.topic=pricedbeers(2)
1 messages channel is backed to Kafka
2 messages channel sends events to pricedbeers topic
If all channels are backed to Kafka, it’s not necessary to set the connector property.
If the channel name is the same as the topic, it’s not necessary to set the topic property.

Dev Services for Kafka

Because starting a Kafka broker can be long and you need to develop fast in your local environment, Dev Services for Kafka is here to help you!

Since we have added the quarkus-messaging-kafka, Quarkus Dev Services automatically starts a containerized Kafka broker in dev mode and when running tests.

You can disable Dev Services for Kafka by adding quarkus.kafka.devservices.enabled=false or configuring kafka.bootstrap.servers in application.properties.

Invoke the endpoint

There’s not really any code to add at this point. Just by having Docker/Podman running on our computer, and starting the service in dev mode, we can now send the same request as in the previous chapter, but it will be sent to a Kafka topic instead of an in-memory channel. Let’s try it:

curl -w '\n' localhost:8080/beer/emit/1

As you can see, nothing is shown in the return message, nor is there anything in the Quarkus terminal, because the event is simply sent to a Kafka topic. We could create some additional code to retrieve the message from Kafka, but in this case we’re going to use the Dev UI interface where we can actually find the contents of the Kafka topic in the Kafka by pointing our browser to http://localhost:8080/q/dev-ui/io.quarkus.quarkus-kafka-client/topics