Kafka in Kubernetes

Kubernetes is an open source container orchestration platform that automates many of the manual processes involved in deploying, managing, and scaling containerized applications.

Kubernetes is an ideal platform for hosting cloud-native applications that require rapid scaling, like real-time data streaming through Apache Kafka.

But deploying a Kafka cluster with - or in newer versions without ZooKeeper - and more than one broker, is not an easy task. It is particularly challenging to get the following three aspects right:

  • networking

  • storage

  • security

Instead of trying to manually configure everything from scratch which is both, cumbersome and error-prone, we should delegate this to Kubernetes operators. To do so, we can use Strimzi which offers a powerful and convenient way to deploy different Kafka cluster architectures to Kubernetes.

Kubernetes

For this part of the tutorial you need a Kubernetes cluster running.

Install Minikube

And then set environment variables:

  • MacOS and Linux

  • Windows

export MINIKUBE_HOME=$TUTORIAL_HOME;
export PATH=$MINIKUBE_HOME/bin:$PATH
export KUBECONFIG=$MINIKUBE_HOME/.kube/config
export KUBE_EDITOR="code -w"
set MINIKUBE_HOME=%TUTORIAL_HOME%
set PATH=%MINIKUBE_HOME%/bin:%PATH%
set KUBECONFIG=%MINIKUBE_HOME%/.kube/config
set KUBE_EDITOR="code -w"

Start Kubernetes

The following section shows how to start Kubernetes with required configurations:

The profile kafka is created to run the tutorial:

  • Minikube

  • OpenShift

Having minikube installed and in your PATH, then run:

MacOS

minikube start --memory=8192 --cpus=3 --kubernetes-version=v1.23.1 --vm-driver=virtualbox -p kafka

Linux

minikube start --memory=8192 --cpus=3 --kubernetes-version=v1.23.1 --vm-driver=kvm2 -p kafka

Windows:

minikube start --memory=8192 --cpus=3 --kubernetes-version=v1.23.1 --vm-driver=hyperv -p kafka

And the output must be something similar like:

😄  [kafka] minikube v1.20.0 on Darwin 11.3
✅  Created a new profile : kafka
✅  minikube profile was successfully set to kafka
😄  [default] minikube v1.25.1 on Darwin 11.3
✨  Selecting 'virtualbox' driver from user configuration (alternates: [hyperkit])
🔥  Creating virtualbox VM (CPUs=2, Memory=8192MB, Disk=50000MB) ...
🐳  Preparing Kubernetes v1.23.1 on Docker '20.10.6' ...
    ▪ apiserver.enable-admission-plugins=LimitRanger,NamespaceExists,NamespaceLifecycle,ResourceQuota,ServiceAccount,DefaultStorageClass,MutatingAdmissionWebhook
🚜  Pulling images ...
🚀  Launching Kubernetes ...
⌛  Waiting for cluster to come online ...
🏄  Done! kubectl is now configured to use "kafka"

Finally configure to use minikube internal docker as docker host:

eval $(minikube docker-env -p kafka)

To run OpenShift4, you need to have one provisioned using try.openshift.com or can use any existing OpenShift4 cluster. Once you have your cluster, you can download the latest OpenShift client(oc) from here and add to your path.

oc version

You can check the OpenShift version using:

oc version

The output should show oc version >=4.7:

Client Version: 4.7.0-202102130115.p0-c66c03f
Kubernetes Version: v1.23.1

Then login into the OpenShift cluster using oc login

Strimzi

Strimzi provides a way to run an Apache Kafka cluster on Kubernetes in various deployment configurations.

Some of the features of Strimzi are:

Secure by Default

TLS and SCRAM-SHA supported. Automated Certificate Management.

Simple yet Configurable

NodePort, Load balancer, and Ingress options. Rack awareness for HA. Use dedicated nodes for Kafka.

Kubernetes-Native Experience

kubectl get kafka. Operator Based. Manage Kafka using gitops.

Installation of Strimzi Operator

Strimzi uses Kubernetes operators to manage the Kafka cluster. To install the Strimzi operator you need to run the following command:

  • Minikube

  • OpenShift

kubectl apply -f 'https://strimzi.io/install/latest?namespace=default' -n default
customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-topic-operator-delegation created
customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io created
clusterrole.rbac.authorization.k8s.io/strimzi-entity-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-global created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-broker-delegation created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-namespaced created
clusterrole.rbac.authorization.k8s.io/strimzi-topic-operator created
serviceaccount/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-kafka-broker created
customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io created
deployment.apps/strimzi-cluster-operator created
customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects2is.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io created
kubectl get pods
NAME                                          READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-f696c85f7-9fggx      1/1     Running   0          6m53s

To install Kafka in OpenShift, you can go to Operators  OperatorHub and search for Kafka. Then select the Red Hat Integration - AMQ Streams and install it.

amqstreams

Check that everything has been installed by running the following commands:

kubectl get crds | grep kafka
NAME                                  CREATED AT
kafkabridges.kafka.strimzi.io         2022-12-19T07:16:50Z
kafkaconnectors.kafka.strimzi.io      2022-12-19T07:16:50Z
kafkaconnects.kafka.strimzi.io        2022-12-19T07:16:50Z
kafkamirrormaker2s.kafka.strimzi.io   2022-12-19T07:16:50Z
kafkamirrormakers.kafka.strimzi.io    2022-12-19T07:16:50Z
kafkarebalances.kafka.strimzi.io      2022-12-19T07:16:50Z
kafkas.kafka.strimzi.io               2022-12-19T07:16:50Z
kafkatopics.kafka.strimzi.io          2022-12-19T07:16:50Z
kafkausers.kafka.strimzi.io           2022-12-19T07:16:50Z

Deploying Kafka Cluster

To deploy the cluster, you need to create a Kafka resource file using the Kafka Custom Resource Definition (CRD).

In this case, you are going to deploy a Kafka cluster with 3 instances and ephemeral storage.

The file you are going to apply is shown below:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 3.3.1
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.3"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

Notice how simple it is to deploy a Kafka cluster in Kubernetes, you just need a few YAML lines. Run the following command to deploy Kafka:

kafka.kafka.strimzi.io/my-cluster created

To wait until Kafka cluster is up and running, you can run the following command:

kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n default
kafka.kafka.strimzi.io/my-cluster condition met

After that, you should have Kafka pods and services up and running:

kubectl get pods
NAME                                          READY   STATUS    RESTARTS      AGE
my-cluster-entity-operator-7dfb85ccf9-28dqd   3/3     Running   0             95s
my-cluster-kafka-0                            1/1     Running   0             118s
my-cluster-kafka-1                            1/1     Running   0             118s
my-cluster-kafka-2                            1/1     Running   0             118s
my-cluster-zookeeper-0                        1/1     Running   0             2m21s
my-cluster-zookeeper-1                        1/1     Running   0             2m21s
my-cluster-zookeeper-2                        1/1     Running   0             2m21s
strimzi-cluster-operator-f696c85f7-9fggx      1/1     Running   0             7m49s

And services:

kubectl get services
NAME                          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                               AGE
kubernetes                    ClusterIP   10.96.0.1       <none>        443/TCP                               10m33s
my-cluster-kafka-bootstrap    ClusterIP   10.98.94.157    <none>        9091/TCP,9092/TCP,9093/TCP            3m12s
my-cluster-kafka-brokers      ClusterIP   None            <none>        9090/TCP,9091/TCP,9092/TCP,9093/TCP   3m12s
my-cluster-zookeeper-client   ClusterIP   10.104.185.52   <none>        2181/TCP                              3m35s
my-cluster-zookeeper-nodes    ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP            3m35s

Deploying Services

Let’s deploy the example done at [Developing Consumers and Producers in Java], but in Kubernetes instead of Docker. This time, the Quarkus versions of the song / song-indexer applications are deployed.

Deploy Song Service

To deploy the song-app service go to its folder and build the application

cd $TUTORIAL_HOME/apps/song-app/quarkus/song-app
./mvnw clean package

After a successful build use kubectl to deploy it:

kubectl apply -f target/kubernetes/kubernetes.yml
service/kafka-tutorial-song-app-quarkus created
deployment.apps/kafka-tutorial-song-app-quarkus created

Verify that the service has been deployed correctly:

kubectl get pods
NAME                                                       READY   STATUS    RESTARTS      AGE
kafka-tutorial-song-app-quarkus-55df95589f-fg5v2           1/1     Running   0             7m23s
my-cluster-entity-operator-7dfb85ccf9-28dqd                3/3     Running   0             21m
my-cluster-kafka-0                                         1/1     Running   0             21m
my-cluster-kafka-1                                         1/1     Running   0             21m
my-cluster-kafka-2                                         1/1     Running   0             21m
my-cluster-zookeeper-0                                     1/1     Running   0             22m
my-cluster-zookeeper-1                                     1/1     Running   0             22m
my-cluster-zookeeper-2                                     1/1     Running   0             22m
strimzi-cluster-operator-f696c85f7-9fggx                   1/1     Running   0             37m49s

Deploy Song Indexer Service

To deploy the song-indexer-app service go to its folder and build the application

cd $TUTORIAL_HOME/apps/song-indexer-app/quarkus/song-indexer-app
./mvnw clean package

