Spark Operator runtime#


The Spark runtimes spark and remote-spark do not support dbfs, http, or memory data stores.

The spark-on-k8s-operator allows Spark applications to be defined in a declarative manner and supports one-time Spark applications with SparkApplication and cron-scheduled applications with ScheduledSparkApplication.

When sending a request with MLRun to the Spark operator, the request contains your full application configuration including the code and dependencies to run (packaged as a docker image or specified via URIs), the infrastructure parameters, (e.g. the memory, CPU, and storage volume specs to allocate to each Spark executor), and the Spark configuration.

Kubernetes takes this request and starts the Spark driver in a Kubernetes pod (a k8s abstraction, just a docker container in this case). The Spark driver then communicates directly with the Kubernetes master to request executor pods, scaling them up and down at runtime according to the load if dynamic allocation is enabled. Kubernetes takes care of the bin-packing of the pods onto Kubernetes nodes (the physical VMs), and dynamically scales the various node pools to meet the requirements.

When using the Spark operator the resources are allocated per task, meaning that it scales down to zero when the task is done.

Memory limit

The Spark memory limit is calculated inside Spark based on the requests and memory overhead, and uses the spark.kubernetes.memoryOverheadFactor, which is set, by default, to 0.4. (See Running Spark on Kubernetes.) This results in higher memory than what you configure. To control the memory overhead, use:

func.spec.spark_conf["spark.driver.memoryOverhead"] = 0
func.spec.spark_conf["spark.executor.memoryOverhead"] = 0

where: 0 means no additional memory (in addition to what you configure); 100 means 100MiB of additional memory; "1g" means 1GiB of additional memory.

V3IO access

If your runtime should access V3IO, use with_igz_spark(). When calling func.with_igz_spark() the default spec and dependencies are defined.


To avoid unexpected behavior, do not override these defaults.

The default spec is:

{'spark.eventLog.enabled': 'true',
'spark.eventLog.dir': 'file:///v3io/users/spark_history_server_logs'} #only added if there is a spark history server configured

And the default dependencies are:

{'jars': ['local:///spark/v3io-libs/v3io-hcfs_2.12.jar',
'files': ['local:///igz/java/libs/']}

Example of Spark function with Spark operator#

import mlrun
import os

# set up new spark function with spark operator
# command will use our spark code which needs to be located on our file system
# the name param can have only non capital letters (k8s convention)
read_csv_filepath = os.path.join(os.path.abspath("."), "")
sj = mlrun.new_function(kind="spark", command=read_csv_filepath, name="sparkreadcsv")

# set spark driver config (gpu_type & gpus=<number_of_gpus>  supported too)
sj.with_driver_requests(cpu=1, mem="512m")

# set spark executor config (gpu_type & gpus=<number_of_gpus> are supported too)
sj.with_executor_requests(cpu=1, mem="512m")

# adds fuse, daemon & iguazio's jars support

# Alternately, move volume_mounts to driver and executor-specific fields and leave
# v3io mounts out of executor mounts if mount_v3io_to_executor=False
# sj.with_igz_spark(mount_v3io_to_executor=False)

# set spark driver volume mount
# sj.function.with_driver_host_path_volume("/host/path", "/mount/path")

# set spark executor volume mount
# sj.function.with_executor_host_path_volume("/host/path", "/mount/path")

# add python module

# Number of executors
sj.spec.replicas = 2

# add jars
# sj.spec.deps["jars"] += ["local:///<path to jar>"]
# Rebuilds the image with MLRun - needed in order to support logging artifacts etc.
# Run task while setting the artifact path on which the run artifact (in any) will be saved"/User")

Spark Code (

from pyspark.sql import SparkSession
from mlrun import get_or_create_ctx

context = get_or_create_ctx("spark-function")

# build spark session
spark = SparkSession.builder.appName("Spark job").getOrCreate()

# read csv
df ='iris.csv', format="csv",
                     sep=",", header="true")

# sample for logging
df_to_log = df.describe().toPandas()

# log final report