Banzai Cloud Logo Close
Home Benefits Blog Company Contact
Sign in
Author Balint Molnar

Kafka on Kubernetes - using etcd

TRY PIPELINE FOR FREE

At Banzai Cloud we are building a cloud agnostic, open source next generation CloudFoundry/Heroku-like PaaS, Pipeline, while running several big data workloads natively on Kubernetes. Apache Kafka is one of the cloud native workloads we support out-of-the-box, alongside Apache Spark and Apache Zeppelin.

If you’re interested in running big data workloads on Kubernetes, please read the following blog series as well.

Apache Kafka on Kubernetes series:
Kafka on Kubernetes with Local Persistent Volumes
Monitoring Apache Kafka with Prometheus

Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Apache Spark CI/CD workflow howto
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes
Collecting Spark History Server event logs in the cloud

Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive

Introduction

Update - we have opened the following KAFKA-6598 ticket to help get the community involved.

Apache Kafka, which was originally developed at LinkedIn, started as a distributed commit log. Since then, it has evolved into a distributed streaming platform.

It was open sourced back in 2011 and became popular extremely fast. It’s a simple and easy to use tool that keeps evolving, and has a vibrant community. One of the biggest headaches we hear Kafka users complaining about is the Zookeeper dependency, and the need to maintain a Zookeeper cluster. So what is Zookeeper?

According to its homepage, Zookeeper is “a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.” It’s useful when creating distributed applications, and that’s why Kafka relies on it. Zookeeper is based on the Zookeeper Atomic Broadcast protocol (ZAB), a consensus protocol which shares some key aspects with Paxos. For more details, take a look at this page.

So what’s all the fuss? Well, we’re not saying that ZK is bad, but…

  • Unlike Kafka, it does not have a large and vibrant community (merge those PR’s please, anyone?)
  • It uses a protocol which is hard to understand, and is hard to maintain in a large Zookeeper cluster
  • It’s a bit outdated, compared with, say, Raft
  • It’s written in Java (yes, this is a subjective problem, but this is a problem for us, because ZK is an infrastructure component)
  • We run everything in Kubernetes, and k8s has built-in Raft implementation by default, etcd
  • Linearizability (if you’ll forgive us minting a word) - check this comparison chart
  • Performance and inherent scalability issues
  • Client-side complexity and thick clients
  • Lack of service discovery

The list above might grow, but I think we’ve made our point. So we have an easy to use and maintain Kafka cluster and a Zookeeper cluster dependency, which is difficult to understand and hard to maintain. You may ask why the community hasn’t already gotten rid of this Zookeeper dependency, or at least made Kafka pluggable, so the end user can choose for themselves. Kafka was designed and built around Zookeeper so it’s really hard to just throw it away, but recently the community has put together a huge refactor around the Kafka code base that handles Zookeeper interactions, so factoring out Zookeeper has become easier. Thanks a lot!.

We’re happy to announce that we at Banzai Cloud have removed the Zookeeper dependency, and use etcd instead. Today, we’re open sourcing the code so we can all try, adjust, make or propose fixes, listen to the Apache Kafka community and eventually submit it as a PR upstream. The code is available on the Banzai Cloud GitHub repository.

Kafka on etcd

etcd is a “distributed reliable key-value store for the most critical data of a distributed system”. It uses the Raft consensus algorithm which was designed to be easy to understand, to scale, and to operate. The protocol and the etcd implementation were very quickly adopted by large distributed systems like Kubernetes, large distributed databases or messaging frameworks, where consensus and strong consistency is a must. It has a vibrant community and is easy to use: on GitHub, alone, over 500+ projects use it. It’s written in Golang so is not directly embeddable in Kafka, however, CoreOS released a project called jetcd, which allows us to interact with etcd from Scala\Java.

During the refactoring of Apache Kafka on etcd we faced several challenges. Despite etcd claims that it can replace Zookeeper completely, there are several approaches that differ significantly in Zookeeper. Please find below the biggest incongruities we faced during the integration.

