Apache Kafka with Reactive Streams
Mutiny is just one part of the Reactive story. To complement it, we could use Reactive Streams too. An important service that can serve as the underlying implementation for our stream is Apache Kafka.
In this chapter, we’ll make a small change: We will send beers with a price to a Kafka broker instead of using an in-memory channel.
Modify BeerProcessor
In the BeerProcessor
Java class in src/main/java
in the org.acme
package should have the print
method commented as it’s not necessary anymore because the content of the messages
channel will be send to a Kafka topic:
/**@Incoming("messages")
public void print(JsonObject beer) {
System.out.println(JsonbBuilder.create().toJson(beer));
}**/
Add the Reactive Messaging Kafka properties
Add the following properties to your application.properties
in src/main/resources
to configure messages
channel to be backed by a Kafka topic instead of a memory channel:
mp.messaging.outgoing.messages.connector=smallrye-kafka(1)
mp.messaging.outgoing.messages.topic=pricedbeers(2)
1 | messages channel is backed to Kafka |
2 | messages channel sends events to pricedbeers topic |
If all channels are backed to Kafka, it’s not necessary to set the connector property.
|
If the channel name is the same as the topic, it’s not necessary to set the topic property.
|
Dev Services for Kafka
Because starting a Kafka broker can be long and you need to develop fast in your local environment, Dev Services for Kafka is here to help you!
Since we have added the quarkus-messaging-kafka
, Quarkus Dev Services automatically starts a containerized Kafka broker in dev mode and when running tests.
You can disable Dev Services for Kafka by adding quarkus.kafka.devservices.enabled=false or configuring kafka.bootstrap.servers in application.properties .
|
Invoke the endpoint
There’s not really any code to add at this point. Just by having Docker/Podman running on our computer, and starting the service in dev mode, we can now send the same request as in the previous chapter, but it will be sent to a Kafka topic instead of an in-memory channel. Let’s try it:
curl -w '\n' localhost:8080/beer/emit/1
As you can see, nothing is shown in the return message, nor is there anything in the Quarkus terminal, because the event is simply sent to a Kafka topic. We could create some additional code to retrieve the message from Kafka, but in this case we’re going to use the Dev UI interface where we can actually find the contents of the Kafka topic in the Kafka by pointing our browser to http://localhost:8080/q/dev-ui/io.quarkus.quarkus-kafka-client/topics