Source code for mlrun.runtimes.remotesparkjob

# Copyright 2021 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import typing
from subprocess import run

from mlrun.config import config

from ..model import RunObject
from ..platforms.iguazio import mount_v3io_extended, mount_v3iod
from .kubejob import KubejobRuntime, KubeRuntimeHandler
from .pod import KubeResourceSpec


class RemoteSparkSpec(KubeResourceSpec):
    _dict_fields = KubeResourceSpec._dict_fields + ["provider"]

    def __init__(
        self,
        command=None,
        args=None,
        image=None,
        mode=None,
        volumes=None,
        volume_mounts=None,
        env=None,
        resources=None,
        default_handler=None,
        entry_points=None,
        description=None,
        workdir=None,
        replicas=None,
        image_pull_policy=None,
        service_account=None,
        build=None,
        image_pull_secret=None,
        provider=None,
        node_name=None,
        node_selector=None,
        affinity=None,
        priority_class_name=None,
        disable_auto_mount=False,
        pythonpath=None,
        tolerations=None,
        preemption_mode=None,
    ):
        super().__init__(
            command=command,
            args=args,
            image=image,
            mode=mode,
            volumes=volumes,
            volume_mounts=volume_mounts,
            env=env,
            resources=resources,
            default_handler=default_handler,
            entry_points=entry_points,
            description=description,
            workdir=workdir,
            replicas=replicas,
            image_pull_policy=image_pull_policy,
            service_account=service_account,
            build=build,
            image_pull_secret=image_pull_secret,
            node_name=node_name,
            node_selector=node_selector,
            affinity=affinity,
            priority_class_name=priority_class_name,
            disable_auto_mount=disable_auto_mount,
            pythonpath=pythonpath,
            tolerations=tolerations,
            preemption_mode=preemption_mode,
        )
        self.provider = provider


class RemoteSparkProviders(object):
    iguazio = "iguazio"


[docs]class RemoteSparkRuntime(KubejobRuntime): kind = "remote-spark" default_image = ".remote-spark-default-image"
[docs] @classmethod def deploy_default_image(cls): from mlrun import get_run_db from mlrun.run import new_function sj = new_function( kind="remote-spark", name="remote-spark-default-image-deploy-temp" ) sj.spec.build.image = cls.default_image sj.with_spark_service(spark_service="dummy-spark") sj.deploy() get_run_db().delete_function(name=sj.metadata.name)
[docs] def is_deployed(self): if ( not self.spec.build.source and not self.spec.build.commands and not self.spec.build.extra ): return True return super().is_deployed()
def _run(self, runobj: RunObject, execution): self.spec.image = self.spec.image or self.default_image super()._run(runobj=runobj, execution=execution) @property def spec(self) -> RemoteSparkSpec: return self._spec @spec.setter def spec(self, spec): self._spec = self._verify_dict(spec, "spec", RemoteSparkSpec)
[docs] def with_spark_service(self, spark_service, provider=RemoteSparkProviders.iguazio): """Attach spark service to function""" self.spec.provider = provider if provider == RemoteSparkProviders.iguazio: self.spec.env.append( {"name": "MLRUN_SPARK_CLIENT_IGZ_SPARK", "value": "true"} ) self.apply(mount_v3io_extended()) self.apply( mount_v3iod( namespace=config.namespace, v3io_config_configmap=spark_service + "-submit", ) )
@property def _resolve_default_base_image(self): if ( self.spec.provider == RemoteSparkProviders.iguazio and config.spark_app_image and config.spark_app_image_tag ): app_image = re.sub("spark-app", "shell", config.spark_app_image) # this is temporary until we get the image name from external config return app_image + ":" + config.spark_app_image_tag return None
[docs] def deploy( self, watch=True, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None, show_on_failure: bool = False, ): """deploy function, build container with dependencies :param watch: wait for the deploy to complete (and print build logs) :param with_mlrun: add the current mlrun package to the container build :param skip_deployed: skip the build if we already have an image for the function :param mlrun_version_specifier: which mlrun package version to include (if not current) :param builder_env: Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={"GIT_TOKEN": token} :param show_on_failure: show logs only in case of build failure :return True if the function is ready (deployed) """ # connect will populate the config from the server config if not self.spec.build.base_image: self.spec.build.base_image = self._resolve_default_base_image return super().deploy( watch=watch, with_mlrun=with_mlrun, skip_deployed=skip_deployed, is_kfp=is_kfp, mlrun_version_specifier=mlrun_version_specifier, show_on_failure=show_on_failure, )
class RemoteSparkRuntimeHandler(KubeRuntimeHandler): @staticmethod def _are_resources_coupled_to_run_object() -> bool: return True @staticmethod def _get_object_label_selector(object_id: str) -> str: return f"mlrun/uid={object_id}" @staticmethod def _get_possible_mlrun_class_label_values() -> typing.List[str]: return ["remote-spark"] def igz_spark_pre_hook(): run(["/bin/bash", "/etc/config/v3io/spark-job-init.sh"])