Source code for mlrun.runtimes.mounts

# Copyright 2023 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import typing
import warnings
from collections import namedtuple

import mlrun.common.secrets
import mlrun.errors
from mlrun.config import config
from mlrun.config import config as mlconf
from mlrun.errors import MLRunInvalidArgumentError
from mlrun.platforms.iguazio import v3io_to_vol
from mlrun.utils import logger

if typing.TYPE_CHECKING:
    from mlrun.runtimes import KubeResource


VolumeMount = namedtuple("Mount", ["path", "sub_path"])


[docs] def v3io_cred( api: str = "", user: str = "", access_key: str = "", ) -> typing.Callable[["KubeResource"], "KubeResource"]: """ Modifier function to copy local v3io env vars to container Usage:: train = train_op(...) train.apply(use_v3io_cred()) """ def _use_v3io_cred(runtime: "KubeResource"): web_api = api or os.environ.get("V3IO_API") or mlconf.v3io_api _user = user or os.environ.get("V3IO_USERNAME") _access_key = access_key or os.environ.get("V3IO_ACCESS_KEY") v3io_framesd = mlconf.v3io_framesd or os.environ.get("V3IO_FRAMESD") runtime.set_envs( { "V3IO_API": web_api, "V3IO_USERNAME": _user, "V3IO_ACCESS_KEY": _access_key, "V3IO_FRAMESD": v3io_framesd, }, ) return runtime return _use_v3io_cred
[docs] def mount_v3io( name: str = "v3io", remote: str = "", access_key: str = "", user: str = "", secret: str | None = None, volume_mounts: list[VolumeMount] | None = None, ) -> typing.Callable[["KubeResource"], "KubeResource"]: """Modifier function to apply to a Container Op to volume mount a v3io path :param name: the volume name :param remote: the v3io path to use for the volume (~/ prefix will be replaced with /users/<username>/) :param access_key: the access key used to auth against v3io (default: V3IO_ACCESS_KEY env var) :param user: the username used to auth against v3io (default: V3IO_USERNAME env var) :param secret: k8s secret name for the username and access key :param volume_mounts: list of VolumeMount; if empty, defaults to mounting /v3io and /User """ volume_mounts, user = _enrich_and_validate_v3io_mounts( remote=remote, volume_mounts=volume_mounts, user=user, ) def _attach_volume_mounts_and_creds(runtime: "KubeResource"): vol = v3io_to_vol(name, remote, access_key, user, secret=secret) runtime.spec.with_volumes(vol) for volume_mount in volume_mounts: runtime.spec.with_volume_mounts( { "mountPath": volume_mount.path, "name": name, "subPath": volume_mount.sub_path, } ) if not secret: runtime = v3io_cred(access_key=access_key, user=user)(runtime) return runtime return _attach_volume_mounts_and_creds
def mount_spark_conf() -> typing.Callable[["KubeResource"], "KubeResource"]: """Modifier function to mount Spark configuration.""" def _mount_spark(runtime: "KubeResource"): runtime.spec.with_volume_mounts( { "mountPath": "/etc/config/spark", "name": "spark-master-config", } ) return runtime return _mount_spark def mount_v3iod( namespace: str, v3io_config_configmap: str ) -> typing.Callable[["KubeResource"], "KubeResource"]: """Modifier function to mount v3iod configuration.""" def _mount_v3iod(runtime: "KubeResource"): def add_vol(name, mount_path, host_path): runtime.spec.with_volumes( { "name": name, "hostPath": { "path": host_path, "type": "", }, } ) runtime.spec.with_volume_mounts( { "mountPath": mount_path, "name": name, } ) add_vol( name="shm", mount_path="/dev/shm", host_path=f"/var/run/iguazio/dayman-shm/{namespace}", ) add_vol( name="v3iod-comm", mount_path="/var/run/iguazio/dayman", host_path="/var/run/iguazio/dayman/" + namespace, ) # Add daemon-health and v3io-config volumes runtime.spec.with_volumes( [ { "name": "daemon-health", "emptyDir": {}, }, { "name": "v3io-config", "configMap": { "name": v3io_config_configmap, "defaultMode": 420, }, }, ] ) # Add volume mounts runtime.spec.with_volume_mounts( [ { "mountPath": "/var/run/iguazio/daemon_health", "name": "daemon-health", }, { "mountPath": "/etc/config/v3io", "name": "v3io-config", }, ] ) # Add environment variables runtime.set_envs( { "CURRENT_NODE_IP": { "valueFrom": { "fieldRef": { "apiVersion": "v1", "fieldPath": "status.hostIP", } }, }, "IGZ_DATA_CONFIG_FILE": "/igz/java/conf/v3io.conf", } ) return runtime return _mount_v3iod def mount_s3( secret_name: str | None = None, aws_access_key: str = "", aws_secret_key: str = "", endpoint_url: str | None = None, prefix: str = "", aws_region: str | None = None, non_anonymous: bool = False, ) -> typing.Callable[["KubeResource"], "KubeResource"]: """Modifier function to add s3 env vars or secrets to container :param secret_name: Kubernetes secret name for credentials :param aws_access_key: AWS_ACCESS_KEY_ID value (default: env variable) :param aws_secret_key: AWS_SECRET_ACCESS_KEY value (default: env variable) :param endpoint_url: s3 endpoint address (for non-AWS s3) :param prefix: prefix to add before the env var name (for multiple s3 data stores) :param aws_region: Amazon region :param non_anonymous: use non-anonymous connection even if no credentials are provided (for authenticating externally, such as through IAM instance-roles) """ if secret_name and (aws_access_key or aws_secret_key): raise MLRunInvalidArgumentError( "Can use k8s_secret for credentials or specify them (aws_access_key, aws_secret_key) not both." ) if not secret_name and ( aws_access_key or os.environ.get(prefix + "AWS_ACCESS_KEY_ID") or aws_secret_key or os.environ.get(prefix + "AWS_SECRET_ACCESS_KEY") ): logger.warning( "It is recommended to use k8s secret (specify secret_name), " "specifying aws_access_key/aws_secret_key directly is unsafe." ) def _use_s3_cred(runtime: "KubeResource"): _access_key = aws_access_key or os.environ.get(prefix + "AWS_ACCESS_KEY_ID") _secret_key = aws_secret_key or os.environ.get(prefix + "AWS_SECRET_ACCESS_KEY") # Check for endpoint URL with backward compatibility _endpoint_url = endpoint_url or os.environ.get(prefix + "AWS_ENDPOINT_URL_S3") if not _endpoint_url: # Check for deprecated environment variable _endpoint_url = os.environ.get(prefix + "S3_ENDPOINT_URL") if _endpoint_url: warnings.warn( "S3_ENDPOINT_URL is deprecated in 1.10.0 and will be removed in 1.12.0, " "use AWS_ENDPOINT_URL_S3 instead.", # TODO: Remove this in 1.12.0 FutureWarning, ) # Auto-mount fills only env vars the user did not already set as a plain value, # so explicit user input (e.g. from the UI batch-run wizard) survives enrichment # (ML-12330). Plain values we *do* write get flagged so server-side project-secret # injection can later override them (ML-12572); value_from writes don't need the # flag — has_user_set_plain_env already returns False for them. def _set_if_not_user_set(name, value=None, value_from=None): if runtime.has_user_set_plain_env(name): return runtime.set_env(name, value=value, value_from=value_from) if value_from is None: runtime.mark_env_auto_mount_injected(name) if _endpoint_url: _set_if_not_user_set(prefix + "AWS_ENDPOINT_URL_S3", _endpoint_url) if aws_region: _set_if_not_user_set(prefix + "AWS_REGION", aws_region) if non_anonymous: _set_if_not_user_set(prefix + "S3_NON_ANONYMOUS", "true") if secret_name: _set_if_not_user_set( f"{prefix}AWS_ACCESS_KEY_ID", value_from={ "secretKeyRef": { "name": secret_name, "key": "AWS_ACCESS_KEY_ID", } }, ) _set_if_not_user_set( f"{prefix}AWS_SECRET_ACCESS_KEY", value_from={ "secretKeyRef": { "name": secret_name, "key": "AWS_SECRET_ACCESS_KEY", } }, ) else: _set_if_not_user_set(f"{prefix}AWS_ACCESS_KEY_ID", _access_key) _set_if_not_user_set(f"{prefix}AWS_SECRET_ACCESS_KEY", _secret_key) return runtime return _use_s3_cred def mount_pvc( pvc_name: str | None = None, volume_name: str = "pipeline", volume_mount_path: str = "/mnt/pipeline", ) -> typing.Callable[["KubeResource"], "KubeResource"]: """ Modifier function to mount a PVC volume in the container, simplifying volume and volume mount addition. Usage:: train = train_op(...) train.apply(mount_pvc("claim-name", "pipeline", "/mnt/pipeline")) """ if not pvc_name: # Try to get the PVC mount configuration from the environment variable if "MLRUN_PVC_MOUNT" in os.environ: mount = os.environ.get("MLRUN_PVC_MOUNT") items = mount.split(":") if len(items) != 2: raise MLRunInvalidArgumentError( "MLRUN_PVC_MOUNT should include <pvc-name>:<mount-path>" ) pvc_name = items[0] volume_mount_path = items[1] if not pvc_name: # The PVC name is still not set, raise an error raise MLRunInvalidArgumentError( "No PVC name: use the pvc_name parameter or configure the MLRUN_PVC_MOUNT environment variable" ) def _mount_pvc(runtime: "KubeResource"): local_pvc = {"claimName": pvc_name} runtime.spec.with_volumes( [ { "name": volume_name, "persistentVolumeClaim": local_pvc, } ] ) runtime.spec.with_volume_mounts( { "mountPath": volume_mount_path, "name": volume_name, } ) return runtime return _mount_pvc
[docs] def auto_mount( pvc_name: str = "", volume_mount_path: str = "", volume_name: str | None = None, ) -> typing.Callable[["KubeResource"], "KubeResource"]: """Choose the mount based on env variables and params Volume will be selected by the following order: - k8s PVC volume when both pvc_name and volume_mount_path are set - k8s PVC volume when env var is set: MLRUN_PVC_MOUNT=<pvc-name>:<mount-path> - k8s PVC volume if it's configured as the auto mount type - S3 credentials when configured as the auto mount type - Secret-based env vars when configured as the auto mount type - iguazio v3io volume when V3IO_ACCESS_KEY and V3IO_USERNAME env vars are set """ if pvc_name and volume_mount_path: return mount_pvc( pvc_name=pvc_name, volume_mount_path=volume_mount_path, volume_name=volume_name or "shared-persistency", ) # When auto_mount_type is explicitly configured (not the default "auto"), # honour it regardless of env variables like MLRUN_PVC_MOUNT. This ensures # that an admin-configured S3 or secret_env mount type takes precedence over # a PVC env var that may be set on the client machine (e.g., external Jupyter). # Lazy import to avoid circular dependency (pod.py imports mounts.py at module level). from mlrun.runtimes.pod import AutoMountType, _filter_modifier_params auto_mount_type = AutoMountType(config.storage.auto_mount_type) if auto_mount_type != AutoMountType.auto: modifier = auto_mount_type.get_modifier() if modifier: params = _filter_modifier_params( modifier, config.get_storage_auto_mount_params() ) return modifier(**params) if "MLRUN_PVC_MOUNT" in os.environ: return mount_pvc( volume_name=volume_name or "shared-persistency", ) if "V3IO_ACCESS_KEY" in os.environ: return mount_v3io(name=volume_name or "v3io") raise ValueError("Failed to auto mount, need to set env vars")
def mount_secret( secret_name: str, mount_path: str, volume_name: str = "secret", items: list[dict] | None = None, ) -> typing.Callable[["KubeResource"], "KubeResource"]: """ Modifier function to mount a Kubernetes secret as file(s). :param secret_name: Kubernetes secret name :param mount_path: Path inside the container to mount :param volume_name: Unique volume name :param items: If unspecified, each key-value pair in the Data field of the referenced Secret will be projected into the volume as a file whose name is the key and content is the value. If specified, the listed keys will be projected into the specified paths, and unlisted keys will not be present.""" if secret_name: mlrun.common.secrets.validate_not_forbidden_secret(secret_name.strip()) def _mount_secret(runtime: "KubeResource"): # Define the secret volume source secret_volume_source = { "secretName": secret_name, "items": items, } # Add the secret volume runtime.spec.with_volumes( { "name": volume_name, "secret": secret_volume_source, } ) # Add the volume mount runtime.spec.with_volume_mounts( { "mountPath": mount_path, "name": volume_name, } ) return runtime return _mount_secret def mount_configmap( configmap_name: str, mount_path: str, volume_name: str = "configmap", items: list[dict] | None = None, ) -> typing.Callable[["KubeResource"], "KubeResource"]: """ Modifier function to mount a Kubernetes ConfigMap as file(s). :param configmap_name: Kubernetes ConfigMap name :param mount_path: Path inside the container to mount :param volume_name: Unique volume name :param items: If unspecified, each key-value pair in the Data field of the referenced Configmap will be projected into the volume as a file whose name is the key and content is the value. If specified, the listed keys will be projected into the specified paths, and unlisted keys will not be present.""" def _mount_configmap(runtime: "KubeResource"): # Construct the configMap dictionary config_map_dict = { "name": configmap_name, } if items is not None: config_map_dict["items"] = items vol = { "name": volume_name, "configMap": config_map_dict, } runtime.spec.with_volumes(vol) runtime.spec.with_volume_mounts( { "mountPath": mount_path, "name": volume_name, } ) return runtime return _mount_configmap def mount_hostpath( host_path: str, mount_path: str, volume_name: str = "hostpath", ) -> typing.Callable[["KubeResource"], "KubeResource"]: """ Modifier function to mount a host path inside a Kubernetes container. :param host_path: Host path on the node to be mounted. :param mount_path: Path inside the container where the volume will be mounted. :param volume_name: Unique name for the volume. """ def _mount_hostpath(runtime: "KubeResource") -> "KubeResource": runtime.spec.with_volumes( { "name": volume_name, "hostPath": { "path": host_path, "type": "", }, } ) runtime.spec.with_volume_mounts( { "mountPath": mount_path, "name": volume_name, } ) return runtime return _mount_hostpath def set_env_variables( env_vars_dict: dict[str, str] | None = None, **kwargs ) -> typing.Callable[["KubeResource"], "KubeResource"]: """ Modifier function to apply a set of environment variables to a runtime. Variables may be passed as either a dictionary of name-value pairs, or as arguments to the function. See `KubeResource.apply` for more information on modifiers. Usage:: function.apply(set_env_variables({"ENV1": "value1", "ENV2": "value2"})) or function.apply(set_env_variables(ENV1=value1, ENV2=value2)) :param env_vars_dict: dictionary of environment variables :param kwargs: environment variables passed as arguments """ env_data = env_vars_dict.copy() if env_vars_dict else {} for key, value in kwargs.items(): env_data[key] = value def _set_env_variables(runtime: "KubeResource"): runtime.set_envs(env_data) return runtime return _set_env_variables def set_env_vars_from_secret( secret_name: str | None = None, keys: typing.Union[str, list[str], None] = None, cleartext_env: typing.Union[str, dict[str, str], None] = None, ) -> typing.Callable[["KubeResource"], "KubeResource"]: """ Modifier function to set environment variables from a Kubernetes Secret. If keys are given, each key is exposed as an environment variable with the same name. If no keys are given, all keys in the secret are mounted as env vars (via envFrom). ``secret_name`` is optional: when it is omitted only the ``cleartext_env`` variables are injected (no secret is mounted). This supports identity-based auth (e.g. Azure workload identity), where the credentials arrive via a federated token rather than a Kubernetes secret, but a plain config value such as the storage account name still has to be set. Performs the same secret-name validation as other secret-mount functions (validate_not_forbidden_secret); when using specific keys this is done via set_env_from_secret(), and when mounting all keys via set_env_from_secret_ref(). Supports auto-mount-params in two forms: - Semicolon-delimited string: e.g. keys="key1;key2;key3" (used when passing keys via ``storage.auto_mount_params``, where commas separate top-level parameters). - List of strings: e.g. keys=["key1", "key2", "key3"] (when params are base64-encoded JSON). Usage:: function.apply(set_env_vars_from_secret("my-secret")) # mount all keys function.apply(set_env_vars_from_secret("my-secret", keys=["KEY1", "KEY2"])) function.apply(set_env_vars_from_secret("my-secret", keys="KEY1;KEY2;KEY3")) function.apply( set_env_vars_from_secret("my-secret", cleartext_env={"ACCT": "name"}) ) function.apply( set_env_vars_from_secret( "my-secret", cleartext_env="ACCT:name;REGION:eastus" ) ) :param secret_name: Optional. Kubernetes secret name. When omitted, no secret is mounted and only ``cleartext_env`` is injected (used for identity-based auth). :param keys: Optional. Secret data keys to expose as env vars. Either a semicolon-delimited string (e.g. "key1;key2;key3") or a list of strings. If omitted, all keys in the secret are mounted as environment variables. :param cleartext_env: Optional. Plain (non-secret) key=value environment variables to inject alongside the secret-backed vars. Accepts either a ``dict[str, str]`` (recommended for the base64-JSON params path, supports colons in values) or a semicolon-delimited ``key:value`` string (for the plain-string params path, e.g. ``"ACCT:name;REGION:eastus"``). In string form the first colon in each token is the delimiter; colons in values are not supported. Defaults to None (no cleartext vars injected). """ if isinstance(keys, str): keys_list = [k.strip() for k in keys.split(";") if k.strip()] elif keys is not None: keys_list = [k if isinstance(k, str) else str(k) for k in keys] else: keys_list = [] if isinstance(cleartext_env, str): cleartext_env_dict: dict[str, str] = {} for token in cleartext_env.split(";"): token = token.strip() if not token: continue sep = token.find(":") if sep == -1: raise mlrun.errors.MLRunInvalidArgumentError( f"cleartext_env token missing ':' separator: {token!r}" ) cleartext_env_dict[token[:sep].strip()] = token[sep + 1 :].strip() elif cleartext_env is not None: cleartext_env_dict = dict(cleartext_env) else: cleartext_env_dict = {} if secret_name: mlrun.common.secrets.validate_not_forbidden_secret(secret_name.strip()) elif keys_list: raise mlrun.errors.MLRunInvalidArgumentError( "set_env_vars_from_secret requires a secret_name when keys are specified" ) elif not cleartext_env_dict: raise mlrun.errors.MLRunInvalidArgumentError( "set_env_vars_from_secret requires either a secret_name or cleartext_env" ) def _set_env_vars_from_secret(runtime: "KubeResource"): if secret_name: if not keys_list: runtime.set_env_from_secret_ref(secret_name) else: for key in keys_list: runtime.set_env_from_secret( name=key, secret=secret_name, secret_key=key ) for k, v in cleartext_env_dict.items(): runtime.set_env(k, v) return runtime return _set_env_vars_from_secret def _enrich_and_validate_v3io_mounts( remote: str = "", volume_mounts: list[VolumeMount] | None = None, user: str = "", ) -> tuple[list[VolumeMount], str]: if volume_mounts is None: volume_mounts = [] if remote and not volume_mounts: raise MLRunInvalidArgumentError( "volume_mounts must be specified when remote is given" ) # Empty remote & volume_mounts defaults are volume mounts of /v3io and /User if not remote and not volume_mounts: user = _resolve_mount_user(user) if not user: raise MLRunInvalidArgumentError( "user name/env must be specified when using empty remote and volume_mounts" ) volume_mounts = [ VolumeMount(path="/v3io", sub_path=""), VolumeMount(path="/User", sub_path="users/" + user), ] if not isinstance(volume_mounts, list) and any( [not isinstance(x, VolumeMount) for x in volume_mounts] ): raise TypeError("mounts should be a list of Mount") return volume_mounts, user def _resolve_mount_user(user: str | None = None): return user or os.environ.get("V3IO_USERNAME")