Reactive Messaging

As a Java developer you’re likely familiar with JMS, which is considered to be the standard when it comes to working with messages. JMS however is a blocking API, which prevents us from implementing the reactive principles.

Quarkus has a "SmallRye Reactive Messaging" extension which is an implementation of the Eclipse MicroProfile Reactive Messaging specification. Reactive Messaging allows us to implement messaging in a non-blocking, reactive way.

In this chapter we’re going to use SmallRye Reactive Messaging to generate beers once again. This time we’re going to add a (random) price to the beers. We’re going to be using messages instead of synchronous calls to do this.

To do so, we’re going to use an in-memory channel. This means that messages are sent through the application using memory as transport channel of the messages.

In the following section, we’ll see what we need to change to start using an external broker for sending messages.

Add the Reactive Messaging extension

Open a new terminal window, and make sure you’re at the root of your tutorial-app project, then run:

  • Maven

  • Quarkus CLI

./mvnw quarkus:add-extension -Dextension=messaging
quarkus extension add messaging

Modify BeerResource

Let’s create a new endpoint that finds a beer and sends/emits a message to a beers channel.

Open the BeerResource class and add the following code.

In the imports section:

import jakarta.ws.rs.core.Response;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

And the business code:

    @Channel("beers")
    Emitter<JsonObject> emitter;

    @GET
    @Path("/emit/{beer}")
    public Response emitBeer(@PathParam("beer") int beerId) {
        beerService.getBeer(beerId) (1)
            .map(beers -> beers.get(0).asJsonObject()) (2)
            .subscribe().with(emitter::send); (3)
        return Response.ok().build(); (4)
    }
1 Finds the beer
2 Gets the first beer
3 Emits the beer to beers channel
4 Sends an ack to caller

The previous code sends the beer as a JsonObject to beers channel. Since we are using an in-memory channel, let’s create a new Java class in the same project capturing the messages sent to the channel.

This new class will send another event to a different channel which will be captured by yet another method.

Create BeerProcessor

Create a new BeerProcessor Java class in src/main/java in the org.acme package with the following contents:

package com.redhat.developers;

import java.util.concurrent.ThreadLocalRandom;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.bind.JsonbBuilder;


@ApplicationScoped
public class BeerProcessor {

    @Incoming("beers") (1)
    @Outgoing("messages") (2)
    public JsonObject processPrice(JsonObject beer) { (3)
        JsonObjectBuilder beerWithPrice = Json.createObjectBuilder(beer).add("price", getPrice());
        return beerWithPrice.build(); (4)
    }

    private int getPrice() {
        return ThreadLocalRandom.current().nextInt(1, 10);
    }

    @Incoming("messages") (5)
    public void print(JsonObject beer) {
        System.out.println(JsonbBuilder.create().toJson(beer));
    }
}
1 Listen to events from beers channel
2 Sends/Emits the result of the method call to the messages channel
3 Argument is the message of the beers channel
4 Return object is sent to the messages channel
5 Captures messages event
You can use the @Retry annotation from the fault-tolerant extension to add some resiliency.

Invoke the endpoint

With all these changes done and having the application running (either in DevMode or as a packaged application) invoke the service and inspect the Quarkus console for the output:

curl localhost:8080/beer/emit/1

And in the Quarkus console, you’ll see an output in JSON format of the beer with the price field added.

{"id":1,"name":"Buzz","tagline":"A Real Bitter Experience."..."contributed_by":"Sam Mason <samjbmason>","price":8}