Banzai Cloud Logo Close
Home Benefits Blog Company Contact
Sign in
Author Toader Sebastian

Apache Spark application resilience on Kubernetes

TRY PIPELINE FOR FREE

Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Apache Spark CI/CD workflow howto
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes
Collecting Spark History Server event logs in the cloud

Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive
CI/CD flow for Zeppelin notebooks

Apache Kafka on Kubernetes series:
Kafka on Kubernetes - using etcd

Over the last three months at Banzai Cloud, we’ve been running and supporting Spark deployments on our open source PaaS, Pipeline, which is built on Kubernetes. We’ve been pushing the limits of Spark on Kubernetes and we’ve seen some interesting situations like checkpointing throughput failures, all kinds of pod or infrastructure failures, and, most recently, node failures/node terminations running on spot instances. Luckily, most of these were monitored, brought to our attention, and automatically addressed by our open source advanced monitoring solutions and resilient cloud management alert-react system, Hollowtrees. However, it was necessary to make fixes in Apache Spark, itself.

In this Apache Spark on Kubernetes series blogpost, we’ll discuss how Spark applications can be made resilient to node failures on Kubernetes. As presented in previous blogposts, the following Spark components are involved when the user submits a Spark application:

  • Spark driver
  • Spark executor
  • External shuffle service
  • And Spark resource staging server

To recap, here is a Spark flow that contains the above components. For a comprehensive guide to Spark on Kubernetes, please make sure to read our Apache Spark on Kubernetes series.

Spark Flow

We look at each of these components separately.

Spark executor

Spark executors, in combination with an External shuffle service, are already resilient to failure. The shuffle data generated by Spark executors is stored in an external service, the External shuffle service, so these are not lost if the executor crashes. Also the Spark driver will re-schedule tasks that have been lost in-flight or unfinished due to the executor failing.

External shuffle service

This component is deployed as a Kubernetes daemonset. Since this is a daemonset, Kubernetes ensures that the External shuffle service runs on each node of the cluster. If a node is lost, when it comes back (or a new replacement node is added to the cluster), Kubernetes will take care of starting an instance of External shuffle service on it. Of course, when a node dies and cannot be recovered, the shuffle data stored by the External shuffle service on that node is lost. In such cases the Spark driver re-schedules the necessary tasks to be re-executed to obtain any missing interim results.

Spark driver

We have opened the following Apache Spark Jira ticket and related GitHub pull request to make the Spark Driver resilient to failures.

The current implementation of the Spark driver on Kubernetes is not resilient to node failures, since it’s implemented as a Pod. However, Kubernetes would offer us more resiliency if the Spark driver was implemented as a Job. Let me share the details we collected while working on enabling Spark checkpointing for Spark clusters deployed with our PaaS Pipeline. These led us to the conclusion that the Spark Driver should be a Kubernetes Job instead of a Pod.

Create a cluster that consists of three nodes:

kubectl get nodes
NAME                        STATUS    ROLES     AGE       VERSION
aks-agentpool1-27255451-0   Ready     agent     2h        v1.8.2
aks-agentpool1-27255451-2   Ready     agent     53m       v1.8.2
aks-agentpool1-27255451-3   Ready     agent     19m       v1.8.2

Submit a Spark application using a spark-submit, driver pod, as well as executor pods, running:

kubectl get jobs
No resources found.

kubectl get pods
NAME                                         READY     STATUS    RESTARTS   AGE
networkwordcount-1519305658702-driver        1/1       Running   0          2m
networkwordcount-1519305658702-exec-1        1/1       Running   0          2m
networkwordcount-1519305658702-exec-2        1/1       Running   0          2m
pipeline-traefik-86c8dc89fb-wbtq8            1/1       Running   0          2h
shuffle-6ljkv                                1/1       Running   0          1h
shuffle-knkbt                                1/1       Running   0          1h
shuffle-tnmk7                                1/1       Running   0          34m
zooming-lizzard-spark-rss-54858cdbb9-sksb6   1/1       Running   0          1h

Get the node the driver is running on:

kubectl describe pod networkwordcount-1519305658702-driver
Name:         networkwordcount-1519305658702-driver
Namespace:    default
Node:         aks-agentpool1-27255451-3/10.240.0.5
Start Time:   Thu, 22 Feb 2018 14:21:01 +0100
Labels:       spark-app-selector=spark-cd7a98b964a04272b48fc619a501d1cd
              spark-role=driver
Annotations:  spark-app-name=NetworkWordCount
Status:       Running

