Reactive Messaging

As a Java developer you are aware that the JMS is the standard when comes to working with messages. JMS is a blocking API which blocks us from implementing the reactive principles.

Quarkus has SmallRye Reactive Messaging which is an implementation of the Eclipse MicroProfile Reactive Messaging specification. Quarkus implements version 2.x of this specification but also provides many other extensions.

In this chapter we’re going to use SmallRye Reactive Messaging to generate beers having a price.

Add the Reactive Messaging extension

Just open a new terminal window, and make sure you’re at the root of your tutorial-app project, then run:

  • Maven

  • Quarkus CLI

./mvnw quarkus:add-extension -Dextension=quarkus-smallrye-reactive-messaging
quarkus extension add quarkus-smallrye-reactive-messaging

Modify Beer POJO

Modify the Beer Java class in src/main/java in the com.redhat.developers package to have the following contents:

package com.redhat.developers;

import jakarta.json.bind.annotation.JsonbCreator;
import java.math.BigDecimal;

public class Beer {

    private String name;

    private String tagline;

    private double abv;

    private Beer(String name, String tagline, double abv) {
        this.name = name;
        this.tagline = tagline;
        this.abv = abv;
    }

    @JsonbCreator
    public static Beer of(String name, String tagline, double abv) {
        return new Beer(name, tagline, abv);
    }

    public PricedBeer withPrice(BigDecimal price) {
        return PricedBeer.of(this.name, price);
    }


    public String getName() {
        return name;
    }

    public String getTagline() {
        return tagline;
    }

    public double getAbv() {
        return abv;
    }

}

Create PricedBeer POJO

Create a new PricedBeer Java class in src/main/java in the com.redhat.developers package with the following contents:

package com.redhat.developers;

import java.math.BigDecimal;

import jakarta.json.bind.annotation.JsonbCreator;

public class PricedBeer {

    private String name;

    private BigDecimal price;

    private PricedBeer(String name, BigDecimal price) {
        this.name = name;
        this.price = price;
    }

    @JsonbCreator
    public static PricedBeer of(String name, double price) {
        return new PricedBeer(name, new BigDecimal(price).setScale(2));
    }

    public static PricedBeer of(String name, BigDecimal price) {
        return new PricedBeer(name, price);
    }

    public String getName() {
        return name;
    }

    public BigDecimal getPrice() {
        return price;
    }

}

Create BeerGenerator

Create a new BeerGenerator Java class in src/main/java in the com.redhat.developers package with the following contents:

package com.redhat.developers;


import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.json.bind.JsonbBuilder;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import org.eclipse.microprofile.rest.client.inject.RestClient;


@ApplicationScoped
public class BeerGenerator {

    @RestClient
    BeerService service;

