This post was written by Keith Tenzer, Dan Zilberman, Pieter Malan, Louis Santillan, Kyle Bader and Guillaume Moutier.

Overview

Running Apache Spark for large data analytics workloads has typically been implemented in on-premise data centers using distributions like Cloudera that are not very flexible, do not  extend well to the cloud, and can be quite expensive. More recently, Spark is being offered as a service in various clouds like AWS EMR, Databricks or others. These environments often also run Apache Spark on traditional infrastructure and virtual machines with fast local disks using a specialized Hadoop Distributed File System (HDFS) but are also starting to offer Spark on Kubernetes.


Running Spark on Kubernetes has a lot of advantages versus  the traditional Spark stack. It is simpler to administer, dependency management is much easier, it provides flexibility/portability for deploying on any infra platform, and it is much more cost effective due to better isolation/use/scaling of resources. The challenge with Kubernetes is that most Spark stacks rely on HDFS, and since it is a locally attached file system, it does not  work well with a cloud-native container platform like Kubernetes. Running HDFS on Kubernetes complicates things dramatically, reducing the value of Kubernetes. S3 is a much better fit for Kubernetes, but getting the necessary performance out of S3 can be a challenge. Enter Red Hat OpenShift with OpenShift Data Foundation.

Red Hat OpenShift and OpenShift Data Foundation (ODF) provide an enterprise Kubernetes platform with an extremely fast cloud-native S3 API compatible storage backed by Ceph. The best thing is that OpenShift and ODF can be run anywhere - on-premise or in the cloud. The storage layer can be configured through Ceph/RadOS Gateway (RGW) to use extremely fast NVME disks or Intel Optane disks and  PMem while providing an S3 interface to access data on those disks. The result is a standardized, highly performant Kubernetes platform for Spark workloads that runs anywhere your workloads need to run. While certain cloud providers offer great platforms for running Spark, what makes this solution unique and beneficial is that it can run identically (complete feature/capability parity) on-premise or in the cloud, which is not the case with other solutions. This is important, because if your workload runs on-premise and in the cloud, then you only have one platform to test/validate/maintain/evolve instead of two or three or more.

OpenShift with ODF adds a lot of value to the Apache Spark ecosystem:

  • Ephemeral (provisioned on demand) Spark Worker nodes simplify configuration.
  • On-demand Spark Worker nodes increase performance due to faster start-up time for containers while being cost efficient.
  • Portability and choice between on-premise, public cloud or both
  • Replaces old YARN scheduler adding intelligence and infrastructure awareness
  • Allows greatest flexibility for choosing the exact Spark version, drivers (executors), application runtime configuration, and dependencies
  • Single cloud-native platform providing compute, networking, and storage resource configuration and management

This article will focus on recommended architecture for running Apache Spark on ODF, how to build a Spark container image, how to run Spark batch jobs, and, finally, how to evaluate performance. While this article describes an architecture in AWS, similar architectures have been built by customers in other cloud providers, on-prem, or on bare metal hardware.

Architecture

OpenShift Cluster Architecture

  1. Start with deployment of “opinionated” OCP cluster deployment via Installer Provisioned Infrastructure (IPI) or User Provisioned Infrastructure (UPI)
  2. Provision number of worker nodes, per specification
  3. Create MachineSet for ODF dedicated worker nodes (‘ocs-worker’), per specification
  4. Scale Machines from ODF MachineSet to a number required per specification.
  5. Create MachinePoolSet to include all OCS/ODF dedicated worker nodes.

Sample specification of OCP cluster with ODF that assumes parallel processing of Spark jobs (and therefore a large number of active worker nodes) is shown below:

 

     

Per instance

Total

Per instance

Total

 

Nodes/

Qty

Instance type

vCPU

RAM (GB)

vCPU

RAM (GB)

EBS Storage (GB)

EBS Storage (GB)

EBS Type

                   

Master Nodes

3

m4.2xlarge

4

32

12

96

50

150

gp2

ODF Infra Nodes

3

m5.2xlarge

8

32

24

96

50

150

gp2

ODF Nodes (each has NVMe storage attached)

6

i3en.2xlarge

8

64

48

384

50

300

gp2

Worker Nodes

17

r5.12xlarge

48

384

816

6528

50

850

gp2

