mlrun.run#

class mlrun.run.ArtifactType(value)[source]#

Bases: enum.Enum

Possible artifact types to log using the MLRun context decorator.

DATASET = 'dataset'#
DEFAULT = 'result'#
DIRECTORY = 'directory'#
FILE = 'file'#
OBJECT = 'object'#
PLOT = 'plot'#
RESULT = 'result'#
class mlrun.run.ContextHandler[source]#

Bases: object

Private class for handling an MLRun context of a function that is wrapped in MLRun’s handler decorator.

The context handler have 3 duties:
  1. Check if the user used MLRun to run the wrapped function and if so, get the MLRun context.

  2. Parse the user’s inputs (MLRun DataItem) to the function.

  3. Log the function’s outputs to MLRun.

The context handler use dictionaries to map objects to their logging / parsing function. The maps can be edited using the relevant update_X class method. If needed to add additional artifacts types, the ArtifactType class can be inherited and replaced as well using the update_artifact_type_class class method.

Initialize a context handler.

is_context_available() bool[source]#

Check if a context was found by the method look_for_context.

Returns

True if a context was found and False otherwise.

log_outputs(outputs: list, logging_instructions: List[Optional[Union[Tuple[str, mlrun.run.ArtifactType], Tuple[str, str], Tuple[str, mlrun.run.ArtifactType, Dict[str, Any]], Tuple[str, str, Dict[str, Any]], str]]])[source]#

Log the given outputs as artifacts with the stored context.

Parameters
  • outputs – List of outputs to log.

  • logging_instructions – List of logging instructions to use.

look_for_context(args: tuple, kwargs: dict)[source]#
Look for an MLRun context (mlrun.MLClientCtx). The handler will look for a context in the given order:
  1. The given arguments.

  2. The given keyword arguments.

  3. If an MLRun RunTime was used the context will be located via the mlrun.get_or_create_ctx method.

Parameters
  • args – The arguments tuple passed to the function.

  • kwargs – The keyword arguments dictionary passed to the function.

parse_inputs(args: tuple, kwargs: dict, expected_arguments_types: collections.OrderedDict) tuple[source]#

Parse the given arguments and keyword arguments data items to the expected types.

Parameters
  • args – The arguments tuple passed to the function.

  • kwargs – The keyword arguments dictionary passed to the function.

  • expected_arguments_types – An ordered dictionary of the expected types of arguments.

Returns

The parsed args (kwargs are parsed inplace).

set_labels(labels: Dict[str, str])[source]#

Set the given labels with the stored context.

Parameters

labels – The labels to set.

classmethod update_artifact_type_class(artifact_type_class: Type[mlrun.run.ArtifactType])[source]#

Update the artifact type enum class that the handler will use to specify new artifact types to log and parse.

Parameters

artifact_type_class – An enum inheriting from the ArtifactType enum.

classmethod update_default_objects_artifact_types_map(updates: Dict[type, mlrun.run.ArtifactType])[source]#

Enrich the default objects artifact types map with new objects types to support.

Parameters

updates – New objects types to artifact types to support.

classmethod update_inputs_parsing_map(updates: Dict[type, Callable[[mlrun.datastore.base.DataItem], Any]])[source]#

Enrich the inputs parsing map with new objects to support. The inputs parsing map is a dictionary of object types as key, and a function that will handle the given input. The function must accept 1 keyword argument (data_item: mlrun.DataItem) and return the relevant parsed object.

Parameters

updates – New object types to support - a dictionary of artifact type enum as key, and a function that will handle the given input to update the current map.

classmethod update_outputs_logging_map(updates: Dict[mlrun.run.ArtifactType, Callable[[mlrun.execution.MLClientCtx, Any, str, dict], None]])[source]#

Enrich the outputs logging map with new artifact types to support. The outputs logging map is a dictionary of artifact type enum as key, and a function that will handle the given output. The function must accept 4 keyword arguments

  • ctx: mlrun.MLClientCtx - The MLRun context to log with.

  • obj: Any - The value / object to log.

  • key: str - The key of the artifact.

  • logging_kwargs: dict - Keyword arguments the user can pass in the instructions tuple.

