mlrun.runtimes

class mlrun.runtimes.BaseRuntime(metadata=None, spec=None)[source]

Bases: mlrun.model.ModelObj

as_step(runspec: Optional[mlrun.model.RunObject] = None, handler=None, name: str = '', project: str = '', params: Optional[dict] = None, hyperparams=None, selector='', hyper_param_options: Optional[mlrun.model.HyperParamOptions] = None, inputs: Optional[dict] = None, outputs: Optional[dict] = None, workdir: str = '', artifact_path: str = '', image: str = '', labels: Optional[dict] = None, use_db=True, verbose=None, scrape_metrics=False)[source]

Run a local or remote task.

Parameters
  • runspec – run template object or dict (see RunTemplate)

  • handler – name of the function handler

  • name – execution name

  • project – project name

  • params – input parameters (dict)

  • hyperparams – hyper parameters

  • selector – selection criteria for hyper params

  • inputs – input objects (dict of key: path)

  • outputs – list of outputs which can pass in the workflow

  • artifact_path – default artifact output path (replace out_path)

  • workdir – default input artifacts path

  • image – container image to use

  • labels – labels to tag the job/run with ({key:val, ..})

  • use_db – save function spec in the db (vs the workflow file)

  • verbose – add verbose prints/logs

  • scrape_metrics – whether to add the mlrun/scrape-metrics label to this run’s resources

Returns

KubeFlow containerOp

doc()[source]
export(target='', format='.yaml', secrets=None, strip=True)[source]

save function spec to a local/remote path (default to./function.yaml)

Parameters
  • target – target path/url

  • format.yaml (default) or .json

  • secrets – optional secrets dict/object for target path (e.g. s3)

  • strip – strip status data

Returns

self

fill_credentials()[source]
full_image_path(image=None)[source]
property is_deployed
kind = 'base'
property metadata
run(runspec: Optional[mlrun.model.RunObject] = None, handler=None, name: str = '', project: str = '', params: Optional[dict] = None, inputs: Optional[Dict[str, str]] = None, out_path: str = '', workdir: str = '', artifact_path: str = '', watch: bool = True, schedule: Optional[Union[str, mlrun.api.schemas.schedule.ScheduleCronTrigger]] = None, hyperparams: Optional[Dict[str, list]] = None, hyper_param_options: Optional[mlrun.model.HyperParamOptions] = None, verbose=None, scrape_metrics: Optional[bool] = None, local=False, local_code_path=None, auto_build=None)mlrun.model.RunObject[source]

Run a local or remote task.

