mlrun.runtimes#

class mlrun.runtimes.ApplicationRuntime(**kwargs)[source]#

Bases: RemoteRuntime

property api_gateway#
create_api_gateway(name: str | None = None, path: str | None = None, direct_port_access: bool = False, authentication_mode: APIGatewayAuthenticationMode | None = None, authentication_creds: tuple[str, str] | None = None, ssl_redirect: bool | None = None, set_as_default: bool = False, gateway_timeout: int | None = None)[source]#

Create the application API gateway. Once the application is deployed, the API gateway can be created. An application without an API gateway is not accessible.

Parameters:
  • name -- The name of the API gateway

  • path -- Optional path of the API gateway, default value is "/". The given path should be supported by the deployed application

  • direct_port_access -- Set True to allow direct port access to the application sidecar

  • authentication_mode -- API Gateway authentication mode

  • authentication_creds -- API Gateway basic authentication credentials as a tuple (username, password)

  • ssl_redirect -- Set True to force SSL redirect, False to disable. Defaults to mlrun.mlconf.force_api_gateway_ssl_redirect()

  • set_as_default -- Set the API gateway as the default for the application (status.api_gateway)

  • gateway_timeout -- nginx ingress timeout in sec (request timeout, when will the gateway return an error)

Returns:

The API gateway URL

delete_api_gateway(name: str)[source]#

Delete API gateway by name. Refreshes the application status to update api gateway and invocation URLs. :param name: The API gateway name

deploy(project='', tag='', verbose=False, auth_info: AuthInfo | None = None, builder_env: dict | None = None, force_build: bool = False, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None, show_on_failure: bool = False, create_default_api_gateway: bool = True)[source]#

Deploy function, builds the application image if required (self.requires_build()) or force_build is True, Once the image is built, the function is deployed.

Parameters:
  • project -- Project name

  • tag -- Function tag

  • verbose -- Set True for verbose logging

  • auth_info -- Service AuthInfo (deprecated and ignored)

  • builder_env -- Env vars dict for source archive config/credentials e.g. builder_env={"GIT_TOKEN": token}

  • force_build -- Set True for force building the application image

  • with_mlrun -- Add the current mlrun package to the container build

  • skip_deployed -- Skip the build if we already have an image for the function

  • is_kfp -- Deploy as part of a kfp pipeline

  • mlrun_version_specifier -- Which mlrun package version to include (if not current)

  • show_on_failure -- Show logs only in case of build failure

  • create_default_api_gateway -- When deploy finishes the default API gateway will be created for the application. Disabling this flag means that the application will not be accessible until an API gateway is created for it.

Returns:

The default API gateway URL if created or True if the function is ready (deployed)

classmethod deploy_reverse_proxy_image()[source]#

Build the reverse proxy image and save it. The reverse proxy image is used to route requests to the application sidecar. This is useful when you want to decrease build time by building the application image only once.

Parameters:

use_cache -- Use the cache when building the image

disable_default_http_trigger(**kwargs)#
enable_default_http_trigger(**kwargs)#
from_image(image)[source]#

Deploy the function with an existing nuclio processor image. This applies only for the reverse proxy and not the application image.

Parameters:

image -- image name

static get_filename_and_handler() -> (<class 'str'>, <class 'str'>)[source]#
invoke(path: str = '', body: str | bytes | dict | None = None, method: str | None = None, headers: dict | None = None, dashboard: str = '', force_external_address: bool = False, auth_info: AuthInfo | None = None, mock: bool | None = None, credentials: tuple[str, str] | None = None, **http_client_kwargs)[source]#

Invoke the remote (live) function and return the results

example:

function.invoke("/api", body={"inputs": x})
Parameters:
  • path -- request sub path (e.g. /images)

  • body -- request body (str, bytes or a dict for json requests)

  • method -- HTTP method (GET, PUT, ..)

  • headers -- key/value dict with http headers

  • dashboard -- nuclio dashboard address (deprecated)

  • force_external_address -- use the external ingress URL

  • auth_info -- service AuthInfo

  • mock -- use mock server vs a real Nuclio function (for local simulations)

  • http_client_kwargs -- allow the user to pass any parameter supported in requests.request method see this link for more information: https://requests.readthedocs.io/en/latest/api/#requests.request

kind = 'application'#
pre_deploy_validation()[source]#
prepare_image_for_deploy()[source]#

if a function has a 'spec.image' it is considered to be deployed, but because we allow the user to set 'spec.image' for usability purposes, we need to check whether this is a built image or it requires to be built on top.

resolve_default_api_gateway_name()[source]#
reverse_proxy_image = None#
set_internal_application_port(port: int)[source]#
property spec: ApplicationSpec#
property status: ApplicationStatus#
property url#
with_source_archive(source, workdir=None, pull_at_runtime: bool = False, target_dir: str | None = None)[source]#

load the code from git/tar/zip archive at build

Parameters:
  • source -- valid absolute path or URL to git, zip, or tar file, e.g. git://github.com/mlrun/something.git http://some/url/file.zip note path source must exist on the image or exist locally when run is local (it is recommended to use 'workdir' when source is a filepath instead)

  • workdir -- working dir relative to the archive root (e.g. './subdir') or absolute to the image root

  • pull_at_runtime -- currently not supported, source must be loaded into the image during the build process

  • target_dir -- target dir on runtime pod or repo clone / archive extraction

class mlrun.runtimes.BaseRuntime(metadata=None, spec=None)[source]#

Bases: ModelObj

as_step(runspec: RunObject | None = None, handler=None, name: str = '', project: str = '', params: dict | None = None, hyperparams=None, selector='', hyper_param_options: HyperParamOptions | None = None, inputs: dict | None = None, outputs: list | None = None, workdir: str = '', artifact_path: str = '', image: str = '', labels: dict | None = None, use_db=True, verbose=None, scrape_metrics=False, returns: list[Union[str, dict[str, str]]] | None = None, auto_build: bool = False)[source]#

Run a local or remote task.

Parameters:
  • runspec -- run template object or dict (see RunTemplate)

  • handler -- name of the function handler

  • name -- execution name

  • project -- project name

  • params -- input parameters (dict)

  • hyperparams -- hyper parameters

  • selector -- selection criteria for hyper params

  • hyper_param_options -- hyper param options (selector, early stop, strategy, ..) see: HyperParamOptions

  • inputs -- Input objects to pass to the handler. Type hints can be given so the input will be parsed during runtime from mlrun.DataItem to the given type hint. The type hint can be given in the key field of the dictionary after a colon, e.g: "<key> : <type_hint>".

  • outputs -- list of outputs which can pass in the workflow

  • artifact_path -- default artifact output path (replace out_path)

  • workdir -- default input artifacts path

  • image -- container image to use

  • labels -- labels to tag the job/run with ({key:val, ..})

  • use_db -- save function spec in the db (vs the workflow file)

  • verbose -- add verbose prints/logs

  • scrape_metrics -- whether to add the mlrun/scrape-metrics label to this run's resources

  • returns --

    List of configurations for how to log the returning values from the handler's run (as artifacts or results). The list's length must be equal to the amount of returning objects. A configuration may be given as:

    • A string of the key to use to log the returning value as result or as an artifact. To specify The artifact type, it is possible to pass a string in the following structure: "<key> : <type>". Available artifact types can be seen in mlrun.ArtifactType. If no artifact type is specified, the object's default artifact type will be used.

    • A dictionary of configurations to use when logging. Further info per object type and artifact type can be given there. The artifact key must appear in the dictionary as "key": "the_key".

  • auto_build -- when set to True and the function require build it will be built on the first function run, use only if you dont plan on changing the build config between runs

Returns:

mlrun_pipelines.models.PipelineNodeWrapper

clean_build_params()[source]#
doc()[source]#
enrich_runtime_spec(project_node_selector: dict[str, str])[source]#
export(target='', format='.yaml', secrets=None, strip=True)[source]#

save function spec to a local/remote path (default to./function.yaml)

Parameters:
  • target -- target path/url

  • format -- .yaml (default) or .json

  • secrets -- optional secrets dict/object for target path (e.g. s3)

  • strip -- strip status data

Returns:

self

full_image_path(image=None, client_version: str | None = None, client_python_version: str | None = None)[source]#
generate_runtime_k8s_env(runobj: RunObject | None = None) list[dict][source]#

Prepares a runtime environment as it's expected by kubernetes.models.V1Container

Parameters:

runobj -- Run context object (RunObject) with run metadata and status

Returns:

List of dicts with the structure {"name": "var_name", "value": "var_value"}

is_deployed()[source]#
is_model_monitoring_function()[source]#
kind = 'base'#
property metadata: BaseMetadata#
prepare_image_for_deploy()[source]#

