Using the Spark execution engine¶
The feature store supports using Spark for ingesting, transforming and writing results to data targets. When using Spark, the internal execution graph is executed synchronously, by utilizing a Spark session to perform read and write operations, as well as potential transformations on the data. Executing synchronously means that the source data is fully read into a data-frame that is processed, writing the output to the targets defined.
Spark execution can be done locally, utilizing a local Spark session provided to the ingestion call. To use Spark as the transformation engine in ingestion, follow these steps:
When constructing the
FeatureSet
object, pass anengine
parameter and set it tospark
. For example:feature_set = fstore.FeatureSet("stocks", entities=[fstore.Entity("ticker")], engine="spark")
To use a local Spark session, pass a Spark session context when calling the
ingest()
function, as thespark_context
parameter. This session is used for data operations and transformations.To use a remote execution engine (remote spark or spark operator), pass a
RunConfig
object as therun_config
parameter for theingest
API. The actual remote function to execute depends on the object passed:A default
RunConfig
, in which case the ingestion code either generates a new MLRun function runtime of typeremote-spark
, or utilizes the function specified infeature_set.spec.function
(in which case, it has to be of runtime typeremote-spark
orspark
).A
RunConfig
that has a function configured within it. As mentioned, the function runtime must be of typeremote-spark
orspark
.
See full examples in:
Local Spark ingestion example¶
The following code executes data ingestion using a local Spark session.
When using a local Spark session, the ingest
API would wait for its completion.
import mlrun
from mlrun.datastore.sources import CSVSource
import mlrun.feature_store as fstore
from pyspark.sql import SparkSession
mlrun.set_environment(project="stocks")
feature_set = fstore.FeatureSet("stocks", entities=[fstore.Entity("ticker")], engine="spark")
# add_aggregation can be used in conjunction with Spark
feature_set.add_aggregation("price", ["min", "max"], ["1h"], "10m")
source = CSVSource("mycsv", path="v3io:///projects/stocks.csv")
# Execution using a local Spark session
spark = SparkSession.builder.appName("Spark function").getOrCreate()
fstore.ingest(feature_set, source, spark_context=spark)
Remote Spark ingestion example¶
When using remote execution the MLRun run execution details are returned, allowing tracking of its status and results.
The following code should be executed only once to build the remote spark image before running the first ingest. It may take a few minutes to prepare the image.
from mlrun.runtimes import RemoteSparkRuntime
RemoteSparkRuntime.deploy_default_image()
Remote ingestion:
# mlrun: start-code
from mlrun.feature_store.api import ingest
def ingest_handler(context):
ingest(mlrun_context=context) # The handler function must call ingest with the mlrun_context
You can run your PySpark code for ingesting data into the feature store by adding:
def my_spark_func(df, context=None):
return df.filter("bid>55") # PySpark code
# mlrun: end-code
from mlrun.datastore.sources import CSVSource
from mlrun import code_to_function
import mlrun.feature_store as fstore
feature_set = fstore.FeatureSet("stock-quotes", entities=[fstore.Entity("ticker")], engine="spark")
source = CSVSource("mycsv", path="v3io:///projects/quotes.csv")
spark_service_name = "iguazio-spark-service" # As configured & shown in the Iguazio dashboard
feature_set.graph.to(name="s1", handler="my_spark_func")
my_func = code_to_function("func", kind="remote-spark")
config = fstore.RunConfig(local=False, function=my_func, handler="ingest_handler")
fstore.ingest(feature_set, source, run_config=config, spark_context=spark_service_name)
Spark operator ingestion example¶
When running with a Spark operator, the MLRun execution details are returned, allowing tracking of the job’s status and results.
The following code should be executed only once to build the spark job image before running the first ingest. It may take a few minutes to prepare the image.
from mlrun.runtimes import Spark3Runtime
Spark3Runtime.deploy_default_image()
Spark operator ingestion:
# mlrun: start-code
from mlrun.feature_store.api import ingest
def ingest_handler(context):
ingest(mlrun_context=context) # The handler function must call ingest with the mlrun_context
# You can add your own PySpark code as a graph step:
def my_spark_func(df, context=None):
return df.filter("bid>55") # PySpark code
# mlrun: end-code
from mlrun.datastore.sources import CSVSource
from mlrun import code_to_function
import mlrun.feature_store as fstore
feature_set = fstore.FeatureSet("stock-quotes", entities=[fstore.Entity("ticker")], engine="spark")
source = CSVSource("mycsv", path="v3io:///projects/quotes.csv")
feature_set.graph.to(name="s1", handler="my_spark_func")
my_func = code_to_function("func", kind="spark")
my_func.with_driver_requests(cpu="200m", mem="1G")
my_func.with_executor_requests(cpu="200m", mem="1G")
my_func.with_igz_spark()
# Enables using the default image (can be replace with specifying a specific image with .spec.image)
my_func.spec.use_default_image = True
# Not a must - default: 1
my_func.spec.replicas = 2
# If needed, sparkConf can be modified like this:
# my_func.spec.spark_conf['spark.specific.config.key'] = 'value'
config = fstore.RunConfig(local=False, function=my_func, handler="ingest_handler")
fstore.ingest(feature_set, source, run_config=config)
Spark execution engine over S3 - full flow example¶
For Spark to work with S3, it requires several properties to be set. The following example writes a feature set to S3 in the parquet format in a remote k8s job:
One-time setup:
Deploy the default image for your job (this takes several minutes but should be executed only once per cluster for any MLRun/Iguazio upgrade):
from mlrun.runtimes import RemoteSparkRuntime RemoteSparkRuntime.deploy_default_image()
Store your S3 credentials in a k8s secret:
import mlrun secrets = {'s3_access_key': AWS_ACCESS_KEY, 's3_secret_key': AWS_SECRET_KEY} mlrun.get_run_db().create_project_secrets( project = "uhuh-proj", provider=mlrun.api.schemas.SecretProviderName.kubernetes, secrets=secrets )
Ingestion job code (to be executed in the remote pod):
# mlrun: start-code
from pyspark import SparkConf
from pyspark.sql import SparkSession
from mlrun.feature_store.api import ingest
def ingest_handler(context):
conf = (SparkConf()
.set("spark.hadoop.fs.s3a.path.style.access", True)
.set("spark.hadoop.fs.s3a.access.key", context.get_secret('s3_access_key'))
.set("spark.hadoop.fs.s3a.secret.key", context.get_secret('s3_secret_key'))
.set("spark.hadoop.fs.s3a.endpoint", context.get_param("s3_endpoint"))
.set("spark.hadoop.fs.s3a.region", context.get_param("s3_region"))
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("com.amazonaws.services.s3.enableV4", True)
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true"))
spark = (
SparkSession.builder.config(conf=conf).appName("S3 app").getOrCreate()
)
ingest(mlrun_context=context, spark_context=spark)
# mlrun: end-code
Ingestion invocation:
from mlrun.datastore.sources import CSVSource
from mlrun.datastore.targets import ParquetTarget
from mlrun import code_to_function
import mlrun.feature_store as fstore
feature_set = fstore.FeatureSet("stock-quotes", entities=[fstore.Entity("ticker")], engine="spark")
source = CSVSource("mycsv", path="v3io:///projects/quotes.csv")
spark_service_name = "spark" # As configured & shown in the Iguazio dashboard
fn = code_to_function(kind='remote-spark', name='func')
run_config = fstore.RunConfig(local=False, function=fn, handler="ingest_handler")
run_config.with_secret('kubernetes', ['s3_access_key', 's3_secret_key'])
run_config.parameters = {
"s3_endpoint" : "s3.us-east-2.amazonaws.com",
"s3_region" : "us-east-2"
}
target = ParquetTarget(
path = "s3://my-s3-bucket/some/path",
partitioned = False,
)
fstore.ingest(feature_set, source, targets=[target], run_config=run_config, spark_context=spark_service_name)