Apache Kafka with Reactive Streams
Mutiny is just part of the Reactive story. To complement it, we need Reactive Streams too. And an important service that can serve as the underlying implementation for our stream is Apache Kafka.
In this chapter we’re going to use Mutiny to create price request for beers to a remote service called price-generator
using Kafka as the broker for our messages in a Kafka topic called beer
. The price-generator
will get the beer from this topic, add a tag price to it, and send the information back in a Kafka topic called priced-beer
.
Finally, we’ll read the priced beers from the topic and send it through our REST endpoint using SSE (Server-Side Events).
Modify Beer POJO
Make sure you have the Beer
Java class in src/main/java
in the com.redhat.developers
package with the following contents:
package com.redhat.developers;
import javax.json.bind.annotation.JsonbCreator;
import java.math.BigDecimal;
public class Beer {
private String name;
private String tagline;
private double abv;
private Beer(String name, String tagline, double abv) {
this.name = name;
this.tagline = tagline;
this.abv = abv;
}
@JsonbCreator
public static Beer of(String name, String tagline, double abv) {
return new Beer(name, tagline, abv);
}
public PricedBeer withPrice(BigDecimal price) {
return PricedBeer.of(this.name, price);
}
public String getName() {
return name;
}
public String getTagline() {
return tagline;
}
public double getAbv() {
return abv;
}
}
Modify PricedBeer POJO
Make sure you have the PricedBeer
Java class in src/main/java
in the com.redhat.developers
package with the following contents:
package com.redhat.developers;
import java.math.BigDecimal;
import javax.json.bind.annotation.JsonbCreator;
public class PricedBeer {
private String name;
private BigDecimal price;
private PricedBeer(String name, BigDecimal price) {
this.name = name;
this.price = price;
}
@JsonbCreator
public static PricedBeer of(String name, double price) {
return new PricedBeer(name, new BigDecimal(price).setScale(2));
}
public static PricedBeer of(String name, BigDecimal price) {
return new PricedBeer(name, price);
}
public String getName() {
return name;
}
public BigDecimal getPrice() {
return price;
}
}
Modify the BeerGenerator
Modify the BeerGenerator
Java class in src/main/java
in the com.redhat.developers
package with the following contents:
package com.redhat.developers;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.enterprise.context.ApplicationScoped;
import javax.json.bind.JsonbBuilder;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;
@ApplicationScoped
public class BeerGenerator {
@RestClient
BeerService service;
@Outgoing("beers")
Multi<String> beers() {
List<Beer> beers = service.getBeers(10);
return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) (1)
.onOverflow().drop() (2)
.map(tick -> beers.get(ThreadLocalRandom.current().nextInt(0, beers.size()))) (3)
.map(JsonbBuilder.create()::toJson); (4)
}
}
1 | We’re creating a Multi that generates a new message every 1 second. |
2 | We apply backpressure by dropping the messages if the topic is not ready. |
3 | For each message we choose a random Beer from our list. |
4 | We map the Beer to JSON format. |
Update BeerResource
Let’s modify the BeerResource
Java class in src/main/java
in the com.redhat.developers
package with the following contents:
package com.redhat.developers;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.json.bind.JsonbBuilder;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import io.smallrye.mutiny.Multi;
import org.jboss.resteasy.annotations.SseElementType;
@Path("/beer")
public class BeerResource {
@RestClient
BeerService beerService;
@Channel("priced-beer") (1)
Multi<String> pricedBeers;
@GET
@Path("all")
@Produces(MediaType.APPLICATION_JSON)
public Multi<Beer> beers() {
return Multi.createBy().repeating()
.supplier(
() -> new AtomicInteger(1),
i -> beerService.getBeers(i.getAndIncrement())
)
.until(List::isEmpty)
.onItem().<Beer>disjoint()
.select().where(b -> b.getAbv() > 15.0);
}
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<PricedBeer> pricedBeers() {
return pricedBeers.map(s -> JsonbBuilder.create().fromJson(s, PricedBeer.class)) (2)
.ifNoItem().after(Duration.ofSeconds(1)).fail(); (3)
}
}
1 | We inject the Multi directly by using the @Channel annotation. |
2 | We just map the PricedBeer to JSON format. |
3 | If no item is retrieved after 1 second, we will assume that the call failed. |
Add the Reactive Messaging Kafka properties
Add the following properties to your application.properties
in src/main/resources
:
kafka.bootstrap.servers=localhost:9092
Create docker-compose configuration
The external dependencies required to run this chapter are:
-
Kafka
-
Zookeeper (required by Kafka)
-
The
price-generator
service
We’re going to use docker-compose
to bootstrap these external services.
Create a new file called docker-compose.yml
in the root of your tutorial-app
folder:
version: '3'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.23.0-kafka-2.8.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: quay.io/strimzi/kafka:0.23.0-kafka-2.8.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
price-generator:
image: quay.io/rhdevelopers/quarkus-tutorial-price-generator:2.0
network_mode: host
depends_on:
- kafka
Run docker-compose
Make sure you are in the same folder that you’ve created the docker-compose.yml
file (in our case, the root of our tutorial-app
folder).
docker-compose up
kafka_1 | [2020-05-13 01:54:53,281] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1 | [2020-05-13 01:54:53,281] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1 | [2020-05-13 01:54:53,284] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1 | [2020-05-13 01:54:53,367] INFO Loading logs. (kafka.log.LogManager)
kafka_1 | [2020-05-13 01:54:53,504] INFO [Log partition=__consumer_offsets-38, dir=/tmp/kafka-logs] Loading producer state till offset 15 with message format version 2 (kafka.log.Log)
kafka_1 | [2020-05-13 01:54:53,531] INFO [ProducerStateManager partition=__consumer_offsets-38] Loading producer state from snapshot file '/tmp/kafka-logs/__consumer_offsets-38/00000000000000000015.snapshot' (kafka.log.ProducerStateManager)
kafka_1 | [2020-05-13 01:54:53,550] INFO [Log partition=__consumer_offsets-38, dir=/tmp/kafka-logs] Completed load of log with 1 segments, log start offset 0 and log end offset 15 in 125 ms (kafka.log.Log)
Invoke the /beer endpoint
Run the following command:
curl -N localhost:8080/beer
data: {"name":"AB:21","price":1525.00}
data: {"name":"Neon Overlord","price":1640.00}
data: {"name":"Brewdog Vs Beavertown","price":1015.00}
data: {"name":"Ace Of Chinook","price":1871.00}
data: {"name":"Hop Shot","price":1180.00}
data: {"name":"Neon Overlord","price":213.00}
data: {"name":"Hop Shot","price":174.00}
data: {"name":"Hello My Name Is Ingrid 2016","price":1230.00}
data: {"name":"Twin Atlantic","price":1143.00}
data: {"name":"AB:21","price":1372.00}
data: {"name":"Self Assembly Pope","price":1667.00}
data: {"name":"Kingpin","price":1954.00}
data: {"name":"Crew Brew","price":460.00}
data: {"name":"Ace Of Chinook","price":307.00}
data: {"name":"Mango And Chili Barley Wine","price":318.00}
data: {"name":"Self Assembly Pope","price":339.00}
data: {"name":"Ace Of Chinook","price":1578.00}
data: {"name":"Chili Hammer","price":470.00}
data: {"name":"Science IPA","price":968.00}
data: {"name":"Casino Rye Ale","price":1349.00}
data: {"name":"Prototype Helles","price":1721.00}
data: {"name":"Kingpin","price":1992.00}
data: {"name":"Hello My Name Is Ingrid 2016","price":1933.00}
data: {"name":"Casino Rye Ale","price":234.00}
data: {"name":"Ace Of Chinook","price":1343.00}
data: {"name":"Small Batch: Rye IPA","price":167.00}
data: {"name":"Science IPA","price":234.00}
data: {"name":"Brewdog Vs Beavertown","price":1123.00}
data: {"name":"Prototype Helles","price":945.00}
data: {"name":"Kingpin","price":851.00}
data: {"name":"Ace Of Equinox","price":1422.00}
data: {"name":"Crew Brew","price":883.00}
data: {"name":"Prototype Helles","price":502.00}
data: {"name":"Brewdog Vs Beavertown","price":710.00}
data: {"name":"Casino Rye Ale","price":1848.00}
data: {"name":"Gin Blitz","price":278.00}
data: {"name":"Brewdog Vs Beavertown","price":719.00}
data: {"name":"Hello My Name Is Ingrid 2016","price":113.00}
Dev Services for Kafka
Because starting a Kafka broker can be long and you need to develop fast in your local environment, Dev Services for Kafka is here to help you!
Since quarkus-smallrye-reactive-messaging-kafka
extension is present, Dev Services for Kafka automatically starts a Kafka broker in dev mode and when running tests.
You can disable Dev Services for Kafka by adding quarkus.kafka.devservices.enabled=false or configuring kafka.bootstrap.servers in application.properties .
|