Create Topic 🔗︎
-
Topic creation by default is enabled in Kafka, but if it is configured otherwise, you’ll need to create a topic first.
You can use the
KafkaTopic
CRD to make a topic like this:cat << EOF | kubectl apply -n kafka -f - apiVersion: kafka.banzaicloud.io/v1alpha1 kind: KafkaTopic metadata: name: my-topic spec: clusterRef: name: kafka name: my-topic partitions: 1 replicationFactor: 1 EOF
Note: The previous command will fail if the cluster has not finished provisioning.
-
To create a sample topic from the CLI you can run the following:
kubectl -n kafka run kafka-topics -it --image=banzaicloud/kafka:2.13-2.4.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper-client.zookeeper:2181 --topic my-topic --create --partitions 1 --replication-factor 1
Send and receive messages without SSL within a cluster 🔗︎
You can use the following commands to send and receive messages within a Kubernetes cluster when SSL encryption is disabled for Kafka.
-
Produce messages:
kubectl -n kafka run kafka-producer -it --image=banzaicloud/kafka:2.13-2.4.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-console-producer.sh --broker-list kafka-headless:29092 --topic my-topic
-
Consume messages:
kubectl -n kafka run kafka-consumer -it --image=banzaicloud/kafka:2.13-2.4.0 --rm=true --restart=Never -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-headless:29092 --topic my-topic --from-beginning
Send and receive messages with SSL within a cluster 🔗︎
You can use the following procedure to send and receive messages within a Kubernetes cluster when SSL encryption is enabled for Kafka. To test a Kafka instance secured by SSL we recommend using Kafkacat.
To use the java client instead of Kafkacat, generate the proper truststore and keystore using the official docs.
-
To use Kafka inside the cluster, create a Pod which contains
Kafkacat
. -
Create a
kafka-test
pod in thekafka
namespace.kubectl create -n kafka -f - <<EOF apiVersion: v1 kind: Pod metadata: name: kafka-test spec: containers: - name: kafka-test image: solsson/kafkacat # Just spin & wait forever command: [ "/bin/bash", "-c", "--" ] args: [ "while true; do sleep 3000; done;" ] volumeMounts: - name: sslcerts mountPath: "/ssl/certs" volumes: - name: sslcerts secret: secretName: kafka-operator-server-cert EOF
-
Exec into the container:
kubectl exec -it -n kafka kafka-test bash
-
Produce some test messages.
kafkacat -P -b kafka-headless:29092 -t my-topic \ -X security.protocol=SSL \ -X ssl.key.location=/ssl/certs/tls.key \ -X ssl.certificate.location=/ssl/certs/tls.crt \ -X ssl.ca.location=/ssl/certs/ca.crt
-
Consume some messages. The following command will use the certificate provisioned with the cluster to connect to Kafka. If you’d like to create and use a different user, create a
KafkaUser
CR, for details, see the SSL documentation.kafkacat -C -b kafka-headless:29092 -t my-topic \ -X security.protocol=SSL \ -X ssl.key.location=/ssl/certs/tls.key \ -X ssl.certificate.location=/ssl/certs/tls.crt \ -X ssl.ca.location=/ssl/certs/ca.crt
Send and receive messages outside a cluster 🔗︎
Obtain the LoadBalancer address first by running the following command.
export SERVICE_IP=$(kubectl get svc --namespace kafka -o jsonpath="{.status.loadBalancer.ingress[0].ip}" envoy-loadbalancer)
echo $SERVICE_IP
export SERVICE_PORTS=($(kubectl get svc --namespace kafka -o jsonpath="{.spec.ports[*].port}" envoy-loadbalancer))
echo ${SERVICE_PORTS[@]}
# depending on the shell of your choice, arrays may be indexed starting from 0 or 1
export SERVICE_PORT=${SERVICE_PORTS[@]:0:1}
echo $SERVICE_PORT
SSL Disabled 🔗︎
-
Produce messages:
kafka-console-producer.sh --broker-list $SERVICE_IP:$SERVICE_PORT --topic my-topic
-
Consume messages
kafka-console-consumer.sh --bootstrap-server $SERVICE_IP:$SERVICE_PORT --topic my-topic --from-beginning
SSL Enabled 🔗︎
You can use the following procedure to send and receive messages outside a Kubernetes cluster when SSL encryption is enabled for Kafka. To test a Kafka instance secured by SSL we recommend using Kafkacat.
To use the java client instead of Kafkacat, generate the proper truststore and keystore using the official docs.
-
Install Kafkacat.
-
MacOS:
brew install kafkacat
-
Ubuntu:
apt-get update apt-get install kafkacat
-
-
Extract secrets from the given Kubernetes Secret:
kubectl get secrets -n kafka kafka-operator-server-cert -o jsonpath="{['data']['\tls.crt']}" | base64 -D > client.crt.pem kubectl get secrets -n kafka kafka-operator-server-cert -o jsonpath="{['data']['\tls.key']}" | base64 -D > client.key.pem kubectl get secrets -n kafka kafka-operator-server-cert -o jsonpath="{['data']['\ca.crt']}" | base64 -D > ca.crt.pem
-
Produce some test messages.
kafkacat -b $SERVICE_IP:$SERVICE_PORT -P -X security.protocol=SSL \ -X ssl.key.location=client.key.pem \ -X ssl.certificate.location=client.crt.pem \ -X ssl.ca.location=ca.crt.pem \ -t my-topic
-
Consume some messages.
kafkacat -b $SERVICE_IP:$SERVICE_PORT -C -X security.protocol=SSL \ -X ssl.key.location=client.key.pem \ -X ssl.certificate.location=client.crt.pem \ -X ssl.ca.location=ca.crt.pem \ -t my-topic