if a function has a 'spec.image' it is considered to be deployed, but because we allow the user to set 'spec.image' for usability purposes, we need to check whether this is a built image or it requires to be built on top.

requires_build() bool[source]#
run(runspec: RunTemplate | RunObject | dict | None = None, handler: str | Callable | None = None, name: str | None = '', project: str | None = '', params: dict | None = None, inputs: dict[str, str] | None = None, out_path: str | None = '', workdir: str | None = '', artifact_path: str | None = '', watch: bool | None = True, schedule: str | ScheduleCronTrigger | None = None, hyperparams: dict[str, list] | None = None, hyper_param_options: HyperParamOptions | None = None, verbose: bool | None = None, scrape_metrics: bool | None = None, local: bool | None = False, local_code_path: str | None = None, auto_build: bool | None = None, param_file_secrets: dict[str, str] | None = None, notifications: list[mlrun.model.Notification] | None = None, returns: list[Union[str, dict[str, str]]] | None = None, state_thresholds: dict[str, int] | None = None, reset_on_run: bool | None = None, **launcher_kwargs) RunObject[source]#

Run a local or remote task.

Parameters:
  • runspec -- The run spec to generate the RunObject from. Can be RunTemplate | RunObject | dict.

  • handler -- Pointer or name of a function handler.

  • name -- Execution name.

  • project -- Project name.

  • params -- Input parameters (dict).

  • inputs -- Input objects to pass to the handler. Type hints can be given so the input will be parsed during runtime from mlrun.DataItem to the given type hint. The type hint can be given in the key field of the dictionary after a colon, e.g: "<key> : <type_hint>".

  • out_path -- Default artifact output path.

  • artifact_path -- Default artifact output path (will replace out_path).

  • workdir -- Default input artifacts path.

  • watch -- Watch/follow run log.

  • schedule -- ScheduleCronTrigger class instance or a standard crontab expression string (which will be converted to the class using its from_crontab constructor), see this link for help: https://apscheduler.readthedocs.io/en/3.x/modules/triggers/cron.html#module-apscheduler.triggers.cron

  • hyperparams -- Dict of param name and list of values to be enumerated. The default strategy is grid search and uses e.g. {"p1": [1,2,3]}. (Can be specified as a JSON file) For list, lists must be of equal length, e.g. {"p1": [1], "p2": [2]}. (Can be specified as JSON file or as a CSV file listing the parameter values per iteration.) You can specify strategy of type grid, list, random, and other options in the hyper_param_options parameter.

  • hyper_param_options -- Dict or HyperParamOptions struct of hyperparameter options.

  • verbose -- Add verbose prints/logs.

  • scrape_metrics -- Whether to add the mlrun/scrape-metrics label to this run's resources.

  • local -- Run the function locally vs on the runtime/cluster.

  • local_code_path -- Path of the code for local runs & debug.

  • auto_build -- When set to True and the function require build it will be built on the first function run, use only if you don't plan on changing the build config between runs.

  • param_file_secrets -- Dictionary of secrets to be used only for accessing the hyper-param parameter file. These secrets are only used locally and will not be stored anywhere

  • notifications -- List of notifications to push when the run is completed

  • returns --

    List of log hints - configurations for how to log the returning values from the handler's run (as artifacts or results). The list's length must be equal to the amount of returning objects. A log hint may be given as:

    • A string of the key to use to log the returning value as result or as an artifact. To specify The artifact type, it is possible to pass a string in the following structure: "<key> : <type>". Available artifact types can be seen in mlrun.ArtifactType. If no artifact type is specified, the object's default artifact type will be used.

    • A dictionary of configurations to use when logging. Further info per object type and artifact type can be given there. The artifact key must appear in the dictionary as "key": "the_key".

  • state_thresholds -- Dictionary of states to time thresholds. The state will be matched against the k8s resource's status. The threshold should be a time string that conforms to timelength python package standards and is at least 1 minute (-1 for infinite). If the phase is active for longer than the threshold, the run will be aborted. See mlconf.function.spec.state_thresholds for the state options and default values.

  • reset_on_run -- When True, function python modules would reload prior to code execution. This ensures latest code changes are executed. This argument must be used in conjunction with the local=True argument.

Returns:

Run context object (RunObject) with run metadata, results and status

save(tag='', versioned=False, refresh=False) str[source]#
set_categories(categories: list[str])[source]#
set_db_connection(conn)[source]#
set_label(key, value)[source]#
skip_image_enrichment()[source]#
property spec: FunctionSpec#
property status: FunctionStatus#
store_run(runobj: RunObject)[source]#
try_auto_mount_based_on_config()[source]#
property uri#
validate_and_enrich_service_account(allowed_service_account, default_service_account)[source]#
with_code(from_file='', body=None, with_doc=True)[source]#

Update the function code This function eliminates the need to build container images every time we edit the code

Parameters:
  • from_file -- blank for current notebook, or path to .py/.ipynb file

  • body -- will use the body as the function code

  • with_doc -- update the document of the function parameters

Returns:

function object

with_commands(commands: list[str], overwrite: bool = False, prepare_image_for_deploy: bool = True)[source]#

add commands to build spec.

Parameters:
  • commands -- list of commands to run during build

  • overwrite -- overwrite existing commands

  • prepare_image_for_deploy -- prepare the image/base_image spec for deployment

Returns:

function object

with_requirements(requirements: list[str] | None = None, overwrite: bool = False, prepare_image_for_deploy: bool = True, requirements_file: str | None = '')[source]#

add package requirements from file or list to build spec.

Parameters:
  • requirements -- a list of python packages

  • requirements_file -- a local python requirements file path

  • overwrite -- overwrite existing requirements

  • prepare_image_for_deploy -- prepare the image/base_image spec for deployment

Returns:

function object

class mlrun.runtimes.DaskCluster(spec=None, metadata=None)[source]#

Bases: KubejobRuntime

property client#
close(running=True)[source]#
cluster()[source]#
deploy(watch=True, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None, builder_env: dict | None = None, show_on_failure: bool = False, force_build: bool = False)[source]#

deploy function, build container with dependencies

Parameters:
  • watch -- wait for the deploy to complete (and print build logs)

  • with_mlrun -- add the current mlrun package to the container build

  • skip_deployed -- skip the build if we already have an image for the function

  • is_kfp -- deploy as part of a kfp pipeline

  • mlrun_version_specifier -- which mlrun package version to include (if not current)

  • builder_env -- Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={"GIT_TOKEN": token}

  • show_on_failure -- show logs only in case of build failure

  • force_build -- force building the image, even when no changes were made

Returns:

True if the function is ready (deployed)

get_status()[source]#
property initialized#
is_deployed()[source]#

check if the function is deployed (has a valid container)

kind = 'dask'#
run(runspec: RunTemplate | RunObject | dict | None = None, handler: str | Callable | None = None, name: str | None = '', project: str | None = '', params: dict | None = None, inputs: dict[str, str] | None = None, out_path: str | None = '', workdir: str | None = '', artifact_path: str | None = '', watch: bool | None = True, schedule: str | ScheduleCronTrigger | None = None, hyperparams: dict[str, list] | None = None, hyper_param_options: HyperParamOptions | None = None, verbose: bool | None = None, scrape_metrics: bool | None = None, local: bool | None = False, local_code_path: str | None = None, auto_build: bool | None = None, param_file_secrets: dict[str, str] | None = None, notifications: list[mlrun.model.Notification] | None = None, returns: list[Union[str, dict[str, str]]] | None = None, state_thresholds: dict[str, int] | None = None, reset_on_run: bool | None = None, **launcher_kwargs) RunObject[source]#

Run a local or remote task.

Parameters:
  • runspec -- The run spec to generate the RunObject from. Can be RunTemplate | RunObject | dict.

  • handler -- Pointer or name of a function handler.

  • name -- Execution name.

  • project -- Project name.

  • params -- Input parameters (dict).

  • inputs -- Input objects to pass to the handler. Type hints can be given so the input will be parsed during runtime from mlrun.DataItem to the given type hint. The type hint can be given in the key field of the dictionary after a colon, e.g: "<key> : <type_hint>".

  • out_path -- Default artifact output path.

  • artifact_path -- Default artifact output path (will replace out_path).

  • workdir -- Default input artifacts path.

  • watch -- Watch/follow run log.

  • schedule -- ScheduleCronTrigger class instance or a standard crontab expression string (which will be converted to the class using its from_crontab constructor), see this link for help: https://apscheduler.readthedocs.io/en/3.x/modules/triggers/cron.html#module-apscheduler.triggers.cron

  • hyperparams -- Dict of param name and list of values to be enumerated. The default strategy is grid search and uses e.g. {"p1": [1,2,3]}. (Can be specified as a JSON file) For list, lists must be of equal length, e.g. {"p1": [1], "p2": [2]}. (Can be specified as JSON file or as a CSV file listing the parameter values per iteration.) You can specify strategy of type grid, list, random, and other options in the hyper_param_options parameter.

  • hyper_param_options -- Dict or HyperParamOptions struct of hyperparameter options.

  • verbose -- Add verbose prints/logs.

  • scrape_metrics -- Whether to add the mlrun/scrape-metrics label to this run's resources.

  • local -- Run the function locally vs on the runtime/cluster.

  • local_code_path -- Path of the code for local runs & debug.

  • auto_build -- When set to True and the function require build it will be built on the first function run, use only if you don't plan on changing the build config between runs.

  • param_file_secrets -- Dictionary of secrets to be used only for accessing the hyper-param parameter file. These secrets are only used locally and will not be stored anywhere

  • notifications -- List of notifications to push when the run is completed

  • returns --

    List of log hints - configurations for how to log the returning values from the handler's run (as artifacts or results). The list's length must be equal to the amount of returning objects. A log hint may be given as:

    • A string of the key to use to log the returning value as result or as an artifact. To specify The artifact type, it is possible to pass a string in the following structure: "<key> : <type>". Available artifact types can be seen in mlrun.ArtifactType. If no artifact type is specified, the object's default artifact type will be used.

    • A dictionary of configurations to use when logging. Further info per object type and artifact type can be given there. The artifact key must appear in the dictionary as "key": "the_key".

  • state_thresholds -- Dictionary of states to time thresholds. The state will be matched against the k8s resource's status. The threshold should be a time string that conforms to timelength python package standards and is at least 1 minute (-1 for infinite). If the phase is active for longer than the threshold, the run will be aborted. See mlconf.function.spec.state_thresholds for the state options and default values.

  • reset_on_run -- When True, function python modules would reload prior to code execution. This ensures latest code changes are executed. This argument must be used in conjunction with the local=True argument.

Returns:

Run context object (RunObject) with run metadata, results and status

set_state_thresholds(state_thresholds: dict[str, str], patch: bool = True)[source]#

Set the threshold for a specific state of the runtime. The threshold is the amount of time that the runtime will wait before aborting the run if the job is in the matching state. The threshold time string must conform to timelength python package standards and be at least 1 minute (e.g. 1000s, 1 hour 30m, 1h etc. or -1 for infinite). If the threshold is not set for a state, the default threshold will be used.

Parameters:
  • state_thresholds --

    A dictionary of state to threshold. The supported states are:

    • pending_scheduled - The pod/crd is scheduled on a node but not yet running

    • pending_not_scheduled - The pod/crd is not yet scheduled on a node

    • executing - The pod/crd started and is running

    • image_pull_backoff - The pod/crd is in image pull backoff

    See mlrun.mlconf.function.spec.state_thresholds for the default thresholds.

  • patch -- Whether to merge the given thresholds with the existing thresholds (True, default) or override them (False)

property spec: DaskSpec#
property status: DaskStatus#
with_limits(mem=None, cpu=None, gpus=None, gpu_type='nvidia.com/gpu', patch: bool = False)[source]#

Set pod cpu/memory/gpu limits (max values)

Parameters:
  • mem -- set limit for memory e.g. '500M', '2G', etc.

  • cpu -- set limit for cpu e.g. '0.5', '2', etc.

  • gpus -- set limit for gpu

  • gpu_type -- set gpu type e.g. "nvidia.com/gpu"

  • patch -- by default it overrides the whole limits section, if you wish to patch specific resources use patch=True

with_requests(mem=None, cpu=None, patch: bool = False)[source]#

Set requested (desired) pod cpu/memory resources

Parameters:
  • mem -- set request for memory e.g. '200M', '1G', etc.

  • cpu -- set request for cpu e.g. '0.1', '1', etc.

  • patch -- by default it overrides the whole requests section, if you wish to patch specific resources use patch=True

with_scheduler_limits(mem: str | None = None, cpu: str | None = None, gpus: int | None = None, gpu_type: str = 'nvidia.com/gpu', patch: bool = False)[source]#

set scheduler pod resources limits by default it overrides the whole limits section, if you wish to patch specific resources use patch=True.

with_scheduler_requests(mem: str | None = None, cpu: str | None = None, patch: bool = False)[source]#

set scheduler pod resources requests by default it overrides the whole requests section, if you wish to patch specific resources use patch=True.

with_worker_limits(mem: str | None = None, cpu: str | None = None, gpus: int | None = None, gpu_type: str = 'nvidia.com/gpu', patch: bool = False)[source]#

set worker pod resources limits by default it overrides the whole limits section, if you wish to patch specific resources use patch=True.

with_worker_requests(mem: str | None = None, cpu: str | None = None, patch: bool = False)[source]#

set worker pod resources requests by default it overrides the whole requests section, if you wish to patch specific resources use patch=True.

class mlrun.runtimes.DatabricksRuntime(spec=None, metadata=None)[source]#

Bases: KubejobRuntime

get_internal_parameters(runobj: RunObject)[source]#

Return the internal function parameters + code.

kind = 'databricks'#
run(runspec: RunTemplate | RunObject | dict | None = None, handler: str | Callable | None = None, name: str | None = '', project: str | None = '', params: dict | None = None, inputs: dict[str, str] | None = None, out_path: str | None = '', workdir: str | None = '', artifact_path: str | None = '', watch: bool | None = True, schedule: str | ScheduleCronTrigger | None = None, hyperparams: dict[str, list] | None = None, hyper_param_options: HyperParamOptions | None = None, verbose: bool | None = None, scrape_metrics: bool | None = None, local: bool | None = False, local_code_path: str | None = None, auto_build: bool | None = None, param_file_secrets: dict[str, str] | None = None, notifications: list[mlrun.model.Notification] | None = None, returns: list[Union[str, dict[str, str]]] | None = None, state_thresholds: dict[str, int] | None = None, reset_on_run: bool | None = None, **launcher_kwargs) RunObject[source]#

Run a local or remote task.

Parameters:
  • runspec -- The run spec to generate the RunObject from. Can be RunTemplate | RunObject | dict.

  • handler -- Pointer or name of a function handler.

  • name -- Execution name.

  • project -- Project name.

  • params -- Input parameters (dict).

  • inputs -- Input objects to pass to the handler. Type hints can be given so the input will be parsed during runtime from mlrun.DataItem to the given type hint. The type hint can be given in the key field of the dictionary after a colon, e.g: "<key> : <type_hint>".

  • out_path -- Default artifact output path.

  • artifact_path -- Default artifact output path (will replace out_path).

  • workdir -- Default input artifacts path.

  • watch -- Watch/follow run log.

  • schedule -- ScheduleCronTrigger class instance or a standard crontab expression string (which will be converted to the class using its from_crontab constructor), see this link for help: https://apscheduler.readthedocs.io/en/3.x/modules/triggers/cron.html#module-apscheduler.triggers.cron

  • hyperparams -- Dict of param name and list of values to be enumerated. The default strategy is grid search and uses e.g. {"p1": [1,2,3]}. (Can be specified as a JSON file) For list, lists must be of equal length, e.g. {"p1": [1], "p2": [2]}. (Can be specified as JSON file or as a CSV file listing the parameter values per iteration.) You can specify strategy of type grid, list, random, and other options in the hyper_param_options parameter.

  • hyper_param_options -- Dict or HyperParamOptions struct of hyperparameter options.

  • verbose -- Add verbose prints/logs.

  • scrape_metrics -- Whether to add the mlrun/scrape-metrics label to this run's resources.

  • local -- Run the function locally vs on the runtime/cluster.

  • local_code_path -- Path of the code for local runs & debug.

  • auto_build -- When set to True and the function require build it will be built on the first function run, use only if you don't plan on changing the build config between runs.

  • param_file_secrets -- Dictionary of secrets to be used only for accessing the hyper-param parameter file. These secrets are only used locally and will not be stored anywhere

  • notifications -- List of notifications to push when the run is completed

  • returns --

    List of log hints - configurations for how to log the returning values from the handler's run (as artifacts or results). The list's length must be equal to the amount of returning objects. A log hint may be given as:

    • A string of the key to use to log the returning value as result or as an artifact. To specify The artifact type, it is possible to pass a string in the following structure: "<key> : <type>". Available artifact types can be seen in mlrun.ArtifactType. If no artifact type is specified, the object's default artifact type will be used.

    • A dictionary of configurations to use when logging. Further info per object type and artifact type can be given there. The artifact key must appear in the dictionary as "key": "the_key".

  • state_thresholds -- Dictionary of states to time thresholds. The state will be matched against the k8s resource's status. The threshold should be a time string that conforms to timelength python package standards and is at least 1 minute (-1 for infinite). If the phase is active for longer than the threshold, the run will be aborted. See mlconf.function.spec.state_thresholds for the state options and default values.

  • reset_on_run -- When True, function python modules would reload prior to code execution. This ensures latest code changes are executed. This argument must be used in conjunction with the local=True argument.

Returns:

Run context object (RunObject) with run metadata, results and status

property spec: DatabricksSpec#
class mlrun.runtimes.HandlerRuntime(metadata=None, spec=None)[source]#

Bases: BaseRuntime, ParallelRunner

kind = 'handler'#
class mlrun.runtimes.KubeResource(spec=None, metadata=None)[source]#

Bases: BaseRuntime, KfpAdapterMixin

A parent class for runtimes that generate k8s resources when executing.

get_default_priority_class_name()[source]#
get_env(name, default=None)[source]#

Get the pod environment variable for the given name, if not found return the default If it's a scalar value, will return it, if the value is from source, return the k8s struct (V1EnvVarSource)

is_env_exists(name)[source]#

Check whether there is an environment variable define for the given key

kind = 'job'#
list_valid_priority_class_names()[source]#
set_env(name, value=None, value_from=None)[source]#

set pod environment var from value

set_env_from_secret(name, secret=None, secret_key=None)[source]#

set pod environment var from secret

set_envs(env_vars: dict | None = None, file_path: str | None = None)[source]#

set pod environment var from key/value dict or .env file

Parameters:
  • env_vars -- dict with env key/values

  • file_path -- .env file with key=value lines

set_image_pull_configuration(image_pull_policy: str | None = None, image_pull_secret_name: str | None = None)[source]#

Configure the image pull parameters for the runtime.

Parameters:
  • image_pull_policy -- The policy to use when pulling. One of IfNotPresent, Always or Never

  • image_pull_secret_name -- Name of a k8s secret containing image repository's authentication credentials

set_state_thresholds(state_thresholds: dict[str, str], patch: bool = True)[source]#

Set the threshold for a specific state of the runtime. The threshold is the amount of time that the runtime will wait before aborting the run if the job is in the matching state. The threshold time string must conform to timelength python package standards and be at least 1 minute (e.g. 1000s, 1 hour 30m, 1h etc. or -1 for infinite). If the threshold is not set for a state, the default threshold will be used.

Parameters:
  • state_thresholds --

    A dictionary of state to threshold. The supported states are:

    • pending_scheduled - The pod/crd is scheduled on a node but not yet running

    • pending_not_scheduled - The pod/crd is not yet scheduled on a node

    • executing - The pod/crd started and is running

    • image_pull_backoff - The pod/crd is in image pull backoff

    See mlrun.mlconf.function.spec.state_thresholds for the default thresholds.

  • patch -- Whether to merge the given thresholds with the existing thresholds (True, default) or override them (False)

property spec: KubeResourceSpec#
try_auto_mount_based_on_config(override_params=None)[source]#
validate_and_enrich_service_account(allowed_service_accounts, default_service_account)[source]#
with_annotations(annotations: dict)[source]#

set a key/value annotations in the metadata of the pod

with_limits(mem: str | None = None, cpu: str | None = None, gpus: int | None = None, gpu_type: str = 'nvidia.com/gpu', patch: bool = False)[source]#

Set pod cpu/memory/gpu limits (max values)

Parameters:
  • mem -- set limit for memory e.g. '500M', '2G', etc.

  • cpu -- set limit for cpu e.g. '0.5', '2', etc.

  • gpus -- set limit for gpu

  • gpu_type -- set gpu type e.g. "nvidia.com/gpu"

  • patch -- by default it overrides the whole limits section, if you wish to patch specific resources use patch=True

with_node_selection(node_name: str | None = None, node_selector: dict[str, str] | None = None, affinity: V1Affinity | None = None, tolerations: list[kubernetes.client.models.v1_toleration.V1Toleration] | None = None)[source]#

Enables to control on which k8s node the job will run

Parameters:
with_preemption_mode(mode: PreemptionModes | str)[source]#

Preemption mode controls whether pods can be scheduled on preemptible nodes. Tolerations, node selector, and affinity are populated on preemptible nodes corresponding to the function spec.

The supported modes are:

  • allow - The function can be scheduled on preemptible nodes

  • constrain - The function can only run on preemptible nodes

  • prevent - The function cannot be scheduled on preemptible nodes

  • none - No preemptible configuration will be applied on the function

The default preemption mode is configurable in mlrun.mlconf.function_defaults.preemption_mode, by default it's set to prevent

Parameters:

mode -- allow | constrain | prevent | none defined in PreemptionModes

with_priority_class(name: str | None = None)[source]#

Enables to control the priority of the pod If not passed - will default to mlrun.mlconf.default_function_priority_class_name

Parameters:

name -- The name of the priority class

with_requests(mem: str | None = None, cpu: str | None = None, patch: bool = False)[source]#

Set requested (desired) pod cpu/memory resources

Parameters:
  • mem -- set request for memory e.g. '200M', '1G', etc.

  • cpu -- set request for cpu e.g. '0.1', '1', etc.

  • patch -- by default it overrides the whole requests section, if you wish to patch specific resources use patch=True

with_security_context(security_context: V1SecurityContext)[source]#

Set security context for the pod. For Iguazio we handle security context internally - see mlrun.common.schemas.function.SecurityContextEnrichmentModes

Example:

from kubernetes import client as k8s_client

security_context = k8s_client.V1SecurityContext(
    run_as_user=1000,
    run_as_group=3000,
)
function.with_security_context(security_context)

More info: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod

Parameters:

security_context -- The security context for the pod

class mlrun.runtimes.KubejobRuntime(spec=None, metadata=None)[source]#

Bases: KubeResource

build_config(image='', base_image=None, commands: list | None = None, secret=None, source=None, extra=None, load_source_on_run=None, with_mlrun=None, auto_build=None, requirements=None, overwrite=False, prepare_image_for_deploy=True, requirements_file=None, builder_env=None, extra_args=None)[source]#

specify builder configuration for the deploy operation

Parameters:
  • image -- target image name/path

  • base_image -- base image name/path

  • commands -- list of docker build (RUN) commands e.g. ['pip install pandas']

  • secret -- k8s secret for accessing the docker registry

  • source -- source git/tar archive to load code from in to the context/workdir e.g. git://github.com/mlrun/something.git#development

  • extra -- extra Dockerfile lines

  • load_source_on_run -- load the archive code into the container at runtime vs at build time

  • with_mlrun -- add the current mlrun package to the container build

  • auto_build -- when set to True and the function require build it will be built on the first function run, use only if you dont plan on changing the build config between runs

  • requirements -- a list of packages to install

  • requirements_file -- requirements file to install

  • overwrite -- overwrite existing build configuration (currently applies to requirements and commands) * False: the new params are merged with the existing * True: the existing params are replaced by the new ones

  • prepare_image_for_deploy -- prepare the image/base_image spec for deployment

  • extra_args -- A string containing additional builder arguments in the format of command-line options, e.g. extra_args="--skip-tls-verify --build-arg A=val"

  • builder_env -- Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={"GIT_TOKEN": token}

deploy(watch: bool = True, with_mlrun: bool | None = None, skip_deployed: bool = False, is_kfp: bool = False, mlrun_version_specifier: bool | None = None, builder_env: dict | None = None, show_on_failure: bool = False, force_build: bool = False) bool[source]#

Deploy function, build container with dependencies

Parameters:
  • watch -- Wait for the deploy to complete (and print build logs)

  • with_mlrun -- Add the current mlrun package to the container build

  • skip_deployed -- Skip the build if we already have an image for the function

  • is_kfp -- Deploy as part of a kfp pipeline

  • mlrun_version_specifier -- Which mlrun package version to include (if not current)

  • builder_env -- Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={"GIT_TOKEN": token}

  • show_on_failure -- Show logs only in case of build failure

  • force_build -- Set True for force building the image, even when no changes were made

Returns:

True if the function is ready (deployed)

deploy_step(image=None, base_image=None, commands: list | None = None, secret_name='', with_mlrun=True, skip_deployed=False)[source]#
is_deployed()[source]#

check if the function is deployed (has a valid container)

kind = 'job'#
with_source_archive(source, workdir=None, handler=None, pull_at_runtime=True, target_dir=None)[source]#

load the code from git/tar/zip archive at runtime or build

Parameters:
  • source -- valid absolute path or URL to git, zip, or tar file, e.g. git://github.com/mlrun/something.git http://some/url/file.zip note path source must exist on the image or exist locally when run is local (it is recommended to use 'workdir' when source is a filepath instead)

  • handler -- default function handler

  • workdir -- working dir relative to the archive root (e.g. './subdir') or absolute to the image root

  • pull_at_runtime -- load the archive into the container at job runtime vs on build/deploy

  • target_dir -- target dir on runtime pod or repo clone / archive extraction

