Create Topic 🔗︎

  1. Topic creation by default is enabled in Kafka, but if it is configured otherwise, you’ll need to create a topic first.

    You can use the KafkaTopic CRD to make a topic like this:

    cat << EOF | kubectl apply -n kafka -f -
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaTopic
    metadata:
      name: my-topic
    spec:
      clusterRef:
        name: kafka
      name: my-topic
      partitions: 1
      replicationFactor: 1
    EOF
    

    Note: The previous command will fail if the cluster has not finished provisioning.

  2. To create a sample topic from the CLI you can run the following:

    kubectl -n kafka run kafka-topics -it --image=banzaicloud/kafka:2.13-2.4.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-client.zookeeper:2181 --topic my-topic --create --partitions 1 --replication-factor 1
    

Send and receive messages without SSL within a cluster 🔗︎

You can use the following commands to send and receive messages within a Kubernetes cluster when SSL encryption is disabled for Kafka.

  • Produce messages:

    kubectl -n kafka run kafka-producer -it --image=banzaicloud/kafka:2.13-2.4.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-console-producer.sh --broker-list kafka-headless:29092 --topic my-topic
    
  • Consume messages:

    kubectl -n kafka run kafka-consumer -it --image=banzaicloud/kafka:2.13-2.4.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-headless:29092 --topic my-topic --from-beginning
    

Send and receive messages with SSL within a cluster 🔗︎

You can use the following procedure to send and receive messages within a Kubernetes cluster when SSL encryption is enabled for Kafka. To test a Kafka instance secured by SSL we recommend using Kafkacat.

To use the java client instead of Kafkacat, generate the proper truststore and keystore using the official docs.

  1. To use Kafka inside the cluster, create a Pod which contains Kafkacat.

  2. Create a kafka-test pod in the kafka namespace.

    kubectl create -n kafka -f - <<EOF
    apiVersion: v1
    kind: Pod
    metadata:
      name: kafka-test
    spec:
      containers:
      - name: kafka-test
        image: solsson/kafkacat
        # Just spin & wait forever
        command: [ "/bin/bash", "-c", "--" ]
        args: [ "while true; do sleep 3000; done;" ]
        volumeMounts:
        - name: sslcerts
          mountPath: "/ssl/certs"
      volumes:
      - name: sslcerts
        secret:
          secretName: kafka-operator-server-cert
    EOF
    
  3. Exec into the container:

    kubectl exec -it -n kafka kafka-test bash
    
  4. Produce some test messages.

    kafkacat -P -b kafka-headless:29092 -t my-topic \
    -X security.protocol=SSL \
    -X ssl.key.location=/ssl/certs/tls.key \
    -X ssl.certificate.location=/ssl/certs/tls.crt \
    -X ssl.ca.location=/ssl/certs/ca.crt
    
  5. Consume some messages. The following command will use the certificate provisioned with the cluster to connect to Kafka. If you’d like to create and use a different user, create a KafkaUser CR, for details, see the SSL documentation.

    kafkacat -C -b kafka-headless:29092 -t my-topic \
    -X security.protocol=SSL \
    -X ssl.key.location=/ssl/certs/tls.key \
    -X ssl.certificate.location=/ssl/certs/tls.crt \
    -X ssl.ca.location=/ssl/certs/ca.crt
    

Send and receive messages outside a cluster 🔗︎

Obtain the LoadBalancer address first by running the following command.

export SERVICE_IP=$(kubectl get svc --namespace kafka -o jsonpath="{.status.loadBalancer.ingress[0].ip}" envoy-loadbalancer)

echo $SERVICE_IP

export SERVICE_PORTS=($(kubectl get svc --namespace kafka -o jsonpath="{.spec.ports[*].port}" envoy-loadbalancer))

echo ${SERVICE_PORTS[@]}

# depending on the shell of your choice, arrays may be indexed starting from 0 or 1
export SERVICE_PORT=${SERVICE_PORTS[@]:0:1}

echo $SERVICE_PORT

SSL Disabled 🔗︎

  • Produce messages:

    kafka-console-producer.sh --broker-list $SERVICE_IP:$SERVICE_PORT --topic my-topic
    
  • Consume messages

    kafka-console-consumer.sh --bootstrap-server $SERVICE_IP:$SERVICE_PORT --topic my-topic --from-beginning
    

SSL Enabled 🔗︎

You can use the following procedure to send and receive messages outside a Kubernetes cluster when SSL encryption is enabled for Kafka. To test a Kafka instance secured by SSL we recommend using Kafkacat.

To use the java client instead of Kafkacat, generate the proper truststore and keystore using the official docs.

  1. Install Kafkacat.

    • MacOS:

      brew install kafkacat
      
    • Ubuntu:

      apt-get update
      apt-get install kafkacat
      
  2. Extract secrets from the given Kubernetes Secret:

    kubectl get secrets -n kafka kafka-operator-server-cert -o jsonpath="{['data']['\tls.crt']}" | base64 -D > client.crt.pem
    kubectl get secrets -n kafka kafka-operator-server-cert -o jsonpath="{['data']['\tls.key']}" | base64 -D > client.key.pem
    kubectl get secrets -n kafka kafka-operator-server-cert -o jsonpath="{['data']['\ca.crt']}" | base64 -D > ca.crt.pem
    
  3. Produce some test messages.

    kafkacat -b $SERVICE_IP:$SERVICE_PORT -P -X security.protocol=SSL \
    -X ssl.key.location=client.key.pem \
    -X ssl.certificate.location=client.crt.pem \
    -X ssl.ca.location=ca.crt.pem \
    -t my-topic
    
  4. Consume some messages.

    kafkacat -b $SERVICE_IP:$SERVICE_PORT -C -X security.protocol=SSL \
    -X ssl.key.location=client.key.pem \
    -X ssl.certificate.location=client.crt.pem \
    -X ssl.ca.location=ca.crt.pem \
    -t my-topic