Apache Kafka Events with Knative Eventing
At the end of this chapter you will be able to:
-
Using KafkaSource with Knative Eventing
-
Source Kafka Events to Sink
-
Autoscaling Knative Services with Apache Kafka Events
cd $TUTORIAL_HOME/eventing
Deploy Knative Eventing KafkaSource
Knative Eventing KafkaSource
need to be used to have the Kafka messages to flow through the Knative Eventing Channels. You can deploy Knative KafkaSource by running the command:
The previous step deploys Knative KafkaSource in the knative-sources
namespace as well as a CRD, ServiceAccount, ClusterRole, etc. Verify that knative-source namespace includes the kafka-controller-manager-0
pod:
watch "kubectl get pods -n knative-eventing"
The command above should show the following output:
NAME READY STATUS AGE
NAME READY STATUS RESTARTS AGE
eventing-controller-6cd47bff4b-vlsjb 1/1 Running 1 92m
eventing-webhook-87b4f6cb5-btfr4 1/1 Running 12 (2m45s ago) 92m
imc-controller-5bd45cf5b-2wr9v 1/1 Running 12 (2m43s ago) 92m
imc-dispatcher-7b9bf546b8-cmf65 1/1 Running 14 (3m27s ago) 92m
kafka-controller-manager-7c4498d478-fsjxl 1/1 Running 0 67s
mt-broker-controller-5bb47f9cf5-dfs2z 1/1 Running 2 (52m ago) 92m
mt-broker-filter-7f468cbd7b-6r28q 1/1 Running 6 (4m35s ago) 92m
mt-broker-ingress-57db965447-8bv57 1/1 Running 5 (22m ago) 92m
You should also deploy the Knative Kafka Channel that can be used to connect the Knative Eventing Channel with a Apache Kafka cluster backend, to deploy a Knative Kafka Channel run:
curl -L "https://github.com/knative-sandbox/eventing-kafka/\
releases/download/knative-v1.8.1/channel-consolidated.yaml" \
| sed 's/REPLACE_WITH_CLUSTER_URL/my-cluster-kafka-bootstrap.kafka:9092/' \
| kubectl apply --filename -
|
Look for 3 new pods in namespace knative-eventing
with the prefix "kafka":
watch "kubectl get pods -n knative-eventing"
The command will shown an output like:
NAME READY STATUS RESTARTS AGE
eventing-controller-6cd47bff4b-vlsjb 1/1 Running 1 94m
eventing-webhook-87b4f6cb5-btfr4 1/1 Running 12 (72s ago) 94m
imc-controller-5bd45cf5b-2wr9v 1/1 Running 13 (26s ago) 94m
imc-dispatcher-7b9bf546b8-cmf65 1/1 Running 15 (74s ago) 94m
kafka-ch-controller-566bb77694-5hmc9 1/1 Running 0 27s
kafka-controller-manager-7c4498d478-fsjxl 1/1 Running 1 (71s ago) 3m5s
kafka-webhook-66fcb6588b-gqpfd 1/1 Running 0 27s
mt-broker-controller-5bb47f9cf5-dfs2z 1/1 Running 2 (54m ago) 94m
mt-broker-filter-7f468cbd7b-6r28q 1/1 Running 6 (6m33s ago) 94m
mt-broker-ingress-57db965447-8bv57 1/1 Running 5 (24m ago) 94m
And you should also find some new api-resources as shown:
The command should show the following APIs in sources.eventing.knative.dev
:
TYPE NAME DESCRIPTION
ApiServerSource apiserversources.sources.knative.dev Watch and send Kubernetes API events to addressable
CamelSource camelsources.sources.knative.dev
ContainerSource containersources.sources.knative.dev Generate events by Container image and send to addressable
KafkaSource kafkasources.sources.knative.dev Route events from Apache Kafka Server to addressable
PingSource pingsources.sources.knative.dev Send periodically ping events to addressable
SinkBinding sinkbindings.sources.knative.dev Binding for connecting a PodSpecable to addressable
include:
kubectl api-resources --api-group='messaging.knative.dev'
The command should show the following APIs in messaging.knative.dev
:
NAME SHORTNAMES APIGROUP NAMESPACED KIND
channels ch messaging.knative.dev true Channel
inmemorychannels imc messaging.knative.dev true InMemoryChannel
kafkachannels kc messaging.knative.dev true KafkaChannel
subscriptions sub messaging.knative.dev true Subscription
Using Kafka Channel as Default Knative Channel
Persistence and Durability are two very important features of any messaging based architectures. The Knative Channel has built-in support for durability. Durability of messages becomes ineffective, if the Knative Eventing Channel does not support persistence. As without persistence it will not be able to deliver the messages to subscribers which might be offline at the time of message delivery.
By default all Knative Channels created by the Knative Eventing API use InMemoryChannel(imc), which does not have capability to persist messages. To enable persistence we need to use one of the supported channels such as GCP PubSub, Kafka or Natss as the default Knative Channel backend.
We installed Apache Kafka, earlier in this tutorial, let us now configure it to be the default Knative Channel backend:
apiVersion: v1
kind: ConfigMap
metadata:
name: default-ch-webhook
namespace: knative-eventing
data:
default-ch-config: |
clusterDefault:
apiVersion: messaging.knative.dev/v1
kind: InMemoryChannel(1)
namespaceDefaults:
knativetutorial:
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel(2)
spec:
numPartitions: 2
replicationFactor: 1
1 | For the cluster we will still use the default InMemoryChannel |
2 | For the namespace knativetutorial, all Knative Eventing Channels will use KafkaChannel as default |
Run the following command apply the Knative Eventing Channel configuration:
kubectl apply -f default-channel-config.yaml
Since you have now made all Knative Eventing Channels of knativetutorial to be KafkaChannel, creating a Knative Eventing Channel in namespace knativetutorial will result in a corresponding Kafka Topic created. Let us now verify it by creating a sample Channel as show in listing,
kn channel create my-events-ch
kubectl apply -f ${TUTORIAL_HOME}/eventing/default-kafka-channel.yaml
The Channel my-events-ch
creation will result in the following resources in the knativetutorial
A channel.messaging.knative.dev
called my-events-ch
:
kn channel -n knativetutorial ls
NAME TYPE URL AGE READY REASON
my-events-ch KafkaChannel http://my-events-ch-kn-channel.knativetutorial.svc.cluster.local 2m55s True
kubectl get -n knativetutorial channels
NAME READY URL
my-events-ch True http://my-events-ch-kn-channel.knativetutorial.svc.cluster.local
And a corresponding kafkachannel.messaging.knative.dev
with the same name my-events-ch
:
kubectl get -n knativetutorial kafkachannels
NAME READY URL
my-events-ch True http://my-events-ch-kn-channel.knativetutorial.svc.cluster.local
When you now list the topics that are available in Kafka using the script $TUTORIAL_HOME/bin/kafka-list-topics.sh
, you should see a topic corresponding to your Channel my-events-ch
:
$TUTORIAL_HOME/bin/kafka-list-topics.sh
The command should return an output like:
knative-messaging-kafka.knativetutorial.my-events-ch
For each Knative Eventing Channel that you will create, there will be a Kafka Topic created, the topic’s name will follow a convention like knative-messaging-kafka.<your-channel-namespace>.<your-channel-name>
.
When listing the resources of knative-eventing
, you should see an extra deployment called kafka-ch-dispatcher
with its corrsponding pod started:
kubectl get -n knative-eventing pods
NAME READY STATUS RESTARTS AGE
broker-controller-56b4d58667-tz77k 1/1 Running 0 19h
broker-filter-5bdbc8d8dd-2b657 1/1 Running 0 19h
broker-ingress-d896b6b46-xss59 1/1 Running 0 19h
eventing-controller-5fc5645584-fqz72 1/1 Running 0 19h
eventing-webhook-7674b867dc-x2lg2 1/1 Running 0 19h
imc-controller-6b548d6468-v4pr8 1/1 Running 0 19h
imc-dispatcher-655cdf6ff6-xftlk 1/1 Running 0 19h
kafka-ch-controller-5cf4bdc98-l8lqg 1/1 Running 0 32m
kafka-ch-dispatcher-7fb7896db4-6r7m6 1/1 Running 0 24m
kafka-webhook-5f8895ccdf-hf28p 1/1 Running 0 32m
mt-broker-controller-6d66c4c6f6-56s9s 1/1 Running 0 19h
We can delete the example my-events-ch
channel using the command:
kn channel delete -n knativetutorial my-events-ch
kubectl -n knativetutorial delete channels.messaging.knative.dev my-events-ch
Connecting Kafka Source to Sink
Now, all of your infrastructure is configured, you can deploy the Knative Serving Service(sink) by running the command:
kn service create eventinghello \
--concurrency-target=1 \
--image=quay.io/rhdevelopers/eventinghello:0.0.2
kubectl apply -f ${TUTORIAL_HOME}/eventing/eventing-hello-sink.yaml
Check the Knative Service that was created by the command above:
kn service ls
Check the Knative Service that was created by the command above. The command should show an output like:
NAME URL LATEST AGE CONDITIONS READY REASON
event-display http://event-display.knativetutorial.example.com event-display-2mhz8 4h11m 3 OK / 3 True
eventinghello http://eventinghello.knativetutorial.example.com eventinghello-krmch-1 34s 3 OK / 3 True
Make sure to follow the logs using stern
:
stern eventinghello -c user-container
The initial deployment of eventinghello
will cause it to scale up to 1 pod. It will be around until it hits its scale-down time limit. Allow it to scale down to zero pods before continuing.
Create KafkaTopic
called my-topic
as shown below:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: my-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 10
replicas: 1
kubectl apply -n kafka \
-f kafka-topic-my-topic.yaml
Check the created topics by:
kubectl get -n kafka kafkatopics
The command should show an output like:
NAME PARTITIONS REPLICATION FACTOR
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a 50 1
my-topic 10 1
Create a KafkaSource
for my-topic
by connecting your Kafka topic my-topic
to eventinghello:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: mykafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 (1)
topics:
- my-topic (2)
sink: (3)
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: eventinghello
1 | "my-cluster-kafka-bootstrap:9092" can be found via kubectl get -n kafka services or oc get -n kafka services |
2 | my-topic was created earlier section when deploying Apache Kafka |
3 | This is another example of a direct Source to Sink |
The deployment of KafkaSource
will result in a new pod prefixed with "mykafka-source".
kubectl -n knativetutorial apply -f mykafka-source.yaml
watch kubectl get pods
When the KafkaSource is ready it will show the following output(listing only relvant pods):
kafkasource-mykafka-source-f0e52a48-256a-4748-bd6a-43422a0rdqtg 1/1 Running 0
108s
Check the created sources
kn sources -n knativetutorial ls
NAME TYPE RESOURCE SINK READY
mykafka-source KafkaSource kafkasources.sources.knative.dev ksvc:eventinghello True
Since we had test messages of "one", "two" and "three" from earlier you might see the eventinghello service awaken to process those messages. Wait for eventinghello to scale down to zero pods before moving on then push more Kafka messages into my-topic. |
$TUTORIAL_HOME/bin/kafka-producer.sh
And then enter the following messages:
Hello World
Hola Mundo
Bonjour Le Monde
Namaste Duniya
Knative Eventing events through the Kafka Source must be JSON formatted |
While making sure to monitor the logs of the eventinghello
pod:
stern eventinghello -c user-container
Output shrinked for brevity |
ce-id=partition:1/offset:1
ce-source=/apis/v1/namespaces/kafka/kafkasources/mykafka-source#my-topic
ce-specversion=1.0
ce-time=2020-01-01T01:16:12.886Z
ce-type=dev.knative.kafka.event
content-type=null
content-length=17
POST:Namaste Duniya
If the |
Auto Scaling with Apache Kafka
Allow all the |
Open a new terminal and watch the pods of knativetutorial
namespace:
watch kubectl get pods -n knativetutorial
We will use a utility pod called kafka-spammer
to send a bunch of messages to KafkaTopic my-topic
, which in turn will trigger the Knative sink service eventinghello
to scale up to handle the KakaTopic message events.
Deploy kafka-spammer
application using the command:
kubectl -n kafka run kafka-spammer \
--image=quay.io/rhdevelopers/kafkaspammer:1.0.2
Wait for the kafka-spammer
pod to be up and running, you can watch kafka
namespace for the pod:
watch kubectl -n kafka get pods
Once the kafka-spammer
is up and running, open a new terminal and exec into the pod by running:
kubectl -n kafka exec -it kafka-spammer -- /bin/sh
Once you are in the shell of the kafka-spammer
pod run the following command:
curl localhost:8080/3
The command should now send three concurrent messages to KafkaTopic my-topic
. As the`eventinghello` sink pods were scaled down to zero, you should see three or more eventinghello
sink pods being scaled to serve the request.
NAME READY STATUS RESTARTS AGE
camel-k-operator-65db5d46bb-llc6g 1/1 Running 0 20h
eventinghello-v1-deployment-57c686cc96-6k9r2 2/2 Running 0 15s
eventinghello-v1-deployment-57c686cc96-jcv8b 2/2 Running 0 13s
eventinghello-v1-deployment-57c686cc96-lh8xr 2/2 Running 0 15s
eventinghello-v1-deployment-57c686cc96-n2slh 2/2 Running 0 16s
kafkasource-mykafka-source-a29a50ca-4d76-4e65-8b96-1507372bfphb 1/1 Running 0 119s