Banzai Cloud Logo Close
Home Products Benefits Blog Company Contact
Get Started
Author Toader Sebastian

Kafka ACLs on Kubernetes over Istio mTLS

Running Kafka on Istio with mTLS is, in of itself, an interesting topic, but before we can talk about how Banzai Cloud's Supertubes allows us to do that, let's take a step back and look at how SSL works in Kafka. Maybe then we can answer the question, why do we need Kafka in Istio with mTLS at all?

Supertubes is Banzai Cloud's Kafka as a Service, which runs on Kubernetes inside an Istio service mesh. In this setup, each Kafka broker has an Envoy Proxy injected as a sidecar container, forming the data plane. The lifecycle and configuration of the data plane are managed by Istio, which is the control plane shown in the Supertubes architecture diagram.

While there are different ways to authenticate a client application (SSL, SASL), in this post we look only at client authentication through SSL in Kafka. When Kafka brokers are configured to use 2-way SSL, client applications are authenticated by the subject field of the same client certificate they present when connecting to a broker. This subject field is then mapped to a Kafka user according to SSL principal mapping rules. By this method we can ensure encrypted communication with brokers, as well as authentication and authorization of the client application.

Configuring SSL for brokers consists of the following steps:

  1. Obtain the private key and public certificates for each broker that is signed by the same CA, or has a common root certificate.
  2. Add the private key and public certificate to the keystore and truststore for each broker.
  3. Add listener(s) with SSL protocol to the broker configuration.

Best practice dictates that we use short-lived certificates and renew them when they expire, then push down the renewed certificates to brokers. While Kafka brokers can pick up renewed certificates dynamically, the same cannot be said of all the accompanying applications that make up the eco-system. Those applications have to be restarted to become aware of renewed certificates. Istio can save us from all that by adding mTLS at the network layer to all interactions between the services in an Istio mesh.

This is just one of the benefits of running Kafka on Istio. If you're interested in learning more about the enormous untapped potential of Kafka on Istio, check out our The benefits of integrating Apache Kafka with Istio post.

The point we're making here is that we don't have to bother with configuring the applications to use TLS when mTLS is provided by Istio. From Kafka's perspective, this means that no keystore and trustore files need be created, no SSL configuration need be completed, and listeners can be configured in PLAINTEXT protocol.

As you might have guessed, there's a problem here. If Kafka communicates in PLAINTEXT mode and the client applications are authenticated as ANONYMOUS, then Kafka can't determine which ACL rule is in effect for the client application since it doesn't know its identity.

Authorization in Kafka relies on two main building blocks, Authorizer and Principal.

An authorizer controls whether or not to authorize an operation based on the principal and the resources being accessed. Kafka ships with a default authorizer implementation called kafka.security.authorizer.AclAuthorizer which stores Kafka ACL information in ZooKeeper and controls what operation can be executed by principals on Kafka resources.

If you are using Kafka that predates 2.4, then the default authorizer is kafka.security.auth.SimpleAclAuthorizer.

To enable client authorization, pass the fully qualified name of the authorizer implementation into the authorizer.class.name Kafka configuration property:

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

Principal is the identity of an authenticated client, which is determined based on the protocol of the listener the client is connected to.

By default, Kafka communicates in PLAINTEXT mode, which means that inbound and outbound broker traffic is unencrypted, and the client application is authenticated as an ANONYMOUS Kafka principal. When a client is identified as ANONYMOUS, Kafka cannot enforce any ACLs; it doesn't know which Kafka user or group the client application is acting on behalf of. Thus, Kafka rejects the client application unless allow.everyone.if.no.acl.found has been set to true.

In the following section we'll describe how Supertubes solves this problem.

Check out Supertubes in action on your own clusters:

curl https://getsupertubes.sh | sh
supertubes install -a --no-demo-cluster --kubeconfig <path-to-k8s-cluster-kubeconfig-file>

or read the documentation for details.

Take a look at some of the Kafka features that we've automated and simplified through Supertubes and the Kafka operator, which we've already blogged about:

Client application identity 🔗︎

When Kafka runs in PLAINTEXT mode, no SSL handshake can occur, so the authentication flow is not executed within Kafka. We know that all the traffic to/from a Kafka broker goes through the Envoy Proxy, which is deployed as a sidecar container by Istio. To solve this problem, we just have to make sure that the Envoy Proxy is smart enough to extract the identity of the client application and pass it to Kafka. The Envoy Proxy is not capable of performing this by default, but its capabilities can be extended through pluggable WASM filters for Envoy. Supertubes deploys our Kafka ACL WASM filter for Kafka brokers, which reads the client certificate information that comes with mTLS traffic, and extracts the field that identifies the client application. The filter enriches the stream that targets Kafka with the extracted client identity to pass it on to Kafka.

