The Koperator makes securing your Apache Kafka cluster with SSL simple.

Enable SSL encryption in Apache Kafka 🔗︎

To create an Apache Kafka cluster which has listener(s) with SSL encryption enabled, you must enable SSL encryption and configure the secrets in the listenersConfig section of your KafkaCluster Custom Resource. You can either provide your own CA certificate and the corresponding private key, or let the operator to create them for you from your cluster configuration. Using sslSecrets, Koperator generates client and server certificates signed using CA. The server certificate is shared across listeners. The client certificate is used by the Koperator , Cruise Control, and Cruise Control Metrics Reporter to communicate Kafka brokers using listener with SSL enabled.

Providing custom certificates per listener is supported from Koperator version 0.21.0+. Having configurations where certain external listeners use user provided certificates while others rely on the auto-generated ones provided by Koperator are also supported. See details below.

Using auto-generated certificates (ssLSecrets) 🔗︎

CAUTION:

After the cluster is created, you cannot change the way the listeners are configured without an outage. If a cluster is created with unencrypted (plain text) listener and you want to switch to SSL encrypted listeners (or the way around), you must manually delete each broker pod. The operator will restart the pods with the new listener configuration.

The following example enables SSL and automatically generates the certificates:

listenersConfig:
  externalListeners:
    - type: "ssl"
      name: "external"
      externalStartingPort: 19090
      containerPort: 29092
  internalListeners:
    - type: "ssl"
      name: "internal"
      containerPort: 29092
      usedForInnerBrokerCommunication: true
    - type: "ssl"
      name: "controller"
      containerPort: 29093
      usedForInnerBrokerCommunication: false
      usedForControllerCommunication: true
  sslSecrets:
    tlsSecretName: "test-kafka-operator"
    jksPasswordName: "test-kafka-operator-pass"
    create: true

If sslSecrets.create is false, the operator will look for the secret at sslSecrets.tlsSecretName and expect these values:

Key Value
caCert The CA certificate
caKey The CA private key

Using own certificates 🔗︎

Listeners not used for internal broker communication 🔗︎

In this KafkaCluster custom resource, SSL is enabled for all listeners, and certificates are automatically generated for “internal” and “controller” listeners. The “external” and “internal” listeners will use the user-provided certificates. The serverSSLCertSecret key is a reference to the Kubernetes secret that contains the server certificate for the listener to be used for SSL communication.

In the server secret the following keys must be set:

Key Value
keystore.jks Certificate and private key in JKS format
truststore.jks Trusted CA certificate in JKS format
password Password for the key and trust store

Koperator using JKS format based certificate for listener config.

Listeners used for internal broker communication 🔗︎

In this KafkaCluster custom resource, SSL is enabled for all listeners, and user-provided certificates are used. In that case, when a custom certificate is used for a listener which is used for internal broker communication, you must also specify the client certificate. The client certificate will be used by Koperator , Cruise Control, Cruise Control Metrics Reporter to communicate on SSL. The clientSSLCertSecret key is a reference to the Kubernetes secret where the custom client SSL certificate can be provided. Client secret data keys must be the same as the server secret. The client certificate must be signed by the same CA authority as the server certificate for the corresponding listener. The clientSSLCertSecret has to be in the KafkaCluster custom resource spec field.

Generate JKS certificate 🔗︎

Certificates in JKS format can be generated using OpenSSL and keystore applications. You can also use this script.

Kafka listeners use 2-way-SSL mutual authentication, so you must properly set the CNAME (Common Name) fields and if needed the SAN (Subject Alternative Name) fields in the certificates. In the following description we assume that the Kafka cluster is in the kafka namespace.

  • For the client certificate, CNAME must be “kafka-controller.kafka.mgt.cluster.local” (where .kafka. is the namespace of the kafka cluster).

  • For internal listeners which are exposed by a headless service (kafka-headless), CNAME must be “kafka-headless.kafka.svc.cluster.local”, and the SAN field must contain the following:

    • *.kafka-headless.kafka.svc.cluster.local
    • kafka-headless.kafka.svc.cluster.local
    • *.kafka-headless.kafka.svc
    • kafka-headless.kafka.svc
    • *.kafka-headless.kafka
    • kafka-headless.kafka
    • kafka-headless
  • For internal listeners which are exposed by a normal service (kafka-all-broker), CNAME must be “kafka-all-broker.kafka.svc.cluster.local”

  • For external listeners, you need to use the advertised load balancer hostname as CNAME. The hostname need to be specified in the KafkaCluster custom resource with hostnameOverride, and the accessMethod has to be “LoadBalancer”. For details about this override, see Step 5 in Expose cluster using a LoadBalancer.

Using Kafka ACLs with SSL 🔗︎

Note: Koperator provides only basic ACL support. For a more complete and robust solution, consider using the Streaming Data Manager product.

The Koperator is a core part of Banzai Cloud Supertubes that helps you create production-ready Apache Kafka cluster on Kubernetes, with scaling, rebalancing, and alerts based self healing. While the Koperator itself is an open-source project, the Banzai Cloud Supertubes product extends the functionality of the Koperator with commercial features (for example, declarative ACL handling, built-in monitoring, and multiple ways of disaster recovery). Read a detailed comparison of Supertubes and the Koperator .

If you choose not to enable ACLs for your Apache Kafka cluster, you may still use the KafkaUser resource to create new certificates for your applications. You can leave the topicGrants out as they will not have any effect.

  1. To enable ACL support for your Apache Kafka cluster, pass the following configurations along with your brokerConfig:

    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    allow.everyone.if.no.acl.found=false
    
  2. The operator will ensure that cruise control and itself can still access the cluster, however, to create new clients you will need to generate new certificates signed by the CA, and ensure ACLs on the topic. The operator can automate this process for you using the KafkaUser CRD. For example, to create a new producer for the topic test-topic against the KafkaCluster kafka, apply the following configuration:

    cat << EOF | kubectl apply -n kafka -f -
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaUser
    metadata:
      name: example-producer
      namespace: kafka
    spec:
      clusterRef:
        name: kafka
      secretName: example-producer-secret
      topicGrants:
        - topicName: test-topic
          accessType: write
    EOF
    

    This will create a user and store its credentials in the secret example-producer-secret. The secret contains these fields:

    Key Value
    ca.crt The CA certificate
    tls.crt The user certificate
    tls.key The user private key
  3. You can then mount these secrets to your pod. Alternatively, you can write them to your local machine by running:

    kubectl get secret example-producer-secret -o jsonpath="{['data']['ca\.crt']}" | base64 -d > ca.crt
    kubectl get secret example-producer-secret -o jsonpath="{['data']['tls\.crt']}" | base64 -d > tls.crt
    kubectl get secret example-producer-secret -o jsonpath="{['data']['tls\.key']}" | base64 -d > tls.key
    
  4. To create a consumer for the topic, run this command:

    cat << EOF | kubectl apply -n kafka -f -
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaUser
    metadata:
      name: example-consumer
      namespace: kafka
    spec:
      clusterRef:
        name: kafka
      secretName: example-consumer-secret
      includeJKS: true
      topicGrants:
        - topicName: test-topic
          accessType: read
    EOF
    
  5. The operator can also include a Java keystore format (JKS) with your user secret if you’d like. Add includeJKS: true to the spec like shown above, and then the user-secret will gain these additional fields:

    Key Value
    tls.jks The java keystore containing both the user keys and the CA (use this for your keystore AND truststore)
    pass.txt The password to decrypt the JKS (this will be randomly generated)