NOTES:

  1. Estimated number of Worker Nodes can be based on assumption of parallel data processing (that is all “stages” of data processing pipeline running in parallel) or sequential.
  2. it may be needed to increase AWS EC2 standard quota for VMs to enable provisioning of additional worker nodes per sizing spec.
  3. For increased performance processing of “scratch data” generated during Spark job execution for large data volumes, consider using fewer worker nodes with each using “higher end” AWS instances with two 500GB local disks and faster storage class (IO1) allowing for up to 7500" IOPS rate.

High-level architecture of an OpenShift cluster deployed into multiple Availability Zones (AZs) of AWS is shown below:

(source - AWS blog: https://aws.amazon.com/blogs/opensource/openshift-4-on-aws-quick-start/)

Storage Cluster Architecture

ODF delivers enterprise-level persistent, abstracted storage to maximize the benefits of a container environment. It is not only optimized for container environments, but also managed with Red Hat OpenShift under a single interface.

High level ODF cluster architecture deployed in an OpenShift cluster related to running persistent container workloads is shown below:

image11-Sep-14-2021-01-21-18-83-PM

An Open Data Foundation (ODF) cluster  can be deployed from  RedHat OpenShift Container Storage Operator available in Operator Hub and configured to run on dedicated (“infrastructure”) nodes provisioned from MachineSet and using their local storage capabilities via “Local Storage” operator.

Upon successful deployment of the ODF cluster, its storage capacity and other metrics will appear on the “Persistent Storage” tab in the OpenShift UI:

Rook-Ceph operator acts as a container that bootstraps and monitors the storage cluster. It performs the following functions:

  • Automates the configuration of Storage components
  • Starts, monitors, and manages the Ceph monitor pods and Ceph OSD daemons to provide the RADOS storage cluster
  • Initializes the pods and other artifacts to run the services to manage:
    • CRDs for pools
    • Object stores (S3/Swift)
    • Filesystems
  • Monitors the Ceph mons and OSDs to ensure that storage remains available and healthy
  • Deploys and manages Ceph mons placement while adjusting the mon configuration based on cluster size
  • Watches the desired state changes requested by the API service and applies the changes
  • Initializes the Ceph-CSI drivers that are needed for consuming the storage
  • Automatically configures the Ceph-CSI driver to mount the storage to pods

 

image2-Sep-14-2021-01-21-51-83-PM NOTE: You may configure  RGW Ceph storage class, service, and route for external access and S3 compatible storage bucket following these instructions in documentation https://red-hat-storage.github.io/ocs-training/training/ocs4/ocs4-enable-rgw.html up until step 3.1.

  • Use ObjectBucketClaim file to deploy RGW based ObjectBucketClaim/Object Bucket:
apiVersion: objectbucket.io/v1alpha1
kind: ObjectBucketClaim
metadata:
name: spark-demo
spec:
generateBucketName: spark-demo
#for RGW type storage:
storageClassName: ocs-storagecluster-ceph-rgw
status:
phase: Bound

To access S3 bucket from CLI, retrieve its credentials to access that bucket from OpenShift:

       

 NOTE: Bucket Name, Access Key, and Secret Key can be used by “aws s3” CLI commands to access and modify the content of Object bucket.

Spark Application Architecture

Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.

Below is a high-level diagram of a Spark application deployed in containerized form factor into a Kubernetes cluster:

    image12-Sep-14-2021-01-23-06-76-PM

(source: https://spark.apache.org/docs/latest/running-on-kubernetes.html)

  The Spark job submission mechanism generally works as follows:

  • Spark operator creates a Spark driver running within a Kubernetes pod.
  • The driver creates executors which are also running within Kubernetes pod, connects to them and executes application code (included as a ConfigMap or a packaged JAR file available via shared storage location).
  • When Spark application completes, the executor pods terminate and are cleaned up, but the driver pod persists logs and remains in “completed” state in the Kubernetes API until it is  eventually “garbage collected” or manually cleaned up.

NOTE: that in the completed state, the driver pod does not use any computational or memory resources.

The driver and executor pod scheduling is handled by Kubernetes. Communication to the Kubernetes API is done via fabric8. It is possible to schedule the driver and executor pods on a specific node(s) through a Node selector using the configuration property for it.

Configuring Google Spark Operator

Below we show a sample process of building custom images based on Google Spark operator

  1. Obtain source code from GitHub repository that contains Spark using Git clone command such as:
  2. From the /spark-images folder:
$ git clone https://github.com/guimou/spark-on-openshift.git

NOTE: You can modify the Docker files in the repo to change the Spark, Hadoop or other libraries versions.

To build the base Spark 3 image, run the following command:

$ docker build --file spark3.Dockerfile --tag spark-odh:<s3.0.1-h3.3.0_v0.0.1> .

(Optional) Publish the image to designated image repo:

$ docker tag spark-odh:<s3.0.1-h3.3.0_v0.0.1> <your_repo>/spark-odh:<s3.0.1-h3.3.0_v0.0.1>
$ docker push <your_repo>/spark-odh:<s3.0.1-h3.3.0_v0.0.1>

To build the simple test application PySpark image:

$ docker build --file pyspark.Dockerfile --tag pyspark-odh:s3.0.1-h3.3.0_v0.0.1 --build-arg base_img=spark-odh:s3.0.1-h3.3.0_v0.0.1 .

(Optional) To publish a custom  image to your repo:

$ docker tag pyspark-odh:<s3.0.1-h3.3.0_v0.1> <your_repo>/pyspark-odh:<s3.0.1-h3.3.0_v0.1>
$ docker push <your_repo>/pyspark-odh:<s3.0.1-h3.3.0_v0.0.1>

NOTE: Due to some companies’ policy restrictions on container images, there is an option of using the internal OpenShift registry for images. To configure it to use Filesystem resources for storage, the following PVC needs to be created.

The operator is installed in its own namespace (‘spark-operator’) but will be able to monitor all namespaces for jobs to be launched.

  • Create a namespace for K8s operator installation:
$ oc new-project spark-operator
 

NOTE: all the subsequent oc CLI commands should run in the context of this project.

  • Service Account and Roles

The Operator needs a special Service Account and Role to create pods and services.

From the spark-operator folder:

Create Service Account and Role (May be OPTIONAL for recent Helm Chart version of Spark operator):

$ oc apply -f spark-rbac.yaml
  • Deploy Google Spark Operator

Add the Spark operator Helm repo:

$ helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator

Deploy operator via Helm chart:

$ helm install spark-operator spark-operator/spark-operator --namespace spark-operator  --create-namespace  --set image.tag=<target tag> --set webhook.enable=true --set resourceQuotaEnforcement.enable=true

3. Monitoring

 There are options to monitor the Spark operator itself, as well as applications it creates.From /spark-operator folder:

NOTE: Prometheus and Grafana operators must be installed in a cluster environment. An instance of Grafana must be created so that the ServiceAccount is provisioned.

Create  Services that will expose the metrics:

$ oc apply -f spark-application-metrics_svc.yaml
$ oc apply -f spark-operator-metrics_svc.yaml

For Prometheus configuration, create the Spark Service Monitor:

$ oc apply -f spark-service-monitor.yaml

For Grafana configuration, create the Prometheus Datasource:

$ oc apply -f prometheus-datasource.yaml

NOTE: We will need another datasource to retrieve base CPU and RAM metrics. To do that we’ll connect to the "main" OpenShift Prometheus through the following procedure.

Grant the Grafana Service Account a cluster-monitoring-view cluster role:

$ oc adm policy add-cluster-role-to-user cluster-monitoring-view -z grafana-serviceaccount

Retrieve the bearer token used to authenticate to Prometheus:

$ export BEARER_TOKEN=$(oc serviceaccounts get-token grafana-serviceaccount)

Deploy main-prometheus-datasource.yaml file with the BEARER_TOKEN value.

Deploy the "main" Prometheus Datasource:

$ cat main-prometheus-datasource.yaml | sed -e "s/BEARER_TOKEN/$BEARER_TOKEN/g" | oc apply -f -

Create the Grafana dashboards:

$ oc apply -f spark-operator-dashboard.yaml
$ oc apply -f spark-application-dashboard.yaml

4. Deploy Spark History Server

Spark History server is a very helpful tool that helps persist and visually monitor metadata about Spark jobs executed on the OpenShift cluster

NOTE: The following commands are executed from the spark-history-server source code folder.

  • Object Storage Claim

We will use object storage to store the logs data from the Spark jobs, so first we need to create a bucket.

Create the OBC:

$ oc apply -f spark-hs-obc.yaml

NOTE: The Spark/Hadoop instances cannot send logs directly into a bucket. A "folder" must exist where the logs will be sent, so we will force creating this folder by uploading a hidden file into its location.

Retrieve the Access and Secret Key from the Secret named obc-spark-history-server, the Bucket name from the ConfigMap named obc-spark-history-server as well as the Route to the S3 storage (you may have to create it to access the RGW, default S3 Route in ODF points to MCG Noobaa).

Upload any file to the bucket using the AWS CLI like:

$ export AWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY
$ export AWS_SECRET_ACCESS_KEY=YOUR_SECRET_ACCESS_KEY
$ aws --endpoint-url <ROUTE_TO_S3> s3 cp YOUR_FILE s3://BUCKET_NAME/logs-dir/.s3keep

Renaming this file .s3keep will mark it as hidden from the History Server and Spark logging mechanism perspective, but the "log folder" will appear as being present.

  •      Deploy Spark History Server

We can now create the service account, Role, RoleBinding, Service, Route, and Deployment for the History Server.

deploy the History Server

$ oc apply -f spark-hs-deployment.yaml

History Server UI is now accessible through the Route that was created - spark-history-server, e.g.

image1-Sep-14-2021-01-27-26-47-PM

5. Run Sample Spark App

Below is a process to configure and run a quick test of a Spark application that performs the standard word count from Shakespeare’s sonnets.

  • Configure and populate Object storage

To create a bucket using ObjectBucketClaim (OBC), and populate it with the data.

 

This OBC creates a bucket in the RGW from an OpenShift Data Foundation deployment. Adapt the instructions depending on your S3 provider.

From the /test folder:

Create the OBC

$ oc apply -f obc.yaml

Retrieve the Access and Secret Key from the Secret named spark-demo, the name of the bucket from the ConfigMap named spark-demo as well as the Route to the S3 storage (you may have to create it to access the RGW, default S3 Route in ODF points to MCG).

Upload the test data (text file shakespeare.txt), to the bucket (here using the AWS CLI)

$ export AWS_ACCESS_KEY_ID=<ACCESS_KEY>
$ export AWS_SECRET_ACCESS_KEY=<SECRET_ACCESS_KEY>
$ aws --endpoint-url YOUR_ROUTE_TO_S3 s3 cp shakespeare.txt s3://BUCKET_NAME/shakespeare.txt

TIP: If S3 endpoint is using a self-signed certificate, we can add --no-verify-ssl to the command.

Our application code is wordcount.py located in the folder. To make it accessible to the Spark Application, it is packaged as data inside a Config Map that in turn will be mounted as a Volume inside our Spark Job YAML definition.

To create the application Config Map:

$ oc apply -f wordcount_configmap.yaml
  • Basic Test

To launch our Spark Job, we will be using the Spark Application CRD from the operator. Its YAML definition will include:

  • Application code file (wordcount.py) from the ConfigMap mounted as a volume
  • Inject values of S3 Endpoint, Bucket, Access, and Secret Keys inside the containers’ definition so that the driver and the workers can retrieve the data to process it/

Launch the Spark Job:

$ oc apply -f spark_app_shakespeare.yaml

To check creation and execution of Spark Application pods (look at the OpenShift UI or cli oc get po -w),  you will see the Spark driver, then the worker pods spawning. They will execute the program, then terminate.

App deployment

  • To retrieve the results of Spark Application execution:
  • List folder content:
$ aws --endpoint-url ROUTE_TO_S3 s3 ls s3://BUCKET_NAME/

Application run results should have been saved in a location called sorted_count_timestamp.

Retrieve the results:

$ aws --endpoint-url YOUR_ROUTE_TO_S3 s3 cp s3://BUCKET_NAME/sorted_counts_timestamp ./ --recursive

Running TPC-DS Decision Support benchmark

To prepare environment and data for comprehensive TPC-DS decision support benchmark, ensure the following:

  1. Spark History Server deployed in the environment. See Section above for details.
  2. Spark TPC-DS tooling deployed in the environment. Full documentation on image creation and TPC-DS benchmarking with Spark on S3 is done here: https://github.com/guimou/spark-tpcds
  3. Scaled up the Rados Gateways (RGW) from 1 to 4 instances to handle projected workload properly. Sample ObjectStore description shown below:image7-Sep-14-2021-01-28-35-06-PM
  4. Data generation and benchmark obtained  for 1TB generated datasets (using executors configurations in Spark Applications of different size)
    Cluster performance metrics obtained from the  ‘spark operator’ project during 1TB dataset processing with executor sized at: 5 CPU and 40GB RAM:               

image18-3

Cluster performance metrics for  ‘spark operator’ project  during running benchmarks for 1TB dataset using smaller executor pods. That pass ran with 144 Executors having each 5 CPU and 40GB RAM (balanced approach of filling the cluster with small footprint pods):

           image4-Sep-14-2021-01-30-53-29-PMCluster performance metrics for  ‘spark operator’ project  during running benchmarks for 1TB dataset using larger resource executors: 36 x 20 CPU/160GB RAM:

         image3-Sep-14-2021-01-31-52-90-PM

5. Spark History Server view of 1TB TCP-DS Dataset generation:

Spark History Server TPC-DS summary view:


Spark History Server TPC-DS summary view:

Detailed view of 1TB Dataset processing Benchmarks per executors:

6. Below are (sample) time metrics for the performed TPC-DS tests with 1GB and 1TB (using 2 different workers) data sets:

Test

Runtime (Seconds)

Query

1G DataSet

1TB DataSet

1TB DataSet Big Executors

q1-v2.4

6

23

18

q10-v2.4

5

32

30

q11-v2.4

4

52

51

q12-v2.4

2

11

9

q13-v2.4

3

62

66

q14a-v2.4

13

176

178

q14b-v2.4

10

152

149

q15-v2.4

1

23

24

q16-v2.4

2

48

49

q17-v2.4

3

77

74

q18-v2.4

3

40

39

q19-v2.4

1

33

32

q2-v2.4

2

20

20

q20-v2.4

1

18

17

q21-v2.4

1

3

3

q22-v2.4

2

15

14

q23a-v2.4

7

132

129

q23b-v2.4

9

138

133

q24a-v2.4

3

110

106

q24b-v2.4

2

102

107

q25-v2.4

2

86

83

q26-v2.4

2

30

31

q27-v2.4

2

48

47

Total Test Time:

1:31:59

1:29:28

As can be seen from total TPC-DS benchmark execution times for executors sized  5 CPU and 40GB RAM in column three and 20 CPU/160GB RAM in col. 4, total test execution times are very close to each other (both around 1.5 hrs) which validates both approaches of scaling Spark job executor pods.


Building  Spark Images Runtime

  • To accommodate different allowed combination of versions of Spark, Hadoop, K8s client etc. and options to inject custom data processing code, you may need to build and use a new base image that can accommodate configuration changes via arguments and config setting keys: 

Below is an example Dockerfile file (for Spark 2.4.6, Hadoop 3.3.0, K8s client 4.7.2 etc.) from which such custom image can be built:

-------------------------------------------------------------------------------------

FROM openjdk:8-jdk-alpine AS builder
# set desired Spark, hadoop and kubernetes client versions
ARG spark_version=2.4.6
ARG hadoop_version=3.3.0
ARG kubernetes_client_version=4.7.2
ARG jmx_prometheus_javaagent_version=0.15.0
ARG aws_java_sdk_version=1.11.682
ARG spark_uid=185
# Download Spark
COPY spark-${spark_version}-bin-without-hadoop.tgz .
# Unzip Spark
RUN tar -xvzf spark-${spark_version}-bin-without-hadoop.tgz
RUN mv spark-${spark_version}-bin-without-hadoop spark
# Download Hadoop from build environment
COPY hadoop-${hadoop_version}.tar.gz .
# Unzip Hadoop
RUN tar -xvzf hadoop-${hadoop_version}.tar.gz
RUN mv hadoop-${hadoop_version} hadoop
# Delete unnecessary hadoop documentation
RUN rm -rf hadoop/share/doc
# Download JMX Prometheus javaagent jar
ADD https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/${jmx_prometheus_javaagent_version}/jmx_prometheus_javaagent-${jmx_prometheus_javaagent_version}.jar /prometheus/
RUN chmod 0644 prometheus/jmx_prometheus_javaagent*.jar
# Delete old spark kubernetes client jars and replace them with newer version
WORKDIR /spark
RUN rm ./jars/kubernetes-*.jar
ADD https://repo1.maven.org/maven2/io/fabric8/kubernetes-model-common/${kubernetes_client_version}/kubernetes-model-common-${kubernetes_client_version}.jar jars/
ADD https://repo1.maven.org/maven2/io/fabric8/kubernetes-client/${kubernetes_client_version}/kubernetes-client-${kubernetes_client_version}.jar jars/
ADD https://repo1.maven.org/maven2/io/fabric8/kubernetes-model/${kubernetes_client_version}/kubernetes-model-${kubernetes_client_version}.jar jars/
RUN chmod 0644 jars/kubernetes-*.jar
# Install aws-java-sdk
WORKDIR /hadoop/share/hadoop/tools/lib
ADD https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${aws_java_sdk_version}/aws-java-sdk-bundle-${aws_java_sdk_version}.jar .
RUN chmod 0644 aws-java-sdk-bundle*.jar
FROM openjdk:8-jdk-alpine as final
WORKDIR /opt/spark
# Copy Spark from builder stage
COPY --from=builder /spark /opt/spark
COPY --from=builder /spark/kubernetes/dockerfiles/spark/entrypoint.sh /opt
# Copy Hadoop from builder stage
COPY --from=builder /hadoop /opt/hadoop
# Copy Prometheus jars from builder stage
COPY --from=builder /prometheus /prometheus
RUN set -ex && \
  apk upgrade --no-cache && \
  ln -s /lib /lib64 && \
  apk add --no-cache bash tini libc6-compat linux-pam nss && \
  mkdir -p /opt/spark && \
  mkdir -p /opt/spark/work-dir && \
  touch /opt/spark/RELEASE && \
  rm /bin/sh && \
  ln -sv /bin/bash /bin/sh && \
  echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
  chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
  rm -rf /var/cache/apt/*
# Configure environment variables for spark
ENV SPARK_HOME /opt/spark
ENV HADOOP_HOME /opt/hadoop
ENV SPARK_DIST_CLASSPATH="$HADOOP_HOME/etc/hadoop:$HADOOP_HOME/share/hadoop/common/lib/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs:$HADOOP_HOME/share/hadoop/hdfs/lib/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/yarn:$HADOOP_HOME/share/hadoop/yarn/lib/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/mapreduce/lib/*:$HADOOP_HOME/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:$HADOOP_HOME/share/hadoop/tools/lib/*"
ENV SPARK_EXTRA_CLASSPATH="$SPARK_DIST_CLASSPATH"
ENV LD_LIBRARY_PATH /lib64
# Set spark workdir
WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN mkdir -p /etc/metrics/conf
COPY conf/metrics.properties /etc/metrics/conf
COPY conf/prometheus.yaml /etc/metrics/conf
COPY entrypoint.sh /opt/entrypoint.sh
run chmod +x /opt/entrypoint.sh
ENTRYPOINT [ "/opt/entrypoint.sh" ]
USER ${spark_uid}

----------

Sample Spark, Kubernetes, Prometheus Java agent and AWS Java SDK Version combinations:

Spark Version

Kubernetes Client Version

Prometheus Java Agent Version

AWS Java SDK Version

2.4.6

4.7.2

0.15.0

1.11.852

2.4.7

4.7.2

0.15.0

1.11.890

3.0.0

4.7.2

0.15.0

1.11.828

3.0.1

4.7.2

0.15.0

1.11.880

  • For added flexibility of configuration of Spark applications, we can create an Object Bucket that will hold all its components: application code (e.g. JAR files), application data (Text files), logs for Spark History server, and others.

Create an OBC such as in the example below:

apiVersion: objectbucket.io/v1alpha1
kind: ObjectBucketClaim
metadata:
name: obc-spark
spec:
generateBucketName: obc-spark
storageClassName: ocs-storagecluster-ceph-rgw

---

  • Deploy the History Server as explained in the previous section (do not  forget to make adjustments to point to the proper bucket).

Transfer Spark application jar files to the Object Bucket. Example:

---

aws --endpoint-url ROUTE_TO_S3 s3 cp spark-examples_2.11-2.4.6.jar s3://YOUR_BUCKET_NAME/application/spark-examples_2.11-2.4.6.jar
  • Now we can create a Spark application (for example,  based on Scala using manifest shown in the file below, using credentials corresponding to the S3 bucket). The Access and Secret keys for S3 bucket are passed to pods as environment variables (‘env’ section):

---

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
spec:
type: Scala
mode: cluster
image: "<registry/image:tag>"
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "s3a://APP_BUCKET_NAME/application/spark-examples_2.11-2.4.6.jar"
arguments:
  - "1000"
sparkConf:
  "spark.kubernetes.local.dirs.tmpfs": "true"
  # History Server
  "spark.eventLog.enabled": "true"
  "spark.eventLog.dir": "s3a://HISTORY_SERVER_BUCKET/logs-dir/"
   # S3 Configuration for History server
"spark.hadoop.fs.s3a.bucket.APP_BUCKET.endpoint"
     "rook-ceph-rgw-ocs-storagecluster-cephobjectstore.openshift-storage.svc"
  "spark.hadoop.fs.s3a.bucket.HISTORY_SERVER_BUCKET.access.key": "YOUR_ACCESS_KEY"
  "spark.hadoop.fs.s3a.bucket.HISTORY_SERVER_BUCKET.secret.key": "YOUR_SECRET_KEY"
"spark.hadoop.fs.s3a.bucket.HISTORY_SERVER_BUCKET.path.style.access": "true"
"spark.hadoop.fs.s3a.bucket.HISTORY_SERVER_BUCKET.connection.ssl.enabled": "false"
  # S3 Configuration
  "spark.hadoop.fs.s3a.endpoint": "rook-ceph-rgw-ocs-storagecluster-cephobjectstore.openshift-storage.svc"
  "spark.hadoop.fs.s3a.path.style.access": "true"
  "spark.hadoop.fs.s3a.connection.ssl.enabled": "false"
sparkVersion: "2.4.6"
restartPolicy:
  type: Never
driver:
  cores: 1
  coreLimit: "1"
  memory: "512m"
  serviceAccount: 'spark'
  env:
    - name: BUCKET_NAME
      valueFrom:
        configMapKeyRef:
          name: obc-spark
          key: BUCKET_NAME
    - name: AWS_ACCESS_KEY_ID
      valueFrom:
        secretKeyRef:
          name: obc-spark
          key: AWS_ACCESS_KEY_ID
    - name: AWS_SECRET_ACCESS_KEY
      valueFrom:
        secretKeyRef:
          name: obc-spark
          key: AWS_SECRET_ACCESS_KEY
executor:
  cores: 1
  coreLimit: "1"
  instances: 2
  memory: "1000m"
  env:
    - name: BUCKET_NAME
      valueFrom:
        configMapKeyRef:
          name: obc-spark
          key: BUCKET_NAME
    - name: AWS_ACCESS_KEY_ID
      valueFrom:
        secretKeyRef:
          name: obc-spark
          key: AWS_ACCESS_KEY_ID
    - name: AWS_SECRET_ACCESS_KEY
      valueFrom:
        secretKeyRef:
          name: obc-spark
          key: AWS_SECRET_ACCESS_KEY
  • If  Spark App has data to consume, it can be placed inside the same Object Bucket (under /data subfolder for example), then this location is referenced from an application code. The S3 connection would already have been initialized from the hadoop configuration in the YAML file. We can also pass other information to app code (like the name of the bucket … ) by creating environment variables pointing to the bucket-associated ConfigMap and Secret.

To use different buckets for Data, Application, and Logs, we must create specific per-bucket configurations in the Spark Application configuration, like shown in the example below:

"spark.hadoop.fs.s3a.bucket.YOUR_BUCKET.endpoint": "rook-ceph-rgw-ocs-storagecluster-cephobjectstore.openshift-storage.svc"
"spark.hadoop.fs.s3a.bucket.YOUR_BUCKET.access.key": "YOUR_ACCESS_KEY"
"spark.hadoop.fs.s3a.bucket.YOUR_BUCKET.secret.key": "YOUR_SECRET_KEY"
"spark.hadoop.fs.s3a.bucket.YOUR_BUCKET.path.style.access": "true"
"spark.hadoop.fs.s3a.bucket.YOUR_BUCKET.connection.ssl.enabled": "false"

---

(This must be done for each one of the different buckets used).


Running Spark Jobs on OpenShift

Finally, after all the configurations of platform and application descriptors, we are ready to take our Spark application solution for a test drive! How can we do that?

  • As described above, datasets can be populated into S3 object storage buckets (based on ODF object storage claims  in our architecture or provided by hyperscaler  such as AWS S3 and others).

For example, to populate data for any stage, we can use AWS S3 commands like these below:

aws --endpoint-url ROUTE_TO_S3 s3 cp <data files> s3://YOUR_BUCKET_NAME/data/<data files>
  •   To kick off run a Spark Job defined as described in previous section, we need to initiate execution of driver by deploying it into OpenShift same way we deploy any other containerized application - CLI command:
oc create -f <Spark Job descriptor.yaml> 

This will start a driver pod (that will create an ephemeral instance of Spark cluster running as a container on OpenShift) and then initiate executor pods which will run application code connected to that cluster for data processing.

  • To ensure that no executor pods for large dataset tests are running out of memory for JVM (and no jobs are getting lost as a result), we can apply the following configurations to all Spark job manifests:

Use the spark.cleaner.* properties to configure dynamic JVM Garbage Collection (GC) so unused memory can be reclaimed by JVM and prevent OOM situation as shown in the example below:

Using explicit _JAVA_OPTIONS values for Java CLI runtime settings inside containers for executors passed in as ENV variables to enforce memory utilization and garbage collection pattern settings:   

Sizing memory allocated to pods and JVMs can be set via spark.memory.fraction and  spark.memory.storageFraction properties (set to 0.3 and 0.7 respectively in the example above) and controls memory overhead fraction for executor pods as shown above

NOTE: At a  runtime, these and other Spark job configuration settings can be seen in the Spark History server details under “Spark properties”:

image17-3

           

Another important parameter impacting performance of Spark jobs is spark.default.parallelism (default parallelism and shuffle partition problems in both RDD and DataFrame API based application implementation are used to tune the number of partitions at runtime).

An example of usage of spark.default.parallelism parameter use is shown below:

In our experience, using parallelism setting properly can significantly improve performance of Spark job execution, but on the flip side might cause sporadic failures of executor pods.

Evaluating Performance

Of course, after investing efforts into  cluster architecture and application configuration work to containerize Spark job applications, we would love to see all that work pay off via good performance!

The following end-to-end data processing tests were performed with 43GB dataset files stored in S3 buckets provided by ODF by executing all job “stages” in sequence. Each stage of the overall data processing pipeline (Preparation, Validation, Acceptance, and others ) was executed sequentially by deploying corresponding Spark job YAML files to OpenShift, as shown in the previous section.

Results were monitored via Spark History server (time) and multiple cluster infrastructure and application level metrics available in OpenShift such as shown below for overall cluster resources:

 and for ODF storage cluster use at one of peak loads for data processing (showing high IOPS rate and sufficient latency)

image9-Sep-14-2021-01-38-14-60-PM

By performing systematic load testing runs and optimizing performance of both OpenShift/ODF clusters and application configuration based on observed metrics, you should hopefully be able to achieve noticeable improvements in time to complete processing of large (43 GB) datasets by containerized Spark jobs.


Summary

This article outlines recommended architectures for running Spark jobs on OpenShift clusters and highlights the following how-tos that we hope will be useful for developing and running Spark job workloads at large scale:

  • Provision a properly sized OCP 4.7.x cluster in AWS cloud
  • Scale the cluster resources to match the sizing specifications (designated OpenShift Data Foundation storage cluster nodes and Worker nodes)
  • Deploy and configure OpenShift Data Foundation storage cluster on dedicated cluster nodes using  and verify its capacity.
  • Configure S3 object store buckets for storage of  data, application run-time code and Spark history server logs.
  • Build and deploy Google Spark operator via container images, test  simple Spark application to  validate operator configuration.
  • Containerize existing production scale Scala implementations of various data processing Spark job “stages” using OpenShift K8s object descriptors.
  • Run Spark jobs on OpenShift platform while monitoring performance metrics and history data for optimization purposes.
  • Optimize configuration of OpenShift infrastructure (OpenShift Data Foundation cluster, number and configuration of worker nodes) and Spark Job manifests to Further improve performance of Spark jobs.

Categories

Storage, How-tos, data, Data Science

< Back to the blog