Client applications running outside the Istio mesh 🔗︎

When a client application connects from outside the Istio mesh, the value of the subject field (extracted from the certificate of the client application) is used as the application's identity.

The following diagram illustrates how a client application connects from outside the Istio mesh flow: Client application outside of Istio mesh

When the client application is external to the Kubernetes cluster, this flow differs from the above in that the traffic from the application passes through a LoadBalancer and Ingress gateway, as shown here: External client application

Client applications running inside the Istio mesh 🔗︎

When client applications that connect to Kafka run in the same Istio mesh as Kafka, they don't need to send certificates. Istio provides one for them out-of-the-box. This certificate is special in that it carries information about the namespace and the service account of the application. The filter collects this information from the certificate and uses CN=<namespace>-<service-account-name> as the application's identity.

Note that this simplifies things even further, as no certificate has to be created for the client application; we simply deploy the client application into the same Istio mesh as the Kafka cluster and it gets automatically identified by the service account it runs with.

In upcoming Supertubes releases, an even more simplified flow will be supported. KafkaUser custom resources will be created automatically, based on the access rights granted to the KafkaTopic custom resources for service accounts. This will allow users to define access rights to Kafka topics through the Kubernetes RBAC.

The following diagram illustrates how a client application connects from inside Istio's mesh flow: Client application outside of Istio mesh

Extended Kafka authentication in PLAINTEXT mode 🔗︎

Kafka authenticates client applications when it establishes a connection to a broker. As already mentioned, the authentication flow is not exercised in PLAINTEXT mode. Unfortunately, the authentication mechanism is not pluggable, so we have to extend it to authenticate client applications. It will check if there is any application identity passed in by the Envoy Proxy filter and add that to the authentication context. PrincipalBuilder's implementation can get access to this information via:

org.apache.kafka.common.security.auth.banzaicloud.AuthenticationContextWithOptionalAuthId

authentication context object.

Try it out 🔗︎

Let's explore how Kafka ACLs with Istio mTLS work through an example. We're going to need a Kubernetes cluster (version 1.15 and above), with at least 8 vCPU and 12 GB of memory, and with the capability to provision LoadBalancer Kubernetes services.

Deploy the Kafka demo cluster with Supertubes 🔗︎

Download and install the Supertubes CLI, and run the following command to spin up the necessary ecosystem (Supertubes, Istio, Zookeeper, etc) alongside a demo Kafka cluster:

supertubes install -a --kubeconfig <path-to-kubeconfig-file>

Verify that the deployed Kafka cluster is up and running:

supertubes cluster get --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file>

Namespace  Name   State           Image                               Alerts  Cruise Control Topic Status  Rolling Upgrade Errors  Rolling Upgrade Last Success
kafka      kafka  ClusterRunning  banzaicloud/kafka:2.13-2.5.0-bzc.1  0       CruiseControlTopicReady      0

Enable ACLs and configure an external listener 🔗︎

The deployed Kafka cluster has no ACLs and external access is not enabled by default, so we enable it by applying the following change:

supertubes cluster update --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file> -f -<<EOF
apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
spec:
  ingressController: "istioingress"
  istioIngressConfig:
    gatewayConfig:
      mode: PASSTHROUGH
  readOnlyConfig: |
    auto.create.topics.enable=false
    offsets.topic.replication.factor=2
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    allow.everyone.if.no.acl.found=false
  listenersConfig:
    externalListeners:
      - type: "plaintext"
        name: "external"
        externalStartingPort: 19090
        containerPort: 9094
EOF

This update will re-configure the Kafka cluster in such a way that it receives rolling updates, which should be reflected in the state of the cluster.

supertubes cluster get --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file>

Namespace  Name   State                    Image                               Alerts  Cruise Control Topic Status  Rolling Upgrade Errors  Rolling Upgrade Last Success
kafka      kafka  ClusterRollingUpgrading  banzaicloud/kafka:2.13-2.5.0-bzc.1  0       CruiseControlTopicReady      0

Wait until the re-configuration is finished and the cluster is in the ClusterRunning state. This will take a while, as the rolling upgrade applies changes on a broker-by-broker basis.

At this stage, we have a Kafka cluster with all listeners configured with PLAINTEXT protocol, and no SSL configured and mTLS provided by Istio. We can quickly verify that mTLS is used by trying to connect with openssl with no certificate and seeing if a server certificate is returned and if the SSL handshake fails:

kubectl run openssl --image=frapsoft/openssl --restart=Never -it --rm -- s_client -connect kafka-all-broker.kafka.svc.cluster.local:29092

Now let's create an example-topic and an another-example-topic Kafka topic, which we'll use to verify whether the ACLs are working as expected:

kubectl apply -f -<<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
  name: example-topic
  namespace: kafka
