Create Topic 🔗︎

Topic creation by default is enabled in Apache Kafka, but if it is configured otherwise, you’ll need to create a topic first.

  • You can use the KafkaTopic CRD to create a topic called my-topic like this:

    cat << EOF | kubectl apply -n kafka -f -
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaTopic
    metadata:
        name: my-topic
    spec:
        clusterRef:
            name: kafka
        name: my-topic
        partitions: 1
        replicationFactor: 1
        config:
            "retention.ms": "604800000"
            "cleanup.policy": "delete"
    EOF

    Note: The previous command will fail if the cluster has not finished provisioning.

  • To create a sample topic from the CLI you can run the following:

    kubectl -n kafka run kafka-topics -it --image=banzaicloud/kafka:2.13-2.4.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-client.zookeeper:2181 --topic my-topic --create --partitions 1 --replication-factor 1
    

After you have created a topic, produce and consume some messages:

Send and receive messages without SSL within a cluster 🔗︎

You can use the following commands to send and receive messages within a Kubernetes cluster when SSL encryption is disabled for Kafka.

  • Produce messages:

    kubectl -n kafka run kafka-producer -it --image=banzaicloud/kafka:2.13-2.4.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server kafka-headless:29092 --topic my-topic
    

    And type some test messages.

  • Consume messages:

    kubectl -n kafka run kafka-consumer -it --image=banzaicloud/kafka:2.13-2.4.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-headless:29092 --topic my-topic --from-beginning
    

    You should see the messages you have created.

Send and receive messages with SSL within a cluster 🔗︎

You can use the following procedure to send and receive messages within a Kubernetes cluster when SSL encryption is enabled for Kafka. To test a Kafka instance secured by SSL we recommend using Kafkacat.