Shut down node aks-agentpool1-27255451-3 (VM) and observe what happens to the pods that were scheduled to this node:

kubectl get nodes
NAME                        STATUS    ROLES     AGE       VERSION
aks-agentpool1-27255451-0   Ready     agent     2h        v1.8.2
aks-agentpool1-27255451-2   Ready     agent     1h        v1.8.2

All pods that were scheduled to this node are terminated:

kubectl get pods -w
NAME                                         READY     STATUS    RESTARTS   AGE
networkwordcount-1519305658702-driver        1/1       Running   0          5m
networkwordcount-1519305658702-exec-1        1/1       Running   0          5m
networkwordcount-1519305658702-exec-2        1/1       Running   0          5m
pipeline-traefik-86c8dc89fb-wbtq8            1/1       Running   0          2h
shuffle-6ljkv                                1/1       Running   0          1h
shuffle-knkbt                                1/1       Running   0          1h
shuffle-tnmk7                                1/1       Running   0          37m
zooming-lizzard-spark-rss-54858cdbb9-sksb6   1/1       Running   0          1h
networkwordcount-1519305658702-driver   1/1       Terminating   0         5m
networkwordcount-1519305658702-driver   1/1       Terminating   0         5m
networkwordcount-1519305658702-exec-1   1/1       Terminating   0         5m
networkwordcount-1519305658702-exec-2   1/1       Terminating   0         5m
networkwordcount-1519305658702-exec-1   1/1       Terminating   0         5m
shuffle-tnmk7   1/1       Terminating   0         38m
shuffle-tnmk7   1/1       Terminating   0         38m



kubectl get pods
NAME                                         READY     STATUS    RESTARTS   AGE
pipeline-traefik-86c8dc89fb-wbtq8            1/1       Running   0          2h
shuffle-6ljkv                                1/1       Running   0          1h
shuffle-knkbt                                1/1       Running   0          1h
zooming-lizzard-spark-rss-54858cdbb9-sksb6   1/1       Running   0          1h

We can see that the driver pod networkwordcount-1519305658702-driver was terminated due to the node hosting it being shut down, and not being rescheduled by Kubernetes to any of the other nodes in the cluster.

So, we modified Spark, creating a version in which the Spark driver is a Kubernetes Job, instead of a Pod. Let’s repeat the experiment.

Create a cluster that consists of three nodes:

kubectl get nodes
NAME                        STATUS    ROLES     AGE       VERSION
aks-agentpool1-27255451-0   Ready     agent     1h        v1.8.2
aks-agentpool1-27255451-1   Ready     agent     1h        v1.8.2
aks-agentpool1-27255451-2   Ready     agent     1m        v1.8.2

Submit a Spark application:

kubectl get jobs
NAME                                    DESIRED   SUCCESSFUL   AGE
networkwordcount-1519301591265-driver   1         0            3m

kubectl get pods
NAME                                          READY     STATUS    RESTARTS   AGE
networkwordcount-1519301591265-driver-mszl2   1/1       Running   0          3m
networkwordcount-1519301591265-usmj-exec-1    1/1       Running   0          1m
networkwordcount-1519301591265-usmj-exec-2    1/1       Running   0          1m
pipeline-traefik-86c8dc89fb-wbtq8             1/1       Running   0          1h
shuffle-6ljkv                                 1/1       Running   0          9m
shuffle-crg9q                                 1/1       Running   0          9m
shuffle-knkbt                                 1/1       Running   0          2m
zooming-lizzard-spark-rss-54858cdbb9-fx6ll    1/1       Running   0          9m

Notice that the Spark driver networkwordcount-1519301591265-driver is a Kubernetes Job, now, which manages the networkwordcount-1519301591265-driver-mszl2 pod.

Get the node the driver is running on:

kubectl describe pod networkwordcount-1519301591265-driver-mszl2
Name:           networkwordcount-1519301591265-driver-mszl2
Namespace:      default
Node:           aks-agentpool1-27255451-1/10.240.0.5
Start Time:     Thu, 22 Feb 2018 13:14:18 +0100
Labels:         controller-uid=c2c89fa1-17c9-11e8-868d-0a58ac1f0240
                job-name=networkwordcount-1519301591265-driver
                spark-app-selector=spark-dac6c2ba946a41d4b1fafe0e077b36ef
                spark-role=driver
Annotations:    kubernetes.io/created-by={"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"Job","namespace":"default","name":"networkwordcount-1519301591265-driver","uid":"c2c89fa1-17c9-11e8-868d-0...
                spark-app-name=NetworkWordCount
Status:         Running

Shut down node aks-agentpool1-27255451-1 (VM) and observe what happens to the pods that were scheduled to this node:

kubectl get nodes
NAME                        STATUS     ROLES     AGE       VERSION
aks-agentpool1-27255451-0   Ready      agent     1h        v1.8.2
aks-agentpool1-27255451-2   Ready      agent     7m        v1.8.2
kubectl get pods
NAME                                          READY     STATUS     RESTARTS   AGE
networkwordcount-1519301591265-driver-dwvkf   1/1       Running   0          3m
networkwordcount-1519301591265-rmes-exec-1    1/1       Running   0          1m
networkwordcount-1519301591265-rmes-exec-2    1/1       Running   0          1m
pipeline-traefik-86c8dc89fb-wbtq8             1/1       Running    0          1h
shuffle-6ljkv                                 1/1       Running    0          17m
shuffle-knkbt                                 1/1       Running    0          9m
zooming-lizzard-spark-rss-54858cdbb9-sksb6    1/1       Running    0          1m

kubectl describe pod networkwordcount-1519301591265-driver-dwvkf
Name:           networkwordcount-1519301591265-driver-dwvkf
Namespace:      default
Node:           aks-agentpool1-27255451-2/10.240.0.6
Start Time:     Thu, 22 Feb 2018 13:22:56 +0100
Labels:         controller-uid=c2c89fa1-17c9-11e8-868d-0a58ac1f0240
                job-name=networkwordcount-1519301591265-driver
                spark-app-selector=spark-dac6c2ba946a41d4b1fafe0e077b36ef
                spark-role=driver
Annotations:    kubernetes.io/created-by={"kind":"SerializedReference","apiVersion":"v1","reference":{"kind":"Job","namespace":"default","name":"networkwordcount-1519301591265-driver","uid":"c2c89fa1-17c9-11e8-868d-0...
                spark-app-name=NetworkWordCount
Status:         Running

Voila! The driver networkwordcount-1519301591265-driver-mszl2 that was originally running on node aks-agentpool1-27255451-1 has been rescheduled to node aks-agentpool1-27255451-2 as networkwordcount-1519301591265-driver-dwvkf.

Not so fast though, there’s a subtle problem that has to be solved here. When the driver is terminated, its executors (that may run on other nodes) are terminated by Kubernetes with a delay. If you are interested in how this works on Kubernetes, please read: Kubernetes: Garbage Collection

This leads to concurrency issues, in which the re-spawned driver tries to create new executors with the same names as the executors that are still being cleaned up by Kubernetes garbage collection.

We overcame this issue by making the executor names unique for each driver instance. Notice that before the driver and executor pod names looked like:

  • networkwordcount-1519305658702-driver <- driver pod
  • networkwordcount-1519305658702-exec-1 <- executor pod
  • networkwordcount-1519305658702-exec-2 <- executor pod

By making the driver a Kubernetes Job, and the executor names unique for each driver, these now look like:

  • networkwordcount-1519301591265-driver <- driver job
  • networkwordcount-1519301591265-driver-mszl2 <- driver pod
  • networkwordcount-1519301591265-usmj-exec-1 <- executor pod
  • networkwordcount-1519301591265-usmj-exec-1 <- executor pod

And after the node shut down:

  • networkwordcount-1519301591265-driver <- driver job
  • networkwordcount-1519301591265-driver-dwvkf <- re-spawned driver pod
  • networkwordcount-1519301591265-rmes-exec-1 <- executor pod
  • networkwordcount-1519301591265-rmes-exec-2 <- executor pod

Spark Resource Staging Server

The Spark resource staging server is only used when those artifacts (jars, files) needed by the Spark application reside on the local machine where the spark-submit command is issued.

At the moment, this component stores uploaded files in directories that are local to the pod. Thus, if the resource staging server is lost, then restarted, the files stored by the server are lost as well. This means that a Spark driver that was rescheduled to a different node during initialization won’t be able to download the necessary files from the resource staging server.

Thus, we can confirm that the resource staging server is not resilient to failures. We’re working on making that server resilient. See our next blogpost for details on how we achieved that.

As usual, we are contributing all the changes that make Spark resilient to failures on Kubernetes back to the community.

If you are interested in our technology and open source projects, follow us on GitHub, LinkedIn or Twitter:

Star

TRY PIPELINE FOR FREE

Comments

comments powered by Disqus