spec:
  clusterRef:
    name: kafka
  name: example-topic
  partitions: 3
  replicationFactor: 2
  config:
    "retention.ms": "604800000"
    "cleanup.policy": "delete"
EOF


kubectl apply -f -<<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
  name: another-example-topic
  namespace: kafka
spec:
  clusterRef:
    name: kafka
  name: another-example-topic
  partitions: 3
  replicationFactor: 2
  config:
    "retention.ms": "604800000"
    "cleanup.policy": "delete"
EOF

Client application types 🔗︎

We distinguish between three different client types based on where they run:

  1. Client applications which reside inside the same Istio mesh as the Kafka cluster
  2. Client applications which reside on the same Kubernetes cluster as the Kafka cluster but outside of the Istio mesh
  3. Client applications which reside outside the Kubernetes cluster

We'll look at each of these separately.

Client applications inside an Istio mesh 🔗︎

In this scenario, the client application is identified as CN=<namespace>-<service-account-name>. We use kafkacat as the client application which we deploy into the same namespace as Kafka to ensure that they run in the same Istio mesh.

kubectl create -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
  namespace: kafka
  name: kafka-client
spec:
  containers:
  - name: kafka-client
    image: solsson/kafkacat:alpine
    # Just spin & wait forever
    command: [ "/bin/bash", "-c", "--" ]
    args: [ "while true; do sleep 3000; done;" ]
EOF

Now let's try to list the topics using this client application:

kubectl exec -it -n kafka kafka-client bash
kafkacat -L -b kafka-all-broker:29092

Metadata for all topics (from broker -1: kafka-all-broker:29092/bootstrap):
 2 brokers:
  broker 0 at kafka-0.kafka.svc.cluster.local:29092 (controller)
  broker 1 at kafka-1.kafka.svc.cluster.local:29092
 0 topics:

The client application retrieved 0 topics. This is because the (principal) user the application is identified as doesn't exist in Kafka. Since we deployed kafkacat into the kafka namespace with the default service account, it's identified as CN=kafka-default. Let's create a Kafka user kafka-default with access to only the example-topic topic.

kubectl create -f -<<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaUser
metadata:
  name: kafka-default
  namespace: kafka
spec:
  clusterRef:
    name: kafka
  secretName: ""
  createCert: false
  topicGrants:
    - topicName: example-topic
      accessType: read
    - topicName: example-topic
      accessType: write
EOF

Now let's repeat the topic list operation:

kafkacat -L -b kafka-all-broker:29092

Metadata for all topics (from broker -1: kafka-all-broker:29092/bootstrap):
 2 brokers:
  broker 0 at kafka-0.kafka.svc.cluster.local:29092 (controller)
  broker 1 at kafka-1.kafka.svc.cluster.local:29092
 1 topics:
  topic "example-topic" with 3 partitions:
    partition 0, leader 0, replicas: 0,1, isrs: 0,1
    partition 1, leader 1, replicas: 1,0, isrs: 0,1
    partition 2, leader 0, replicas: 0,1, isrs: 0,1

As you can see, the client application was able to list the only topic, example-topic, which it has access to. It is important to emphasize that we executed kafkacat without the need to configure a client certificate to identify the application. The application was automatically authenticated and authorized by its service account.

Client applications outside an Istio mesh 🔗︎

When the client application resides outside the Istio mesh, the client application has to identify itself by presenting a client certificate. On the client application side things are more complex than in the previous scenario.

We'll use cert-manager to issue client certificates which will represent the client application.

kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v0.11.0/cert-manager.yaml

We need to specify a cluster issuer for cert-manager that has the same CA or root certificate as the Istio mesh, otherwise, the application's client certificate won't be valid for the mTLS enforced by Istio.

Get the CA certificate used by Istio:

kubectl get secrets -n istio-system istio-ca-secret -o yaml

Since this secret has different fields than what cert-manager expects, we need to create a new secret from this in a format that works for cert-manager.

kubectl create -f - <<EOF
apiVersion: v1
kind: Secret
metadata:
  name: ca-key-pair
  namespace: cert-manager
data:
  tls.crt: <tls-crt-from-istio-ca-secret>
  tls.key: <your-tls-key-from-istio-ca-secret>
EOF


kubectl create -f - <<EOF
apiVersion: cert-manager.io/v1alpha2
kind: ClusterIssuer
metadata:
  name: ca-issuer
  namespace: cert-manager
spec:
  ca:
    secretName: ca-key-pair
EOF

In the next step, we'll create a Kafka user that we'll use so the client application can identify itself. Also, we'll grant this user access to only the example-topic topic, just as before.

kubectl create -f - <<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaUser
metadata:
  name: external-kafkauser
  namespace: default
