Placeholder image

Balint Molnar

Wed, Feb 21, 2018


Spark Streaming Checkpointing on Kubernetes

Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Apache Spark CI/CD workflow howto
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes

Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive
CI/CD flow for Zeppelin notebooks

Apache Kafka on Kubernetes series:
Kafka on Kubernetes - using etcd

Spark Streaming applications are special Spark applications capable to process data continuously - it lets you reuse the same code for batch processing, join streams against historical data, or run ad-hoc queries on stream data. These streaming scenarios require special dealing when these apps run for long and without any interruption. To achieve these two things required:

  • Good Scheduler
  • Enable checkpointing in the Spark streaming app

For the scheduler (and for Spark in general) we use Spark on Kubernetes. If you need a Kubernetes cluster in the cloud you can have Pipeline to do all the heavy lifting for you across several cloud providers. By default Kubernetes will take care of the failing Spark executors and driver by simply restarting the failing pods. Although this is enough for the executors but for the driver it is necessary, but not sufficient. The driver to became resilient to failures Spark checkpointing has to be enabled.

When we speak about Spark checkpointing we need to distinguish two different options:

  • Metadata checkpointing
  • Data checkpointing

This blogpost focuses on the Metadata checkpointing because that is the one which is needed to be recovered from failures but if you are interested in which scenario the data checkpointing is required you can check it on the Spark documentation.

Metadata checkpointing will save the required metadata, so in case of failure the application can continue where it left of. Usually the most common storage layer for the checkpoint is HDFS or S3. For Kubernetes and in the cloud usually you don’t want to manage your own HDFS cluster because you can use S3. On the other hand S3 is slow and in case of large Spark streaming applications we have seen bottlenecks and issues to do the slow throughout of persisting the checkpointing data. One of the better options is to use either EFS on Amazon, AzureFile on Azure and GCE-PD on Google.

For this blogpost we will use AzureFiles, and we showcase the usage of Spark streaming with AzureFiles on Azure AKS, using the latest 0.2.0 version of Pipeline.

Azure Files offers fully managed file shares in the cloud that are accessible via the Server Message Block (SMB) protocol and can be used concurrently by multiple cloud VMs.

It turned out that just simply Spark checkpointing is not enough with the current Kubernetes Spark implementation. In order to support this we needed the following changes:

  • Driver should be a Job instead of Pod - Pod is not the most fail proof Kubernetes instance because it’s only scheduled to one node so if that node fails that pod will be never rescheduled.
  • Driver should bound a PersistentVolume for the checkpointdir, this also required some related config: spark.kubernetes.checkpointdir.enable,spark.kubernetes.checkpointdir.path, spark.kubernetes.checkpointdir.size
  • SPARK-22294 - This one is required to bind to the right IP in case of restart.

We built our own Spark version with the above changes, which is available on DockerHub (banzaicloud/spark-driver:v2.2.0-k8s-1.0.207). We are going to contribute these changes back in a PR soon.

Next we need a Spark Streaming application which is configured properly to enable the checkpointing. For simplicity we are going to use a slightly modified version of the Spark Streaming example NetworkWordCount. This application will consume data from a netcat server, which for simplicity reasons we will run on our local machine. Since we run the Kubernetes cluster on AKS we need to ensure that the netcat server running on our host is reachable from outside. We use ngrok for that.

nc -lk 9999
ngrok tcp 9999

After creating an AKS cluster with Pipeline, we need to change and delete the default storageclass created by Azure, because it has a type AzureDisk and what we want is AzureFile. With the KUBECONFIG set to the right config e.g.:(export KUBECONFIG=path to kubeconfig), the new storage class can be created with kubectl create using the following yaml:

kubectl create -f - <<EOF
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  annotations:
    storageclass.kubernetes.io/is-default-class: "true"
  name: azurefile
provisioner: kubernetes.io/azure-file
parameters:
  skuName: Standard_LRS
  location: eastus
  storageAccount: banzaicloud
EOF

The old storageclass can be deleted with kubectl delete storageclass default. This phase can be skipped if the AzureDisk as a storage is suitable for you. We picked AzureFiles to leverager the benefits of it. AzureFiles also requires a new StorageAccount which should be created to the same resource group as a cluster and if you are using the above mentioned example the account name needs to be banzaicloud. The next thing you need to do is to deploy the Spark Helm chart, which creates all the required subsystems for your Spark job. To do that please use our Postman collection and create a deployment called banzaicloud-stable/spark.

To submit a Spark application from your computer to a Kubernetes cluster you need a spark-submit which support the above mentioned configs, so we created a tar.tgz which can be downloaded and can be used to submit your Spark application. Please download this file and untar it to some directory. If you are done clone or download the above mentioned example and build it:

ls -lah
total 80
drwxr-xr-x  11 baluchicken  staff   352B Feb 21 15:07 .
drwxr-xr-x  24 baluchicken  staff   768B Feb  5 16:30 ..
drwxr-xr-x  14 baluchicken  staff   448B Feb 21 19:07 .git
-rw-r--r--   1 baluchicken  staff    43B Feb  5 16:55 .gitignore
drwxr-xr-x  11 baluchicken  staff   352B Feb 21 19:07 .idea
-rw-r--r--   1 baluchicken  staff    11K Feb 20 14:57 LICENSE
-rw-r--r--   1 baluchicken  staff   2.9K Feb 20 14:57 pom.xml
-rw-r--r--   1 baluchicken  staff   241B Jan 31 14:55 settings.xml
-rw-r--r--   1 baluchicken  staff    14K Feb  5 16:54 spark-network-word-count.iml
drwxr-xr-x   3 baluchicken  staff    96B Jan 31 14:55 src
mvn clean package

To submit your Spark app from your local machine, you need to use resource staging server (if you like to learn more about spark-submit/resource staging server read this blogpost) and port forward to upload your jar to the Kubernetes cluster. To do that first find your resource staging server and use the port-forwarding feature of the kubectl:

kubectl get pods
NAME                                        READY     STATUS    RESTARTS   AGE
shuffle-ncmw9                               1/1       Running   0          30m
falling-monkey-spark-rss-867c7c855d-h82nzj   1/1       Running   0          30m

kubectl port-forward ulterior-dingo-spark-rss-558ff96bb4-ng7sj 31000:10000
Forwarding from 127.0.0.1:31000 -> 10000

Please note that sometimes port-forward fails, simply restart it to make it work again.

If the port-forward is running you can submit your Spark app with the following command:

  bin/spark-submit --verbose \
  --deploy-mode cluster \
  --class com.banzaicloud.SparkNetworkWordCount \
  --master k8s://<replace this with the kubernetesendpoint> \
  --kubernetes-namespace default \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.app.name=NetworkWordCount \
  --conf spark.kubernetes.driver.docker.image=banzaicloud/spark-driver:v2.2.0-k8s-1.0.207 \
  --conf spark.kubernetes.executor.docker.image=banzaicloud/spark-executor:v2.2.0-k8s-1.0.207 \
  --conf spark.kubernetes.initcontainer.docker.image=banzaicloud/spark-init:v2.2.0-k8s-1.0.207 \
  --conf spark.kubernetes.checkpointdir.enable=true \
  --conf spark.driver.cores="300m" \
  --conf spark.executor.instances=2 \
  --conf spark.kubernetes.shuffle.namespace=default \
  --conf spark.kubernetes.resourceStagingServer.uri=http://localhost:31000 \
  --conf spark.kubernetes.resourceStagingServer.internal.uri=http://spark-rss:10000 \
  --conf spark.local.dir=/tmp/spark-local \
  file:///<your path to the jar>spark-network-word-count-1.0-SNAPSHOT.jar tcp://0.tcp.ngrok.io <your ngrok port> file:///checkpointdir

Now check if your cluster is alive with:

kubectl get pods
NAME                                          READY     STATUS    RESTARTS   AGE
networkwordcount-1519234651100-driver-6frsx   1/1       Running   0          4m
networkwordcount-1519234651100-v2wj-exec-1    1/1       Running   0          3m
networkwordcount-1519234651100-v2wj-exec-2    1/1       Running   0          3m
shuffle-ncmw9                                 1/1       Running   0          1h
falling-monkey-spark-rss-867c7c855d-h82nz     1/1       Running   0          1h

To check if the checkpointing works kill the driver pod:

kubectl delete pod networkwordcount-1519234651100-driver-6frsx
pod "networkwordcount-1519234651100-driver-6frsx" deleted

The Job will automatically restart the pod for you:

kubectl get pods
NAME                                          READY     STATUS    RESTARTS   AGE
networkwordcount-1519234651100-driver-lj7pr   1/1       Running   0          1m
networkwordcount-1519234651100-fbd4-exec-1    1/1       Running   0          22s
networkwordcount-1519234651100-fbd4-exec-2    1/1       Running   0          22s
shuffle-ncmw9                                 1/1       Running   0          1h
falling-monkey-spark-rss-867c7c855d-h82nz     1/1       Running   0          1h

Check the driver logs and look for checkpointing dir usage:

kubectl logs networkwordcount-1519234651100-driver-lj7pr

CheckpointReader:54 - Attempting to load checkpoint from file file:/checkpointdir/checkpoint-1519236375000
Checkpoint:54 - Checkpoint for time 1519236375000 ms validated
CheckpointReader:54 - Checkpoint successfully loaded from file file:/checkpointdir/checkpoint-1519236375000

That’s it for now - you can replay the same on AWS EFS by changing AzureFiles to EFS by reading our EFS blog.

If you are interested in our technology and open source projects, follow us on GitHub, LinkedIn or Twitter:

Star



Comments

comments powered by Disqus