etcd and Zookeeper differences

  • Zookeeper uses a so-called Znode, which can store information but can also have children nodes. Moreover, users can register watchers to a parent node, and Zookeeper will, for example, report on children nodes whenever they’re created. Even though every registry in etcd is just a key with value, users cannot register new keys as children keys, so, by default, it’s not possible to register a watcher on a parent key that will inform us when new children keys are created. Kafka relies heavily on Zookeeper, so we had to figure out ways of imitating its behavior with etcd. For child nodes, we simply concatenate their paths, so if Kafka wants to inject, for example, “id” under the “broker” node, we create a key in etcd as “/broker/id”. In order to solve the other problem mentioned above, we first check if the created node has a parent registry. If it has one and there is a registered watcher in it, then we register a new watcher to the newly created key/value.
  • Zookeeper can have nodes that contain no information, but etcd cannot create a key without value, so, until we figure out how to handle this in a nicer way, we’re putting in “[null]” strings.
  • Zookeeper has a so-called ephemeral node, which is used by Kafka to make liveliness checks with brokers. This node type requires frequent heartbeats from Kafka, otherwise Zookeeper deletes the node. etcd does not have ephemeral nodes but it has leases. If a lease goes unrenewed beyond a configurable time, then etcd deletes the key/value.
  • Recently, Kafka has started to use persistent sequential Zookeeper nodes. In interactions, these nodes will increase a counter in their names. In etcd we use the key/value-bounded counter, which also changes if an interaction occurs.
  • We use etcd transactions for put, get and exists operations but, unfortunately, jetcd contains a bug that affects all transaction operations. We blogged about this problem and its resolution quite some time ago.
  • We also put significant effort into a metastore. We’re introducing a new KafkaMetaStore trait, which allows the users to implement their own metastore for Kafka. But, keep in mind that, for now, Kafka depends so much on Zookeeper that it requires tremendous effort to remove all things Zookeeper-related. Our current solution is to map etcd code back to Zookeeper’s. We hope that the community will become engaged and help us to refactor this part of Kafka as well.

Try it out

Create a Kubernetes cluster

To try out Kafka we created a Kubernetes cluster on Microsoft Azure Managed Kubernetes, AKS with Pipeline. Just to recap, Pipeline can provision Kubernetes clusters across all major cloud providers and automate Helm deployments through a RESTful API. It also has a CI/CD component, in which cluster creates, artifact builds and deployments can be wired into a workflow.

If you’d like to use Pipeline to create Kubernetes clusters, please follow the following how-to. All the RESTful API calls are available through the following postman collection (e.g create a Kubernetes cluster and get the Kubernetes config).

Kafka Pipeline

Deploy the Kafka Helm charts to a Kubernetes cluster

The example below should work on any Kubernetes cluster, and it’s not tied to Pipeline. You can take the Helm chart from the Banzai Cloud charts repo.

Using the above mentioned postman collection, you can deploy the Kafka helm chart, by using Deployment Create with a modified body, which should look like this:

{"name": "banzaicloud-stable/kafka"}

You can check Kafka cluster creation by using kubectl get pods (remember, to properly set your kubecontext)

>kubectl get pods
NAME                                         READY     STATUS    RESTARTS   AGE
etcd-cluster-0000                            1/1       Running   0          3m
etcd-cluster-0001                            1/1       Running   0          3m
etcd-cluster-0002                            1/1       Running   0          3m
kafka-0                                      1/1       Running   0          4m
kafka-1                                      1/1       Running   0          2m
kafka-2                                      1/1       Running   0          1m
nosy-alpaca-etcd-operator-57f46478fd-dt5q8   1/1       Running   0          4m

Produce and Consume messages

At this point your Kafka cluster is only accessible inside the Kubernetes cluster, so you have to create a kafka-test pod with the following yaml:

Pipeline spotguides automate this process.

apiVersion: v1
kind: Pod
metadata:
  name: kafka-test
