Reactive Messaging
As a Java developer you are aware that the JMS is the standard when comes to working with messages. JMS is a blocking API which blocks us from implementing the reactive principles.
Quarkus has SmallRye Reactive Messaging which is an implementation of the Eclipse MicroProfile Reactive Messaging specification. Quarkus implements version 2.x of this specification but also provides many other extensions.
In this chapter we’re going to use SmallRye Reactive Messaging to generate beers having a price.
Modify Beer POJO
Modify the Beer
Java class in src/main/java
in the com.redhat.developers
package to have the following contents:
package com.redhat.developers;
import jakarta.json.bind.annotation.JsonbCreator;
import java.math.BigDecimal;
public class Beer {
private String name;
private String tagline;
private double abv;
private Beer(String name, String tagline, double abv) {
this.name = name;
this.tagline = tagline;
this.abv = abv;
}
@JsonbCreator
public static Beer of(String name, String tagline, double abv) {
return new Beer(name, tagline, abv);
}
public PricedBeer withPrice(BigDecimal price) {
return PricedBeer.of(this.name, price);
}
public String getName() {
return name;
}
public String getTagline() {
return tagline;
}
public double getAbv() {
return abv;
}
}
Create PricedBeer POJO
Create a new PricedBeer
Java class in src/main/java
in the com.redhat.developers
package with the following contents:
package com.redhat.developers;
import java.math.BigDecimal;
import jakarta.json.bind.annotation.JsonbCreator;
public class PricedBeer {
private String name;
private BigDecimal price;
private PricedBeer(String name, BigDecimal price) {
this.name = name;
this.price = price;
}
@JsonbCreator
public static PricedBeer of(String name, double price) {
return new PricedBeer(name, new BigDecimal(price).setScale(2));
}
public static PricedBeer of(String name, BigDecimal price) {
return new PricedBeer(name, price);
}
public String getName() {
return name;
}
public BigDecimal getPrice() {
return price;
}
}
Create BeerGenerator
Create a new BeerGenerator
Java class in src/main/java
in the com.redhat.developers
package with the following contents:
package com.redhat.developers;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.json.bind.JsonbBuilder;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;
@ApplicationScoped
public class BeerGenerator {
@RestClient
BeerService service;
@Outgoing("beers")
Multi<String> beers() {
List<Beer> beers = service.getBeers(10);
return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) (1)
.onOverflow().drop() (2)
.map(tick -> beers.get(ThreadLocalRandom.current().nextInt(0, beers.size()))) (3)
.map(JsonbBuilder.create()::toJson); (4)
}
@Incoming("beers")
@Outgoing("groups")
public Multi<List<String>> skipGroup(Multi<String> stream) {
return stream.skip().first(Duration.ofMillis(10)).group().intoLists().of(5); (5)
}
@Incoming("groups")
@Outgoing("messages")
public String processGroup(List<String> list) {
return String.join(",", list.toString()); (6)
}
@Incoming("messages")
public String print(String msg) {
System.out.println(msg);
return msg; (7)
}
}
1 | We’re creating a Multi that generates a new message every 1 second. |
2 | We apply backpressure by dropping the messages if the topic is not ready. |
3 | For each message we choose a random Beer from our list. |
4 | We map the Beer to JSON format. |
5 | The method `skipGroup' skips the messages sent during the first 10 milliseconds and groups next into lists of 5. |
6 | The processGroup method takes each group and does further processing. |
7 | The print method is just to have a manner to observe the output. |
Block Processing
Invoking a blocking operation/service can take longer time. In such cases you can protect your implementation
by using @Blocking
annotation.
Let’s modify the BeerGenerator
Java class in src/main/java
in the com.redhat.developers
package
by adding the @Blocking
annotation to the processGroup
method:
package com.redhat.developers;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.json.bind.JsonbBuilder;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;
@ApplicationScoped
public class BeerGenerator {
@RestClient
BeerService service;
@Outgoing("beers")
Multi<String> beers() {
List<Beer> beers = service.getBeers(10);
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop()
.map(tick -> beers.get(ThreadLocalRandom.current().nextInt(0, beers.size())))
.map(JsonbBuilder.create()::toJson);
}
@Incoming("beers")
@Outgoing("groups")
public Multi<List<String>> group(Multi<String> stream) {
return stream.skip().first(Duration.ofMillis(10)).group().intoLists().of(5);
}
@Incoming("groups")
@Outgoing("messages")
@Blocking
public String processGroup(List<String> list) {
try {
Thread.sleep(1000);
System.out.println(LocalDateTime.now().toLocalTime());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return String.join(",", list.toString());
}
@Incoming("messages")
public String print(String msg) {
System.out.println(msg);
return msg;
}
}
The processGroup
method is invoked on a worker thread, but please be careful when using @Blocking
as it impacts the
concurrency aspect of your implementation.
Retry Processing
Please run the following in a new terminal window, being at the root of your tutorial-app
project:
./mvnw quarkus:add-extension -Dextension=quarkus-smallrye-fault-tolerance
quarkus extension add quarkus-smallrye-fault-tolerance
Even in the case of reactive applications you can experience faulty situations caused by unforeseen scenarios.
Let’s assume that something not expected occurs during processGroup
logic execution.
package com.redhat.developers;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.json.bind.JsonbBuilder;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;
@ApplicationScoped
public class BeerGenerator {
private final Random random = new Random();
@RestClient
BeerService service;
@Outgoing("beers")
Multi<String> beers() {
List<Beer> beers = service.getBeers(10);
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop()
.map(tick -> beers.get(ThreadLocalRandom.current().nextInt(0, beers.size())))
.map(JsonbBuilder.create()::toJson);
}
@Incoming("beers")
@Outgoing("groups")
@Retry(maxRetries = 10, delay = 1, delayUnit = ChronoUnit.SECONDS) (1)
public Multi<List<String>> group(Multi<String> stream) {
int i = random.nextInt(10);
System.out.println("Show retry for random number "+i);
if (i > 1) {
throw new RuntimeException("not working"); (2)
}
return stream.skip().first(Duration.ofMillis(10)).group().intoLists().of(5);
}
@Incoming("groups")
@Outgoing("messages")
@Blocking
public String processGroup(List<String> list) {
try {
Thread.sleep(1000);
System.out.println(LocalDateTime.now().toLocalTime());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return String.join(",", list.toString());
}
@Incoming("messages")
public String print(String msg) {
System.out.println(msg);
return msg;
}
}
1 | In case of a RuntimeException , the @Retry annotation will attempt execution of the same logic maxium 10 times,
at an interval of 1 second. |
2 | Throw RuntimeException if the random number is greater than 1. |
Now restart Quarkus in Dev Mode and you should observe something similar to:
[io.quarkus] (Quarkus Main Thread) tutorial-app stopped in 0.024s
Show retry for random number 6
Show retry for random number 3
Show retry for random number 0
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/