To use the java client instead of Kafkacat, generate the proper truststore and keystore using the official docs.

  1. Create a Kafka user. The client will use this user account to access Kafka. You can use the KafkaUser custom resource to customize the access rights as needed. For example:

    kubectl create -n kafka -f - <<EOF
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaUser
    metadata:
      name: example-kafkauser
      namespace: kafka
    spec:
      clusterRef:
        name: kafka
      secretName: example-kafkauser-secret
    EOF
  2. To use Kafka inside the cluster, create a Pod which contains Kafkacat. Create a kafka-test pod in the kafka namespace. Note that the value of the secretName parameter must be the same as you used when creating the KafkaUser resource, for example, example-kafkauser-secret.

    kubectl create -n kafka -f - <<EOF
    apiVersion: v1
    kind: Pod
    metadata:
      name: kafka-test
    spec:
      containers:
      - name: kafka-test
        image: solsson/kafkacat
        # Just spin & wait forever
        command: [ "/bin/sh", "-c", "--" ]
        args: [ "while true; do sleep 3000; done;" ]
        volumeMounts:
        - name: sslcerts
          mountPath: "/ssl/certs"
      volumes:
      - name: sslcerts
        secret:
          secretName: example-kafkauser-secret
    EOF
  3. Wait until the pod is created, then exec into the container:

    kubectl exec -it -n kafka kafka-test -- sh
    
  4. Run the following command to check that you can connect to the brokers.

    kafkacat -L -b kafka-headless:29092 -X security.protocol=SSL -X ssl.key.location=/ssl/certs/tls.key -X ssl.certificate.location=/ssl/certs/tls.crt -X ssl.ca.location=/ssl/certs/ca.crt
    

    The first line of the output should indicate that the communication is encrypted, for example:

    Metadata for all topics (from broker -1: ssl://kafka-headless:29092/bootstrap):
    
  5. Produce some test messages. Run:

    kafkacat -P -b kafka-headless:29092 -t my-topic \
    -X security.protocol=SSL \
    -X ssl.key.location=/ssl/certs/tls.key \
    -X ssl.certificate.location=/ssl/certs/tls.crt \
    -X ssl.ca.location=/ssl/certs/ca.crt
    

    And type some test messages.

  6. Consume some messages. The following command will use the certificate provisioned with the cluster to connect to Kafka. If you’d like to create and use a different user, create a KafkaUser CR, for details, see the SSL documentation.

    kafkacat -C -b kafka-headless:29092 -t my-topic \
    -X security.protocol=SSL \
    -X ssl.key.location=/ssl/certs/tls.key \
    -X ssl.certificate.location=/ssl/certs/tls.crt \
    -X ssl.ca.location=/ssl/certs/ca.crt
    

    You should see the messages you have created.

Send and receive messages outside a cluster 🔗︎

Prerequisites 🔗︎

  1. Producers and consumers that are not in the same Kubernetes cluster can access the Kafka cluster only if an external listener is configured in your KafkaCluster CR. Check that the listenersConfig.externalListeners section exists in the KafkaCluster CR.

  2. Obtain the external address and port number of the cluster by running the following commands.

    • If the external listener uses a LoadBalancer:

      export SERVICE_IP=$(kubectl get svc --namespace kafka -o jsonpath="{.status.loadBalancer.ingress[0].hostname}" envoy-loadbalancer-external-kafka); echo $SERVICE_IP
      
      export SERVICE_PORTS=($(kubectl get svc --namespace kafka -o jsonpath="{.spec.ports[*].port}" envoy-loadbalancer-external-kafka)); echo ${SERVICE_PORTS[@]}
      
      # depending on the shell you are using, arrays may be indexed starting from 0 or 1
      export SERVICE_PORT=${SERVICE_PORTS[@]:0:1}; echo $SERVICE_PORT
      
  3. If the external listener of your Kafka cluster accepts encrypted connections, proceed to SSL enabled. Otherwise, proceed to SSL disabled.

SSL disabled 🔗︎

  1. Produce some test messages on the the external client.

    • If you have Kafkacat installed, run:

      kafkacat -P -b $SERVICE_IP:$SERVICE_PORT -t my-topic
      
    • If you have the Java Kafka client installed, run:

      kafka-console-producer.sh --bootstrap-server $SERVICE_IP:$SERVICE_PORT --topic my-topic
      

    And type some test messages.

  2. Consume some messages.

    • If you have Kafkacat installed, run:

      kafkacat -C -b $SERVICE_IP:$SERVICE_PORT -t my-topic
      
    • If you have the Java Kafka client installed, run:

      kafka-console-consumer.sh --bootstrap-server $SERVICE_IP:$SERVICE_PORT --topic my-topic --from-beginning
      

    You should see the messages you have created.

SSL enabled 🔗︎

You can use the following procedure to send and receive messages from an external host that is outside a Kubernetes cluster when SSL encryption is enabled for Kafka. To test a Kafka instance secured by SSL we recommend using Kafkacat.

To use the java client instead of Kafkacat, generate the proper truststore and keystore using the official docs.

  1. Install Kafkacat.

    • MacOS:

      brew install kafkacat
      
    • Ubuntu:

      apt-get update
      apt-get install kafkacat
      
  2. Connect to the Kubernetes cluster that runs your Kafka deployment.

  3. Create a Kafka user. The client will use this user account to access Kafka. You can use the KafkaUser custom resource to customize the access rights as needed. For example:

    kubectl create -n kafka -f - <<EOF
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaUser
    metadata:
      name: example-kafkauser
      namespace: kafka
    spec:
      clusterRef:
        name: kafka
      secretName: example-kafkauser-secret
    EOF
  4. Download the certificate and the key of the user, and the CA certificate used to verify the certificate of the Kafka server. These are available in the Kubernetes Secret created for the KafkaUser.

    kubectl get secrets -n kafka <name-of-the-user-secret> -o jsonpath="{['data']['tls\.crt']}" | base64 -D > client.crt.pem
    kubectl get secrets -n kafka <name-of-the-user-secret> -o jsonpath="{['data']['tls\.key']}" | base64 -D > client.key.pem
    kubectl get secrets -n kafka <name-of-the-user-secret> -o jsonpath="{['data']['ca\.crt']}" | base64 -D > ca.crt.pem
    
  5. Copy the downloaded certificates to a location that is accessible to the external host.

  6. If you haven’t done so already, obtain the external address and port number of the cluster.

  7. Produce some test messages on the host that is outside your cluster.

    kafkacat -b $SERVICE_IP:$SERVICE_PORT -P -X security.protocol=SSL \
    -X ssl.key.location=client.key.pem \
    -X ssl.certificate.location=client.crt.pem \
    -X ssl.ca.location=ca.crt.pem \
    -t my-topic
    

    And type some test messages.

  8. Consume some messages.

    kafkacat -b $SERVICE_IP:$SERVICE_PORT -C -X security.protocol=SSL \
    -X ssl.key.location=client.key.pem \
    -X ssl.certificate.location=client.crt.pem \
    -X ssl.ca.location=ca.crt.pem \
    -t my-topic
    

    You should see the messages you have created.