Kafka in Kubernetes

Kubernetes is an open source container orchestration platform that automates many of the manual processes involved in deploying, managing, and scaling containerized applications.

Kubernetes is an ideal platform for hosting cloud-native applications that require rapid scaling, like real-time data streaming through Apache Kafka.

But deploying a Kafka cluster with Zookeeper and more than one broker, it is not an easy task and Strimzi makes this task a simple task.

Kubernetes

To run this part you need a Kubernetes cluster running.

Install Minikube

And then set environment variables:

  • MacOS and Linux

  • Windows

export MINIKUBE_HOME=$TUTORIAL_HOME;
export PATH=$MINIKUBE_HOME/bin:$PATH
export KUBECONFIG=$MINIKUBE_HOME/.kube/config
export KUBE_EDITOR="code -w"
set MINIKUBE_HOME=%TUTORIAL_HOME%
set PATH=%MINIKUBE_HOME%/bin:%PATH%
set KUBECONFIG=%MINIKUBE_HOME%/.kube/config
set KUBE_EDITOR="code -w"

Start Kubernetes

The following section shows how to start Kubernetes with required configurations:

The profile kafka is created to run the tutorial:

  • Minikube

  • OpenShift

Having minikube installed and in your PATH, then run:

MacOS

minikube start --memory=8192 --cpus=3 --kubernetes-version=v1.23.1 --vm-driver=virtualbox -p kafka

Linux

minikube start --memory=8192 --cpus=3 --kubernetes-version=v1.23.1 --vm-driver=kvm2 -p kafka

Windows:

minikube start --memory=8192 --cpus=3 --kubernetes-version=v1.23.1 --vm-driver=hyperv -p kafka

And the output must be something similar like:

😄  [kafka] minikube v1.20.0 on Darwin 11.3
✅  Created a new profile : kafka
✅  minikube profile was successfully set to kafka
😄  [default] minikube v1.25.1 on Darwin 11.3
✨  Selecting 'virtualbox' driver from user configuration (alternates: [hyperkit])
🔥  Creating virtualbox VM (CPUs=2, Memory=8192MB, Disk=50000MB) ...
🐳  Preparing Kubernetes v1.23.1 on Docker '20.10.6' ...
    ▪ apiserver.enable-admission-plugins=LimitRanger,NamespaceExists,NamespaceLifecycle,ResourceQuota,ServiceAccount,DefaultStorageClass,MutatingAdmissionWebhook
🚜  Pulling images ...
🚀  Launching Kubernetes ...
⌛  Waiting for cluster to come online ...
🏄  Done! kubectl is now configured to use "kafka"

Finally configure to use minikube internal docker as docker host:

eval $(minikube docker-env -p kafka)

To run OpenShift4, you need to have one provisioned using try.openshift.com or can use any existing OpenShift4 cluster. Once you have your cluster, you can download the latest OpenShift client(oc) from here and add to your path.

oc version

You can check the OpenShift version using:

oc version

The output should show oc version >=4.7:

Client Version: 4.7.0-202102130115.p0-c66c03f
Kubernetes Version: v1.23.1

Then login into the OpenShift cluster using oc login

Strimzi

Strimzi provides a way to run an Apache Kafka cluster on Kubernetes in various deployment configurations.

Some of the features of Strimzi are:

Secure by Default

TLS and SCRAM-SHA supported. Automated Certificate Management.

Simple yet Configurable

NodePort, Load balancer, and Ingress options. Rack awareness for HA. Use dedicated nodes for Kafka.

Kubernetes-Native Experience

kubectl get kafka. Operator Based. Manage Kafka using gitops.

Installation of Strimzi Operator

Strimzi uses Kubernetes operators to manage the Kafka cluster. To install the Strimzi operator you need to run the following command:

  • Minikube

  • OpenShift

kubectl apply -f 'https://strimzi.io/install/latest?namespace=default' -n default
customresourcedefinition.apiextensions.k8s.io/kafkas.kafka.strimzi.io created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-entity-operator-delegation created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-topic-operator-delegation created
customresourcedefinition.apiextensions.k8s.io/kafkausers.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormaker2s.kafka.strimzi.io created
clusterrole.rbac.authorization.k8s.io/strimzi-entity-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-global created
clusterrolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator-kafka-broker-delegation created
rolebinding.rbac.authorization.k8s.io/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-cluster-operator-namespaced created
clusterrole.rbac.authorization.k8s.io/strimzi-topic-operator created
serviceaccount/strimzi-cluster-operator created
clusterrole.rbac.authorization.k8s.io/strimzi-kafka-broker created
customresourcedefinition.apiextensions.k8s.io/kafkatopics.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkabridges.kafka.strimzi.io created
deployment.apps/strimzi-cluster-operator created
customresourcedefinition.apiextensions.k8s.io/kafkaconnectors.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects2is.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkaconnects.kafka.strimzi.io created
customresourcedefinition.apiextensions.k8s.io/kafkamirrormakers.kafka.strimzi.io created
kubectl get pods
NAME                                          READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-59b99fc7cf-h4kf8     1/1     Running   0          6m53s

To install Kafka in OpenShift, you can go to Operators  OperatorHub and search for Kafka. Then select the Red Hat Integration - AMQ Streams and install it.

amqstreams

Check that everything has been installed by running the following commands:

kubectl get crds | grep kafka
NAME                                  CREATED AT
kafkabridges.kafka.strimzi.io         2020-05-07T07:50:38Z
kafkaconnectors.kafka.strimzi.io      2020-05-07T07:50:38Z
kafkaconnects.kafka.strimzi.io        2020-05-07T07:50:38Z
kafkaconnects2is.kafka.strimzi.io     2020-05-07T07:50:38Z
kafkamirrormaker2s.kafka.strimzi.io   2020-05-07T07:50:37Z
kafkamirrormakers.kafka.strimzi.io    2020-05-07T07:50:39Z
kafkas.kafka.strimzi.io               2020-05-07T07:50:37Z
kafkatopics.kafka.strimzi.io          2020-05-07T07:50:38Z
kafkausers.kafka.strimzi.io           2020-05-07T07:50:37Z

Deploying Kafka Cluster

To deploy the cluster, you need to create a Kafka resource file using the Kafka Custom Resource Definition (CRD).

In this case, you are going to deploy a Kafka cluster with 3 instances and ephemeral storage.

The file you are going to apply is the next one:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 3.2.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.2"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

Notice how simple it is to deploy a Kafka cluster in Kubernetes, you just need a few YAML lines. Run the following command to deploy Kafka:

kafka.kafka.strimzi.io/my-cluster created

To wait until Kafka cluster is up and running, you can run the following comamnd:

kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n default
kafka.kafka.strimzi.io/my-cluster condition met

After that, you should have Kafka pods and services up and running:

kubectl get pods
NAME                                          READY   STATUS    RESTARTS   AGE
my-cluster-entity-operator-77b5dd5594-hl7cf   3/3     Running   0          44s
my-cluster-kafka-0                            2/2     Running   0          70s
my-cluster-kafka-1                            2/2     Running   0          70s
my-cluster-kafka-2                            2/2     Running   0          70s
my-cluster-zookeeper-0                        2/2     Running   0          108s
my-cluster-zookeeper-1                        2/2     Running   0          108s
my-cluster-zookeeper-2                        2/2     Running   0          108s
strimzi-cluster-operator-59b99fc7cf-h4kf8     1/1     Running   0          6m53s

And services:

kubectl get services
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
kubernetes                    ClusterIP   10.96.0.1        <none>        443/TCP                      11m
my-cluster-kafka-bootstrap    ClusterIP   10.103.67.179    <none>        9091/TCP,9092/TCP,9093/TCP   92s
my-cluster-kafka-brokers      ClusterIP   None             <none>        9091/TCP,9092/TCP,9093/TCP   92s
my-cluster-zookeeper-client   ClusterIP   10.101.161.217   <none>        2181/TCP                     2m11s
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   2m11s

Deploying Services

Let’s deploy the example done at [Developing Consumers and Producers in Java], but in Kubernetes instead of Docker. At this time, Quarkus version of the song - song-indexer application is deployed.

Deploy Song Service

To deploy the Song Service go to:

cd $TUTORIAL_HOME/apps/song-app/quarkus/song-app

And use kubectl to deploy the service:

kubectl apply -f src/main/kubernetes/kubernetes.yml
serviceaccount/kafka-tutorial-song-app created
service/kafka-tutorial-song-app created
deployment.apps/kafka-tutorial-song-app created

Verify that the service has been deployed correctly:

kubectl get pods
kafka-tutorial-song-app-84f8476bc8-vhr4k           1/1     Running   0          8m53s
my-cluster-entity-operator-77b5dd5594-hl7cf        3/3     Running   3          8h
my-cluster-kafka-0                                 2/2     Running   3          8h
my-cluster-kafka-1                                 2/2     Running   3          8h
my-cluster-kafka-2                                 2/2     Running   3          8h
my-cluster-zookeeper-0                             2/2     Running   2          8h
my-cluster-zookeeper-1                             2/2     Running   2          8h
my-cluster-zookeeper-2                             2/2     Running   2          8h
strimzi-cluster-operator-59b99fc7cf-h4kf8          1/1     Running   1          8h

Deploy Song Indexer Service

To deploy the Song Indexer Service go to:

cd $TUTORIAL_HOME/apps/song-indexer-app/quarkus/song-indexer-app

And use kubectl to deploy the service:

kubectl apply -f src/main/kubernetes/kubernetes.yml
serviceaccount/kafka-tutorial-song-indexer-app created
service/kafka-tutorial-song-indexer-app created
deployment.apps/kafka-tutorial-song-indexer-app created

Verify that the service has been deployed correctly:

kubectl get pods
NAME                                               READY   STATUS    RESTARTS   AGE
kafka-tutorial-song-app-84f8476bc8-vhr4k           1/1     Running   0          8m53s
kafka-tutorial-song-indexer-app-58fbc7d6fd-979zw   1/1     Running   0          4m6s
my-cluster-entity-operator-77b5dd5594-hl7cf        3/3     Running   3          8h
my-cluster-kafka-0                                 2/2     Running   3          8h
my-cluster-kafka-1                                 2/2     Running   3          8h
my-cluster-kafka-2                                 2/2     Running   3          8h
my-cluster-zookeeper-0                             2/2     Running   2          8h
my-cluster-zookeeper-1                             2/2     Running   2          8h
my-cluster-zookeeper-2                             2/2     Running   2          8h
strimzi-cluster-operator-59b99fc7cf-h4kf8          1/1     Running   1          8h

Test It

Let’s test that it works as expected.

The first thing that you need to know is the exposed port of each of the services.

kubectl get services
NAME                              TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
kafka-tutorial-song-app           LoadBalancer   10.99.68.119     <pending>     8080:31834/TCP               5m15s
kafka-tutorial-song-indexer-app   LoadBalancer   10.103.172.33    <pending>     8080:30581/TCP               28s
kubernetes                        ClusterIP      10.96.0.1        <none>        443/TCP                      8h
my-cluster-kafka-bootstrap        ClusterIP      10.103.67.179    <none>        9091/TCP,9092/TCP,9093/TCP   8h
my-cluster-kafka-brokers          ClusterIP      None             <none>        9091/TCP,9092/TCP,9093/TCP   8h
my-cluster-zookeeper-client       ClusterIP      10.101.161.217   <none>        2181/TCP                     8h
my-cluster-zookeeper-nodes        ClusterIP      None             <none>        2181/TCP,2888/TCP,3888/TCP   8h

In this specific case, kafka-tutorial-song-app port is 31834 and kafka-tutorial-song-indexer-app is 30581 but this is going to change every time you deploy these services.

Then you need the IP to connect. This IP is the minikube IP and it is known by running the following command:

minikube ip -p kafka
192.168.99.107

Open 2 terminal windows, one for populating songs, and another one to get the result from the indexer service.

In the terminal 1 run the following command:

http GET 192.168.99.107:30581/events --stream --timeout=600
HTTP/1.1 200 OK
Content-Type: text/event-stream
transfer-encoding: chunked

In the terminal 2 run the following command:

http POST 192.168.99.107:31834/songs id=107 name=Portals author='Alan Silvestri'

Inspect the output of terminal 2, to check that the song has been processed by indexer service.

data: {"author":"Alan Silvestri","id":107,"name":"Portals","op":"ADD"}

Clean Up

To clean the namespace run:

kubectl delete all --all