Brokers and Triggers

Use the Knative Eventing Broker and Trigger Custom Resources to allow for CloudEvent attribute filtering.

Broker

---
apiVersion: eventing.knative.dev/v1
kind: broker
metadata:
 name: default
  • kn

  • kubectl

kn broker create default

Verify the created resources:

  • kn

  • kubectl

kn broker list
NAME      URL                                                                                AGE    CONDITIONS   READY   REASON
default   http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default   112s   5 OK / 5     True
kubectl get broker.eventing.knative.dev -n knativetutorial
NAME      URL                                                                                AGE     READY   REASON
default   http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default   2m57s   True

Service

Now, that you have the broker configured, you need to create the sinks eventingaloha and eventingbonjour, which will receive the filtered events.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: eventingaloha
spec:
  template:
    metadata:
      name: eventingaloha-v1
      annotations:
        autoscaling.knative.dev/target: "1"
    spec:
      containers:
      - image: quay.io/rhdevelopers/eventinghello:0.0.2
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: eventingbonjour
spec:
  template:
    metadata:
      name: eventingbonjour-v1
      annotations:
        autoscaling.knative.dev/target: "1"
    spec:
      containers:
      - image: quay.io/rhdevelopers/eventinghello:0.0.2

The image being used by both of these services is identical. However, the difference in name aloha vs bonjour will make obvious which one is receiving a particular event.

Deploy Service

Run the following commands to deploy the eventingaloha and eventingbonjour services:

Deploy Aloha

  • kn

  • kubectl

kn service create eventingaloha \
  --concurrency-target=1 \
  --revision-name=eventingaloha-v1 \
  --image=quay.io/rhdevelopers/eventinghello:0.0.2

Deploy Bonjour

  • kn

  • kubectl

kn service create eventingbonjour \
  --concurrency-target=1 \
  --revision-name=eventingbonjour-v1 \
  --image=quay.io/rhdevelopers/eventinghello:0.0.2

Wait approximately 60 seconds for eventingaloha and eventingbonjour to terminate, scale-down to zero before proceeding.

Trigger

Now create the the trigger for eventingaloha that will associate the filtered events to a service:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: helloaloha
spec:
  broker: default
  filter:
    attributes:
      type: aloha (1)
  subscriber:
    ref:
     apiVersion: serving.knative.dev/v1
     kind: Service
     name: eventingaloha
1 The type is the CloudEvent type that is mapped to the ce-type HTTP header. A Trigger can filter by CloudEvent attributes such as type, source or extension.

Now create the the trigger for eventingaloha that will associate the filtered events to a service:

Create Aloha Trigger

  • kn

  • kubectl

kn trigger create helloaloha \
  --broker=default \
  --sink=ksvc:eventingaloha \
  --filter=type=aloha

Create Bonjour Trigger

Now create the the trigger for eventingbonjour that will associate the filtered events to a service:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: hellobonjour
spec:
  broker: default
  filter:
    attributes:
      type: bonjour
  subscriber:
    ref:
     apiVersion: serving.knative.dev/v1
     kind: Service
     name: eventingbonjour

Run the following commands to create the trigger:

  • kn

  • kubectl

kn trigger create hellobonjour \
  --broker=default \
  --sink=ksvc:eventingbonjour \
  --filter=type=bonjour

Verify that your triggers are ready:

  • kn

  • kubectl

kn trigger list
NAME           BROKER    SINK                   AGE   CONDITIONS   READY   REASON
helloaloha     default   ksvc:eventingaloha     12m   5 OK / 5     True
hellobonjour   default   ksvc:eventingbonjour   11m   5 OK / 5     True
kubectl get triggers.eventing.knative.dev
NAME         READY BROKER  SUBSCRIBER_URI                                      AGE
helloaloha   True  default http://eventingaloha.knativetutorial.svc.cluster.local   24s
hellobonjour True  default http://eventingbonjour.knativetutorial.svc.cluster.local 48s

Verification

Pull out the subscriberUri for eventhingaloha:

kubectl get trigger helloaloha -o jsonpath='{.status.subscriberUri}'

The command should show the output as:

http://eventingaloha.knativetutorial.svc.cluster.local

Pull out the subscriberUri for eventhingbonjour:

kubectl get trigger hellobonjour -o jsonpath='{.status.subscriberUri}'

The command should show the output as:

http://eventingbonjour.knativetutorial.svc.cluster.local

As well as broker’s subscriberUri:

kubectl get broker default -o jsonpath='{.status.address.url}'

The command should show the output as:

http://broker-ingress.knative-eventing.svc.cluster.local/{tutorial-namespace}/default

You should notice that the subscriberURIs are Kubernetes services with the suffix of knativetutorial.svc.cluster.local. This means they can be interacted with from another pod within the Kubernetes cluster.

Now that you have setup the Broker and Triggers you need to send in some test messages to see the behavior.

First start streaming the logs for the event consumers:

stern eventing -c user-container

Then create a pod for using the curl command:

apiVersion: v1
kind: Pod
metadata:
  labels:
    run: curler
  name: curler
spec:
  containers:
  - name: curler
    image: fedora:29 (1)
    tty: true
1 You can use any image that includes a curl command.

Wait for the curler pod to be running and then exec into the curler pod:

kubectl -n knativetutorial apply -f curler.yaml

Exec into the curler pod:

kubectl -n knativetutorial exec -it curler -- /bin/bash

Using the curler pod’s shell, curl the broker URI for eventingaloha:

curl -v "http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default" \
-X POST \
-H "Ce-Id: say-hello" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: aloha" \
-H "Ce-Source: mycurl" \
-H "Content-Type: application/json" \
-d '{"key":"from a curl"}'

You will then see eventingaloha will scale-up to respond to that event:

watch kubectl get pods

The command above should show the following output:

NAME                                        READY STATUS  AGE
curler                                      1/1   Running 59s
eventingaloha-1-deployment-6cdc888d9d-9xnnn 2/2   Running 30s

Next, curl the broker URI for eventingbonjour:

curl -v "http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default" \
-X POST \
-H "Ce-Id: say-hello" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: bonjour" \
-H "Ce-Source: mycurl" \
-H "Content-Type: application/json" \
-d '{"key":"from a curl"}'

And you will see the eventingbonjour pod scale up:

watch kubectl get pods

The command above should show the following output:

NAME                                         READY STATUS  AGE
curler                                       1/1   Running 82s
eventingbonjour-1-deployment-fc7858b5b-s9prj 2/2   Running 5s

In the previous examples you sent to the broker a payload configured with Ce-Type: aloha or Ce-Type: bonjour. Each trigger filtered the incoming message and sent it to the configured Sink Service in case of a match.

You can experiment by using different type filters in the Subscription to see how the different subscribed services respond. Filters may use an CloudEvent attribute for its criteria.

Cleanup

  • kn

  • kubectl

kn service delete eventingbonjour
kn service delete eventingaloha
kn trigger delete helloaloha
kn trigger delete hellobonjour
kubectl delete pod curler
kn broker delete default
kubectl delete -f eventing-aloha-sink.yaml
kubectl delete -f eventing-bonjour-sink.yaml
kubectl delete -f trigger-helloaloha.yaml
kubectl delete -f trigger-hellobonjour.yaml
kubectl delete -f curler.yaml
kubectl delete -f default-broker.yaml