Applying Content Based Routing EIP

At the end of this chapter you will be able to:

  • How to integrate Apache Kafka and Camel-K

  • Apply the Content Based Routing (CBR) Enterprise Integration Pattern(EIP)

Apache Camel supports numerous Enterprise Integration Patterns (EIPs) out-of-the-box, you can find the complete list of patterns on the Apache Camel website.

Content Based Router

The Content Based Router examines the message content and routes messages to a different channel based on the data contained in the message. The routing can be based on a number of criteria such as existence of fields, specific field values etc. When implementing a Content Based Router, special caution should be taken to make the routing function easy to maintain as the router can become a point of frequent maintenance. In more sophisticated integration scenarios, the Content Based Router can take on the form of a configurable rules engine that computes the destination channel based on a set of configurable rules. [1]

Application Overview

We will deploy a simple data streaming application that will use Camel-K and Knative to process the incoming data where that processed data is pushed out to to a reactive web application via Server-Sent Events (SSE) as shown below:

cbr app overview
Figure 1. Application Overview

The application has following components,

  • Data Producer: A Camel-K integration application which will produce data simulating the streaming data by sending the data to Apache Kafka

  • Data Processor: A Camel-K integration application which will process the streaming data from Kafka and send the default Knative Eventing Broker

  • Event Subscriber(Fruits UI): A Quarkus Java application, that will display the processed data from the Data Processor

  • Event Trigger: A Knative Event Trigger that applies a filter on the processed data to send to the Event Subscriber

The upcoming samples will deploy these individual components and after that we will test the integration by wiring them all together.

Just make sure:

Create default Broker

If you have deleted the default broker, then you need to create the default broker to be used by this chapter’s exercises.

Navigate to the eventing chapter folder eventing:

cd $TUTORIAL_HOME/eventing
---
apiVersion: eventing.knative.dev/v1
kind: broker
metadata:
 name: default
  • kn

  • kubectl

kn broker create default

Verify the created resources:

  • kn

  • kubectl

kn broker list
NAME      URL                                                                                AGE    CONDITIONS   READY   REASON
default   http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default   112s   5 OK / 5     True
kubectl get broker.eventing.knative.dev -n knativetutorial
NAME      URL                                                                                AGE     READY   REASON
default   http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default   2m57s   True

Finally navigate to the tutorial chapter’s folder advanced/camel-k:

cd $TUTORIAL_HOME/advanced/camel-k

Deploy Data Producer

This is a Knative Camel-K integration called fruits-producer which will use a public fruits API to retrieve the information about fruits and stream the data to Apache Kafka. The fruits-producer service retrieves the data from the fruits API, splits it using the Split EIP and then sends the data to a Kafka topic called fruits.

Fruits producer
- from:
    uri: timer:tick
    parameters:
      period: 5000(1)
    steps:
      - set-header:
          name: CamelHttpMethod
          constant: GET
      - to: "https://fruityvice.com/api/fruit/all"(2)
      - split:
          jsonpath: "$.[*]"(3)
      - marshal:
          json: {}
      - log:
          message: "${body}"
      - to: "kafka:fruits?brokers=my-cluster-kafka-bootstrap.kafka:9092"(4)
1 Poll every 5 seconds from the REST API http://fruityvice.com.
2 Call the external REST API http://fruityvice.com to get the list of fruits to simulate the data streaming
3 Split the message
4 Send the processed data i.e. the individual fruit record as JSON to Apache Kafka Topic

Run the following command to deploy the fruit-producer integration:

kamel -n knativetutorial run \
 --wait \
 --dependency camel:log \
 --dependency camel:jackson \
 --dependency camel:jsonpath \
 eip/fruits-producer.yaml

The service deployment may take several minutes to become available, to monitor the status:

  • watch kubectl get pods or watch oc get pods

  • watch kamel get

  • watch kubectl get ksvc or watch oc get ksvc

watch kubectl -n knativetutorial get pods

A successful deploy will show the following pods:

fruits-producer service pods
NAME                                                READY   STATUS    AGE
camel-k-operator-5d74595cdf-4v9qz                   1/1     Running   4h4m
fruits-producer-nfngm-deployment-759c797c44-d6r52   2/2     Running   70s
kn -n knativetutorial service ls
fruit-producer Knative services
NAME              URL                                                           LATESTCREATED           LATESTREADY             READY   REASON
fruits-producer   http://fruits-producer.knativetutorial.192.168.64.13.nip.io   fruits-producer-gl575   fruits-producer-gl575   True

Verify Fruit Producer

