# Copyright 2023 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from subprocess import run
import kubernetes.client
from mlrun_pipelines.mounts import mount_v3io, mount_v3iod
import mlrun.errors
from mlrun.config import config
from .kubejob import KubejobRuntime
from .pod import KubeResourceSpec
class RemoteSparkSpec(KubeResourceSpec):
_dict_fields = KubeResourceSpec._dict_fields + ["provider"]
def __init__(
self,
command=None,
args=None,
image=None,
mode=None,
volumes=None,
volume_mounts=None,
env=None,
resources=None,
default_handler=None,
entry_points=None,
description=None,
workdir=None,
replicas=None,
image_pull_policy=None,
service_account=None,
build=None,
image_pull_secret=None,
provider=None,
node_name=None,
node_selector=None,
affinity=None,
priority_class_name=None,
disable_auto_mount=False,
pythonpath=None,
tolerations=None,
preemption_mode=None,
security_context=None,
clone_target_dir=None,
state_thresholds=None,
):
super().__init__(
command=command,
args=args,
image=image,
mode=mode,
volumes=volumes,
volume_mounts=volume_mounts,
env=env,
resources=resources,
default_handler=default_handler,
entry_points=entry_points,
description=description,
workdir=workdir,
replicas=replicas,
image_pull_policy=image_pull_policy,
service_account=service_account,
build=build,
image_pull_secret=image_pull_secret,
node_name=node_name,
node_selector=node_selector,
affinity=affinity,
priority_class_name=priority_class_name,
disable_auto_mount=disable_auto_mount,
pythonpath=pythonpath,
tolerations=tolerations,
preemption_mode=preemption_mode,
security_context=security_context,
clone_target_dir=clone_target_dir,
state_thresholds=state_thresholds,
)
self.provider = provider
class RemoteSparkProviders:
iguazio = "iguazio"
[docs]class RemoteSparkRuntime(KubejobRuntime):
kind = "remote-spark"
default_image = ".remote-spark-default-image"
[docs] @classmethod
def deploy_default_image(cls):
sj = mlrun.new_function(
kind="remote-spark", name="remote-spark-default-image-deploy-temp"
)
sj.spec.build.image = cls.default_image
sj.with_spark_service(spark_service="dummy-spark")
sj.deploy()
mlrun.get_run_db().delete_function(name=sj.metadata.name)
[docs] def is_deployed(self):
if (
not self.spec.build.source
and not self.spec.build.commands
and not self.spec.build.extra
):
return True
return super().is_deployed()
@property
def spec(self) -> RemoteSparkSpec:
return self._spec
@spec.setter
def spec(self, spec):
self._spec = self._verify_dict(spec, "spec", RemoteSparkSpec)
[docs] def with_spark_service(
self,
spark_service,
provider=RemoteSparkProviders.iguazio,
with_v3io_mount=True,
):
"""Attach spark service to function"""
self.spec.provider = provider
if provider == RemoteSparkProviders.iguazio:
self.spec.env.append(
{"name": "MLRUN_SPARK_CLIENT_IGZ_SPARK", "value": "true"}
)
if with_v3io_mount:
self.apply(mount_v3io())
self.apply(
mount_v3iod(
namespace=config.namespace,
v3io_config_configmap=spark_service + "-submit",
)
)
[docs] def with_security_context(
self, security_context: kubernetes.client.V1SecurityContext
):
"""
With security context is not supported for spark runtime.
Driver / Executor processes run with uid / gid 1000 as long as security context is not defined.
If in the future we want to support setting security context it will work only from spark version 3.2 onwards.
"""
raise mlrun.errors.MLRunInvalidArgumentTypeError(
"with_security_context is not supported with remote spark"
)
@property
def _resolve_default_base_image(self):
if (
self.spec.provider == RemoteSparkProviders.iguazio
and config.spark_app_image
and config.spark_app_image_tag
):
app_image = re.sub("spark-app", "shell", config.spark_app_image)
# this is temporary until we get the image name from external config
return app_image + ":" + config.spark_app_image_tag
return None
[docs] def deploy(
self,
watch=True,
with_mlrun=None,
skip_deployed=False,
is_kfp=False,
mlrun_version_specifier=None,
builder_env: dict = None,
show_on_failure: bool = False,
force_build: bool = False,
):
"""deploy function, build container with dependencies
:param watch: wait for the deploy to complete (and print build logs)
:param with_mlrun: add the current mlrun package to the container build
:param skip_deployed: skip the build if we already have an image for the function
:param is_kfp: deploy as part of a kfp pipeline
:param mlrun_version_specifier: which mlrun package version to include (if not current)
:param builder_env: Kaniko builder pod env vars dict (for config/credentials)
e.g. builder_env={"GIT_TOKEN": token}
:param show_on_failure: show logs only in case of build failure
:param force_build: force building the image, even when no changes were made
:return True if the function is ready (deployed)
"""
# connect will populate the config from the server config
if not self.spec.build.base_image:
self.spec.build.base_image = self._resolve_default_base_image
return super().deploy(
watch=watch,
with_mlrun=with_mlrun,
skip_deployed=skip_deployed,
is_kfp=is_kfp,
mlrun_version_specifier=mlrun_version_specifier,
builder_env=builder_env,
show_on_failure=show_on_failure,
force_build=force_build,
)
def igz_spark_pre_hook():
run(["/bin/bash", "/etc/config/v3io/spark-job-init.sh"])