Apache Kafka with Reactive Streams

Mutiny is just part of the Reactive story. To complement it, we need Reactive Streams too. And an important service that can serve as the underlying implementation for our stream is Apache Kafka.

In this chapter we’re going to use Mutiny to create price request for beers to a remote service called price-generator using Kafka as the broker for our messages in a Kafka topic called beer. The price-generator will get the beer from this topic, add a tag price to it, and send the information back in a Kafka topic called priced-beer. Finally, we’ll read the priced beers from the topic and send it through our REST endpoint using SSE (Server-Side Events).

Add the Reactive Messaging Kafka extension

Just open a new terminal window, and make sure you’re at the root of your tutorial-app project, then run:

  • Maven

  • Quarkus CLI

mvn quarkus:add-extension -Dextensions=quarkus-smallrye-reactive-messaging-kafka
quarkus extension add quarkus-smallrye-reactive-messaging-kafka

Modify Beer POJO

Make sure you have the Beer Java class in src/main/java in the com.redhat.developers package with the following contents:

package com.redhat.developers;

import javax.json.bind.annotation.JsonbCreator;
import java.math.BigDecimal;

public class Beer {

    private String name;

    private String tagline;

    private double abv;

    private Beer(String name, String tagline, double abv) {
        this.name = name;
        this.tagline = tagline;
        this.abv = abv;
    }

    @JsonbCreator
    public static Beer of(String name, String tagline, double abv) {
        return new Beer(name, tagline, abv);
    }

    public PricedBeer withPrice(BigDecimal price) {
        return PricedBeer.of(this.name, price);
    }


    public String getName() {
        return name;
    }

    public String getTagline() {
        return tagline;
    }

    public double getAbv() {
        return abv;
    }

}

Modify PricedBeer POJO

Make sure you have the PricedBeer Java class in src/main/java in the com.redhat.developers package with the following contents:

package com.redhat.developers;

import java.math.BigDecimal;

import javax.json.bind.annotation.JsonbCreator;

public class PricedBeer {

    private String name;

    private BigDecimal price;

    private PricedBeer(String name, BigDecimal price) {
        this.name = name;
        this.price = price;
    }

    @JsonbCreator
    public static PricedBeer of(String name, double price) {
        return new PricedBeer(name, new BigDecimal(price).setScale(2));
    }

    public static PricedBeer of(String name, BigDecimal price) {
        return new PricedBeer(name, price);
    }

    public String getName() {
        return name;
    }

    public BigDecimal getPrice() {
        return price;
    }

}

Modify the BeerGenerator

Modify the BeerGenerator Java class in src/main/java in the com.redhat.developers package with the following contents:

package com.redhat.developers;


import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import javax.enterprise.context.ApplicationScoped;
import javax.json.bind.JsonbBuilder;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import org.eclipse.microprofile.rest.client.inject.RestClient;


@ApplicationScoped
public class BeerGenerator {

    @RestClient
    BeerService service;

    @Outgoing("beers")
    Multi<String> beers() {
        List<Beer>  beers = service.getBeers(10);
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) (1)
                .onOverflow().drop() (2)
                .map(tick -> beers.get(ThreadLocalRandom.current().nextInt(0, beers.size()))) (3)
                .map(JsonbBuilder.create()::toJson); (4)
    }
}
1 We’re creating a Multi that generates a new message every 1 second.
2 We apply backpressure by dropping the messages if the topic is not ready.
3 For each message we choose a random Beer from our list.
4 We map the Beer to JSON format.

Update BeerResource

Let’s modify the BeerResource Java class in src/main/java in the com.redhat.developers package with the following contents:

package com.redhat.developers;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import javax.json.bind.JsonbBuilder;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.smallrye.mutiny.Multi;
import org.jboss.resteasy.annotations.SseElementType;

@Path("/beer")
public class BeerResource {

    @RestClient
    BeerService beerService;

    @Channel("priced-beer") (1)
    Multi<String> pricedBeers;


    @GET
    @Path("all")
    @Produces(MediaType.APPLICATION_JSON)
    public Multi<Beer> beers() {
        return Multi.createBy().repeating()
                .supplier(
                        () -> new AtomicInteger(1),
                        i -> beerService.getBeers(i.getAndIncrement())
                )
                .until(List::isEmpty)
                .onItem().<Beer>disjoint()
                .select().where(b -> b.getAbv() > 15.0);
    }


    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    public Multi<PricedBeer> pricedBeers() {
        return pricedBeers.map(s -> JsonbBuilder.create().fromJson(s, PricedBeer.class)) (2)
                .ifNoItem().after(Duration.ofSeconds(1)).fail(); (3)
    }

}
1 We inject the Multi directly by using the @Channel annotation.
2 We just map the PricedBeer to JSON format.
3 If no item is retrieved after 1 second, we will assume that the call failed.

Add the Reactive Messaging Kafka properties