spec:
  clusterRef:
    name: kafka
    namespace: kafka
  secretName: external-kafkauser-secret
  pkiBackendSpec:
    pkiBackend: "cert-manager"
    issuerRef:
      name: "ca-issuer"
      kind: "ClusterIssuer"
  topicGrants:
    - topicName: example-topic
      accessType: read
    - topicName: example-topic
      accessType: write
EOF

Now deploy the client application kafkacat into the default namespace, which is outside the Istio mesh.

kubectl create -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
  name: external-kafka-client
  namespace: default
spec:
  containers:
  - name: external-kafka-client
    image: solsson/kafkacat:alpine
    # Just spin & wait forever
    command: [ "/bin/bash", "-c", "--" ]
    args: [ "while true; do sleep 3000; done;" ]
    volumeMounts:
    - name: sslcerts
      mountPath: "/ssl/certs"
  volumes:
  - name: sslcerts
    secret:
      secretName: external-kafkauser-secret
EOF

Now let's try to list the topics using this client application:

kubectl exec -it external-kafka-client bash

kafkacat -L -b kafka-all-broker.kafka:29092
% ERROR: Failed to acquire metadata: Local: Timed out

What we see is that the client application cannot connect to any of the brokers. This is because the client application didn't present a client certificate, which is required due to Istio enforced mTLS.

Let's retry the command but this time provide the certificate that represents the previously created external-kafkauser Kafka user.

kafkacat -L -b kafka-all-broker.kafka:29092 -X security.protocol=SSL -X ssl.key.location=/ssl/certs/tls.key -X ssl.certificate.location=/ssl/certs/tls.crt -X ssl.ca.location=/ssl/certs/ca.crt

Metadata for all topics (from broker -1: ssl://kafka-all-broker.kafka:29092/bootstrap):
 2 brokers:
  broker 0 at kafka-0.kafka.svc.cluster.local:29092 (controller)
  broker 1 at kafka-1.kafka.svc.cluster.local:29092
 1 topics:
  topic "example-topic" with 3 partitions:
    partition 0, leader 0, replicas: 0,1, isrs: 0,1
    partition 1, leader 1, replicas: 1,0, isrs: 0,1
    partition 2, leader 0, replicas: 0,1, isrs: 0,1

As expected, the client application was able to list only the topic it has access to, example-topic.

Client applications outside the Kubernetes cluster 🔗︎

This scenario is much the same as the previous one, the only difference being that the client application connects to the endpoint bound to the Kafka cluster's external listener.

To get the endpoint bound to the external listener of the Kafka cluster run:

kubectl get svc -n kafka kafka-meshgateway

NAME                TYPE           CLUSTER-IP     EXTERNAL-IP                                                                PORT(S)
kafka-meshgateway   LoadBalancer   10.10.44.209   aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com   19090:32480/TCP,19091:30972/TCP,29092:30748/TCP
kafkacat -L -b aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092
% ERROR: Failed to acquire metadata: Local: Timed out

Unsurprisingly, the client application was unable to connect to the Kafka cluster, since it didn't present a client certificate.

Now let's run it again with the client certificate stored in external-kafkauser-secret which represents Kafka user external-kafkauser:

kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "ca.crt"}}' | base64 -D > /var/tmp/ca.crt
kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.crt"}}' | base64 -D > /var/tmp/tls.crt
kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.key"}}' | base64 -D > /var/tmp/tls.key

kafkacat -L -b aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092 -X security.protocol=SSL -X ssl.key.location=/var/tmp/tls.key -X ssl.certificate.location=/var/tmp/tls.crt -X ssl.ca.location=/var/tmp/ca.crt

Metadata for all topics (from broker -1: ssl://aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092/bootstrap):
 2 brokers:
  broker 0 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19090 (controller)
  broker 1 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19091
 1 topics:
  topic "example-topic" with 3 partitions:
    partition 0, leader 0, replicas: 0,1, isrs: 0,1
    partition 1, leader 1, replicas: 1,0, isrs: 0,1
    partition 2, leader 0, replicas: 0,1, isrs: 0,1

Kafka 2.5 is out 🔗︎

Kafka 2.5 has recently been released, which Banzai Cloud's Supertubes fully supports. Those of you who are still on Kafka 2.4 can run the following command to upgrade your Kafka cluster to version 2.5:

supertubes cluster update --kafka-image "banzaicloud/kafka:2.13-2.5.0-bzc.1"

Conclusion 🔗︎

The case we described in this post shows the power of integrating applications with Istio through the use of pluggable WASM filters for Envoy. If we put this in Kafka's context, we believe that we can provide easy to use, well-integrated solutions to complex business problems and requirements that would otherwise be very difficult to implement or prohibitively cumbersome.

Never miss a post again!
Request a Supertubes demo

If you are interested in our technology and open source projects, follow us on GitHub, LinkedIn, or Twitter, or get in touch on Slack: