Apache Kafka Events with Knative Eventing

At the end of this chapter you will be able to:

  • Using KafkaSource with Knative Eventing

  • Source Kafka Events to Sink

  • Autoscaling Knative Services with Apache Kafka Events

cd $TUTORIAL_HOME/eventing

Deploy Knative Eventing KafkaSource

Knative Eventing KafkaSource need to be used to have the Kafka messages to flow through the Knative Eventing Channels. You can deploy Knative KafkaSource by running the command:

The previous step deploys Knative KafkaSource in the knative-sources namespace as well as a CRD, ServiceAccount, ClusterRole, etc. Verify that knative-source namespace includes the kafka-controller-manager-0 pod:

watch "kubectl get pods -n knative-eventing"

The command above should show the following output:

NAME                         READY   STATUS    AGE
NAME                                        READY   STATUS    RESTARTS         AGE
eventing-controller-6cd47bff4b-vlsjb        1/1     Running   1                92m
eventing-webhook-87b4f6cb5-btfr4            1/1     Running   12 (2m45s ago)   92m
imc-controller-5bd45cf5b-2wr9v              1/1     Running   12 (2m43s ago)   92m
imc-dispatcher-7b9bf546b8-cmf65             1/1     Running   14 (3m27s ago)   92m
kafka-controller-manager-7c4498d478-fsjxl   1/1     Running   0                67s
mt-broker-controller-5bb47f9cf5-dfs2z       1/1     Running   2 (52m ago)      92m
mt-broker-filter-7f468cbd7b-6r28q           1/1     Running   6 (4m35s ago)    92m
mt-broker-ingress-57db965447-8bv57          1/1     Running   5 (22m ago)      92m

You should also deploy the Knative Kafka Channel that can be used to connect the Knative Eventing Channel with a Apache Kafka cluster backend, to deploy a Knative Kafka Channel run:

curl -L "https://github.com/knative-sandbox/eventing-kafka/\
releases/download/knative-v1.8.1/channel-consolidated.yaml" \
 | sed 's/REPLACE_WITH_CLUSTER_URL/my-cluster-kafka-bootstrap.kafka:9092/' \
 | kubectl apply --filename -
  • "my-cluster-kafka-bootstrap.kafka:9092" comes from kubectl get services -n kafka

  • On OpenShift, Knative Eventing Kafka operator that was done in earlier, will install Knative KafkaChannel as well.

Look for 3 new pods in namespace knative-eventing with the prefix "kafka":

watch "kubectl get pods -n knative-eventing"

The command will shown an output like:

NAME                                        READY   STATUS              RESTARTS        AGE
eventing-controller-6cd47bff4b-vlsjb        1/1     Running             1               94m
eventing-webhook-87b4f6cb5-btfr4            1/1     Running             12 (72s ago)    94m
imc-controller-5bd45cf5b-2wr9v              1/1     Running             13 (26s ago)    94m
imc-dispatcher-7b9bf546b8-cmf65             1/1     Running             15 (74s ago)    94m
kafka-ch-controller-566bb77694-5hmc9        1/1     Running             0               27s
kafka-controller-manager-7c4498d478-fsjxl   1/1     Running             1 (71s ago)     3m5s
kafka-webhook-66fcb6588b-gqpfd              1/1     Running             0               27s
mt-broker-controller-5bb47f9cf5-dfs2z       1/1     Running             2 (54m ago)     94m
mt-broker-filter-7f468cbd7b-6r28q           1/1     Running             6 (6m33s ago)   94m
mt-broker-ingress-57db965447-8bv57          1/1     Running             5 (24m ago)     94m

And you should also find some new api-resources as shown:

  • kn

  • kubectl

kn source list-types
kubectl api-resources --api-group='sources.knative.dev'

The command should show the following APIs in sources.eventing.knative.dev :

TYPE              NAME                                   DESCRIPTION
ApiServerSource   apiserversources.sources.knative.dev   Watch and send Kubernetes API events to addressable
CamelSource       camelsources.sources.knative.dev
ContainerSource   containersources.sources.knative.dev   Generate events by Container image and send to addressable
KafkaSource       kafkasources.sources.knative.dev       Route events from Apache Kafka Server to addressable
PingSource        pingsources.sources.knative.dev        Send periodically ping events to addressable
SinkBinding       sinkbindings.sources.knative.dev       Binding for connecting a PodSpecable to addressable

include:

kubectl api-resources --api-group='messaging.knative.dev'

The command should show the following APIs in messaging.knative.dev :

NAME               SHORTNAMES   APIGROUP                NAMESPACED   KIND
channels           ch           messaging.knative.dev   true         Channel
inmemorychannels   imc          messaging.knative.dev   true         InMemoryChannel
kafkachannels      kc           messaging.knative.dev   true         KafkaChannel
subscriptions      sub          messaging.knative.dev   true         Subscription

Using Kafka Channel as Default Knative Channel

Persistence and Durability are two very important features of any messaging based architectures. The Knative Channel has built-in support for durability. Durability of messages becomes ineffective, if the Knative Eventing Channel does not support persistence. As without persistence it will not be able to deliver the messages to subscribers which might be offline at the time of message delivery.

By default all Knative Channels created by the Knative Eventing API use InMemoryChannel(imc), which does not have capability to persist messages. To enable persistence we need to use one of the supported channels such as GCP PubSub, Kafka or Natss as the default Knative Channel backend.

We installed Apache Kafka, earlier in this tutorial, let us now configure it to be the default Knative Channel backend:

Knative Default Channel ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
  name: default-ch-webhook
  namespace: knative-eventing
data:
  default-ch-config: |
    clusterDefault:
      apiVersion: messaging.knative.dev/v1
      kind: InMemoryChannel(1)
    namespaceDefaults:
      knativetutorial:
        apiVersion: messaging.knative.dev/v1beta1
        kind: KafkaChannel(2)
        spec:
          numPartitions: 2
          replicationFactor: 1
1 For the cluster we will still use the default InMemoryChannel
2 For the namespace knativetutorial, all Knative Eventing Channels will use KafkaChannel as default

Run the following command apply the Knative Eventing Channel configuration:

kubectl apply -f default-channel-config.yaml

Since you have now made all Knative Eventing Channels of knativetutorial to be KafkaChannel, creating a Knative Eventing Channel in namespace knativetutorial will result in a corresponding Kafka Topic created. Let us now verify it by creating a sample Channel as show in listing,

  • kn

  • kubectl

kn channel create my-events-ch

The Channel my-events-ch creation will result in the following resources in the knativetutorial

A channel.messaging.knative.dev called my-events-ch:

  • kn

  • kubectl

kn channel -n knativetutorial ls
NAME           TYPE           URL                                                                AGE     READY   REASON
my-events-ch   KafkaChannel   http://my-events-ch-kn-channel.knativetutorial.svc.cluster.local   2m55s   True
kubectl get -n knativetutorial channels

And a corresponding kafkachannel.messaging.knative.dev with the same name my-events-ch:

kubectl get -n knativetutorial kafkachannels

When you now list the topics that are available in Kafka using the script $TUTORIAL_HOME/bin/kafka-list-topics.sh, you should see a topic corresponding to your Channel my-events-ch:

$TUTORIAL_HOME/bin/kafka-list-topics.sh

The command should return an output like:

knative-messaging-kafka.knativetutorial.my-events-ch

For each Knative Eventing Channel that you will create, there will be a Kafka Topic created, the topic’s name will follow a convention like knative-messaging-kafka.<your-channel-namespace>.<your-channel-name>.

When listing the resources of knative-eventing , you should see an extra deployment called kafka-ch-dispatcher with its corrsponding pod started:

kubectl get -n knative-eventing pods
NAME                                    READY   STATUS    RESTARTS   AGE
broker-controller-56b4d58667-tz77k      1/1     Running   0          19h
broker-filter-5bdbc8d8dd-2b657          1/1     Running   0          19h
broker-ingress-d896b6b46-xss59          1/1     Running   0          19h
eventing-controller-5fc5645584-fqz72    1/1     Running   0          19h
eventing-webhook-7674b867dc-x2lg2       1/1     Running   0          19h
imc-controller-6b548d6468-v4pr8         1/1     Running   0          19h
imc-dispatcher-655cdf6ff6-xftlk         1/1     Running   0          19h
kafka-ch-controller-5cf4bdc98-l8lqg     1/1     Running   0          32m
kafka-ch-dispatcher-7fb7896db4-6r7m6    1/1     Running   0          24m
kafka-webhook-5f8895ccdf-hf28p          1/1     Running   0          32m
mt-broker-controller-6d66c4c6f6-56s9s   1/1     Running   0          19h

We can delete the example my-events-ch channel using the command:

  • kn

  • kubectl

kn channel delete -n knativetutorial my-events-ch
kubectl -n knativetutorial delete  channels.messaging.knative.dev my-events-ch

Connecting Kafka Source to Sink

Now, all of your infrastructure is configured, you can deploy the Knative Serving Service(sink) by running the command:

  • kn

  • kubectl

kn service create eventinghello \
  --concurrency-target=1 \
  --image=quay.io/rhdevelopers/eventinghello:0.0.2

Check the Knative Service that was created by the command above:

kn service ls

Check the Knative Service that was created by the command above. The command should show an output like:

NAME            URL                                                LATEST                  AGE     CONDITIONS   READY   REASON
event-display   http://event-display.knativetutorial.example.com   event-display-2mhz8     4h11m   3 OK / 3     True
eventinghello  http://eventinghello.knativetutorial.example.com   eventinghello-krmch-1   34s     3 OK / 3     True