    @Outgoing("beers")
    Multi<String> beers() {
        List<Beer>  beers = service.getBeers(10);
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) (1)
                .onOverflow().drop() (2)
                .map(tick -> beers.get(ThreadLocalRandom.current().nextInt(0, beers.size()))) (3)
                .map(JsonbBuilder.create()::toJson); (4)
    }

    @Incoming("beers")
    @Outgoing("groups")
    public Multi<List<String>> skipGroup(Multi<String> stream) {
        return stream.skip().first(Duration.ofMillis(10)).group().intoLists().of(5); (5)
    }


    @Incoming("groups")
    @Outgoing("messages")
    public String processGroup(List<String> list) {
        return String.join(",", list.toString()); (6)
    }


    @Incoming("messages")
    public String print(String msg) {
        System.out.println(msg);
        return  msg; (7)
    }
}
1 We’re creating a Multi that generates a new message every 1 second.
2 We apply backpressure by dropping the messages if the topic is not ready.
3 For each message we choose a random Beer from our list.
4 We map the Beer to JSON format.
5 The method `skipGroup' skips the messages sent during the first 10 milliseconds and groups next into lists of 5.
6 The processGroup method takes each group and does further processing.
7 The print method is just to have a manner to observe the output.

Block Processing

Invoking a blocking operation/service can take longer time. In such cases you can protect your implementation by using @Blocking annotation.

Let’s modify the BeerGenerator Java class in src/main/java in the com.redhat.developers package by adding the @Blocking annotation to the processGroup method:

package com.redhat.developers;


import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.json.bind.JsonbBuilder;

import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import org.eclipse.microprofile.rest.client.inject.RestClient;


@ApplicationScoped
public class BeerGenerator {

    @RestClient
    BeerService service;

    @Outgoing("beers")
    Multi<String> beers() {
        List<Beer>  beers = service.getBeers(10);
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onOverflow().drop()
                .map(tick -> beers.get(ThreadLocalRandom.current().nextInt(0, beers.size())))
                .map(JsonbBuilder.create()::toJson);
    }

    @Incoming("beers")
    @Outgoing("groups")
    public Multi<List<String>> group(Multi<String> stream) {
        return stream.skip().first(Duration.ofMillis(10)).group().intoLists().of(5);
    }


    @Incoming("groups")
    @Outgoing("messages")
    @Blocking
    public String processGroup(List<String> list) {
        try {
            Thread.sleep(1000);
            System.out.println(LocalDateTime.now().toLocalTime());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return String.join(",", list.toString());
    }


    @Incoming("messages")
    public String print(String msg) {
        System.out.println(msg);
        return  msg;
    }
}

The processGroup method is invoked on a worker thread, but please be careful when using @Blocking as it impacts the concurrency aspect of your implementation.

Retry Processing

Please run the following in a new terminal window, being at the root of your tutorial-app project:

  • Maven

  • Quarkus CLI

./mvnw quarkus:add-extension -Dextension=quarkus-smallrye-fault-tolerance
quarkus extension add quarkus-smallrye-fault-tolerance

Even in the case of reactive applications you can experience faulty situations caused by unforeseen scenarios. Let’s assume that something not expected occurs during processGroup logic execution.

package com.redhat.developers;


import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.json.bind.JsonbBuilder;

import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import org.eclipse.microprofile.rest.client.inject.RestClient;


@ApplicationScoped
public class BeerGenerator {
    private final Random random = new Random();

    @RestClient
    BeerService service;

    @Outgoing("beers")
    Multi<String> beers() {
        List<Beer>  beers = service.getBeers(10);
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onOverflow().drop()
                .map(tick -> beers.get(ThreadLocalRandom.current().nextInt(0, beers.size())))
                .map(JsonbBuilder.create()::toJson);
    }

    @Incoming("beers")
    @Outgoing("groups")
    @Retry(maxRetries = 10, delay = 1, delayUnit = ChronoUnit.SECONDS) (1)
    public Multi<List<String>> group(Multi<String> stream) {
        int i = random.nextInt(10);
        System.out.println("Show retry  for random number "+i);
        if (i > 1) {
            throw new RuntimeException("not working"); (2)
        }
        return stream.skip().first(Duration.ofMillis(10)).group().intoLists().of(5);
    }


    @Incoming("groups")
    @Outgoing("messages")
    @Blocking
    public String processGroup(List<String> list) {
        try {
            Thread.sleep(1000);
            System.out.println(LocalDateTime.now().toLocalTime());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return String.join(",", list.toString());
    }


    @Incoming("messages")
    public String print(String msg) {
        System.out.println(msg);
        return  msg;
    }
}
1 In case of a RuntimeException, the @Retry annotation will attempt execution of the same logic maxium 10 times, at an interval of 1 second.
2 Throw RuntimeException if the random number is greater than 1.

Now restart Quarkus in Dev Mode and you should observe something similar to:

[io.quarkus] (Quarkus Main Thread) tutorial-app stopped in 0.024s
Show retry  for random number 6
Show retry  for random number 3
Show retry  for random number 0
__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/