mlrun.model#

class mlrun.model.DataSource(name: str | None = None, path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time: datetime | str | None = None, end_time: datetime | str | None = None)[source]#

Bases: ModelObj

online or offline data source spec

class mlrun.model.DataTarget(kind: str | None = None, name: str = '', path=None, online=None)[source]#

Bases: DataTargetBase

data target with extra status information (used in the feature-set/vector status)

class mlrun.model.DataTargetBase(kind: str | None = None, name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#

Bases: ModelObj

data target spec, specify a destination for the feature set data

classmethod from_dict(struct=None, fields=None, deprecated_fields: dict | None = None)[source]#

create an object from a python dictionary

class mlrun.model.FeatureSetProducer(kind=None, name=None, uri=None, owner=None, sources=None)[source]#

Bases: ModelObj

information about the task/job which produced the feature set data

class mlrun.model.HyperParamOptions(param_file=None, strategy: HyperParamStrategies | None = None, selector=None, stop_condition=None, parallel_runs=None, dask_cluster_uri=None, max_iterations=None, max_errors=None, teardown_dask=None)[source]#

Bases: ModelObj

Hyper Parameter Options

Parameters:
  • param_file (str) -- hyper params input file path/url, instead of inline

  • strategy (HyperParamStrategies) -- hyper param strategy - grid, list or random

  • selector (str) -- selection criteria for best result ([min|max.]<result>), e.g. max.accuracy

  • stop_condition (str) -- early stop condition e.g. "accuracy > 0.9"

  • parallel_runs (int) -- number of param combinations to run in parallel (over Dask)

  • dask_cluster_uri (str) -- db uri for a deployed dask cluster function, e.g. db://myproject/dask

  • max_iterations (int) -- max number of runs (in random strategy)

  • max_errors (int) -- max number of child runs errors for the overall job to fail

  • teardown_dask (bool) -- kill the dask cluster pods after the runs

class mlrun.model.Notification(kind: NotificationKind = NotificationKind.slack, name=None, message=None, severity: NotificationSeverity = NotificationSeverity.INFO, when=None, condition=None, secret_params=None, params=None, status=None, sent_time=None, reason=None)[source]#

Bases: ModelObj

Notification object

Parameters:
  • kind -- notification implementation kind - slack, webhook, etc. See mlrun.common.schemas.notification.NotificationKind

  • name -- for logging and identification

  • message -- message content in the notification

  • severity -- severity to display in the notification

  • when -- list of statuses to trigger the notification: 'running', 'completed', 'error'

  • condition -- optional condition to trigger the notification, a jinja2 expression that can use run data to evaluate if the notification should be sent in addition to the 'when' statuses. e.g.: '{{ run["status"]["results"]["accuracy"] < 0.9}}'

  • params -- Implementation specific parameters for the notification implementation (e.g. slack webhook url, git repository details, etc.)

  • secret_params -- secret parameters for the notification implementation, same as params but will be stored in a k8s secret and passed as a secret reference to the implementation.

  • status -- notification status - pending, sent, error

  • sent_time -- time the notification was sent

  • reason -- failure reason if the notification failed to send

enrich_unmasked_secret_params_from_project_secret()[source]#

Fill the notification secret params from the project secret. We are using this function instead of unmask_secret_params_from_project_secret when we run inside the workflow runner pod that doesn't have access to the k8s secrets (but have access to the project secret)

static validate_notification_uniqueness(notifications: list['Notification'])[source]#

Validate that all notifications in the list are unique by name

class mlrun.model.RunMetadata(uid=None, name=None, project=None, labels=None, annotations=None, iteration=None)[source]#

Bases: ModelObj

Run metadata

class mlrun.model.RunObject(spec: RunSpec | None = None, metadata: RunMetadata | None = None, status: RunStatus | None = None)[source]#

Bases: RunTemplate

A run

abort()[source]#

abort the run

artifact(key: str) DataItem[source]#

Return artifact DataItem by key.

This method waits for the outputs to complete, searches for the artifact matching the given key, and returns a DataItem if the artifact is found.

Parameters:

key -- The key of the artifact to find.

Returns:

A DataItem corresponding to the artifact with the given key, or None if no such artifact is found.

property error: str#

error string if failed

logs(watch=True, db=None, offset=0)[source]#

return or watch on the run logs

output(key: str)[source]#

Return the value of a specific result or artifact by key.

This method waits for the outputs to complete and retrieves the value corresponding to the provided key. If the key exists in the results, it returns the corresponding result value. If not found in results, it attempts to fetch the artifact by key (cached in the run status). If the artifact is not found, it tries to fetch the artifact URI by key. If no artifact or result is found for the key, returns None.

Parameters:

key -- The key of the result or artifact to retrieve.

Returns:

The value of the result or the artifact URI corresponding to the key, or None if not found.

property outputs#

Return a dictionary of outputs, including result values and artifact URIs.

This method waits for the outputs to complete and combines result values and artifact URIs into a single dictionary. If there are multiple artifacts for the same key, only include the artifact that does not have the "latest" tag. If there is no other tag, include the "latest" tag as a fallback.

Returns:

Dictionary containing result values and artifact URIs.

static parse_uri(uri: str) tuple[str, str, str, str][source]#

Parse the run's uri

Parameters:

uri -- run uri in the format of <project>@<uid>#<iteration>[:tag]

Returns:

project, uid, iteration, tag

refresh()[source]#

refresh run state from the db

show()[source]#

show the current status widget, in jupyter notebook

state()[source]#

current run state

to_json(exclude=None, **kwargs)[source]#

convert the object to json

Parameters:
  • exclude -- list of fields to exclude from the json

  • strip -- if True, strip fields that are not required for actually define the object

property ui_url: str#

UI URL (for relevant runtimes)

uid()[source]#

run unique id

wait_for_completion(sleep=3, timeout=0, raise_on_failure=True, show_logs=None, logs_interval=None)[source]#

Wait for remote run to complete. Default behavior is to wait until reached terminal state or timeout passed, if timeout is 0 then wait forever It pulls the run status from the db every sleep seconds. If show_logs is not False and logs_interval is not None, it will print the logs when run reached terminal state If show_logs is not False and logs_interval is defined, it will print the logs every logs_interval seconds if show_logs is False it will not print the logs, will still pull the run state until it reaches terminal state

class mlrun.model.RunSpec(parameters=None, hyperparams=None, param_file=None, selector=None, handler=None, inputs=None, outputs=None, input_path=None, output_path=None, function=None, secret_sources=None, data_stores=None, strategy=None, verbose=None, scrape_metrics=None, hyper_param_options=None, allow_empty_resources=None, inputs_type_hints=None, returns=None, notifications=None, state_thresholds=None, reset_on_run=None, node_selector=None)[source]#

Bases: ModelObj

Run specification

extract_type_hints_from_inputs()[source]#

This method extracts the type hints from the input keys in the input dictionary.

As a result, after the method ran the inputs dictionary - a dictionary of parameter names as keys and paths as values, will be cleared from type hints and the extracted type hints will be saved in the spec's inputs type hints dictionary - a dictionary of parameter names as keys and their type hints as values. If a parameter is not in the type hints dictionary, its type hint will be mlrun.DataItem by default.

property inputs: dict[str, str]#

Get the inputs dictionary. A dictionary of parameter names as keys and paths as values.

Returns:

The inputs dictionary.

property inputs_type_hints: dict[str, str]#

Get the input type hints. A dictionary of parameter names as keys and their type hints as values.

Returns:

The input type hints dictionary.

static join_outputs_and_returns(outputs: list[str], returns: list[Union[str, dict[str, str]]]) list[str][source]#

Get the outputs set in the spec. The outputs are constructed from both the 'outputs' and 'returns' properties that were set by the user.

Parameters:
  • outputs -- A spec outputs property - list of output keys.

  • returns -- A spec returns property - list of key and configuration of how to log returning values.

Returns:

The joined 'outputs' and 'returns' list.

property outputs: list[str]#

Get the expected outputs. The list is constructed from keys of both the outputs and returns properties.

Returns:

The expected outputs list.

property returns#

Get the returns list. A list of log hints for returning values.

Returns:

The returns list.

class mlrun.model.RunStatus(state=None, error=None, host=None, commit=None, status_text=None, results=None, artifacts=None, start_time=None, last_update=None, iterations=None, ui_url=None, reason: str | None = None, notifications: dict[str, mlrun.model.Notification] | None = None, artifact_uris: dict[str, str] | None = None)[source]#

Bases: ModelObj

Run status

is_failed() bool | None[source]#

This method returns whether a run has failed. Returns none if state has yet to be defined. callee is responsible for handling None. (e.g wait for state to be defined)

class mlrun.model.RunTemplate(spec: RunSpec | None = None, metadata: RunMetadata | None = None)[source]#

Bases: ModelObj

Run template

set_label(key, value)[source]#

set a key/value label for the task

with_hyper_params(hyperparams, selector=None, strategy: HyperParamStrategies | None = None, **options)[source]#

set hyper param values and configurations, see parameters in: HyperParamOptions

example:

grid_params = {"p1": [2, 4, 1], "p2": [10, 20]}
task = mlrun.new_task("grid-search")
task.with_hyper_params(grid_params, selector="max.accuracy")
with_input(key, path)[source]#

set task data input, path is an Mlrun global DataItem uri

examples:

task.with_input("data", "/file-dir/path/to/file")
task.with_input("data", "s3://<bucket>/path/to/file")
task.with_input("data", "v3io://<data-container>/path/to/file")
with_param_file(param_file, selector=None, strategy: HyperParamStrategies | None = None, **options)[source]#

set hyper param values (from a file url) and configurations, see parameters in: HyperParamOptions

example:

grid_params = "s3://<my-bucket>/path/to/params.json"
task = mlrun.new_task("grid-search")
task.with_param_file(grid_params, selector="max.accuracy")
with_params(**kwargs)[source]#

set task parameters using key=value, key2=value2, ..

with_secrets(kind, source)[source]#

register a secrets source (file, env or dict)

read secrets from a source provider to be used in workflows, example:

task.with_secrets('file', 'file.txt')
task.with_secrets('inline', {'key': 'val'})
task.with_secrets('env', 'ENV1,ENV2')

task.with_secrets('vault', ['secret1', 'secret2'...])

# If using with k8s secrets, the k8s secret is managed by MLRun, through the project-secrets
# mechanism. The secrets will be attached to the running pod as environment variables.
task.with_secrets('kubernetes', ['secret1', 'secret2'])

# If using an empty secrets list [] then all accessible secrets will be available.
task.with_secrets('vault', [])

# To use with Azure key vault, a k8s secret must be created with the following keys:
# kubectl -n <namespace> create secret generic azure-key-vault-secret \
#     --from-literal=tenant_id=<service principal tenant ID> \
#     --from-literal=client_id=<service principal client ID> \
#     --from-literal=secret=<service principal secret key>

task.with_secrets('azure_vault', {
    'name': 'my-vault-name',
    'k8s_secret': 'azure-key-vault-secret',
    # An empty secrets list may be passed ('secrets': []) to access all vault secrets.
    'secrets': ['secret1', 'secret2'...]
})
Parameters:
  • kind -- secret type (file, inline, env)

  • source -- secret data or link (see example)

Returns:

The RunTemplate object

class mlrun.model.TargetPathObject(base_path=None, run_id=None, is_single_file=False)[source]#

Bases: object

Class configuring the target path This class will take consideration of a few parameters to create the correct end result path:

  • run_id:

    if run_id is provided target will be considered as run_id mode which require to contain a {run_id} place holder in the path.

  • is_single_file:

    if true then run_id must be the directory containing the output file or generated before the file name (run_id/output.file).

  • base_path:

    if contains the place holder for run_id, run_id must not be None. if run_id passed and place holder doesn't exist the place holder will be generated in the correct place.

mlrun.model.new_task(name=None, project=None, handler=None, params=None, hyper_params=None, param_file=None, selector=None, hyper_param_options=None, inputs=None, outputs=None, in_path=None, out_path=None, artifact_path=None, secrets=None, base=None, returns=None) RunTemplate[source]#

Creates a new task

Parameters:
  • name -- task name

  • project -- task project

  • handler -- code entry-point/handler name

  • params -- input parameters (dict)

  • hyper_params -- dictionary of hyper parameters and list values, each hyper param holds a list of values, the run will be executed for every parameter combination (GridSearch)

  • param_file -- a csv file with parameter combinations, first row hold the parameter names, following rows hold param values

  • selector -- selection criteria for hyper params e.g. "max.accuracy"

  • hyper_param_options -- hyper parameter options, see: HyperParamOptions

  • inputs -- dictionary of input objects + optional paths (if path is omitted the path will be the in_path/key)

  • outputs -- dictionary of input objects + optional paths (if path is omitted the path will be the out_path/key)

  • in_path -- default input path/url (prefix) for inputs

  • out_path -- default output path/url (prefix) for artifacts

  • artifact_path -- default artifact output path

  • secrets -- extra secrets specs, will be injected into the runtime e.g. ['file=<filename>', 'env=ENV_KEY1,ENV_KEY2']

  • base -- task instance to use as a base instead of a fresh new task instance

  • returns --

    List of log hints - configurations for how to log the returning values from the handler's run (as artifacts or results). The list's length must be equal to the amount of returning objects. A log hint may be given as:

    • A string of the key to use to log the returning value as result or as an artifact. To specify The artifact type, it is possible to pass a string in the following structure: "<key> : <type>". Available artifact types can be seen in mlrun.ArtifactType. If no artifact type is specified, the object's default artifact type will be used.

    • A dictionary of configurations to use when logging. Further info per object type and artifact type can be given there. The artifact key must appear in the dictionary as "key": "the_key".