At Banzai Cloud we are building a feature-rich enterprise-grade application platform, built for containers on top of Kubernetes, called Pipeline. Applications deployed to Pipeline automatically inherit the platform’s features: enterprise-grade security, observability (centralized log collection, monitoring and tracing), discovery, high availability and resiliency, just to name a few - encapsulated in spotguides.
One of the most popular spotguides we deploy is Spark. In the past few months we’ve been working and pushing many pull requests to make Spark a first class player on Kubernetes and to make it resilient. There is a good collection of posts (and contributions) we’ve already put out, so, if you are interested in one of these features, read:
Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes
Collecting Spark History Server event logs in the cloud
Spark resilience on Kubernetes 🔗︎
Update - The solution described below using
ReadWriteOncestorage for checkpointing dir works. However, there may be use cases that require
ReadWriteMany/Objectstorage to be used for checkpointing directory. In such instances the solution described is imperfect. One possible solution is to use Spark Operator. Another possible solution would be the utilization of
StatefulJob- a new Kubernetes feature.
Today’s post will focus on Spark Driver resiliency on Kubernetes, illustrated through the simulation of a node failure. Note that, at this point (upstream), not all Spark components are resilient - we’ve highlighted these in this post: Apache Spark application resilience on Kubernetes, however, we have now contributed all changes as pull requests - in the meantime you can use our Spark fork as you see fit.
Earlier we proposed a change PR to Apache Spark, which enables driver resiliency by using a Kubernetes Job instead of a Pod. In this blog post we will not go into details about implementation, as we have already published two blog posts on that subject. Instead, we will focus on the question, is it a good idea to use Kubernetes Jobs to make Spark resilient to failures?.
Lets start by keeping in mind that Spark batch processing applications are stateless. If the driver pod dies all previous computation is lost, and the newly created driver will pick up computation over again from the beginning.
This is not a Spark on Kubernetes limitation as Spark on Yarn also behaves the same way.
Spark Stream processing applications are different from batch processing, as they are statefull when checkpointing is enabled. With checkpointing enabled, Spark persists data so that in case of failure it can recover it.
Correctness is vital here, since we cannot afford checkpoint file corruptions. Corruption may occur if two Spark driver pods run simultaneously while working on the same Spark Job.
We can see that correctness is not an issue when the Spark submit creates a Pod because, in case of a Node failure, the Pod will simply die and the Kubernetes Controller will not reschedule a new Pod. If there is a Job, the controller will reschedule a new Pod, as long as the last one died without completing said job. The question is, is it possible that two Spark driver pods executing the same Spark job might be scheduled simultaneously. Well, yes, because the Job controller is greedy and, in the event of network failure, will schedule a new driver. However, based on our observations, such instances do not affect correctness at all.
Simulate the scenario 🔗︎
We will simulate the following scenario:
We are going to use a GKE cluster, which can be created easily, either by using Pipeline or by using the Google Cloud Console UI.
We are going to use
iptables, so please use
Ubuntuas the image type. Make sure you create a cluster with at least 2 nodes.
Next, add your ssh-key, as we will need to get inside the node to simulate a network failure.
Generate the kubernetes config:
gcloud container clusters get-credentials <your cluster name> --zone <your zone>
To simulate a network partition we are going to ssh into the node and add some iptables rules, so that the
kubelet cannot access the Kubernetes API server. Before that, we need to submit a Spark Job which runs long enough to encounter the network partition. For the sake of simplicity we are going to use Spark Pi with a large precision and only two executors. To check what happens with the checkpoint dir we are going to attach a PVC to the Pod. (We know it is not a Spark Streaming App, but, as we will see, that does not really matter.)
kubectl create -f - <<EOF kind: PersistentVolumeClaim apiVersion: v1 metadata: name: spark-pvc-claim spec: accessModes: - "ReadWriteOnce" resources: requests: storage: "1Gi" EOF
Submit the Spark Pi application:
bin/spark-submit --verbose \ --master k8s://https://220.127.116.11 \ --deploy-mode cluster \ --name spark-pi \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false \ --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim \ --class org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.driver.job.backofflimit=1 \ --conf spark.kubernetes.container.image=banzaicloud/spark:master_job-dev_0.5_blog \ local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 200000
Check where the driver pod is scheduled and look up the external IP address of the node in the Google Cloud Console UI.
kubectl describe pod spark-pi-1531855837017-driver-sx77q Name: spark-pi-1531855837017-driver-sx77q Namespace: default Node: gke-blog-cluster-1-default-pool-7484150c-z6wz/10.154.0.3
SSH into that Node and apply the following
# First, save the iptables rules so they can be reused later sudo iptables-save > iptables.backup # Check which port is used by kubelet sudo netstat -ntp | grep kubelet tcp 0 0 10.154.0.3:55288 18.104.22.168:443 ESTABLISHED 2191/kubelet tcp 0 0 10.154.0.3:55282 22.214.171.124:443 ESTABLISHED 2191/kubelet tcp 0 0 10.154.0.3:35414 169.254.169.254:80 ESTABLISHED 2191/kubelet tcp6 0 0 10.154.0.3:10255 10.52.0.6:34778 ESTABLISHED 2191/kubelet tcp6 0 0 10.154.0.3:10255 10.52.1.4:57318 ESTABLISHED 2191/kubelet tcp6 0 0 10.154.0.3:10250 10.154.0.2:53980 ESTABLISHED 2191/kubelet # Our Kubernetes API external IP is 126.96.36.199 and # the internal IP is 169.254.169.254. We need to block those # to simulate network partition sudo iptables -A OUTPUT -p tcp -d 188.8.131.52 -j DROP sudo iptables -A OUTPUT -p tcp -d 169.254.169.254 -j DROP
Check the Kubernetes nodes. After a while, because of the
iptables rules, one node state will change to
kubectl get nodes -w NAME STATUS ROLES AGE VERSION gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2 gke-blog-cluster-1-default-pool-7484150c-z6wz Ready <none> 4h v1.10.4-gke.2 gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2 gke-blog-cluster-1-default-pool-7484150c-z6wz NotReady <none> 4h v1.10.4-gke.2 gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2
If we also check the Kubernetes pods, we will see the following:
kubectl get pods -w NAME READY STATUS RESTARTS AGE spark-pi-1531855837017-driver-sx77q 1/1 Running 0 1m spark-application-1531855852300-exec-1 1/1 Running 0 1m spark-application-1531855852300-exec-2 1/1 Running 0 1m spark-pi-1531855837017-driver-sx77q 1/1 Unknown 0 7m
After a small amount of time the driver pod will go from
Unknown. Remember, that pod is running on a Node which is separated from the cluster.
As you know, the Spark job has been submitted as a Kubernetes Job, so the Job controller will create a new Pod as soon as the other goes into an Unknown state. This can cause correctness issues - because the separated driver pod is still running, it does not require a connection to the Kubernetes API, and if two driver pods are running side by side, that’s a problem. Let’s check back on the pods.
kubectl get pods -w NAME READY STATUS RESTARTS AGE spark-pi-1531855837017-driver-p6cbm 0/1 ContainerCreating 0 7s spark-pi-1531855837017-driver-sx77q 1/1 Unknown 0 7m
If we investigate the newly created driver pod we can see that it will never start successfully as long as the other pod is not terminated, thanks to the
ReadWriteOnce PVC, so there will be no correctness issues at all.
Warning FailedMount 19s kubelet, gke-blog-cluster-1-default-pool-7484150c-qhwh Unable to mount volumes for pod “spark-pi-1531902241419-driver-p6cbm_default(c00c5519-8a64-11e8-b81d-42010a9a00fa)": timeout expired waiting for volumes to attach or mount for pod “default”/“spark-pi-1531902241419-driver-p6cbm”. list of unmounted volumes=[checkpointpvc]. list of unattached volumes=[spark-local-dir-1 checkpointpvc spark-conf-volume default-token-nbpnb]
Now let’s restore the
iptables rules to normal, so the node can rejoin the cluster.
sudo iptables-restore < iptables.backup
The lost node will rejoin the cluster
kubectl get nodes -w NAME STATUS ROLES AGE VERSION gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2 gke-blog-cluster-1-default-pool-7484150c-z6wz Ready <none> 4h v1.10.4-gke.2 gke-blog-cluster-1-default-pool-7484150c-qhwh Ready <none> 4h v1.10.4-gke.2 gke-blog-cluster-1-default-pool-7484150c-z6wz Ready <none> 4h v1.10.4-gke.2
And pod status will show that, once the old driver has been terminated (it will be, since Kubernetes terminates every pod on the rejoined node), the new one will arise and finally be properly created.
kubectl get pods -w spark-pi-1531855837017-driver-p6cbm 1/1 Running 0 2m spark-pi-1531855837017-driver-sx77q 0/1 Terminating 0 9m spark-application-1531856426866-exec-1 0/1 Pending 0 0s spark-application-1531856426866-exec-1 0/1 Pending 0 0s spark-application-1531856426866-exec-1 0/1 ContainerCreating 0 0s spark-application-1531856426866-exec-2 0/1 Pending 0 0s spark-application-1531856426866-exec-2 0/1 Pending 0 0s spark-application-1531856426866-exec-2 0/1 ContainerCreating 0 0s spark-application-1531856426866-exec-2 1/1 Running 0 1s spark-application-1531856426866-exec-1 1/1 Running 0 1s
To summarize, let’s take a look at the image below. It shows what happens during a network separation.