spec:
  containers:
  - name: kafka-test
    image: banzaicloud/kafka:2.12-1.2.0-etcd-0.0.1
    # Just spin & wait forever
    command: [ "/bin/bash", "-c", "--" ]
    args: [ "while true; do sleep 3000; done;" ]

This creates a simple pod which will be available when trying out Kafka (kubectl create -f kafka-test.yaml). The next Pipeline release will contain the Kafka spotguide as well, thus Kafka will become accessible from outside. Now, exec into this pod by using: kubectl exec -it kafka-test bash. Once you are inside the container, create a topic:

./bin/kafka-topics.sh --zookeeper etcd://etcd-cluster-client:2379 --create --topic kafka-test --partitions 1 --replication-factor 3
Created topic "kafka-test".

Once we’re done, we’ll produce some messages:

root@kafka-test:/opt/kafka# ./bin/kafka-console-producer.sh --broker-list bootstrap:9092 --topic kafka-test
>welcome
>kafka
>on
>etcd
>good
>you
>are
>here

Let’s consume these messages:

./bin/kafka-console-consumer.sh --bootstrap-server bootstrap:9092 --topic kafka-test --from-beginning
welcome
kafka
on
etcd
good
you
are
here

As you see all the messages arrived from the producer side.

Handling broker failures

Now we’re going to simulate a broker failure in the cluster. From etcd we can see that the broker with id 0 is the partition’s leader, and all other brokers are in sync:

/brokers/topics/kafka-test/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}

Kill broker with id 0, and check if we can still consume all the messages.

kubectl delete pod kafka-0
kubectl get pods
NAME                                         READY     STATUS        RESTARTS   AGE
etcd-cluster-0000                            1/1       Running       0          24m
etcd-cluster-0001                            1/1       Running       0          24m
etcd-cluster-0002                            1/1       Running       0          24m
kafka-0                                      1/1       Terminating   0          24m
kafka-1                                      1/1       Running       0          23m
kafka-2                                      1/1       Running       0          22m
kafka-test                                   1/1       Running       0          11m
nosy-alpaca-etcd-operator-57f46478fd-dt5q8   1/1       Running       0          24m

/brokers/topics/kafka-test/partitions/0/state
{"controller_epoch":2,"leader":1,"version":1,"leader_epoch":1,"isr":[1,2]}

./bin/kafka-console-consumer.sh --bootstrap-server bootstrap:9092 --topic kafka-test --from-beginning
welcome
kafka
on
etcd
good
you
are
here

/brokers/topics/kafka-test/partitions/0/state
{"controller_epoch":2,"leader":1,"version":1,"leader_epoch":1,"isr":[1,2,0]}

As you can see, leader election was successful, and all messages are consumable.

What’s next

After a code refactor, we’d like to contribute this code back to the Apache Kafka community and begin a conversation about improvements, future plans and changes. We’d like to give Kafka users the chance to choose whether they’d like to use Zookeeper or etcd. These are especially important for users who deploy Kafka to Kubernetes, like us.

There’s a list of unsupported features - about which we’d love to receive feedback. At Banzai Cloud all our workloads are cloud based (mostly managed by Kubernetes), and we rely heavily on cloud providers’ security features. Pipeline, k8s clusters internally, and interactions with third parties all use OAuth tokens that are stored/leased by Vault (for our internal security architecture read this post). This model is a little bit different from how Kafka currently deals with security, thus the unsupported features are:

  • ACL support
  • Kerberos (etcd does not support Kerberos)

We are running proof of concepts internally, however, we believe in the power of community and we know that these are considerably more difficult than the changes we’ve already made. We invite anybody interested to join the project; let’s make Kafka a first class citizen on Kubernetes.

So far, all our pilot users considered Kerberos overkill, and already use OAuth2 or OpenID, though they are all Web 2.0 (or 3.0?) companies, with their deployment primarily in the cloud

If you are interested in our technology and open source projects, follow us on GitHub, LinkedIn or Twitter:

Star

TRY PIPELINE FOR FREE

Comments

comments powered by Disqus