Parameters

updates – New artifact types to support - a dictionary of artifact type enum as key, and a function that will handle the given output to update the current map.

class mlrun.run.InputsParser[source]#

Bases: object

A static class to hold all the common parsing functions - functions for parsing MLRun DataItem to the user desired type.

static parse_dict(data_item: mlrun.datastore.base.DataItem) dict[source]#

Parse an MLRun DataItem to a dict.

Parameters

data_item – The DataItem to parse.

Returns

The DataItem as a dict.

static parse_list(data_item: mlrun.datastore.base.DataItem) list[source]#

Parse an MLRun DataItem to a list.

Parameters

data_item – The DataItem to parse.

Returns

The DataItem as a list.

static parse_numpy_array(data_item: mlrun.datastore.base.DataItem) numpy.ndarray[source]#

Parse an MLRun DataItem to a numpy.ndarray.

Parameters

data_item – The DataItem to parse.

Returns

The DataItem as a numpy.ndarray.

static parse_object(data_item: mlrun.datastore.base.DataItem) object[source]#

Parse an MLRun DataItem to its unpickled object. The pickle file will be downloaded to a local temp directory and then loaded.

Parameters

data_item – The DataItem to parse.

Returns

The DataItem as the original object that was pickled once it was logged.

static parse_pandas_dataframe(data_item: mlrun.datastore.base.DataItem) pandas.core.frame.DataFrame[source]#

Parse an MLRun DataItem to a pandas.DataFrame.

Parameters

data_item – The DataItem to parse.

Returns

The DataItem as a pandas.DataFrame.

class mlrun.run.OutputsLogger[source]#

Bases: object

A static class to hold all the common logging functions - functions for logging different objects by artifact type to MLRun.

static log_dataset(ctx: mlrun.execution.MLClientCtx, obj: Union[pandas.core.frame.DataFrame, numpy.ndarray, pandas.core.series.Series, dict, list], key: str, logging_kwargs: dict)[source]#

Log an object as a dataset. The dataset wil lbe cast to a pandas.DataFrame. Supporting casting from pandas.Series, numpy.ndarray, dict and list.

Parameters
  • ctx – The MLRun context to log with.

  • obj – The data to log.

  • key – The key of the artifact.

  • logging_kwargs – Additional keyword arguments to pass to the context.log_dataset

Raises

MLRunInvalidArgumentError – If the type is not supported for being cast to pandas.DataFrame.

static log_directory(ctx: mlrun.execution.MLClientCtx, obj: Union[str, pathlib.Path], key: str, logging_kwargs: dict)[source]#

Log a directory as a zip file. The zip file will be created at the current working directory. Once logged, it will be deleted.

Parameters
  • ctx – The MLRun context to log with.

  • obj – The directory to zip path.

  • key – The key of the artifact.

  • logging_kwargs – Additional keyword arguments to pass to the context.log_artifact method.

Raises

MLRunInvalidArgumentError – In case the given path is not of a directory or do not exist.

static log_file(ctx: mlrun.execution.MLClientCtx, obj: Union[str, pathlib.Path], key: str, logging_kwargs: dict)[source]#

Log a file to MLRun.

Parameters
  • ctx – The MLRun context to log with.

  • obj – The path of the file to log.

  • key – The key of the artifact.

  • logging_kwargs – Additional keyword arguments to pass to the context.log_artifact method.

Raises

MLRunInvalidArgumentError – In case the given path is not of a file or do not exist.

static log_object(ctx: mlrun.execution.MLClientCtx, obj, key: str, logging_kwargs: dict)[source]#

Log an object as a pickle.

Parameters
  • ctx – The MLRun context to log with.

  • obj – The object to log.

  • key – The key of the artifact.

  • logging_kwargs – Additional keyword arguments to pass to the context.log_artifact method.

static log_plot(ctx: mlrun.execution.MLClientCtx, obj, key: str, logging_kwargs: dict)[source]#

Log an object as a plot. Currently, supporting plots produced by one the following modules: matplotlib, seaborn, plotly and bokeh.

Parameters
  • ctx – The MLRun context to log with.

  • obj – The plot to log.

  • key – The key of the artifact.

  • logging_kwargs – Additional keyword arguments to pass to the context.log_artifact.

Raises

MLRunInvalidArgumentError – If the object type is not supported (meaning the plot was not produced by one of the supported modules).

static log_result(ctx: mlrun.execution.MLClientCtx, obj: Union[int, float, str, list, tuple, dict, numpy.ndarray], key: str, logging_kwargs: dict)[source]#

Log an object as a result. The objects value will be cast to a serializable version of itself. Supporting: int, float, str, list, tuple, dict, numpy.ndarray

Parameters
  • ctx – The MLRun context to log with.

  • obj – The value to log.

  • key – The key of the artifact.

  • logging_kwargs – Additional keyword arguments to pass to the context.log_result method.

class mlrun.run.RunStatuses[source]#

Bases: object

static all()[source]#
error = 'Error'#
failed = 'Failed'#
running = 'Running'#
skipped = 'Skipped'#
static stable_statuses()[source]#
succeeded = 'Succeeded'#
static transient_statuses()[source]#
mlrun.run.code_to_function(name: str = '', project: str = '', tag: str = '', filename: str = '', handler: str = '', kind: str = '', image: Optional[str] = None, code_output: str = '', embed_code: bool = True, description: str = '', requirements: Optional[Union[str, List[str]]] = None, categories: Optional[List[str]] = None, labels: Optional[Dict[str, str]] = None, with_doc: bool = True, ignored_tags=None) Union[mlrun.runtimes.mpijob.v1alpha1.MpiRuntimeV1Alpha1, mlrun.runtimes.mpijob.v1.MpiRuntimeV1, mlrun.runtimes.function.RemoteRuntime, mlrun.runtimes.serving.ServingRuntime, mlrun.runtimes.daskjob.DaskCluster, mlrun.runtimes.kubejob.KubejobRuntime, mlrun.runtimes.local.LocalRuntime, mlrun.runtimes.sparkjob.spark2job.Spark2Runtime, mlrun.runtimes.sparkjob.spark3job.Spark3Runtime, mlrun.runtimes.remotesparkjob.RemoteSparkRuntime][source]#

Convenience function to insert code and configure an mlrun runtime.

Easiest way to construct a runtime type object. Provides the most often used configuration options for all runtimes as parameters.

Instantiated runtimes are considered ‘functions’ in mlrun, but they are anything from nuclio functions to generic kubernetes pods to spark jobs. Functions are meant to be focused, and as such limited in scope and size. Typically a function can be expressed in a single python module with added support from custom docker images and commands for the environment. The returned runtime object can be further configured if more customization is required.

One of the most important parameters is ‘kind’. This is what is used to specify the chosen runtimes. The options are:

  • local: execute a local python or shell script

  • job: insert the code into a Kubernetes pod and execute it

  • nuclio: insert the code into a real-time serverless nuclio function

  • serving: insert code into orchestrated nuclio function(s) forming a DAG

  • dask: run the specified python code / script as Dask Distributed job

  • mpijob: run distributed Horovod jobs over the MPI job operator

  • spark: run distributed Spark job using Spark Kubernetes Operator

  • remote-spark: run distributed Spark job on remote Spark service

Learn more about function runtimes here: https://docs.mlrun.org/en/latest/runtimes/functions.html#function-runtimes

Parameters
  • name – function name, typically best to use hyphen-case

  • project – project used to namespace the function, defaults to ‘default’

  • tag – function tag to track multiple versions of the same function, defaults to ‘latest’

  • filename – path to .py/.ipynb file, defaults to current jupyter notebook

  • handler – The default function handler to call for the job or nuclio function, in batch functions (job, mpijob, ..) the handler can also be specified in the .run() command, when not specified the entire file will be executed (as main). for nuclio functions the handler is in the form of module:function, defaults to ‘main:handler’

  • kind – function runtime type string - nuclio, job, etc. (see docstring for all options)

  • image – base docker image to use for building the function container, defaults to None

  • code_output – specify ‘.’ to generate python module from the current jupyter notebook

  • embed_code – indicates whether or not to inject the code directly into the function runtime spec, defaults to True

  • description – short function description, defaults to ‘’

  • requirements – list of python packages or pip requirements file path, defaults to None

  • categories – list of categories for mlrun function marketplace, defaults to None

  • labels – immutable name/value pairs to tag the function with useful metadata, defaults to None

  • with_doc – indicates whether to document the function parameters, defaults to True

  • ignored_tags – notebook cells to ignore when converting notebooks to py code (separated by ‘;’)

Returns

pre-configured function object from a mlrun runtime class

example:

import mlrun

# create job function object from notebook code and add doc/metadata
fn = mlrun.code_to_function("file_utils", kind="job",
                            handler="open_archive", image="mlrun/mlrun",
                            description = "this function opens a zip archive into a local/mounted folder",
                            categories = ["fileutils"],
                            labels = {"author": "me"})

example:

import mlrun
from pathlib import Path

# create file
Path("mover.py").touch()

# create nuclio function object from python module call mover.py
fn = mlrun.code_to_function("nuclio-mover", kind="nuclio",
                            filename="mover.py", image="python:3.7",
                            description = "this function moves files from one system to another",
                            requirements = ["pandas"],
                            labels = {"author": "me"})
mlrun.run.download_object(url, target, secrets=None)[source]#

download mlrun dataitem (from path/url to target path)

mlrun.run.function_to_module(code='', workdir=None, secrets=None, silent=False)[source]#

Load code, notebook or mlrun function as .py module this function can import a local/remote py file or notebook or load an mlrun function object as a module, you can use this from your code, notebook, or another function (for common libs)

Note: the function may have package requirements which must be satisfied

example:

mod = mlrun.function_to_module('./examples/training.py')
task = mlrun.new_task(inputs={'infile.txt': '../examples/infile.txt'})
context = mlrun.get_or_create_ctx('myfunc', spec=task)
mod.my_job(context, p1=1, p2='x')
print(context.to_yaml())

fn = mlrun.import_function('hub://open_archive')
mod = mlrun.function_to_module(fn)
data = mlrun.run.get_dataitem("https://fpsignals-public.s3.amazonaws.com/catsndogs.tar.gz")
context = mlrun.get_or_create_ctx('myfunc')
mod.open_archive(context, archive_url=data)
print(context.to_yaml())
Parameters
  • code – path/url to function (.py or .ipynb or .yaml) OR function object

  • workdir – code workdir

  • secrets – secrets needed to access the URL (e.g.s3, v3io, ..)

  • silent – do not raise on errors

Returns

python module

mlrun.run.get_dataitem(url, secrets=None, db=None) mlrun.datastore.base.DataItem[source]#

get mlrun dataitem object (from path/url)

mlrun.run.get_object(url, secrets=None, size=None, offset=0, db=None)[source]#

get mlrun dataitem body (from path/url)

mlrun.run.get_or_create_ctx(name: str, event=None, spec=None, with_env: bool = True, rundb: str = '', project: str = '', upload_artifacts=False)[source]#

called from within the user program to obtain a run context

the run context is an interface for receiving parameters, data and logging run results, the run context is read from the event, spec, or environment (in that order), user can also work without a context (local defaults mode)

all results are automatically stored in the “rundb” or artifact store, the path to the rundb can be specified in the call or obtained from env.

Parameters
  • name – run name (will be overridden by context)

  • event – function (nuclio Event object)

  • spec – dictionary holding run spec

  • with_env – look for context in environment vars, default True

  • rundb – path/url to the metadata and artifact database

  • project – project to initiate the context in (by default mlrun.mlctx.default_project)

  • upload_artifacts – when using local context (not as part of a job/run), upload artifacts to the system default artifact path location

Returns

execution context

Examples:

# load MLRUN runtime context (will be set by the runtime framework e.g. KubeFlow)
context = get_or_create_ctx('train')

# get parameters from the runtime context (or use defaults)
p1 = context.get_param('p1', 1)
p2 = context.get_param('p2', 'a-string')