Make sure to follow the logs using stern:

stern eventinghello -c user-container

The initial deployment of eventinghello will cause it to scale up to 1 pod. It will be around until it hits its scale-down time limit. Allow it to scale down to zero pods before continuing.

Create KafkaTopic called my-topic as shown below:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 10
  replicas: 1
kubectl apply -n kafka \
  -f kafka-topic-my-topic.yaml

Check the created topics by:

kubectl get -n kafka kafkatopics

The command should show an output like:

NAME                                                          PARTITIONS   REPLICATION FACTOR
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a   50           1
my-topic                                                      10           1

Create a KafkaSource for my-topic by connecting your Kafka topic my-topic to eventinghello:

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: mykafka-source
spec:
  consumerGroup: knative-group
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092 (1)
  topics:
   - my-topic (2)
  sink: (3)
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: eventinghello
1 "my-cluster-kafka-bootstrap:9092" can be found via kubectl get -n kafka services or oc get -n kafka services
2 my-topic was created earlier section when deploying Apache Kafka
3 This is another example of a direct Source to Sink

The deployment of KafkaSource will result in a new pod prefixed with "mykafka-source".

kubectl -n knativetutorial apply -f mykafka-source.yaml
watch kubectl get pods

When the KafkaSource is ready it will show the following output(listing only relvant pods):

kafkasource-mykafka-source-f0e52a48-256a-4748-bd6a-43422a0rdqtg   1/1     Running   0
      108s

Check the created sources

kn sources -n knativetutorial ls
NAME             TYPE          RESOURCE                           SINK                 READY
mykafka-source   KafkaSource   kafkasources.sources.knative.dev   ksvc:eventinghello   True

Since we had test messages of "one", "two" and "three" from earlier you might see the eventinghello service awaken to process those messages.

Wait for eventinghello to scale down to zero pods before moving on then push more Kafka messages into my-topic.

$TUTORIAL_HOME/bin/kafka-producer.sh

And then enter the following messages:

Hello World

Hola Mundo

Bonjour Le Monde

Namaste Duniya

Knative Eventing events through the Kafka Source must be JSON formatted

While making sure to monitor the logs of the eventinghello pod:

stern eventinghello -c user-container
Output shrinked for brevity
ce-id=partition:1/offset:1
ce-source=/apis/v1/namespaces/kafka/kafkasources/mykafka-source#my-topic
ce-specversion=1.0
ce-time=2020-01-01T01:16:12.886Z
ce-type=dev.knative.kafka.event
content-type=null
content-length=17
POST:Namaste Duniya

If the eventinghello sink was scaled down by Knative due to inactivity, you will see the service comeup to serve the request once the message is sent

Auto Scaling with Apache Kafka

Allow all the eventinghello sink pods to scale down to zero.

Open a new terminal and watch the pods of knativetutorial namespace:

watch kubectl get pods -n knativetutorial

We will use a utility pod called kafka-spammer to send a bunch of messages to KafkaTopic my-topic, which in turn will trigger the Knative sink service eventinghello to scale up to handle the KakaTopic message events.

Deploy kafka-spammer application using the command:

kubectl -n kafka run kafka-spammer \
--image=quay.io/rhdevelopers/kafkaspammer:1.0.2

Wait for the kafka-spammer pod to be up and running, you can watch kafka namespace for the pod:

watch kubectl -n kafka get pods

Once the kafka-spammer is up and running, open a new terminal and exec into the pod by running:

kubectl -n kafka exec -it kafka-spammer -- /bin/sh

Once you are in the shell of the kafka-spammer pod run the following command:

curl localhost:8080/3

The command should now send three concurrent messages to KafkaTopic my-topic. As the`eventinghello` sink pods were scaled down to zero, you should see three or more eventinghello sink pods being scaled to serve the request.

NAME                                                              READY   STATUS    RESTARTS   AGE
camel-k-operator-65db5d46bb-llc6g                                 1/1     Running   0          20h
eventinghello-v1-deployment-57c686cc96-6k9r2                      2/2     Running   0          15s
eventinghello-v1-deployment-57c686cc96-jcv8b                      2/2     Running   0          13s
eventinghello-v1-deployment-57c686cc96-lh8xr                      2/2     Running   0          15s
eventinghello-v1-deployment-57c686cc96-n2slh                      2/2     Running   0          16s
kafkasource-mykafka-source-a29a50ca-4d76-4e65-8b96-1507372bfphb   1/1     Running   0          119s

Cleanup

kubectl delete -n kafka pod kafka-spammer
kubectl delete -n knativetutorial  -f mykafka-source.yaml
kubectl delete -n knativetutorial  -f eventing-hello-sink.yaml
kubectl delete -n kafka -f kafka-topic-my-topic.yaml