Reactive Messaging

As a Java developer you are aware that the JMS is the standard when comes to working with messages. JMS is a blocking API which blocks us from implementing the reactive principles.

Quarkus has SmallRye Reactive Messaging which is an implementation of the Eclipse MicroProfile Reactive Messaging specification. Quarkus implements version 2.x of this specification but also provides many other extensions.

In this chapter we’re going to use SmallRye Reactive Messaging to generate beers having a price but using messages instead of synchronous calls.

At this point, we’re going to use an inmemory channel which means that messages are sent through the same application using the memory as transport channel of the messages.

In the following section, we’ll see what we need to change to start using an external broker for sending messages.

Add the Reactive Messaging extension

Just open a new terminal window, and make sure you’re at the root of your tutorial-app project, then run:

  • Maven

  • Quarkus CLI

./mvnw quarkus:add-extension -Dextension=quarkus-smallrye-reactive-messaging
quarkus extension add quarkus-smallrye-reactive-messaging

Modify BeerResource

Let’s create a new endpoint that finds a beer and sends/emits a message to beers channel with the beer.

Open BeerResource class and add the following code.

In the imports section:


import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

And the business code:

Emitter<JsonObject> emitter;

public Response emitBeer(@PathParam("beer") int beerId) {
    beerService.getBeer(beerId) (1)
        .map(beers -> beers.get(0).asJsonObject()) (2)
        .subscribe().with(emitter::send); (3)
    return Response.ok().build(); (4)
1 Finds the beer
2 Gets the first beer
3 Emits the beer to beers channel
4 Sends an ack to caller

The previous code sends the beer as a JsonObject to beers channel. Since we are using a memory channel, let’s create a new Java class in the same project capturing the messages sent to the channel.

Moreover this new class, will send another event to a different channel which will be captured by another method.

Create BeerProcessor

Create a new BeerProcessor Java class in src/main/java in the org.acme package with the following contents:

package org.acme;

import java.util.concurrent.ThreadLocalRandom;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.bind.JsonbBuilder;

public class BeerProcessor {

    @Incoming("beers") (1)
    @Outgoing("messages") (2)
    public JsonObject processPrize(JsonObject beer) { (3)
        JsonObjectBuilder beerWithPrice = Json.createObjectBuilder(beer).add("price", getPrice());
        return; (4)

    private int getPrice() {
        return ThreadLocalRandom.current().nextInt(1, 10);

    @Incoming("messages") (5)
    public void print(JsonObject beer) {
1 Listen events from beers channel
2 Sends/Emits the result of the method call to the messages channel
3 Argument is the message of the beers channel
4 Return object is sent to the messages channel
5 Captures messages event
You can use the @Retry annotation from the fault-tolerant extension to add some resiliency.

Invoke the endpoint

With all these changes done and having the application running (either in DevMode or as a packaged application) invoke the service and inspect the Quarkus console for the output:

curl localhost:8080/beer/emit/1

And in the Quarkus console, you’ll see an output in JSON format of the beer with the price field added.

{"id":1,"name":"Buzz","tagline":"A Real Bitter Experience."..."contributed_by":"Sam Mason <samjbmason>","price":8}