Placeholder image

Balint Molnar

Mon, Dec 18, 2017


Monitoring Apache Spark with Prometheus on Kubernetes

Apache Spark on Kubernetes series:
Introduction to Spark on Kubernetes
Scaling Spark made simple on Kubernetes
The anatomy of Spark applications on Kubernetes
Monitoring Apache Spark with Prometheus
Apache Spark CI/CD workflow howto
Spark History Server on Kubernetes
Spark scheduling on Kubernetes demystified
Spark Streaming Checkpointing on Kubernetes
Deep dive into monitoring Spark and Zeppelin with Prometheus
Apache Spark application resilience on Kubernetes

Apache Zeppelin on Kubernetes series:
Running Zeppelin Spark notebooks on Kubernetes
Running Zeppelin Spark notebooks on Kubernetes - deep dive
CI/CD flow for Zeppelin notebooks

Apache Kafka on Kubernetes series:
Kafka on Kubernetes - using etcd

This is the third post in the Spark on Kubernetes series - if you missed the first and second one check them here:

Our PaaS, Pipeline deploys cloud native applications to Kubernetes using a CI/CD Pipeline and as a complementary feature, these applications are monitored as well - with out of the box dashboards and alerts preconfigured based on the application language, type, cluster size and predefined SLA’s (these values can be changed). Also centralized log collection, tracing and visualization of these are part of the platform.

Pipeline monitor multiple federated clusters with Prometheus - the secure way

To monitor these applications, and in this particular example Spark we needed a robust and open source monitoring system and we choose Prometheus. Prometheus is an opensource monitoring and alerting system and it was opensourced in 2012. Since then it became the standard for monitoring within the enterprise. It is also part of the official Cloud Native Computing Foundation project so every Kubernetes related component is using/will use Prometheus for monitoring and alerting.

Prometheus uses a pull model over http to scrape data from the applications. For batch jobs it also supports a push model, but to enable this feature requires a special component called pushgateway. Let’s imagine this gateway as a simple storage which stores your application metrics until Prometheus scrapes all the information from it. If Prometheus collected the application metrics all we need is a nice visualizer tool which is capable to work with/alongside Prometheus.

The ultimate tool for that job is Grafana. Grafana is an opensource metric analytics and visualization tool. It has a nice UI and a rich set of default features.

If you search about monitoring Spark with Prometheus topic in the internet, all you can find is an old blog from 2015 where they were using a Graphite Sink to get metrics from Spark then mapping it to the Prometheus format. We did not choose that approach so to enable monitoring Spark via Prometheus a couple of changes had to be made in the Spark Code base.

Out of the box Spark only supports a bunch of sinks (Graphite, CSV, Ganglia) but Prometheus is not one of them, so we introduced a new Prometheus sink (PR; the related apache ticket SPARK-22343). Spark uses a push model to send metrics data, so Prometheus pushgateway is required. Metrics data published by Spark are based on Dropwizard, thus the format of the metrics is not supported natively by Prometheus so the new metrics are converted using DropwizardExports before pushing them to the pushgateway.

Initially we have submitted these Pr’s to the Spark on K8S fork as this is what gets deployed on our PaaS, however based on the suggestions of the maintainers we have extracted the Kubernetes specific features and re-submitted the PR in the upstream Apache repo as well as this feature is useful for that community too.

The latest version of pushgateway disallows to receive messages which contains timestamp (see the related PR), thus we had to go with an earlier version which has supported timestamps. We are planning to introduce our version of the pushgateway like Weaveworks did (see it on Github) to overcome these issues and also to have support for a few advanced scenarios.

The reason why the timestamp is essential is the following: we wanted metrics data which can be represented as histograms, then this historical metrics data can be used for smart scaling of the Kubernetes cluster for meeting different SLAs defined by Pipeline spotguides. By default, Prometheus protocol supports timestamps but if the metric comes from the pushgateway and there is no timestamp inside the metric, Prometheus adds a default timestamp which will be the time when the data is scraped from the pushgateway. Of course this is not the approach what we wanted.

The default Prometheus pushgateway API does not support metrics timestamp so this API has been enhanced to enrich metrics data with timestamp.

    public void pushAdd(CollectorRegistry registry,
                        String job, String timestamp) throws IOException {
        doRequest(registry, job, null, "POST", timestamp);
    }

Also Spark by default does not include timestamps in the metrics, so we are injecting timestamp to every metric before reporting it to the pushgateway.

   pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava,
   s"${System.currentTimeMillis}")

Inside the Spark codebase there is a file metrics.properties.template where the user can fine tune which metrics to get and where these metrics are processed. In our case

# Enable Prometheus for all instances by class name
*.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusSink
# Prometheus pushgateway address
*.sink.prometheus.pushgateway-address-protocol=<prometheus pushgateway protocol> - defaults to http
*.sink.prometheus.pushgateway-address=<prometheus pushgateway address> - defaults to 127.0.0.1:9091
*.sink.prometheus.unit=< unit> - defaults to seconds (TimeUnit.SECONDS)
*.sink.prometheus.period=<period> - defaults to 10

# Enable JVM metrics source for all instances by class name
*.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Spark only gives a metrics.template.properties file so to enable metrics a proper metrics.properties file has to be created and during the application submission the following configuration value has to be set with the path of the metrics.properties. This file must be reachable by every Spark component.

--conf spark.metrics.conf=<path_to_the_file>/metrics.properties

Also Spark by default is injecting spark.app.id to the metrics, so the data can be differentiated. Unfortunately, clustering by this field is hard because it is an on the fly generated random string. Spark provides a way to change this behavior by setting the spark.metrics.namespace configuration property (for further details please check the official spark page about metrics). To further sort metrics, Spark names couple of metrics sources e.g.: Executor, Driver but the Shuffle service is not named yet, so we created another PR for that.

    } else if (instance == "shuffleService") {
      MetricRegistry.name(metricsNamespace.getOrElse(""), defaultName)

For easier slice and dice the Spark metrics in Prometheus we group them by the following keys metricsnamespace/role/id where:

  • metricsnamespace: is the value passed into conf spark.metrics.namespace
  • role: is the Spark component the metrics originates from (driver/executor/shuffle)
  • id: this one is optional, this is set only for metrics coming from executors and represents the identifier of the executor

The following table illustrates this grouping with a simple example:

Metrics App id MetricsNamespace Role Id Prometheus Grouping Key
spark-prometheus_sample-memory_consumption job1 prometheus_sample driver - spark-prometheus_sample/prometheus_sample/driver/memory_consumption
spark-prometheus_sample-memory_consumption job1 prometheus_sample executor 1 spark-prometheus_sample/prometheus_sample/executor/1/memory_consumption
spark-job2-cpu_usage job2 job2 driver - spark-job2/job2/driver/cpu_usage

The full implementation of our Promethes Sink can be found here.

Finally, here is the architecture how Pipeline monitors the Spark cluster. Arch

We will follow up with additional informations about how we use metrics to scale our clusters or enforce custom SLAs - meanwhile if you have any questions or interested in what we are doing follow up on GitHub, LinkedIn or Twitter.

Star



Comments

comments powered by Disqus