Open a new terminal and run the start Kafka consumer using the script $TUTORIAL_HOME/bin/kafka-consumer.sh with parameter fruits:

$TUTORIAL_HOME/bin/kafka-consumer.sh fruits

If the fruit-producer executed well then you should the Kafka Consumer terminal show something like:

{"genus":"Citrullus","name":"Watermelon","id":25,"family":"Cucurbitaceae","order":"Cucurbitales","nutritions":{"carbohydrates":8,"protein":0.6,"fat":0.2,"calories":30,"sugar":6}}

Since the fruits API returns a static set of fruit data consistently, you can call it as needed to simulate data streaming and it will always be the same data.

Deploy Data Processor

Let us now deploy a Kamelet called fruits-processor, that can handle and process the streaming data from the Kafka topic fruits. The fruits-processor Kamelet applies the Content Based Router EIP to process the data:

Kamelet fruits-processor
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
  name: fruits-processor
  annotations:
    camel.apache.org/kamelet.support.level: "Preview"
    camel.apache.org/catalog.version: "main-SNAPSHOT"
    camel.apache.org/provider: "Apache Software Foundation"
    camel.apache.org/kamelet.group: "Kafka"
  labels:
    camel.apache.org/kamelet.type: "source"
spec:
  definition:
    title: "Kafka Not Secured Source"
    description: |-
      Receive data from Kafka topics on an insecure broker.
    required:
      - topic
      - brokers
    type: object
    properties:
      topic:
        title: Topic Names
        description: Comma separated list of Kafka topic names
        type: string
      brokers:
        title: Brokers
        description: Comma separated list of Kafka Broker URLs
        type: string
  dependencies:
    - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:0.10.0"
    - "camel:kafka"
    - "camel:kamelet"
    - "camel:jackson"
    - "camel:core"
    - "camel:log"
  template:
    from:
      uri: "kafka:fruits?brokers=my-cluster-kafka-bootstrap.kafka:9092"(1)
      steps:
        - log:
            message: "Received Body ${body}"
        - unmarshal:
            json: {}(2)
        - choice: (3)
            when:
              - simple: "${body[nutritions][sugar]} <= 5"
                steps:
                  - remove-headers: "*"
                  - marshal:
                      json: {}
                  - set-header: (4)
                      name: ce-type
                      constant: low-sugar
                  - set-header:
                      name: fruit-sugar-level
                      constant: low
                  - to: "log:low?showAll=true&multiline=true"
              - simple: "${body[nutritions][sugar]} > 5 || ${body[nutritions][sugar]} <= 10"
                steps:
                  - remove-headers: "*"
                  - marshal:
                      json: {}
                  - set-header:
                      name: ce-type
                      constant: medium-sugar
                  - set-header:
                      name: fruit-sugar-level
                      constant: medium
                  - to: "log:medium?showAll=true&multiline=true"
            otherwise:
              steps:
                - remove-headers: "*"
                - marshal:
                    json: {}
                - set-header:
                    name: ce-type
                    constant: high-sugar
                - set-header:
                    name: fruit-sugar-level
                    constant: high
                - to: "log:high?showAll=true&multiline=true"
1 The Camel route connects to Apache Kafka broker and the topic fruits.
2 Once the data is received it is transformed into a JSON payload.
3 The Content Based Router pattern using the Choice EIP. In the data processing you classify the fruits as low (sugar <= 5), medium(sugar between 5 to 10) and high(sugar > 10) based on the sugar level present in their nutritions data.
4 Based on the data classification you will be setting the CloudEvents type header to be low-high, medium-sugar and high-sugar. This header is used as one of the filter attributes in the Knative Eventing Trigger.

Let’s deploy the Kamelet using the following command:

kubectl apply -n knativetutorial -f eip/fruit-processor-kamelet.yaml

The last step is to send the processed data to the Knative Eventing Broker named default. We can do that by using a KameletBinding:

KameletBinding fruits-processor
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
  name: fruits-processor-to-knative
spec:
  source:
    ref: (1)
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: fruits-processor
    properties:
      topic: "fruits"
      brokers: "my-cluster-kafka-bootstrap.kafka:9092"
  sink: (2)
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
1 Kubernetes reference to the previously created Kamelet.
2 Sink the input to the default broker.

Let’s deploy the KameletBinding using:

kubectl apply -n knativetutorial -f eip/fruits-processor-to-knative.yaml

As the Camel-K controller takes few minutes to deploy the KameletBinding, you can watch the pods of the knativetutorial namespace for its status:

watch kubectl -n knativetutorial get pods
fruit-processor Knative service pods
NAME                                               READY   STATUS    AGE
camel-k-operator-5d74595cdf-4v9qz                  1/1     Running   4h17m
fruit-processor-to-knative-595945f8d7-wc9qz      1/1     Running   4h42m

Wondering why fruit-producer is not listed ?

fruit-producer is a Knative service, hence it wil be scaled down to zero in 60-90 seconds.

You can check if your KameletBinding is Ready by running:

kn source kamelet binding list -n knativetutorial

When the KamelBinding is successful you will see it in READY state as shown:

NAME                           READY   REASON   AGE
fruits-processor-to-knative    True             2m22s

Deploy Event Subscriber

Now deploy a Reactive Web application called fruit-events-display. It is a Quarkus Java application, that will update UI(reactively) as and when it receives the processed data from the Knative Eventing backend.

You can deploy the fruit-events-display application using the command:

kubectl apply -n knativetutorial \
  -f $TUTORIAL_HOME/install/utils/fruit-events-display.yaml

Verify if the fruit-events-display application is up and running:

watch kubectl -n knativetutorial get pods

Once the fruit-events-display is running you will see the following pods in the knativetutorial:

Pods list
NAME                                       READY   STATUS    AGE
camel-k-operator-5d74595cdf-4v9qz          1/1     Running   4h21m
fruit-events-display-8d47bc98f-6r7zt       1/1     Running   15s
fruits-processor-h45f7-6fdfd74cf9-nmfkn    1/1     Running   4m12s

The web fruit-events-display application will refresh its UI as and when it receives the processed data, you need you open the web application in your browser.

  • Minikube

  • OpenShift

minikube -p knativetutorial -n knativetutorial service fruit-events-display
oc expose -n knativetutorial service fruit-events-display

Once you have exposed the service, you can open the OpenShift route in the web browser:

oc get -n knativetutorial route fruit-events-display

The fruit-events-display UI will be empty as shown below:

cbr app ui empty
Figure 2. Fruit events Display Web Application

Apply Knative Filter

As a last step we need to deploy a Knative Event Trigger called fruits-trigger. The trigger consumes the events from the Knative Event Broker named default, when the fruit event is received it will dispatch the events to the subscriber which is the fruit-events-display service.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: sugary-fruits
spec:
  broker: default (1)
  filter: (2)
    attributes:
      type: low-sugar
  subscriber: (3)
    ref:
      apiVersion: v1
      kind: Service
      name: fruit-events-display
1 The Knative Event Broker that this Trigger listens to for Knative events. Events originate from the CamelSource called fruits-processor and are sent to the Knative Eventing Broker named default.
2 The filter attribute restricts the events that fruit-events-display will receive. In this example, it is configured to filter the events for the type low-sugar. You could also use the other classifications of fruits such as medium-sugar or high-sugar.
3 Set the subscriber as the fruit-events-display Kubernetes service to receive the filtered event data.

You can deploy the Knative Event Trigger using the following command:

kubectl apply -n knativetutorial -f eip/sugary-fruits.yaml

Let us check the status of the Trigger using the command kubectl -n knativetutorial get triggers which should return one trigger called sugary-fruits with ready state as shown below.

kubectl -n knativetutorial get triggers

As the trigger will dispatch its filtered event to fruit-events-display , the subscriber URI of the Trigger will the fruit-events-display service.

NAME           READY BROKER    SUBSCRIBER_URI
sugary-fruits  True  default   http://fruits-events-display.knativetutorial.svc.cluster.local/

Verify end to end

Now that we have all the components for the Application Overview, let’s verify the end to end flow:

To verify the data flow and processing call the fruits-producer service using the script $TUTORIAL_HOME/bin/call.sh with parameters fruits-producer and ''.

$TUTORIAL_HOME/bin/call.sh fruits-producer ''

Assuming everything worked well, you should see the low-sugar fruits listed in the fruits-event-display as shown below:

cbr app ui with data
Figure 3. Fruit events

Cleanup

kamel delete  -n knativetutorial fruits-producer
kubectl delete  -n knativetutorial -f eip/fruits-processor-kamelet.yaml
kubectl delete  -n knativetutorial -f eip/fruits-processor-to-knative.yaml
kubectl delete  -n knativetutorial -f eip/sugary-fruits.yaml
kubectl delete -n knativetutorial -f $TUTORIAL_HOME/install/utils/fruit-events-display.yaml
kn broker delete  -n knativetutorial default
kamel -n knativetutorial reset
kamel -n knativetutorial uninstall