# access input metadata, values, files, and secrets (passwords)
print(f'Run: {context.name} (uid={context.uid})')
print(f'Params: p1={p1}, p2={p2}')
print(f'accesskey = {context.get_secret("ACCESS_KEY")}')
input_str = context.get_input('infile.txt').get()
print(f'file: {input_str}')

# RUN some useful code e.g. ML training, data prep, etc.

# log scalar result values (job result metrics)
context.log_result('accuracy', p1 * 2)
context.log_result('loss', p1 * 3)
context.set_label('framework', 'sklearn')

# log various types of artifacts (file, web page, table), will be versioned and visible in the UI
context.log_artifact('model.txt', body=b'abc is 123', labels={'framework': 'xgboost'})
context.log_artifact('results.html', body=b'<b> Some HTML <b>', viewer='web-app')
mlrun.run.get_pipeline(run_id, namespace=None, format_: Union[str, mlrun.api.schemas.pipeline.PipelinesFormat] = PipelinesFormat.summary, project: Optional[str] = None, remote: bool = True)[source]#

Get Pipeline status

Parameters
  • run_id – id of pipelines run

  • namespace – k8s namespace if not default

  • format – Format of the results. Possible values are: - summary (default value) - Return summary of the object data. - full - Return full pipeline object.

  • project – the project of the pipeline run

  • remote – read kfp data from mlrun service (default=True)

Returns

kfp run dict

mlrun.run.handler(labels: Optional[Dict[str, str]] = None, outputs: Optional[List[Optional[Union[Tuple[str, mlrun.run.ArtifactType], Tuple[str, str], Tuple[str, mlrun.run.ArtifactType, Dict[str, Any]], Tuple[str, str, Dict[str, Any]], str]]]] = None, inputs: Union[bool, Dict[str, Type]] = True)[source]#

MLRun’s handler is a decorator to wrap a function and enable setting labels, automatic mlrun.DataItem parsing and outputs logging.

Parameters
  • labels – Labels to add to the run. Expecting a dictionary with the labels names as keys. Default: None.

  • outputs

    Logging configurations for the function’s returned values. Expecting a list of tuples and None values:

    • str - A string in the format of ‘{key}:{artifact_type}’. If a string was given without ‘:’ it will

      indicate the key and the artifact type will be defaulted according to the returned value type.

    • tuple - A tuple of:

      • [0]: str - The key (name) of the artifact to use for the logged output.

      • [1]: Union[ArtifactType, str] = “result” - An ArtifactType enum or an equivalent string, that indicates how to log the returned value. The artifact types can be one of:

        • DATASET = “dataset”

        • DIRECTORY = “directory”

        • FILE = “file”

        • OBJECT = “object”

        • PLOT = “plot”

        • RESULT = “result”.

      • [2]: Optional[Dict[str, Any]] - A keyword arguments dictionary with the properties to pass to the relevant logging function (one of context.log_artifact, context.log_result, context.log_dataset).

    • None - Do not log the output.

    The list length must be equal to the total amount of returned values from the function. Default to None - meaning no outputs will be logged.

  • inputs

    Parsing configurations for the arguments passed as inputs via the run method of an MLRun function. Can be passed as a boolean value or a dictionary:

    • True - Parse all found inputs to the assigned type hint in the function’s signature. If there is no

      type hint assigned, the value will remain an mlrun.DataItem.

    • False - Do not parse inputs, leaving the inputs as mlrun.DataItem.

    • Dict[str, Type] - A dictionary with argument name as key and the expected type to parse the

      mlrun.DataItem to.

    Defaulted to True.

Example:

import mlrun

@mlrun.handler(outputs=["my_array", None, "my_multiplier"])
def my_handler(array: np.ndarray, m: int):
    array = array * m
    m += 1
    return array, "I won't be logged", m

>>> mlrun_function = mlrun.code_to_function("my_code.py", kind="job")
>>> run_object = mlrun_function.run(
...     handler="my_handler",
...     inputs={"array": "store://my_array_Artifact"},
...     params={"m": 2}
... )
>>> run_object.outputs
{'my_multiplier': 3, 'my_array': 'store://...'}
mlrun.run.import_function(url='', secrets=None, db='', project=None, new_name=None)[source]#