Add the following properties to your application.properties in src/main/resources:

kafka.bootstrap.servers=localhost:9092

Create docker-compose configuration

The external dependencies required to run this chapter are:

  • Kafka

  • Zookeeper (required by Kafka)

  • The price-generator service

We’re going to use docker-compose to bootstrap these external services.

Create a new file called docker-compose.yml in the root of your tutorial-app folder:

version: '3'
services:
  zookeeper:
    image: quay.io/strimzi/kafka:0.23.0-kafka-2.8.0
    command: [
        "sh", "-c",
        "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs
  kafka:
    image: quay.io/strimzi/kafka:0.23.0-kafka-2.8.0
    command: [
        "sh", "-c",
        "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  price-generator:
    image: quay.io/rhdevelopers/quarkus-tutorial-price-generator:2.0
    network_mode: host
    depends_on:
      - kafka

Run docker-compose

Make sure you are in the same folder that you’ve created the docker-compose.yml file (in our case, the root of our tutorial-app folder).

docker-compose up
kafka_1            | [2020-05-13 01:54:53,281] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1            | [2020-05-13 01:54:53,281] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1            | [2020-05-13 01:54:53,284] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
kafka_1            | [2020-05-13 01:54:53,367] INFO Loading logs. (kafka.log.LogManager)
kafka_1            | [2020-05-13 01:54:53,504] INFO [Log partition=__consumer_offsets-38, dir=/tmp/kafka-logs] Loading producer state till offset 15 with message format version 2 (kafka.log.Log)
kafka_1            | [2020-05-13 01:54:53,531] INFO [ProducerStateManager partition=__consumer_offsets-38] Loading producer state from snapshot file '/tmp/kafka-logs/__consumer_offsets-38/00000000000000000015.snapshot' (kafka.log.ProducerStateManager)
kafka_1            | [2020-05-13 01:54:53,550] INFO [Log partition=__consumer_offsets-38, dir=/tmp/kafka-logs] Completed load of log with 1 segments, log start offset 0 and log end offset 15 in 125 ms (kafka.log.Log)

Invoke the /beer endpoint

Run the following command:

curl -N localhost:8080/beer
data: {"name":"AB:21","price":1525.00}

data: {"name":"Neon Overlord","price":1640.00}

data: {"name":"Brewdog Vs Beavertown","price":1015.00}

data: {"name":"Ace Of Chinook","price":1871.00}

data: {"name":"Hop Shot","price":1180.00}

data: {"name":"Neon Overlord","price":213.00}

data: {"name":"Hop Shot","price":174.00}

data: {"name":"Hello My Name Is Ingrid 2016","price":1230.00}

data: {"name":"Twin Atlantic","price":1143.00}

data: {"name":"AB:21","price":1372.00}

data: {"name":"Self Assembly Pope","price":1667.00}

data: {"name":"Kingpin","price":1954.00}

data: {"name":"Crew Brew","price":460.00}

data: {"name":"Ace Of Chinook","price":307.00}

data: {"name":"Mango And Chili Barley Wine","price":318.00}

data: {"name":"Self Assembly Pope","price":339.00}

data: {"name":"Ace Of Chinook","price":1578.00}

data: {"name":"Chili Hammer","price":470.00}

data: {"name":"Science IPA","price":968.00}

data: {"name":"Casino Rye Ale","price":1349.00}

data: {"name":"Prototype Helles","price":1721.00}

data: {"name":"Kingpin","price":1992.00}

data: {"name":"Hello My Name Is Ingrid 2016","price":1933.00}

data: {"name":"Casino Rye Ale","price":234.00}

data: {"name":"Ace Of Chinook","price":1343.00}

data: {"name":"Small Batch: Rye IPA","price":167.00}

data: {"name":"Science IPA","price":234.00}

data: {"name":"Brewdog Vs Beavertown","price":1123.00}

data: {"name":"Prototype Helles","price":945.00}

data: {"name":"Kingpin","price":851.00}

data: {"name":"Ace Of Equinox","price":1422.00}

data: {"name":"Crew Brew","price":883.00}

data: {"name":"Prototype Helles","price":502.00}

data: {"name":"Brewdog Vs Beavertown","price":710.00}

data: {"name":"Casino Rye Ale","price":1848.00}

data: {"name":"Gin Blitz","price":278.00}

data: {"name":"Brewdog Vs Beavertown","price":719.00}

data: {"name":"Hello My Name Is Ingrid 2016","price":113.00}

Dev Services for Kafka

Because starting a Kafka broker can be long and you need to develop fast in your local environment, Dev Services for Kafka is here to help you!

Since quarkus-smallrye-reactive-messaging-kafka extension is present, Dev Services for Kafka automatically starts a Kafka broker in dev mode and when running tests.

You can disable Dev Services for Kafka by adding quarkus.kafka.devservices.enabled=false or configuring kafka.bootstrap.servers in application.properties.