mlrun.run
mlrun.run#
- class mlrun.run.RunStatuses[source]#
Bases:
object
- error = 'Error'#
- failed = 'Failed'#
- running = 'Running'#
- skipped = 'Skipped'#
- succeeded = 'Succeeded'#
- mlrun.run.code_to_function(name: str = '', project: str = '', tag: str = '', filename: str = '', handler: str = '', kind: str = '', image: Optional[str] = None, code_output: str = '', embed_code: bool = True, description: str = '', requirements: Optional[Union[str, List[str]]] = None, categories: Optional[List[str]] = None, labels: Optional[Dict[str, str]] = None, with_doc: bool = True, ignored_tags=None) Union[mlrun.runtimes.mpijob.v1alpha1.MpiRuntimeV1Alpha1, mlrun.runtimes.mpijob.v1.MpiRuntimeV1, mlrun.runtimes.function.RemoteRuntime, mlrun.runtimes.serving.ServingRuntime, mlrun.runtimes.daskjob.DaskCluster, mlrun.runtimes.kubejob.KubejobRuntime, mlrun.runtimes.local.LocalRuntime, mlrun.runtimes.sparkjob.spark3job.Spark3Runtime, mlrun.runtimes.remotesparkjob.RemoteSparkRuntime] [source]#
Convenience function to insert code and configure an mlrun runtime.
Easiest way to construct a runtime type object. Provides the most often used configuration options for all runtimes as parameters.
Instantiated runtimes are considered ‘functions’ in mlrun, but they are anything from nuclio functions to generic kubernetes pods to spark jobs. Functions are meant to be focused, and as such limited in scope and size. Typically a function can be expressed in a single python module with added support from custom docker images and commands for the environment. The returned runtime object can be further configured if more customization is required.
One of the most important parameters is ‘kind’. This is what is used to specify the chosen runtimes. The options are:
local: execute a local python or shell script
job: insert the code into a Kubernetes pod and execute it
nuclio: insert the code into a real-time serverless nuclio function
serving: insert code into orchestrated nuclio function(s) forming a DAG
dask: run the specified python code / script as Dask Distributed job
mpijob: run distributed Horovod jobs over the MPI job operator
spark: run distributed Spark job using Spark Kubernetes Operator
remote-spark: run distributed Spark job on remote Spark service
Learn more about function runtimes here: https://docs.mlrun.org/en/latest/runtimes/functions.html#function-runtimes
- Parameters
name – function name, typically best to use hyphen-case
project – project used to namespace the function, defaults to ‘default’
tag – function tag to track multiple versions of the same function, defaults to ‘latest’
filename – path to .py/.ipynb file, defaults to current jupyter notebook
handler – The default function handler to call for the job or nuclio function, in batch functions (job, mpijob, ..) the handler can also be specified in the .run() command, when not specified the entire file will be executed (as main). for nuclio functions the handler is in the form of module:function, defaults to ‘main:handler’
kind – function runtime type string - nuclio, job, etc. (see docstring for all options)
image – base docker image to use for building the function container, defaults to None
code_output – specify ‘.’ to generate python module from the current jupyter notebook
embed_code – indicates whether or not to inject the code directly into the function runtime spec, defaults to True
description – short function description, defaults to ‘’
requirements – list of python packages or pip requirements file path, defaults to None
categories – list of categories for mlrun Function Hub, defaults to None
labels – immutable name/value pairs to tag the function with useful metadata, defaults to None
with_doc – indicates whether to document the function parameters, defaults to True
ignored_tags – notebook cells to ignore when converting notebooks to py code (separated by ‘;’)
- Returns
pre-configured function object from a mlrun runtime class
example:
import mlrun # create job function object from notebook code and add doc/metadata fn = mlrun.code_to_function("file_utils", kind="job", handler="open_archive", image="mlrun/mlrun", description = "this function opens a zip archive into a local/mounted folder", categories = ["fileutils"], labels = {"author": "me"})
example:
import mlrun from pathlib import Path # create file Path("mover.py").touch() # create nuclio function object from python module call mover.py fn = mlrun.code_to_function("nuclio-mover", kind="nuclio", filename="mover.py", image="python:3.7", description = "this function moves files from one system to another", requirements = ["pandas"], labels = {"author": "me"})
- mlrun.run.download_object(url, target, secrets=None)[source]#
download mlrun dataitem (from path/url to target path)
- mlrun.run.function_to_module(code='', workdir=None, secrets=None, silent=False)[source]#
Load code, notebook or mlrun function as .py module this function can import a local/remote py file or notebook or load an mlrun function object as a module, you can use this from your code, notebook, or another function (for common libs)
Note: the function may have package requirements which must be satisfied
example:
mod = mlrun.function_to_module('./examples/training.py') task = mlrun.new_task(inputs={'infile.txt': '../examples/infile.txt'}) context = mlrun.get_or_create_ctx('myfunc', spec=task) mod.my_job(context, p1=1, p2='x') print(context.to_yaml()) fn = mlrun.import_function('hub://open-archive') mod = mlrun.function_to_module(fn) data = mlrun.run.get_dataitem("https://fpsignals-public.s3.amazonaws.com/catsndogs.tar.gz") context = mlrun.get_or_create_ctx('myfunc') mod.open_archive(context, archive_url=data) print(context.to_yaml())
- Parameters
code – path/url to function (.py or .ipynb or .yaml) OR function object
workdir – code workdir
secrets – secrets needed to access the URL (e.g.s3, v3io, ..)
silent – do not raise on errors
- Returns
python module
- mlrun.run.get_dataitem(url, secrets=None, db=None) mlrun.datastore.base.DataItem [source]#
get mlrun dataitem object (from path/url)
- mlrun.run.get_object(url, secrets=None, size=None, offset=0, db=None)[source]#
get mlrun dataitem body (from path/url)
- mlrun.run.get_or_create_ctx(name: str, event=None, spec=None, with_env: bool = True, rundb: str = '', project: str = '', upload_artifacts=False)[source]#
called from within the user program to obtain a run context
the run context is an interface for receiving parameters, data and logging run results, the run context is read from the event, spec, or environment (in that order), user can also work without a context (local defaults mode)
all results are automatically stored in the “rundb” or artifact store, the path to the rundb can be specified in the call or obtained from env.
- Parameters
name – run name (will be overridden by context)
event – function (nuclio Event object)
spec – dictionary holding run spec
with_env – look for context in environment vars, default True
rundb – path/url to the metadata and artifact database
project – project to initiate the context in (by default mlrun.mlctx.default_project)
upload_artifacts – when using local context (not as part of a job/run), upload artifacts to the system default artifact path location
- Returns
execution context
Examples:
# load MLRUN runtime context (will be set by the runtime framework e.g. KubeFlow) context = get_or_create_ctx('train') # get parameters from the runtime context (or use defaults) p1 = context.get_param('p1', 1) p2 = context.get_param('p2', 'a-string') # access input metadata, values, files, and secrets (passwords) print(f'Run: {context.name} (uid={context.uid})') print(f'Params: p1={p1}, p2={p2}') print(f'accesskey = {context.get_secret("ACCESS_KEY")}') input_str = context.get_input('infile.txt').get() print(f'file: {input_str}') # RUN some useful code e.g. ML training, data prep, etc. # log scalar result values (job result metrics) context.log_result('accuracy', p1 * 2) context.log_result('loss', p1 * 3) context.set_label('framework', 'sklearn') # log various types of artifacts (file, web page, table), will be versioned and visible in the UI context.log_artifact('model.txt', body=b'abc is 123', labels={'framework': 'xgboost'}) context.log_artifact('results.html', body=b'<b> Some HTML <b>', viewer='web-app')
- mlrun.run.get_pipeline(run_id, namespace=None, format_: Union[str, mlrun.common.schemas.pipeline.PipelinesFormat] = PipelinesFormat.summary, project: Optional[str] = None, remote: bool = True)[source]#
Get Pipeline status
- Parameters
run_id – id of pipelines run
namespace – k8s namespace if not default
format – Format of the results. Possible values are: -
summary
(default value) - Return summary of the object data. -full
- Return full pipeline object.project – the project of the pipeline run
remote – read kfp data from mlrun service (default=True)
- Returns
kfp run dict
- mlrun.run.handler(labels: Optional[Dict[str, str]] = None, outputs: Optional[List[Union[str, Dict[str, str]]]] = None, inputs: Union[bool, Dict[str, Union[str, Type]]] = True)[source]#
MLRun’s handler is a decorator to wrap a function and enable setting labels, automatic mlrun.DataItem parsing and outputs logging.
- Parameters
labels – Labels to add to the run. Expecting a dictionary with the labels names as keys. Default: None.
outputs –
Logging configurations for the function’s returned values. Expecting a list of tuples and None values:
str - A string in the format of ‘{key}:{artifact_type}’. If a string was given without ‘:’ it will indicate the key and the artifact type will be according to the returned value type. The artifact types can be one of: “dataset”, “directory”, “file”, “object”, “plot” and “result”.
Dict[str, str] - A dictionary of logging configuration. the key ‘key’ is mandatory for the logged artifact key.
None - Do not log the output.
The list length must be equal to the total amount of returned values from the function. Default is None - meaning no outputs will be logged.
inputs –
Parsing configurations for the arguments passed as inputs via the run method of an MLRun function. Can be passed as a boolean value or a dictionary:
True - Parse all found inputs to the assigned type hint in the function’s signature. If there is no type hint assigned, the value will remain an mlrun.DataItem.
False - Do not parse inputs, leaving the inputs as mlrun.DataItem.
Dict[str, Union[Type, str]] - A dictionary with argument name as key and the expected type to parse the mlrun.DataItem to. The expected type can be a string as well, idicating the full module path.
Notice: Type hints from the typing module (e.g. typing.Optional, typing.Union, typing.List etc.) are currently not supported but will be in the future.
Default: True.
Example:
import mlrun @mlrun.handler(outputs=["my_array", None, "my_multiplier"]) def my_handler(array: np.ndarray, m: int): array = array * m m += 1 return array, "I won't be logged", m >>> mlrun_function = mlrun.code_to_function("my_code.py", kind="job") >>> run_object = mlrun_function.run( ... handler="my_handler", ... inputs={"array": "store://my_array_Artifact"}, ... params={"m": 2} ... ) >>> run_object.outputs {'my_multiplier': 3, 'my_array': 'store://...'}
- mlrun.run.import_function(url='', secrets=None, db='', project=None, new_name=None)[source]#
Create function object from DB or local/remote YAML file
Functions can be imported from function repositories (mlrun Function Hub (formerly Marketplace) or local db), or be read from a remote URL (http(s), s3, git, v3io, ..) containing the function YAML
special URLs:
function hub: hub://{name}[:{tag}] local mlrun db: db://{project-name}/{name}[:{tag}]
examples:
function = mlrun.import_function("hub://auto-trainer") function = mlrun.import_function("./func.yaml") function = mlrun.import_function("https://raw.githubusercontent.com/org/repo/func.yaml")
- Parameters
url – path/url to Function Hub, db or function YAML file
secrets – optional, credentials dict for DB or URL (s3, v3io, …)
db – optional, mlrun api/db path
project – optional, target project for the function
new_name – optional, override the imported function name
- Returns
function object
- mlrun.run.import_function_to_dict(url, secrets=None)[source]#
Load function spec from local/remote YAML file
- mlrun.run.list_pipelines(full=False, page_token='', page_size=None, sort_by='', filter_='', namespace=None, project='*', format_: mlrun.common.schemas.pipeline.PipelinesFormat = PipelinesFormat.metadata_only) Tuple[int, Optional[int], List[dict]] [source]#
List pipelines
- Parameters
full – Deprecated, use format_ instead. if True will set format_ to full, otherwise format_ will be used
page_token – A page token to request the next page of results. The token is acquired from the nextPageToken field of the response from the previous call or can be omitted when fetching the first page.
page_size – The number of pipelines to be listed per page. If there are more pipelines than this number, the response message will contain a nextPageToken field you can use to fetch the next page.
sort_by – Can be format of “field_name”, “field_name asc” or “field_name desc” (Example, “name asc” or “id desc”). Ascending by default.
filter – A url-encoded, JSON-serialized Filter protocol buffer, see: [filter.proto](https://github.com/kubeflow/pipelines/ blob/master/backend/api/filter.proto).
namespace – Kubernetes namespace if other than default
project – Can be used to retrieve only specific project pipelines. “*” for all projects. Note that filtering by project can’t be used together with pagination, sorting, or custom filter.
format – Control what will be returned (full/metadata_only/name_only)
- mlrun.run.new_function(name: str = '', project: str = '', tag: str = '', kind: str = '', command: str = '', image: str = '', args: Optional[list] = None, runtime=None, mode=None, handler: Optional[str] = None, source: Optional[str] = None, requirements: Optional[Union[str, List[str]]] = None, kfp=None)[source]#
Create a new ML function from base properties
example:
# define a container based function (the `training.py` must exist in the container workdir) f = new_function(command='training.py -x {x}', image='myrepo/image:latest', kind='job') f.run(params={"x": 5}) # define a container based function which reads its source from a git archive f = new_function(command='training.py -x {x}', image='myrepo/image:latest', kind='job', source='git://github.com/mlrun/something.git') f.run(params={"x": 5}) # define a local handler function (execute a local function handler) f = new_function().run(task, handler=myfunction)
- Parameters
name – function name
project – function project (none for ‘default’)
tag – function version tag (none for ‘latest’)
kind – runtime type (local, job, nuclio, spark, mpijob, dask, ..)
command – command/url + args (e.g.: training.py –verbose)
image – container image (start with ‘.’ for default registry)
args – command line arguments (override the ones in command)
runtime – runtime (job, nuclio, spark, dask ..) object/dict store runtime specific details and preferences
mode –
- runtime mode, “args” mode will push params into command template, example:
command=`mycode.py –x {xparam}` will substitute the {xparam} with the value of the xparam param
- ”pass” mode will run the command as is in the container (not wrapped by mlrun), the command can use
{} for parameters like in the “args” mode
handler – The default function handler to call for the job or nuclio function, in batch functions (job, mpijob, ..) the handler can also be specified in the .run() command, when not specified the entire file will be executed (as main). for nuclio functions the handler is in the form of module:function, defaults to “main:handler”
source – valid absolute path or URL to git, zip, or tar file, e.g. git://github.com/mlrun/something.git, http://some/url/file.zip note path source must exist on the image or exist locally when run is local (it is recommended to use ‘function.spec.workdir’ when source is a filepath instead)
requirements – list of python packages or pip requirements file path, defaults to None
kfp – reserved, flag indicating running within kubeflow pipeline
- Returns
function object
- mlrun.run.run_local(task=None, command='', name: str = '', args: Optional[list] = None, workdir=None, project: str = '', tag: str = '', secrets=None, handler=None, params: Optional[dict] = None, inputs: Optional[dict] = None, artifact_path: str = '', mode: Optional[str] = None, allow_empty_resources=None, notifications: Optional[List[mlrun.model.Notification]] = None, returns: Optional[list] = None)[source]#
Run a task on function/code (.py, .ipynb or .yaml) locally,
example:
# define a task task = new_task(params={'p1': 8}, out_path=out_path) # run run = run_local(spec, command='src/training.py', workdir='src')
or specify base task parameters (handler, params, ..) in the call:
run = run_local(handler=my_function, params={'x': 5})
- Parameters
task – task template object or dict (see RunTemplate)
command – command/url/function
name – ad hook function name
args – command line arguments (override the ones in command)
workdir – working dir to exec in
project – function project (none for ‘default’)
tag – function version tag (none for ‘latest’)
secrets – secrets dict if the function source is remote (s3, v3io, ..)
handler – pointer or name of a function handler
params – input parameters (dict)
inputs – Input objects to pass to the handler. Type hints can be given so the input will be parsed during runtime from mlrun.DataItem to the given type hint. The type hint can be given in the key field of the dictionary after a colon, e.g: “<key> : <type_hint>”.
artifact_path – default artifact output path
mode – Runtime mode for more details head to mlrun.new_function
allow_empty_resources –
Allow passing non materialized set/vector as input to jobs (allows to have function which don’t depend on having targets, e.g a function which accepts a feature vector uri and generate
the offline vector e.g. parquet_ for it if it doesn’t exist)
returns –
List of configurations for how to log the returning values from the handler’s run (as artifacts or results). The list’s length must be equal to the amount of returning objects. A configuration may be given as:
A string of the key to use to log the returning value as result or as an artifact. To specify The artifact type, it is possible to pass a string in the following structure: “<key> : <type>”. Available artifact types can be seen in mlrun.ArtifactType. If no artifact type is specified, the object’s default artifact type will be used.
A dictionary of configurations to use when logging. Further info per object type and artifact type can be given there. The artifact key must appear in the dictionary as “key”: “the_key”.
- Returns
run object
- mlrun.run.run_pipeline(pipeline, arguments=None, project=None, experiment=None, run=None, namespace=None, artifact_path=None, ops=None, url=None, ttl=None, remote: bool = True, cleanup_ttl=None)[source]#
remote KubeFlow pipeline execution
Submit a workflow task to KFP via mlrun API service
- Parameters
pipeline – KFP pipeline function or path to .yaml/.zip pipeline file
arguments – pipeline arguments
project – name of project
experiment – experiment name
run – optional, run name
namespace – Kubernetes namespace (if not using default)
url – optional, url to mlrun API service
artifact_path – target location/url for mlrun artifacts
ops – additional operators (.apply() to all pipeline functions)
ttl – pipeline cleanup ttl in secs (time to wait after workflow completion, at which point the workflow and all its resources are deleted) (deprecated, use cleanup_ttl instead)
remote – read kfp data from mlrun service (default=True). Run pipeline from local kfp data (remote=False) is deprecated. Should not be used
cleanup_ttl – pipeline cleanup ttl in secs (time to wait after workflow completion, at which point the workflow and all its resources are deleted)
- Returns
kubeflow pipeline id
- mlrun.run.wait_for_pipeline_completion(run_id, timeout=3600, expected_statuses: Optional[List[str]] = None, namespace=None, remote=True, project: Optional[str] = None)[source]#
Wait for Pipeline status, timeout in sec
- Parameters
run_id – id of pipelines run
timeout – wait timeout in sec
expected_statuses – list of expected statuses, one of [ Succeeded | Failed | Skipped | Error ], by default [ Succeeded ]
namespace – k8s namespace if not default
remote – read kfp data from mlrun service (default=True)
project – the project of the pipeline
- Returns
kfp run dict
- mlrun.run.wait_for_runs_completion(runs: list, sleep=3, timeout=0, silent=False)[source]#
wait for multiple runs to complete
Note: need to use watch=False in .run() so the run will not wait for completion
example:
# run two training functions in parallel and wait for the results inputs = {'dataset': cleaned_data} run1 = train.run(name='train_lr', inputs=inputs, watch=False, params={'model_pkg_class': 'sklearn.linear_model.LogisticRegression', 'label_column': 'label'}) run2 = train.run(name='train_lr', inputs=inputs, watch=False, params={'model_pkg_class': 'sklearn.ensemble.RandomForestClassifier', 'label_column': 'label'}) completed = wait_for_runs_completion([run1, run2])
- Parameters
runs – list of run objects (the returned values of function.run())
sleep – time to sleep between checks (in seconds)
timeout – maximum time to wait in seconds (0 for unlimited)
silent – set to True for silent exit on timeout
- Returns
list of completed runs