And use kubectl to deploy the it:

kubectl apply -f target/kubernetes/kubernetes.yml
service/kafka-tutorial-song-indexer-app-quarkus created
deployment.apps/kafka-tutorial-song-indexer-app-quarkus created

Verify that the service has been deployed correctly:

kubectl get pods
NAME                                                       READY   STATUS    RESTARTS      AGE
kafka-tutorial-song-app-quarkus-55df95589f-fg5v2           1/1     Running   0             9m21s
kafka-tutorial-song-indexer-app-quarkus-6c4b56f448-xgshm   1/1     Running   0             1m10s
my-cluster-entity-operator-7dfb85ccf9-28dqd                3/3     Running   0             23m
my-cluster-kafka-0                                         1/1     Running   0             23m
my-cluster-kafka-1                                         1/1     Running   0             23m
my-cluster-kafka-2                                         1/1     Running   0             23m
my-cluster-zookeeper-0                                     1/1     Running   0             24m
my-cluster-zookeeper-1                                     1/1     Running   0             24m
my-cluster-zookeeper-2                                     1/1     Running   0             24m
strimzi-cluster-operator-f696c85f7-9fggx                   1/1     Running   0             39m33s

Test It

Let’s test that it works as expected.

The first thing we do is create tunnels to the corresponding services of the two applications. This can be easily done with the minikube service command.

In a separate terminal window run the following:

minikube service kafka-tutorial-song-indexer-app-quarkus

Your console output should look similar (minikube IP and ports in use may vary of course). In case a browser window opens automatically you can close it because it’s not needed in our case.

|-----------|---------------------------------|-------------|---------------------------|
| NAMESPACE |              NAME               | TARGET PORT |            URL            |
|-----------|---------------------------------|-------------|---------------------------|
| default   | kafka-tutorial-song-app-quarkus | http/80     | http://192.168.49.2:30588 |
|-----------|---------------------------------|-------------|---------------------------|
🏃  Starting tunnel for service kafka-tutorial-song-app-quarkus.
|-----------|---------------------------------|-------------|------------------------|
| NAMESPACE |              NAME               | TARGET PORT |          URL           |
|-----------|---------------------------------|-------------|------------------------|
| default   | kafka-tutorial-song-app-quarkus |             | http://127.0.0.1:65000 |
|-----------|---------------------------------|-------------|------------------------|
🎉  Opening service default/kafka-tutorial-song-app-quarkus in default browser...

The important part is shown in the lower right box, namely use 127.0.0.1:65000 as host and port for any HTTP commands against the song-app.

Again, in a separate terminal window run the following:

minikube service kafka-tutorial-song-indexer-app-quarkus

Your console output should look similar (minikube IP and ports in use may vary of course). In case a browser window opens automatically you can close it because it’s not needed in our case.

|-----------|-----------------------------------------|-------------|---------------------------|
| NAMESPACE |                  NAME                   | TARGET PORT |            URL            |
|-----------|-----------------------------------------|-------------|---------------------------|
| default   | kafka-tutorial-song-indexer-app-quarkus | http/80     | http://192.168.49.2:32548 |
|-----------|-----------------------------------------|-------------|---------------------------|
🏃  Starting tunnel for service kafka-tutorial-song-indexer-app-quarkus.
|-----------|-----------------------------------------|-------------|------------------------|
| NAMESPACE |                  NAME                   | TARGET PORT |          URL           |
|-----------|-----------------------------------------|-------------|------------------------|
| default   | kafka-tutorial-song-indexer-app-quarkus |             | http://127.0.0.1:64938 |
|-----------|-----------------------------------------|-------------|------------------------|
🎉  Opening service default/kafka-tutorial-song-indexer-app-quarkus in default browser...

The important part is shown in the lower right box, namely use 127.0.0.1:64938 as host and port for any HTTP commands against the song-indexer-app.

With the above, you successfully created tunnels to the two services and you are now ready to communicate with the applications from you host machine.

Open another 2 terminal windows, one for populating songs, and another one to get the SSE stream from the indexer service.

In the terminal 1 run the following command against the song-indexer-app:

http GET 127.0.0.1:64938/events --stream --timeout=600
HTTP/1.1 200 OK
Content-Type: text/event-stream
transfer-encoding: chunked

In the terminal 2 run the following command against the song-app:

http POST 127.0.0.1:65000/songs id=107 name=Portals author='Alan Silvestri'
HTTP/1.1 201 Created
content-length: 0

Inspect the output of terminal 1, to check that the published song data has been processed by the song-indexer-app:

id:7d92d0ed-ab04-4182-96b0-0bc4f97156c5
data:Song 107 processed

Clean Up

To clean up the whole namespace run:

kubectl delete all --all