Create function object from DB or local/remote YAML file

Function can be imported from function repositories (mlrun marketplace or local db), or be read from a remote URL (http(s), s3, git, v3io, ..) containing the function YAML

special URLs:

function marketplace: hub://{name}[:{tag}]
local mlrun db:       db://{project-name}/{name}[:{tag}]

examples:

function = mlrun.import_function("hub://sklearn_classifier")
function = mlrun.import_function("./func.yaml")
function = mlrun.import_function("https://raw.githubusercontent.com/org/repo/func.yaml")
Parameters
  • url – path/url to marketplace, db or function YAML file

  • secrets – optional, credentials dict for DB or URL (s3, v3io, …)

  • db – optional, mlrun api/db path

  • project – optional, target project for the function

  • new_name – optional, override the imported function name

Returns

function object

mlrun.run.import_function_to_dict(url, secrets=None)[source]#

Load function spec from local/remote YAML file

mlrun.run.list_pipelines(full=False, page_token='', page_size=None, sort_by='', filter_='', namespace=None, project='*', format_: mlrun.api.schemas.pipeline.PipelinesFormat = PipelinesFormat.metadata_only) Tuple[int, Optional[int], List[dict]][source]#

List pipelines

Parameters
  • full – Deprecated, use format_ instead. if True will set format_ to full, otherwise format_ will be used

  • page_token – A page token to request the next page of results. The token is acquired from the nextPageToken field of the response from the previous call or can be omitted when fetching the first page.

  • page_size – The number of pipelines to be listed per page. If there are more pipelines than this number, the response message will contain a nextPageToken field you can use to fetch the next page.

  • sort_by – Can be format of “field_name”, “field_name asc” or “field_name desc” (Example, “name asc” or “id desc”). Ascending by default.

  • filter – A url-encoded, JSON-serialized Filter protocol buffer, see: [filter.proto](https://github.com/kubeflow/pipelines/ blob/master/backend/api/filter.proto).

  • namespace – Kubernetes namespace if other than default

  • project – Can be used to retrieve only specific project pipelines. “*” for all projects. Note that filtering by project can’t be used together with pagination, sorting, or custom filter.

  • format – Control what will be returned (full/metadata_only/name_only)

mlrun.run.load_func_code(command='', workdir=None, secrets=None, name='name')[source]#
mlrun.run.new_function(name: str = '', project: str = '', tag: str = '', kind: str = '', command: str = '', image: str = '', args: Optional[list] = None, runtime=None, mode=None, handler: Optional[str] = None, source: Optional[str] = None, requirements: Optional[Union[str, List[str]]] = None, kfp=None)[source]#

Create a new ML function from base properties

example:

# define a container based function (the `training.py` must exist in the container workdir)
f = new_function(command='training.py -x {x}', image='myrepo/image:latest', kind='job')
f.run(params={"x": 5})

# define a container based function which reads its source from a git archive
f = new_function(command='training.py -x {x}', image='myrepo/image:latest', kind='job',
                 source='git://github.com/mlrun/something.git')
f.run(params={"x": 5})

# define a local handler function (execute a local function handler)
f = new_function().run(task, handler=myfunction)
Parameters
  • name – function name

  • project – function project (none for ‘default’)

  • tag – function version tag (none for ‘latest’)

  • kind – runtime type (local, job, nuclio, spark, mpijob, dask, ..)

  • command – command/url + args (e.g.: training.py –verbose)

  • image – container image (start with ‘.’ for default registry)

  • args – command line arguments (override the ones in command)

  • runtime – runtime (job, nuclio, spark, dask ..) object/dict store runtime specific details and preferences

  • mode

    runtime mode, “args” mode will push params into command template, example:

    command=`mycode.py –x {xparam}` will substitute the {xparam} with the value of the xparam param

    ”pass” mode will run the command as is in the container (not wrapped by mlrun), the command can use

    {} for parameters like in the “args” mode

  • handler – The default function handler to call for the job or nuclio function, in batch functions (job, mpijob, ..) the handler can also be specified in the .run() command, when not specified the entire file will be executed (as main). for nuclio functions the handler is in the form of module:function, defaults to “main:handler”

  • source – valid path to git, zip, or tar file, e.g. git://github.com/mlrun/something.git, http://some/url/file.zip

  • requirements – list of python packages or pip requirements file path, defaults to None

  • kfp – reserved, flag indicating running within kubeflow pipeline

Returns

function object

mlrun.run.run_local(task=None, command='', name: str = '', args: Optional[list] = None, workdir=None, project: str = '', tag: str = '', secrets=None, handler=None, params: Optional[dict] = None, inputs: Optional[dict] = None, artifact_path: str = '', mode: Optional[str] = None, allow_empty_resources=None)[source]#

Run a task on function/code (.py, .ipynb or .yaml) locally,

example:

# define a task
task = new_task(params={'p1': 8}, out_path=out_path)
# run
run = run_local(spec, command='src/training.py', workdir='src')

or specify base task parameters (handler, params, ..) in the call:

run = run_local(handler=my_function, params={'x': 5})
Parameters
  • task – task template object or dict (see RunTemplate)

  • command – command/url/function

  • name – ad hook function name

  • args – command line arguments (override the ones in command)

  • workdir – working dir to exec in

  • project – function project (none for ‘default’)

  • tag – function version tag (none for ‘latest’)

  • secrets – secrets dict if the function source is remote (s3, v3io, ..)

  • handler – pointer or name of a function handler

  • params – input parameters (dict)

  • inputs – input objects (dict of key: path)

  • artifact_path – default artifact output path

Returns

run object

mlrun.run.run_pipeline(pipeline, arguments=None, project=None, experiment=None, run=None, namespace=None, artifact_path=None, ops=None, url=None, ttl=None, remote: bool = True)[source]#

remote KubeFlow pipeline execution

Submit a workflow task to KFP via mlrun API service

Parameters
  • pipeline – KFP pipeline function or path to .yaml/.zip pipeline file

  • arguments – pipeline arguments

  • project – name of project

  • experiment – experiment name

  • run – optional, run name

  • namespace – Kubernetes namespace (if not using default)

  • url – optional, url to mlrun API service

  • artifact_path – target location/url for mlrun artifacts

  • ops – additional operators (.apply() to all pipeline functions)

  • ttl – pipeline ttl in secs (after that the pods will be removed)

  • remote – read kfp data from mlrun service (default=True)

Returns

kubeflow pipeline id

mlrun.run.wait_for_pipeline_completion(run_id, timeout=3600, expected_statuses: Optional[List[str]] = None, namespace=None, remote=True, project: Optional[str] = None)[source]#

Wait for Pipeline status, timeout in sec

Parameters
  • run_id – id of pipelines run

  • timeout – wait timeout in sec

  • expected_statuses – list of expected statuses, one of [ Succeeded | Failed | Skipped | Error ], by default [ Succeeded ]

  • namespace – k8s namespace if not default

  • remote – read kfp data from mlrun service (default=True)

  • project – the project of the pipeline

Returns

kfp run dict

mlrun.run.wait_for_runs_completion(runs: list, sleep=3, timeout=0, silent=False)[source]#

wait for multiple runs to complete

Note: need to use watch=False in .run() so the run will not wait for completion

example:

# run two training functions in parallel and wait for the results
inputs = {'dataset': cleaned_data}
run1 = train.run(name='train_lr', inputs=inputs, watch=False,
                 params={'model_pkg_class': 'sklearn.linear_model.LogisticRegression',
                         'label_column': 'label'})
run2 = train.run(name='train_lr', inputs=inputs, watch=False,
                 params={'model_pkg_class': 'sklearn.ensemble.RandomForestClassifier',
                         'label_column': 'label'})
completed = wait_for_runs_completion([run1, run2])
Parameters
  • runs – list of run objects (the returned values of function.run())

  • sleep – time to sleep between checks (in seconds)

  • timeout – maximum time to wait in seconds (0 for unlimited)

  • silent – set to True for silent exit on timeout

Returns

list of completed runs