class mlrun.runtimes.LocalRuntime(metadata=None, spec=None)[source]#

Bases: BaseRuntime, ParallelRunner

is_deployed()[source]#
kind = 'local'#
to_job(image='')[source]#
with_source_archive(source, workdir=None, handler=None, target_dir=None)[source]#

load the code from git/tar/zip archive at runtime or build

Parameters:
  • source -- valid path to git, zip, or tar file, e.g. git://github.com/mlrun/something.git http://some/url/file.zip

  • handler -- default function handler

  • workdir -- working dir relative to the archive root (e.g. './subdir') or absolute

  • target_dir -- local target dir for repo clone (by default its <current-dir>/code)

class mlrun.runtimes.MpiRuntimeV1(spec=None, metadata=None)[source]#

Bases: AbstractMPIJobRuntime

crd_group = 'kubeflow.org'#
crd_plural = 'mpijobs'#
crd_version = 'v1'#
property spec: MPIV1ResourceSpec#
class mlrun.runtimes.RemoteRuntime(spec=None, metadata=None)[source]#

Bases: KubeResource

add_trigger(name, spec)[source]#

add a nuclio trigger object/dict

Parameters:
  • name -- trigger name

  • spec -- trigger object or dict

add_v3io_stream_trigger(stream_path, name='stream', group='serving', seek_to='earliest', shards=1, extra_attributes=None, ack_window_size=None, **kwargs)[source]#

add v3io stream trigger to the function

Parameters:
  • stream_path -- v3io stream path (e.g. 'v3io:///projects/myproj/stream1')

  • name -- trigger name

  • group -- consumer group

  • seek_to -- start seek from: "earliest", "latest", "time", "sequence"

  • shards -- number of shards (used to set number of replicas)

  • extra_attributes -- key/value dict with extra trigger attributes

  • ack_window_size -- stream ack window size (the consumer group will be updated with the event id - ack_window_size, on failure the events in the window will be retransmitted)

  • kwargs -- extra V3IOStreamTrigger class attributes

deploy(project='', tag='', verbose=False, auth_info: AuthInfo | None = None, builder_env: dict | None = None, force_build: bool = False)[source]#

Deploy the nuclio function to the cluster

Parameters:
  • project -- project name

  • tag -- function tag

  • verbose -- set True for verbose logging

  • auth_info -- service AuthInfo (deprecated and ignored)

  • builder_env -- env vars dict for source archive config/credentials e.g. builder_env={"GIT_TOKEN": token}

  • force_build -- set True for force building the image

deploy_step(project='', models=None, env=None, tag=None, verbose=None, use_function_from_db=None)[source]#

return as a Kubeflow pipeline step (ContainerOp), recommended to use mlrun.deploy_function() instead

Parameters:
  • project -- project name, defaults to function project

  • models -- model name and paths

  • env -- dict of environment variables

  • tag -- version tag

  • verbose -- verbose output

  • use_function_from_db -- use the function from the DB instead of the local function object

disable_default_http_trigger(**kwargs)[source]#
enable_default_http_trigger(**kwargs)[source]#
from_image(image)[source]#

Deploy the function with an existing nuclio processor image.

Parameters:

image -- image name

get_url(force_external_address: bool = False, auth_info: AuthInfo | None = None)[source]#

This method returns function's url.

Parameters:
  • force_external_address -- use the external ingress URL

  • auth_info -- service AuthInfo

Returns:

returns function's url

invoke(path: str, body: str | bytes | dict | None = None, method: str | None = None, headers: dict | None = None, dashboard: str = '', force_external_address: bool = False, auth_info: AuthInfo | None = None, mock: bool | None = None, **http_client_kwargs)[source]#

Invoke the remote (live) function and return the results

example:

function.invoke("/api", body={"inputs": x})
Parameters:
  • path -- request sub path (e.g. /images)

  • body -- request body (str, bytes or a dict for json requests)

  • method -- HTTP method (GET, PUT, ..)

  • headers -- key/value dict with http headers

  • dashboard -- nuclio dashboard address (deprecated)

  • force_external_address -- use the external ingress URL

  • auth_info -- service AuthInfo

  • mock -- use mock server vs a real Nuclio function (for local simulations)

  • http_client_kwargs -- allow the user to pass any parameter supported in requests.request method see this link for more information: https://requests.readthedocs.io/en/latest/api/#requests.request

kind = 'remote'#
pre_deploy_validation()[source]#
set_config(key, value)[source]#
set_state_thresholds(state_thresholds: dict[str, int], patch: bool = True)[source]#

Set the threshold for a specific state of the runtime. The threshold is the amount of time that the runtime will wait before aborting the run if the job is in the matching state. The threshold time string must conform to timelength python package standards and be at least 1 minute (e.g. 1000s, 1 hour 30m, 1h etc. or -1 for infinite). If the threshold is not set for a state, the default threshold will be used.

Parameters:
  • state_thresholds --

    A dictionary of state to threshold. The supported states are:

    • pending_scheduled - The pod/crd is scheduled on a node but not yet running

    • pending_not_scheduled - The pod/crd is not yet scheduled on a node

    • executing - The pod/crd started and is running

    • image_pull_backoff - The pod/crd is in image pull backoff

    See mlrun.mlconf.function.spec.state_thresholds for the default thresholds.

  • patch -- Whether to merge the given thresholds with the existing thresholds (True, default) or override them (False)

skip_image_enrichment()[source]#
property spec: NuclioSpec#
property status: NuclioStatus#
with_annotations(annotations: dict)[source]#

set a key/value annotations for function

with_http(workers: int | None = 8, port: int | None = None, host: str | None = None, paths: list[str] | None = None, canary: float | None = None, secret: str | None = None, worker_timeout: int | None = None, gateway_timeout: int | None = None, trigger_name: str | None = None, annotations: Mapping[str, str] | None = None, extra_attributes: Mapping[str, str] | None = None)[source]#

update/add nuclio HTTP trigger settings

Note: gateway timeout is the maximum request time before an error is returned, while the worker timeout if the max time a request will wait for until it will start processing, gateway_timeout must be greater than the worker_timeout.

Parameters:
  • workers -- number of worker processes (default=8). set 0 to use Nuclio's default workers count

  • port -- TCP port to listen on. by default, nuclio will choose a random port as long as the function service is NodePort. if the function service is ClusterIP, the port is ignored.

  • host -- Ingress hostname

  • paths -- list of Ingress sub paths

  • canary -- k8s ingress canary (% traffic value between 0 and 100)

  • secret -- k8s secret name for SSL certificate

  • worker_timeout -- worker wait timeout in sec (how long a message should wait in the worker queue before an error is returned)

  • gateway_timeout -- nginx ingress timeout in sec (request timeout, when will the gateway return an error)

  • trigger_name -- alternative nuclio trigger name

  • annotations -- key/value dict of ingress annotations

  • extra_attributes -- key/value dict of extra nuclio trigger attributes

Returns:

function object (self)

with_node_selection(**kwargs)[source]#

Enables to control on which k8s node the job will run

Parameters:
with_preemption_mode(**kwargs)[source]#

Preemption mode controls whether pods can be scheduled on preemptible nodes. Tolerations, node selector, and affinity are populated on preemptible nodes corresponding to the function spec.

The supported modes are:

  • allow - The function can be scheduled on preemptible nodes

  • constrain - The function can only run on preemptible nodes

  • prevent - The function cannot be scheduled on preemptible nodes

  • none - No preemptible configuration will be applied on the function

The default preemption mode is configurable in mlrun.mlconf.function_defaults.preemption_mode, by default it's set to prevent

Parameters:

mode -- allow | constrain | prevent | none defined in PreemptionModes

with_priority_class(**kwargs)[source]#

Enables to control the priority of the pod If not passed - will default to mlrun.mlconf.default_function_priority_class_name

Parameters:

name -- The name of the priority class

with_service_type(service_type: str, add_templated_ingress_host_mode: str | None = None)[source]#

Enables to control the service type of the pod and the addition of templated ingress host

Parameters:
  • service_type -- service type (ClusterIP, NodePort), defaults to mlrun.mlconf.httpdb.nuclio.service_type

  • add_templated_ingress_host_mode -- add templated ingress host mode (never, always, onClusterIP), see mlrun.mlconf.httpdb.nuclio.add_templated_ingress_host_mode for the default and more information

with_sidecar(name: str | None = None, image: str | None = None, ports: int | list[int] | None = None, command: str | None = None, args: list[str] | None = None)[source]#