Parameters
  • runspec – run template object or dict (see RunTemplate)

  • handler – pointer or name of a function handler

  • name – execution name

  • project – project name

  • params – input parameters (dict)

  • inputs – input objects (dict of key: path)

  • out_path – default artifact output path

  • artifact_path – default artifact output path (will replace out_path)

  • workdir – default input artifacts path

  • watch – watch/follow run log

  • schedule – ScheduleCronTrigger class instance or a standard crontab expression string (which will be converted to the class using its from_crontab constructor. see this link for help: https://apscheduler.readthedocs.io/en/v3.6.3/modules/triggers/cron.html#module-apscheduler.triggers.cron

  • hyperparams – dict of param name and list of values to be enumerated e.g. {“p1”: [1,2,3]} the default strategy is grid search, can specify strategy (grid, list, random) and other options in the hyper_param_options parameter

  • hyper_param_options – dict or HyperParamOptions struct of hyper parameter options

  • verbose – add verbose prints/logs

  • scrape_metrics – whether to add the mlrun/scrape-metrics label to this run’s resources

  • local – run the function locally vs on the runtime/cluster

  • local_code_path – path of the code for local runs & debug

  • auto_build – when set to True and the function require build it will be built on the first function run, use only if you dont plan on changing the build config between runs

Returns

run context object (RunObject) with run metadata, results and status

save(tag='', versioned=False, refresh=False)str[source]
set_db_connection(conn, is_api=False)[source]
set_label(key, value)[source]
property spec
property status
store_run(runobj: mlrun.model.RunObject)[source]
to_dict(fields=None, exclude=None, strip=False)[source]

convert the object to a python dictionary

try_auto_mount_based_on_config()[source]
property uri
validate_and_enrich_service_account(allowed_service_account, default_service_account)[source]
verify_base_image()[source]
with_code(from_file='', body=None, with_doc=True)[source]

Update the function code This function eliminates the need to build container images every time we edit the code

Parameters
  • from_file – blank for current notebook, or path to .py/.ipynb file

  • body – will use the body as the function code

  • with_doc – update the document of the function parameters

Returns

function object

with_requirements(requirements: Union[str, List[str]])[source]

add package requirements from file or list to build spec.

Parameters

requirements – python requirements file path or list of packages

Returns

function object

class mlrun.runtimes.DaskCluster(spec=None, metadata=None)[source]

Bases: mlrun.runtimes.kubejob.KubejobRuntime

property client
close(running=True)[source]
cluster()[source]
deploy(watch=True, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None)[source]

deploy function, build container with dependencies

get_status()[source]
gpus(gpus, gpu_type='nvidia.com/gpu')[source]
property initialized
property is_deployed

check if the function is deployed (have a valid container)

kind = 'dask'
property spec
property status
with_limits(mem=None, cpu=None, gpus=None, gpu_type='nvidia.com/gpu')[source]

set pod cpu/memory/gpu limits

with_requests(mem=None, cpu=None)[source]

set requested (desired) pod cpu/memory resources

with_scheduler_limits(mem=None, cpu=None, gpus=None, gpu_type='nvidia.com/gpu')[source]

set scheduler pod resources limits

with_scheduler_requests(mem=None, cpu=None)[source]

set scheduler pod resources requests

with_worker_limits(mem=None, cpu=None, gpus=None, gpu_type='nvidia.com/gpu')[source]

set worker pod resources limits

with_worker_requests(mem=None, cpu=None)[source]

set worker pod resources requests

class mlrun.runtimes.HandlerRuntime(metadata=None, spec=None)[source]

Bases: mlrun.runtimes.base.BaseRuntime, mlrun.runtimes.local.ParallelRunner

kind = 'handler'
class mlrun.runtimes.KubejobRuntime(spec=None, metadata=None)[source]

Bases: mlrun.runtimes.pod.KubeResource

build_config(image='', base_image=None, commands: Optional[list] = None, secret=None, source=None, extra=None, load_source_on_run=None, with_mlrun=None, auto_build=None)[source]

specify builder configuration for the deploy operation

Parameters
  • image – target image name/path

  • base_image – base image name/path

  • commands – list of docker build (RUN) commands e.g. [‘pip install pandas’]

  • secret – k8s secret for accessing the docker registry

  • source – source git/tar archive to load code from in to the context/workdir e.g. git://github.com/mlrun/something.git#development

  • extra – extra Dockerfile lines

  • load_source_on_run – load the archive code into the container at runtime vs at build time

  • with_mlrun – add the current mlrun package to the container build

  • auto_build – when set to True and the function require build it will be built on the first function run, use only if you dont plan on changing the build config between runs

builder_status(watch=True, logs=True)[source]
deploy(watch=True, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None, builder_env: Optional[dict] = None)bool[source]

deploy function, build container with dependencies

Parameters
  • watch – wait for the deploy to complete (and print build logs)

  • with_mlrun – add the current mlrun package to the container build

  • skip_deployed – skip the build if we already have an image for the function

  • mlrun_version_specifier – which mlrun package version to include (if not current)

  • builder_env – Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={“GIT_TOKEN”: token}

:return True if the function is ready (deployed)

deploy_step(image=None, base_image=None, commands: Optional[list] = None, secret_name='', with_mlrun=True, skip_deployed=False)[source]
property is_deployed

check if the function is deployed (have a valid container)

kind = 'job'
with_source_archive(source, pythonpath=None, pull_at_runtime=True)[source]

load the code from git/tar/zip archive at runtime or build

Parameters
  • source – valid path to git, zip, or tar file, e.g. git://github.com/mlrun/something.git http://some/url/file.zip

  • pythonpath – python search path relative to the archive root or absolute (e.g. ‘./subdir’)

  • pull_at_runtime – load the archive into the container at job runtime vs on build/deploy

class mlrun.runtimes.LocalRuntime(metadata=None, spec=None)[source]

Bases: mlrun.runtimes.base.BaseRuntime, mlrun.runtimes.local.ParallelRunner

property is_deployed
kind = 'local'
to_job(image='')[source]
with_source_archive(source, pythonpath=None)[source]

load the code from git/tar/zip archive at runtime or build

Parameters
  • source – valid path to git, zip, or tar file, e.g. git://github.com/mlrun/something.git http://some/url/file.zip

  • pythonpath – python search path relative to the archive root or absolute (e.g. ‘./subdir’)

class mlrun.runtimes.RemoteRuntime(spec=None, metadata=None)[source]

Bases: mlrun.runtimes.pod.KubeResource

add_model(name, model_path, **kw)[source]
add_secrets_config_to_spec()[source]
add_trigger(name, spec)[source]

add a nuclio trigger object/dict

Parameters
  • name – trigger name

  • spec – trigger object or dict

add_v3io_stream_trigger(stream_path, name='stream', group='serving', seek_to='earliest', shards=1, extra_attributes=None, ack_window_size=None, **kwargs)[source]

add v3io stream trigger to the function

Parameters
  • stream_path – v3io stream path (e.g. ‘v3io:///projects/myproj/stream1’

  • name – trigger name

  • group – consumer group

  • seek_to – start seek from: “earliest”, “latest”, “time”, “sequence”

  • shards – number of shards (used to set number of replicas)

  • extra_attributes – key/value dict with extra trigger attributes

  • ack_window_size – stream ack window size (the consumer group will be updated with the event id - ack_window_size, on failure the events in the window will be retransmitted)

  • kwargs – extra V3IOStreamTrigger class attributes

add_volume(local, remote, name='fs', access_key='', user='')[source]
deploy(dashboard='', project='', tag='', verbose=False, auth_info: Optional[mlrun.api.schemas.auth.AuthInfo] = None)[source]

Deploy the nuclio function to the cluster

Parameters
  • dashboard – address of the nuclio dashboard service (keep blank for current cluster)

  • project – project name

  • tag – function tag

  • verbose – set True for verbose logging

  • auth_info – service AuthInfo

deploy_step(dashboard='', project='', models=None, env=None, tag=None, verbose=None, use_function_from_db=True)[source]

return as a Kubeflow pipeline step (ContainerOp), recommended to use mlrun.deploy_function() instead

from_image(image)[source]
get_nuclio_config_spec_env()[source]
invoke(path: str, body: Optional[Union[str, bytes, dict]] = None, method: Optional[str] = None, headers: Optional[dict] = None, dashboard: str = '', force_external_address: bool = False, auth_info: Optional[mlrun.api.schemas.auth.AuthInfo] = None)[source]

Invoke the remote (live) function and return the results

example:

function.invoke("/api", body={"inputs": x})
Parameters
  • path – request sub path (e.g. /images)

  • body – request body (str, bytes or a dict for json requests)

  • method – HTTP method (GET, PUT, ..)

  • headers – key/value dict with http headers

  • dashboard – nuclio dashboard address

  • force_external_address – use the external ingress URL

  • auth_info – service AuthInfo

kind = 'remote'
serving(models: Optional[dict] = None, model_class='', protocol='', image='', endpoint='', explainer=False, workers=8, canary=None)[source]
set_config(key, value)[source]
property spec
property status
with_http(workers=8, port=0, host=None, paths=None, canary=None, secret=None, worker_timeout: Optional[int] = None, gateway_timeout: Optional[int] = None, trigger_name=None, annotations=None, extra_attributes=None)[source]

update/add nuclio HTTP trigger settings

Note: gateway timeout is the maximum request time before an error is returned, while the worker timeout if the max time a request will wait for until it will start processing, gateway_timeout must be greater than the worker_timeout.

Parameters
  • workers – number of worker processes (default=8)

  • port – TCP port

  • host – hostname

  • paths – list of sub paths

  • canary – k8s ingress canary (% traffic value between 0 to 100)

  • secret – k8s secret name for SSL certificate

  • worker_timeout – worker wait timeout in sec (how long a message should wait in the worker queue before an error is returned)

  • gateway_timeout – nginx ingress timeout in sec (request timeout, when will the gateway return an error)

  • trigger_name – alternative nuclio trigger name

  • annotations – key/value dict of ingress annotations

  • extra_attributes – key/value dict of extra nuclio trigger attributes

Returns

function object (self)

with_node_selection(**kwargs)[source]

Enables to control on which k8s node the job will run

Parameters
with_priority_class(**kwargs)[source]

Enables to control the priority of the pod If not passed - will default to mlrun.mlconf.default_function_priority_class_name

Parameters

name – The name of the priority class

with_source_archive(source, handler='', runtime='', secrets=None)[source]

Load nuclio function from remote source :param source: a full path to the nuclio function source (code entry) to load the function from :param handler: a path to the function’s handler, including path inside archive/git repo :param runtime: (optional) the runtime of the function (defaults to python:3.7) :param secrets: a dictionary of secrets to be used to fetch the function from the source.

(can also be passed using env vars). options: [“V3IO_ACCESS_KEY”, “GIT_USERNAME”, “GIT_PASSWORD”, “AWS_ACCESS_KEY_ID”, “AWS_SECRET_ACCESS_KEY”, “AWS_SESSION_TOKEN”]

Examples::
git:
(“git://github.com/org/repo#my-branch”,

handler=”path/inside/repo#main:handler”, secrets={“GIT_PASSWORD”: “my-access-token”})

s3:
(“s3://my-bucket/path/in/bucket/my-functions-archive”,

handler=”path/inside/functions/archive#main:Handler”, runtime=”golang”, secrets={“AWS_ACCESS_KEY_ID”: “some-id”, “AWS_SECRET_ACCESS_KEY”: “some-secret”})

with_v3io(local='', remote='')[source]

Add v3io volume to the function

Parameters
  • local – local path (mount path inside the function container)

  • remote – v3io path

class mlrun.runtimes.RemoteSparkRuntime(spec=None, metadata=None)[source]

Bases: mlrun.runtimes.kubejob.KubejobRuntime

default_image = '.remote-spark-default-image'
deploy(watch=True, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None)[source]

deploy function, build container with dependencies

classmethod deploy_default_image()[source]
property is_deployed

check if the function is deployed (have a valid container)

kind = 'remote-spark'
property spec
with_spark_service(spark_service, provider='iguazio')[source]

Attach spark service to function

class mlrun.runtimes.ServingRuntime(spec=None, metadata=None)[source]

Bases: mlrun.runtimes.function.RemoteRuntime

MLRun Serving Runtime

add_child_function(name, url=None, image=None, requirements=None, kind=None)[source]

in a multi-function pipeline add child function

example:

fn.add_child_function('enrich', './enrich.ipynb', 'mlrun/mlrun')
Parameters
  • name – child function name

  • url – function/code url, support .py, .ipynb, .yaml extensions

  • image – base docker image for the function

  • requirements – py package requirements file path OR list of packages

  • kind – mlrun function/runtime kind

:return function object

add_model(key, model_path=None, class_name=None, model_url=None, handler=None, router_step=None, child_function=None, **class_args)[source]

add ml model and/or route to the function.

Example, create a function (from the notebook), add a model class, and deploy:

fn = code_to_function(kind='serving')
fn.add_model('boost', model_path, model_class='MyClass', my_arg=5)
fn.deploy()

only works with router topology, for nested topologies (model under router under flow) need to add router to flow and use router.add_route()

Parameters
  • key – model api key (or name:version), will determine the relative url/path

  • model_path – path to mlrun model artifact or model directory file/object path

  • class_name – V2 Model python class name or a model class instance (can also module.submodule.class and it will be imported automatically)

  • model_url – url of a remote model serving endpoint (cannot be used with model_path)

  • handler – for advanced users!, override default class handler name (do_event)

  • router_step – router step name (to determine which router we add the model to in graphs with multiple router steps)

  • child_function – child function name, when the model runs in a child function

  • class_args – extra kwargs to pass to the model serving class __init__ (can be read in the model using .get_param(key) method)

add_secrets_config_to_spec()[source]
deploy(dashboard='', project='', tag='', verbose=False, auth_info: Optional[mlrun.api.schemas.auth.AuthInfo] = None)[source]

deploy model serving function to a local/remote cluster

Parameters
  • dashboard – remote nuclio dashboard url (blank for local or auto detection)

  • project – optional, override function specified project name

  • tag – specify unique function tag (a different function service is created for every tag)

  • verbose – verbose logging

  • auth_info – The auth info to use to communicate with the Nuclio dashboard, required only when providing dashboard

kind = 'serving'
remove_states(keys: list)[source]

remove one, multiple, or all states/models from the spec (blank list for all)

set_topology(topology=None, class_name=None, engine=None, exist_ok=False, **class_args)Union[mlrun.serving.states.RootFlowStep, mlrun.serving.states.RouterStep][source]

set the serving graph topology (router/flow) and root class or params

examples:

# simple model router topology
graph = fn.set_topology("router")
fn.add_model(name, class_name="ClassifierModel", model_path=model_uri)

# async flow topology
graph = fn.set_topology("flow", engine="async")
graph.to("MyClass").to(name="to_json", handler="json.dumps").respond()

topology options are:

router - root router + multiple child route states/models
         route is usually determined by the path (route key/name)
         can specify special router class and router arguments

flow   - workflow (DAG) with a chain of states
         flow support "sync" and "async" engines, branches are not allowed in sync mode
         when using async mode calling state.respond() will mark the state as the
         one which generates the (REST) call response
Parameters
  • topology

    • graph topology, router or flow

  • class_name

    • optional for router, router class name/path or router object

  • engine

    • optional for flow, sync or async engine (default to async)

  • exist_ok

    • allow overriding existing topology

  • class_args

    • optional, router/flow class init args

:return graph object (fn.spec.graph)

set_tracking(stream_path: Optional[str] = None, batch: Optional[int] = None, sample: Optional[int] = None, stream_args: Optional[dict] = None)[source]

set tracking stream parameters:

Parameters
  • stream_path – path/url of the tracking stream e.g. v3io:///users/mike/mystream you can use the “dummy://” path for test/simulation

  • batch – micro batch size (send micro batches of N records at a time)

  • sample – sample size (send only one of N records)

  • stream_args – stream initialization parameters, e.g. shards, retention_in_hours, ..

property spec
to_mock_server(namespace=None, current_function='*', track_models=False, **kwargs)mlrun.serving.server.GraphServer[source]

create mock server object for local testing/emulation

Parameters
  • namespace – classes search namespace, use globals() for current notebook

  • log_level – log level (error | info | debug)

  • current_function – specify if you want to simulate a child function, * for all functions

  • track_models – allow model tracking (disabled by default in the mock server)

with_secrets(kind, source)[source]

register a secrets source (file, env or dict)

read secrets from a source provider to be used in workflows, example:

task.with_secrets('file', 'file.txt')
task.with_secrets('inline', {'key': 'val'})
task.with_secrets('env', 'ENV1,ENV2')
task.with_secrets('vault', ['secret1', 'secret2'...])

# If using an empty secrets list [] then all accessible secrets will be available.
task.with_secrets('vault', [])

# To use with Azure key vault, a k8s secret must be created with the following keys:
# kubectl -n <namespace> create secret generic azure-key-vault-secret \
#     --from-literal=tenant_id=<service principal tenant ID> \
#     --from-literal=client_id=<service principal client ID> \
#     --from-literal=secret=<service principal secret key>

task.with_secrets('azure_vault', {
    'name': 'my-vault-name',
    'k8s_secret': 'azure-key-vault-secret',
    # An empty secrets list may be passed ('secrets': []) to access all vault secrets.
    'secrets': ['secret1', 'secret2'...]
})
Parameters
  • kind – secret type (file, inline, env)

  • source – secret data or link (see example)

Returns

The Runtime (function) object