Add a sidecar container to the function pod :param name: Sidecar container name. :param image: Sidecar container image. :param ports: Sidecar container ports to expose. Can be a single port or a list of ports. :param command: Sidecar container command instead of the image entrypoint. :param args: Sidecar container command args (requires command to be set).

with_source_archive(source, workdir=None, handler=None, runtime='')[source]#

Load nuclio function from remote source

Note: remote source may require credentials, those can be stored in the project secrets or passed in the function.deploy() using the builder_env dict, see the required credentials per source:

  • v3io - "V3IO_ACCESS_KEY".

  • git - "GIT_USERNAME", "GIT_PASSWORD".

  • AWS S3 - "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY" or "AWS_SESSION_TOKEN".

Parameters:
  • source -- a full path to the nuclio function source (code entry) to load the function from

  • handler -- a path to the function's handler, including path inside archive/git repo

  • workdir -- working dir relative to the archive root (e.g. 'subdir')

  • runtime -- (optional) the runtime of the function (defaults to mlrun.mlconf.default_nuclio_runtime)

Examples:

git:

fn.with_source_archive(
    "git://github.com/org/repo#my-branch",
    handler="main:handler",
    workdir="path/inside/repo",
)

s3:

fn.spec.nuclio_runtime = "golang"
fn.with_source_archive(
    "s3://my-bucket/path/in/bucket/my-functions-archive",
    handler="my_func:Handler",
    workdir="path/inside/functions/archive",
    runtime="golang",
)
with_v3io(local='', remote='')[source]#

Add v3io volume to the function

Parameters:
  • local -- local path (mount path inside the function container)

  • remote -- v3io path

class mlrun.runtimes.RemoteSparkRuntime(spec=None, metadata=None)[source]#

Bases: KubejobRuntime

default_image = '.remote-spark-default-image'#
deploy(watch=True, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None, builder_env: dict | None = None, show_on_failure: bool = False, force_build: bool = False)[source]#

deploy function, build container with dependencies

Parameters:
  • watch -- wait for the deploy to complete (and print build logs)

  • with_mlrun -- add the current mlrun package to the container build

  • skip_deployed -- skip the build if we already have an image for the function

  • is_kfp -- deploy as part of a kfp pipeline

  • mlrun_version_specifier -- which mlrun package version to include (if not current)

  • builder_env -- Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={"GIT_TOKEN": token}

  • show_on_failure -- show logs only in case of build failure

  • force_build -- force building the image, even when no changes were made

:return True if the function is ready (deployed)

classmethod deploy_default_image()[source]#
is_deployed()[source]#

check if the function is deployed (has a valid container)

kind = 'remote-spark'#
property spec: RemoteSparkSpec#
with_security_context(security_context: V1SecurityContext)[source]#

With security context is not supported for spark runtime. Driver / Executor processes run with uid / gid 1000 as long as security context is not defined. If in the future we want to support setting security context it will work only from spark version 3.2 onwards.

with_spark_service(spark_service, provider='iguazio', with_v3io_mount=True)[source]#

Attach spark service to function

class mlrun.runtimes.ServingRuntime(spec=None, metadata=None)[source]#

Bases: RemoteRuntime

MLRun Serving Runtime

add_child_function(name, url=None, image=None, requirements=None, kind=None)[source]#

in a multi-function pipeline add child function

example:

fn.add_child_function("enrich", "./enrich.ipynb", "mlrun/mlrun")
Parameters:
  • name -- child function name

  • url -- function/code url, support .py, .ipynb, .yaml extensions

  • image -- base docker image for the function

  • requirements -- py package requirements file path OR list of packages

  • kind -- mlrun function/runtime kind

:return function object

add_model(key: str, model_path: str | None = None, class_name: str | None = None, model_url: str | None = None, handler: str | None = None, router_step: str | None = None, child_function: str | None = None, **class_args)[source]#

add ml model and/or route to the function.

Example, create a function (from the notebook), add a model class, and deploy:

fn = code_to_function(kind="serving")
fn.add_model("boost", model_path, model_class="MyClass", my_arg=5)
fn.deploy()

only works with router topology, for nested topologies (model under router under flow) need to add router to flow and use router.add_route()

Parameters:
  • key -- model api key (or name:version), will determine the relative url/path

  • model_path -- path to mlrun model artifact or model directory file/object path

  • class_name -- V2 Model python class name or a model class instance (can also module.submodule.class and it will be imported automatically)

  • model_url -- url of a remote model serving endpoint (cannot be used with model_path)

  • handler -- for advanced users!, override default class handler name (do_event)

  • router_step -- router step name (to determine which router we add the model to in graphs with multiple router steps)

  • child_function -- child function name, when the model runs in a child function

  • class_args -- extra kwargs to pass to the model serving class __init__ (can be read in the model using .get_param(key) method)

deploy(project='', tag='', verbose=False, auth_info: AuthInfo | None = None, builder_env: dict | None = None, force_build: bool = False)[source]#

deploy model serving function to a local/remote cluster

Parameters:
  • project -- optional, override function specified project name

  • tag -- specify unique function tag (a different function service is created for every tag)

  • verbose -- verbose logging

  • auth_info -- The auth info to use to communicate with the Nuclio dashboard, required only when providing dashboard

  • builder_env -- env vars dict for source archive config/credentials e.g. builder_env={"GIT_TOKEN": token}

  • force_build -- set True for force building the image

kind = 'serving'#
plot(filename=None, format=None, source=None, **kw)[source]#

plot/save graph using graphviz

example:

serving_fn = mlrun.new_function("serving", image="mlrun/mlrun", kind="serving")
serving_fn.add_model(
    "my-classifier",
    model_path=model_path,
    class_name="mlrun.frameworks.sklearn.SklearnModelServer",
)
serving_fn.plot(rankdir="LR")
Parameters:
  • filename -- target filepath for the image (None for the notebook)

  • format -- The output format used for rendering ('pdf', 'png', etc.)

  • source -- source step to add to the graph

  • kw -- kwargs passed to graphviz, e.g. rankdir="LR" (see: https://graphviz.org/doc/info/attrs.html)

Returns:

graphviz graph object

remove_states(keys: list)[source]#

remove one, multiple, or all states/models from the spec (blank list for all)

set_topology(topology=None, class_name=None, engine=None, exist_ok=False, **class_args) RootFlowStep | RouterStep[source]#

set the serving graph topology (router/flow) and root class or params

examples:

# simple model router topology
graph = fn.set_topology("router")
fn.add_model(name, class_name="ClassifierModel", model_path=model_uri)

# async flow topology
graph = fn.set_topology("flow", engine="async")
graph.to("MyClass").to(name="to_json", handler="json.dumps").respond()

topology options are:

router - root router + multiple child route states/models
         route is usually determined by the path (route key/name)
         can specify special router class and router arguments

flow   - workflow (DAG) with a chain of states
         flow support "sync" and "async" engines, branches are not allowed in sync mode
         when using async mode calling state.respond() will mark the state as the
         one which generates the (REST) call response
Parameters:
  • topology --

    • graph topology, router or flow

  • class_name --

    • optional for router, router class name/path or router object

  • engine --

    • optional for flow, sync or async engine (default to async)

  • exist_ok --

    • allow overriding existing topology

  • class_args --

    • optional, router/flow class init args

:return graph object (fn.spec.graph)

set_tracking(stream_path: str | None = None, batch: int | None = None, sample: int | None = None, stream_args: dict | None = None, tracking_policy: TrackingPolicy | dict | None = None, enable_tracking: bool = True) None[source]#

Apply on your serving function to monitor a deployed model, including real-time dashboards to detect drift and analyze performance.

Parameters:
  • stream_path -- Path/url of the tracking stream e.g. v3io:///users/mike/mystream you can use the "dummy://" path for test/simulation.

  • batch -- Micro batch size (send micro batches of N records at a time).

  • sample -- Sample size (send only one of N records).

  • stream_args -- Stream initialization parameters, e.g. shards, retention_in_hours, ..

  • enable_tracking -- Enabled/Disable model-monitoring tracking. Default True (tracking enabled).

Example:

# initialize a new serving function
serving_fn = mlrun.import_function("hub://v2-model-server", new_name="serving")
# apply model monitoring
serving_fn.set_tracking()
property spec: ServingSpec#
to_mock_server(namespace=None, current_function='*', track_models=False, workdir=None, **kwargs) GraphServer[source]#

create mock server object for local testing/emulation

Parameters:
  • namespace -- one or list of namespaces/modules to search the steps classes/functions in

  • current_function -- specify if you want to simulate a child function, * for all functions

  • track_models -- allow model tracking (disabled by default in the mock server)

  • workdir -- working directory to locate the source code (if not the current one)

with_secrets(kind, source)[source]#

register a secrets source (file, env or dict)

read secrets from a source provider to be used in workflows, example:

task.with_secrets('file', 'file.txt')
task.with_secrets('inline', {'key': 'val'})
task.with_secrets('env', 'ENV1,ENV2')
task.with_secrets('vault', ['secret1', 'secret2'...])

# If using an empty secrets list [] then all accessible secrets will be available.
task.with_secrets('vault', [])

# To use with Azure key vault, a k8s secret must be created with the following keys:
# kubectl -n <namespace> create secret generic azure-key-vault-secret \
#     --from-literal=tenant_id=<service principal tenant ID> \
#     --from-literal=client_id=<service principal client ID> \
#     --from-literal=secret=<service principal secret key>

task.with_secrets('azure_vault', {
    'name': 'my-vault-name',
    'k8s_secret': 'azure-key-vault-secret',
    # An empty secrets list may be passed ('secrets': []) to access all vault secrets.
    'secrets': ['secret1', 'secret2'...]
})
Parameters:
  • kind -- secret type (file, inline, env)

  • source -- secret data or link (see example)

Returns:

The Runtime (function) object

class mlrun.runtimes.Spark3Runtime(spec=None, metadata=None)[source]#

Bases: KubejobRuntime

apiVersion = 'sparkoperator.k8s.io/v1beta2'#
code_path = '/etc/config/mlrun'#
code_script = 'spark-function-code.py'#
default_mlrun_image = '.spark-job-default-image'#
deploy(watch=True, with_mlrun=True, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None, builder_env: dict | None = None, show_on_failure: bool = False, force_build: bool = False)[source]#

deploy function, build container with dependencies

Parameters:
  • watch -- wait for the deploy to complete (and print build logs)

  • with_mlrun -- add the current mlrun package to the container build

  • skip_deployed -- skip the build if we already have an image for the function

  • is_kfp -- deploy as part of a kfp pipeline

  • mlrun_version_specifier -- which mlrun package version to include (if not current)

  • builder_env -- Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={"GIT_TOKEN": token}

  • show_on_failure -- show logs only in case of build failure

  • force_build -- set True for force building the image, even when no changes were made

Returns:

True if the function is ready (deployed)

classmethod deploy_default_image(with_gpu=False)[source]#
disable_monitoring()[source]#
gpu_suffix = '-cuda'#
gpus(gpus, gpu_type='nvidia.com/gpu')[source]#
group = 'sparkoperator.k8s.io'#
is_deployed()[source]#

check if the function is deployed (has a valid container)

kind = 'spark'#
plural = 'sparkapplications'#
property spec: Spark3JobSpec#
version = 'v1beta2'#
with_cores(executor_cores: int | None = None, driver_cores: int | None = None)[source]#

Allows to configure spark.executor.cores and spark.driver.cores parameters. The values must be integers greater than or equal to 1. If a parameter is not specified, it defaults to 1.

Spark operator has multiple options to control the number of cores available to the executor and driver. The .coreLimit and .coreRequest parameters can be set for both executor and driver, but they only control the k8s properties of the pods created to run the driver/executor. Spark itself uses the spec.[executor|driver].cores parameter to set the parallelism of tasks and cores assigned to each task within the pod. This function sets the .cores parameters for the job executed.

See GoogleCloudPlatform/spark-on-k8s-operator#581 for a discussion about those parameters and their meaning in Spark operator.

Parameters:
  • executor_cores -- Number of cores to use for executor (spark.executor.cores)

  • driver_cores -- Number of cores to use for driver (spark.driver.cores)

with_driver_host_path_volume(host_path: str, mount_path: str, type: str = '', volume_name: str = 'host-path-volume')[source]#

Add a host path volume and mounts it to the driver pod More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath

Parameters:
  • host_path -- Path of the directory on the host. If the path is a symlink, it follows the link to the real path

  • mount_path -- Path within the container at which the volume should be mounted. Must not contain ':'

  • type -- Type for HostPath Volume Defaults to ""

  • volume_name -- Volume's name. Must be a DNS_LABEL and unique within the pod

with_driver_limits(cpu: str | None = None, gpus: int | None = None, gpu_type: str = 'nvidia.com/gpu', patch: bool = False)[source]#

set driver pod cpu limits by default it overrides the whole limits section, if you wish to patch specific resources use patch=True.

with_driver_node_selection(node_name: str | None = None, node_selector: dict[str, str] | None = None, affinity: V1Affinity | None = None, tolerations: list[kubernetes.client.models.v1_toleration.V1Toleration] | None = None)[source]#

Enables control of which k8s node the spark executor will run on.

Parameters:
with_driver_preemption_mode(mode: PreemptionModes | str)[source]#

Preemption mode controls whether the spark driver can be scheduled on preemptible nodes. Tolerations, node selector, and affinity are populated on preemptible nodes corresponding to the function spec.

The supported modes are:

  • allow - The function can be scheduled on preemptible nodes

  • constrain - The function can only run on preemptible nodes

  • prevent - The function cannot be scheduled on preemptible nodes

  • none - No preemptible configuration will be applied on the function

The default preemption mode is configurable in mlrun.mlconf.function_defaults.preemption_mode. By default it's set to prevent

Parameters:

mode -- allow | constrain | prevent | none defined in PreemptionModes

with_driver_requests(mem: str | None = None, cpu: str | None = None, patch: bool = False)[source]#

set driver pod required cpu/memory/gpu resources by default it overrides the whole requests section, if you wish to patch specific resources use patch=True.

with_dynamic_allocation(min_executors=None, max_executors=None, initial_executors=None)[source]#

Allows to configure spark's dynamic allocation

Parameters:
  • min_executors -- Min. number of executors

  • max_executors -- Max. number of executors

  • initial_executors -- Initial number of executors

with_executor_host_path_volume(host_path: str, mount_path: str, type: str = '', volume_name: str = 'host-path-volume')[source]#

Add an host path volume and mount it to the executor pod/s More info: https://kubernetes.io/docs/concepts/storage/volumes#hostpath

Parameters:
  • host_path -- Path of the directory on the host. If the path is a symlink, it follows the link to the real path

  • mount_path -- Path within the container at which the volume should be mounted. Must not contain ':'

  • type -- Type for HostPath Volume Defaults to ""

  • volume_name -- Volume's name. Must be a DNS_LABEL and unique within the pod

with_executor_limits(cpu: str | None = None, gpus: int | None = None, gpu_type: str = 'nvidia.com/gpu', patch: bool = False)[source]#

set executor pod limits by default it overrides the whole limits section, if you wish to patch specific resources use patch=True.

with_executor_node_selection(node_name: str | None = None, node_selector: dict[str, str] | None = None, affinity: V1Affinity | None = None, tolerations: list[kubernetes.client.models.v1_toleration.V1Toleration] | None = None)[source]#

Enables control of which k8s node the spark executor will run on.

Parameters:
with_executor_preemption_mode(mode: PreemptionModes | str)[source]#

Preemption mode controls whether the spark executor can be scheduled on preemptible nodes. Tolerations, node selector, and affinity are populated on preemptible nodes corresponding to the function spec.

The supported modes are:

  • allow - The function can be scheduled on preemptible nodes

  • constrain - The function can only run on preemptible nodes

  • prevent - The function cannot be scheduled on preemptible nodes

  • none - No preemptible configuration will be applied on the function

The default preemption mode is configurable in mlrun.mlconf.function_defaults.preemption_mode, by default it's set to prevent

Parameters:

mode -- allow | constrain | prevent | none defined in PreemptionModes

with_executor_requests(mem: str | None = None, cpu: str | None = None, patch: bool = False)[source]#

set executor pod required cpu/memory/gpu resources by default it overrides the whole requests section, if you wish to patch specific resources use patch=True.

with_igz_spark(mount_v3io_to_executor=True)[source]#

Configures the pods (driver and executors) to have V3IO access (via file system and via Hadoop).

Parameters:

mount_v3io_to_executor -- When False, limits the file system mount to driver pod only. Default is True.

with_limits(mem=None, cpu=None, gpus=None, gpu_type='nvidia.com/gpu', patch: bool = False)[source]#

Set pod cpu/memory/gpu limits (max values)

Parameters:
  • mem -- set limit for memory e.g. '500M', '2G', etc.

  • cpu -- set limit for cpu e.g. '0.5', '2', etc.

  • gpus -- set limit for gpu

  • gpu_type -- set gpu type e.g. "nvidia.com/gpu"

  • patch -- by default it overrides the whole limits section, if you wish to patch specific resources use patch=True

with_node_selection(node_name: str | None = None, node_selector: dict[str, str] | None = None, affinity: V1Affinity | None = None, tolerations: list[kubernetes.client.models.v1_toleration.V1Toleration] | None = None)[source]#

Enables to control on which k8s node the job will run

Parameters:
with_preemption_mode(mode: PreemptionModes | str)[source]#

Use with_driver_preemption_mode / with_executor_preemption_mode to setup preemption_mode for spark operator

with_requests(mem=None, cpu=None, patch: bool = False)[source]#

Set requested (desired) pod cpu/memory resources

Parameters:
  • mem -- set request for memory e.g. '200M', '1G', etc.

  • cpu -- set request for cpu e.g. '0.1', '1', etc.

  • patch -- by default it overrides the whole requests section, if you wish to patch specific resources use patch=True

with_restart_policy(restart_type='OnFailure', retries=0, retry_interval=10, submission_retries=3, submission_retry_interval=20)[source]#

set restart policy restart_type=OnFailure/Never/Always

with_security_context(security_context: V1SecurityContext)[source]#

With security context is not supported for spark runtime. Driver / Executor processes run with uid / gid 1000 as long as security context is not defined. If in the future we want to support setting security context it will work only from spark version 3.2 onwards.

with_source_archive(source, workdir=None, handler=None, pull_at_runtime=True, target_dir=None)[source]#

load the code from git/tar/zip archive at runtime or build

Parameters:
  • source -- valid path to git, zip, or tar file, e.g. git://github.com/mlrun/something.git http://some/url/file.zip

  • handler -- default function handler

  • workdir -- working dir relative to the archive root (e.g. './subdir') or absolute to the image root

  • pull_at_runtime -- not supported for spark runtime, must be False

  • target_dir -- target dir on runtime pod for repo clone / archive extraction

mlrun.runtimes.nuclio#

class mlrun.runtimes.nuclio.api_gateway.APIGateway(**kwargs)[source]#

Bases: ModelObj

property authentication#
property description#
classmethod from_scheme(api_gateway: APIGateway)[source]#
property host#
invoke(method='POST', headers: dict | None = None, credentials: tuple[str, str] | None = None, path: str | None = None, body: str | bytes | dict | None = None, **kwargs)[source]#

Invoke the API gateway.

Parameters:
  • method -- (str, optional) The HTTP method for the invocation.

  • headers -- (dict, optional) The HTTP headers for the invocation.

  • credentials -- (Optional[tuple[str, str]], optional) The (username,password) for the invocation if required can also be set by the environment variable (_, V3IO_ACCESS_KEY) for access key authentication.

  • path -- (str, optional) The sub-path for the invocation.

  • body -- (Optional[Union[str, bytes, dict]]) The body of the invocation.

  • kwargs -- (dict) Additional keyword arguments.

Returns:

The response from the API gateway invocation.

property invoke_url#

Get the invoke URL.

Returns:

(str) The invoke URL.

is_ready()[source]#
property metadata: APIGatewayMetadata#
property name#
property path#
property project#
property spec: APIGatewaySpec#
property status: APIGatewayStatus#
sync()[source]#

Synchronize the API gateway from the server.

to_scheme() APIGateway[source]#
wait_for_readiness(max_wait_time=90)[source]#

Wait for the API gateway to become ready within the maximum wait time.

Parameters:

max_wait_time -- int - Maximum time to wait in seconds (default is 90 seconds).

Returns:

True if the entity becomes ready within the maximum wait time, False otherwise

Return type:

bool

with_access_key_auth(**kwargs)#
with_annotations(annotations: dict)[source]#

set a key/value annotations in the metadata of the api gateway

with_basic_auth(username: str, password: str)[source]#

Set basic authentication for the API gateway.

Parameters:
  • username -- (str) The username for basic authentication.

  • password -- (str) The password for basic authentication.

with_canary(functions: list[Union[str, mlrun.runtimes.nuclio.function.RemoteRuntime, mlrun.runtimes.nuclio.serving.ServingRuntime, mlrun.runtimes.nuclio.application.application.ApplicationRuntime]] | RemoteRuntime | ServingRuntime | ApplicationRuntime, canary: list[int])[source]#

Set canary function for the API gateway

Parameters:
  • functions -- The list of functions associated with the API gateway Can be a list of function names (["my-func1", "my-func2"]) or a list of nuclio functions of types RemoteRuntime OR ServingRuntime OR ApplicationRuntime

  • canary -- The canary percents for the API gateway of type list[int]; for instance: [20,80]

with_force_ssl_redirect()[source]#

Set SSL redirect annotation for the API gateway.

with_gateway_timeout(gateway_timeout: int)[source]#

Set gateway proxy connect/read/send timeout annotations :param gateway_timeout: The timeout in seconds

with_ports(ports: list[int])[source]#

Set ports for the API gateway

Parameters:

ports -- The ports of the API gateway, as a list of integers that correspond to the functions in the functions list. for instance: [8050] or [8050, 8081]

class mlrun.runtimes.nuclio.api_gateway.APIGatewayAuthenticator(*args, **kwargs)[source]#

Bases: Authenticator, ModelObj

class mlrun.runtimes.nuclio.api_gateway.APIGatewayMetadata(name: str, namespace: str | None = None, labels: dict | None = None, annotations: dict | None = None, creation_timestamp: str | None = None)[source]#

Bases: ModelObj

Parameters:
  • name -- The name of the API gateway

  • namespace -- The namespace of the API gateway

  • labels -- The labels of the API gateway

  • annotations -- The annotations of the API gateway

  • creation_timestamp -- The creation timestamp of the API gateway

class mlrun.runtimes.nuclio.api_gateway.APIGatewaySpec(functions: list[typing.Union[str, mlrun.runtimes.nuclio.function.RemoteRuntime, mlrun.runtimes.nuclio.serving.ServingRuntime, mlrun.runtimes.nuclio.application.application.ApplicationRuntime]] | ~mlrun.runtimes.nuclio.function.RemoteRuntime | ~mlrun.runtimes.nuclio.serving.ServingRuntime | ~mlrun.runtimes.nuclio.application.application.ApplicationRuntime, project: str | None = None, description: str = '', host: str | None = None, path: str = '/', authentication: ~mlrun.runtimes.nuclio.api_gateway.APIGatewayAuthenticator | None = <mlrun.runtimes.nuclio.api_gateway.NoneAuth object>, canary: list[int] | None = None, ports: list[int] | None = None)[source]#

Bases: ModelObj

Parameters:
  • functions -- The list of functions associated with the API gateway Can be a list of function names (["my-func1", "my-func2"]) or a list or a single entity of RemoteRuntime OR ServingRuntime OR ApplicationRuntime

  • project -- The project name

  • description -- Optional description of the API gateway

  • path -- Optional path of the API gateway, default value is "/"

  • authentication -- The authentication for the API gateway of type BasicAuth

  • host -- The host of the API gateway (optional). If not set, it will be automatically generated

  • canary -- The canary percents for the API gateway of type list[int]; for instance: [20,80] (optional)

  • ports -- The ports of the API gateway, as a list of integers that correspond to the functions in the functions list. for instance: [8050] or [8050, 8081] (optional)

enrich()[source]#
validate(project: str, functions: list[Union[str, mlrun.runtimes.nuclio.function.RemoteRuntime, mlrun.runtimes.nuclio.serving.ServingRuntime, mlrun.runtimes.nuclio.application.application.ApplicationRuntime]] | RemoteRuntime | ServingRuntime | ApplicationRuntime, canary: list[int] | None = None, ports: list[int] | None = None)[source]#
class mlrun.runtimes.nuclio.api_gateway.APIGatewayStatus(state: APIGatewayState | None = None)[source]#

Bases: ModelObj

class mlrun.runtimes.nuclio.api_gateway.AccessKeyAuth(*args, **kwargs)[source]#

Bases: APIGatewayAuthenticator

An API gateway authenticator with access key authentication.

property authentication_mode: str#
class mlrun.runtimes.nuclio.api_gateway.Authenticator(*args, **kwargs)[source]#

Bases: Protocol

property authentication_mode: str#
classmethod from_scheme(api_gateway_spec: APIGatewaySpec)[source]#
to_scheme() dict[str, Optional[mlrun.common.schemas.api_gateway.APIGatewayBasicAuth]] | None[source]#
class mlrun.runtimes.nuclio.api_gateway.BasicAuth(username=None, password=None)[source]#

Bases: APIGatewayAuthenticator

An API gateway authenticator with basic authentication.

Parameters:
  • username -- (str) The username for basic authentication.

  • password -- (str) The password for basic authentication.

property authentication_mode: str#
to_scheme() dict[str, Optional[mlrun.common.schemas.api_gateway.APIGatewayBasicAuth]] | None[source]#
class mlrun.runtimes.nuclio.api_gateway.NoneAuth[source]#

Bases: APIGatewayAuthenticator

An API gateway authenticator with no authentication.