Source code for mlrun.execution

# Copyright 2018 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import uuid
from copy import deepcopy
from datetime import datetime
from typing import List, Union

import numpy as np

import mlrun
from mlrun.artifacts import ModelArtifact
from mlrun.datastore.store_resources import get_store_resource
from mlrun.errors import MLRunInvalidArgumentError

from .artifacts import DatasetArtifact
from .artifacts.manager import ArtifactManager, extend_artifact_path
from .datastore import store_manager
from .db import get_run_db
from .features import Feature
from .model import HyperParamOptions
from .secrets import SecretsStore
from .utils import (
    dict_to_json,
    dict_to_yaml,
    get_in,
    is_relative_path,
    logger,
    now_date,
    run_keys,
    to_date_str,
    update_in,
)


[docs]class MLClientCtx(object): """ML Execution Client Context the context is generated and injected to the function using the ``function.run()`` or manually using the :py:func:`~mlrun.run.get_or_create_ctx` call and provides an interface to use run params, metadata, inputs, and outputs base metadata include: uid, name, project, and iteration (for hyper params) users can set labels and annotations using :py:func:`~set_label`, :py:func:`~set_annotation` access parameters and secrets using :py:func:`~get_param`, :py:func:`~get_secret` access input data objects using :py:func:`~get_input` store results, artifacts, and real-time metrics using the :py:func:`~log_result`, :py:func:`~log_artifact`, :py:func:`~log_dataset` and :py:func:`~log_model` methods see doc for the individual params and methods """ kind = "run" def __init__(self, autocommit=False, tmp="", log_stream=None): self._uid = "" self.name = "" self._iteration = 0 self._project = "" self._tag = "" self._secrets_manager = SecretsStore() # runtime db service interfaces self._rundb = None self._tmpfile = tmp self._logger = log_stream or logger self._log_level = "info" self._matrics_db = None self._autocommit = autocommit self._labels = {} self._annotations = {} self._function = "" self._parameters = {} self._hyperparams = {} self._hyper_param_options = HyperParamOptions() self._in_path = "" self.artifact_path = "" self._inputs = {} self._outputs = [] self._results = {} self._state = "created" self._error = None self._commit = "" self._host = None self._start_time = now_date() self._last_update = now_date() self._iteration_results = None self._children = [] self._parent = None self._handler = None self._project_object = None self._allow_empty_resources = None def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_traceback): if exc_value: self.set_state(error=exc_value, commit=False) self.commit()
[docs] def get_child_context(self, with_parent_params=False, **params): """get child context (iteration) allow sub experiments (epochs, hyper-param, ..) under a parent will create a new iteration, log_xx will update the child only use commit_children() to save all the children and specify the best run example:: def handler(context: mlrun.MLClientCtx, data: mlrun.DataItem): df = data.as_df() best_accuracy = accuracy_sum = 0 for param in param_list: with context.get_child_context(myparam=param) as child: accuracy = child_handler(child, df, **child.parameters) accuracy_sum += accuracy child.log_result('accuracy', accuracy) if accuracy > best_accuracy: child.mark_as_best() best_accuracy = accuracy context.log_result('avg_accuracy', accuracy_sum / len(param_list)) :param params: extra (or override) params to parent context :param with_parent_params: child will copy the parent parameters and add to them :return: child context """ if self.iteration != 0: raise MLRunInvalidArgumentError( "cannot create child from a child iteration!" ) ctx = deepcopy(self.to_dict()) if not with_parent_params: update_in(ctx, ["spec", "parameters"], {}) if params: for key, val in params.items(): update_in(ctx, ["spec", "parameters", key], val) update_in(ctx, ["metadata", "iteration"], len(self._children) + 1) ctx["status"] = {} ctx = MLClientCtx.from_dict( ctx, self._rundb, self._autocommit, log_stream=self._logger ) ctx._parent = self self._children.append(ctx) return ctx
[docs] def update_child_iterations( self, best_run=0, commit_children=False, completed=True ): """update children results in the parent, and optionally mark the best :param best_run: marks the child iteration number (starts from 1) :param commit_children: commit all child runs to the db :param completed: mark children as completed """ if not self._children: return if commit_children: for child in self._children: child.commit(completed=completed) results = [child.to_dict() for child in self._children] summary, df = mlrun.runtimes.utils.results_to_iter(results, None, self) task = results[best_run - 1] if best_run else None self.log_iteration_results(best_run, summary, task) mlrun.runtimes.utils.log_iter_artifacts(self, df, summary[0])
[docs] def mark_as_best(self): """mark a child as the best iteration result, see .get_child_context()""" if not self._parent or not self._iteration: raise MLRunInvalidArgumentError( "can only mark a child run as best iteration" ) self._parent.log_iteration_results(self._iteration, None, self.to_dict())
[docs] def get_store_resource(self, url, secrets: dict = None): """get mlrun data resource (feature set/vector, artifact, item) from url example:: feature_vector = context.get_store_resource("store://feature-vectors/default/myvec") dataset = context.get_store_resource("store://artifacts/default/mydata") :param url: store resource uri/path, store://<type>/<project>/<name>:<version> types: artifacts | feature-sets | feature-vectors :param secrets: additional secrets to use when accessing the store resource """ return get_store_resource( url, db=self._rundb, secrets=self._secrets_manager, data_store_secrets=secrets, )
[docs] def get_dataitem(self, url, secrets: dict = None): """get mlrun dataitem from url example:: data = context.get_dataitem("s3://my-bucket/file.csv").as_df() :param url: data-item uri/path :param secrets: additional secrets to use when accessing the data-item """ return self._data_stores.object(url=url, secrets=secrets)
[docs] def set_logger_stream(self, stream): self._logger.replace_handler_stream("default", stream)
def _init_dbs(self, rundb): if rundb: if isinstance(rundb, str): self._rundb = get_run_db(rundb, secrets=self._secrets_manager) else: self._rundb = rundb else: self._rundb = mlrun.get_run_db() self._data_stores = store_manager.set(self._secrets_manager, db=self._rundb) self._artifacts_manager = ArtifactManager(db=self._rundb)
[docs] def get_meta(self) -> dict: """Reserved for internal use""" uri = f"{self._project}/{self.uid}" if self._project else self.uid resp = { "name": self.name, "kind": "run", "uri": uri, "owner": get_in(self._labels, "owner"), } if "workflow" in self._labels: resp["workflow"] = self._labels["workflow"] return resp
[docs] @classmethod def from_dict( cls, attrs: dict, rundb="", autocommit=False, tmp="", host=None, log_stream=None, is_api=False, update_db=True, ): """create execution context from dict""" self = cls(autocommit=autocommit, tmp=tmp, log_stream=log_stream) meta = attrs.get("metadata") if meta: self._uid = meta.get("uid", self._uid or uuid.uuid4().hex) self._iteration = meta.get("iteration", self._iteration) self.name = meta.get("name", self.name) self._project = meta.get("project", self._project) self._annotations = meta.get("annotations", self._annotations) self._labels = meta.get("labels", self._labels) spec = attrs.get("spec") if spec: self._secrets_manager = SecretsStore.from_list(spec.get(run_keys.secrets)) self._log_level = spec.get("log_level", self._log_level) self._function = spec.get("function", self._function) self._parameters = spec.get("parameters", self._parameters) self._handler = spec.get("handler", self._handler) if not self._iteration: self._hyperparams = spec.get("hyperparams", self._hyperparams) self._hyper_param_options = spec.get( "hyper_param_options", self._hyper_param_options ) if isinstance(self._hyper_param_options, dict): self._hyper_param_options = HyperParamOptions.from_dict( self._hyper_param_options ) self._outputs = spec.get("outputs", self._outputs) self._allow_empty_resources = spec.get( "allow_empty_resources", self._allow_empty_resources ) self.artifact_path = spec.get(run_keys.output_path, self.artifact_path) self._in_path = spec.get(run_keys.input_path, self._in_path) inputs = spec.get(run_keys.inputs) self._init_dbs(rundb) if spec and not is_api: # init data related objects (require DB & Secrets to be set first), skip when running in the api service self._data_stores.from_dict(spec) if inputs and isinstance(inputs, dict): for k, v in inputs.items(): if v: self._set_input(k, v) if host and not is_api: self.set_label("host", host) start = get_in(attrs, "status.start_time") if start: self._start_time = start self._state = "running" if update_db: self._update_db(commit=True) return self
@property def uid(self): """Unique run id""" if self._iteration: return f"{self._uid}-{self._iteration}" return self._uid @property def tag(self): """run tag (uid or workflow id if exists)""" return self._labels.get("workflow") or self._uid @property def iteration(self): """child iteration index, for hyper parameters""" return self._iteration @property def project(self): """project name, runs can be categorized by projects""" return self._project @property def logger(self): """built-in logger interface example:: context.logger.info("started experiment..", param=5) """ return self._logger @property def log_level(self): """get the logging level, e.g. 'debug', 'info', 'error'""" return self._log_level @log_level.setter def log_level(self, value: str): """set the logging level, e.g. 'debug', 'info', 'error'""" self._log_level = value print(f"changed log level to: {value}") @property def parameters(self): """dictionary of run parameters (read-only)""" return deepcopy(self._parameters) @property def inputs(self): """dictionary of input data items (read-only)""" return self._inputs @property def results(self): """dictionary of results (read-only)""" return deepcopy(self._results) @property def artifacts(self): """dictionary of artifacts (read-only)""" return deepcopy(self._artifacts_manager.artifact_list()) @property def in_path(self): """default input path for data objects""" return self._in_path @property def out_path(self): """default output path for artifacts""" logger.info(".out_path will soon be deprecated, use .artifact_path") return self.artifact_path
[docs] def artifact_subpath(self, *subpaths): """subpaths under output path artifacts path example:: data_path=context.artifact_subpath('data') """ return os.path.join(self.artifact_path, *subpaths)
@property def labels(self): """dictionary with labels (read-only)""" return deepcopy(self._labels)
[docs] def set_label(self, key: str, value, replace: bool = True): """set/record a specific label example:: context.set_label("framework", "sklearn") """ if replace or not self._labels.get(key): self._labels[key] = str(value)
@property def annotations(self): """dictionary with annotations (read-only)""" return deepcopy(self._annotations)
[docs] def set_annotation(self, key: str, value, replace: bool = True): """set/record a specific annotation example:: context.set_annotation("comment", "some text") """ if replace or not self._annotations.get(key): self._annotations[key] = str(value)
[docs] def get_param(self, key: str, default=None): """get a run parameter, or use the provided default if not set example:: p1 = context.get_param("p1", 0) """ if key not in self._parameters: self._parameters[key] = default if default: self._update_db() return default return self._parameters[key]
def _load_project_object(self): if not self._project_object: if not self._project: self.logger.warning("get_project_param called without a project name") return None if not self._rundb: self.logger.warning( "cannot retrieve project parameters - MLRun DB is not accessible" ) return None self._project_object = self._rundb.get_project(self._project) return self._project_object
[docs] def get_project_param(self, key: str, default=None): """get a parameter from the run's project's parameters""" if not self._load_project_object(): return default return self._project_object.get_param(key, default)
[docs] def get_secret(self, key: str): """get a key based secret e.g. DB password from the context secrets can be specified when invoking a run through vault, files, env, .. example:: access_key = context.get_secret("ACCESS_KEY") """ return mlrun.get_secret_or_env(key, secret_provider=self._secrets_manager)
def _set_input(self, key, url=""): if url is None: return if not url: url = key if self.in_path and is_relative_path(url): url = os.path.join(self._in_path, url) obj = self._data_stores.object( url, key, project=self._project, allow_empty_resources=self._allow_empty_resources, ) self._inputs[key] = obj return obj
[docs] def get_input(self, key: str, url: str = ""): """get an input :py:class:`~mlrun.DataItem` object, data objects have methods such as .get(), .download(), .url, .. to access the actual data example:: data = context.get_input("my_data").get() """ if key not in self._inputs: return self._set_input(key, url) else: return self._inputs[key]
[docs] def log_result(self, key: str, value, commit=False): """log a scalar result value example:: context.log_result('accuracy', 0.85) :param key: result key :param value: result value :param commit: commit (write to DB now vs wait for the end of the run) """ self._results[str(key)] = _cast_result(value) self._update_db(commit=commit)
[docs] def log_results(self, results: dict, commit=False): """log a set of scalar result values example:: context.log_results({'accuracy': 0.85, 'loss': 0.2}) :param results: key/value dict or results :param commit: commit (write to DB now vs wait for the end of the run) """ if not isinstance(results, dict): raise MLRunInvalidArgumentError( "(multiple) results must be in the form of dict" ) for p in results.keys(): self._results[str(p)] = _cast_result(results[p]) self._update_db(commit=commit)
[docs] def log_iteration_results(self, best, summary: list, task: dict, commit=False): """Reserved for internal use""" if best: self._results["best_iteration"] = best for k, v in get_in(task, ["status", "results"], {}).items(): self._results[k] = v for artifact in get_in(task, ["status", run_keys.artifacts], []): self._artifacts_manager.artifacts[ artifact["metadata"]["key"] ] = artifact self._artifacts_manager.link_artifact( self.project, self.name, self.tag, artifact["metadata"]["key"], self.iteration, artifact["spec"]["target_path"], link_iteration=best, db_key=artifact["spec"]["db_key"], ) if summary is not None: self._iteration_results = summary if commit: self._update_db(commit=True)
[docs] def log_metric(self, key: str, value, timestamp=None, labels=None): """TBD, log a real-time time-series metric""" labels = {} if labels is None else labels if not timestamp: timestamp = datetime.now() if self._rundb: self._rundb.store_metric({key: value}, timestamp, labels)
[docs] def log_metrics(self, keyvals: dict, timestamp=None, labels=None): """TBD, log a set of real-time time-series metrics""" labels = {} if labels is None else labels if not timestamp: timestamp = datetime.now() if self._rundb: self._rundb.store_metric(keyvals, timestamp, labels)
[docs] def log_artifact( self, item, body=None, local_path=None, artifact_path=None, tag="", viewer=None, target_path="", src_path=None, upload=None, labels=None, format=None, db_key=None, **kwargs, ): """log an output artifact and optionally upload it to datastore example:: context.log_artifact( "some-data", body=b"abc is 123", local_path="model.txt", labels={"framework": "xgboost"}, ) :param item: artifact key or artifact class () :param body: will use the body as the artifact content :param local_path: path to the local file we upload, will also be use as the destination subpath (under "artifact_path") :param artifact_path: target artifact path (when not using the default) to define a subpath under the default location use: `artifact_path=context.artifact_subpath('data')` :param tag: version tag :param viewer: kubeflow viewer type :param target_path: absolute target path (instead of using artifact_path + local_path) :param src_path: deprecated, use local_path :param upload: upload to datastore (default is True) :param labels: a set of key/value labels to tag the artifact with :param format: optional, format to use (e.g. csv, parquet, ..) :param db_key: the key to use in the artifact DB table, by default its run name + '_' + key db_key=False will not register it in the artifacts table :returns: artifact object """ local_path = src_path or local_path item = self._artifacts_manager.log_artifact( self, item, body=body, local_path=local_path, artifact_path=extend_artifact_path(artifact_path, self.artifact_path), target_path=target_path, tag=tag, viewer=viewer, upload=upload, labels=labels, db_key=db_key, format=format, **kwargs, ) self._update_db() return item
[docs] def log_dataset( self, key, df, tag="", local_path=None, artifact_path=None, upload=True, labels=None, format="", preview=None, stats=None, db_key=None, target_path="", extra_data=None, label_column: str = None, **kwargs, ): """log a dataset artifact and optionally upload it to datastore example:: raw_data = { "first_name": ["Jason", "Molly", "Tina", "Jake", "Amy"], "last_name": ["Miller", "Jacobson", "Ali", "Milner", "Cooze"], "age": [42, 52, 36, 24, 73], "testScore": [25, 94, 57, 62, 70], } df = pd.DataFrame(raw_data, columns=["first_name", "last_name", "age", "testScore"]) context.log_dataset("mydf", df=df, stats=True) :param key: artifact key :param df: dataframe object :param label_column: name of the label column (the one holding the target (y) values) :param local_path: path to the local dataframe file that exists locally. The given file extension will be used to save the dataframe to a file If the file exists, it will be uploaded to the datastore instead of the given df. :param artifact_path: target artifact path (when not using the default) to define a subpath under the default location use: `artifact_path=context.artifact_subpath('data')` :param tag: version tag :param format: optional, format to use (e.g. csv, parquet, ..) :param target_path: absolute target path (instead of using artifact_path + local_path) :param preview: number of lines to store as preview in the artifact metadata :param stats: calculate and store dataset stats in the artifact metadata :param extra_data: key/value list of extra files/charts to link with this dataset :param upload: upload to datastore (default is True) :param labels: a set of key/value labels to tag the artifact with :param db_key: the key to use in the artifact DB table, by default its run name + '_' + key db_key=False will not register it in the artifacts table :returns: artifact object """ ds = DatasetArtifact( key, df, preview=preview, extra_data=extra_data, format=format, stats=stats, label_column=label_column, **kwargs, ) item = self._artifacts_manager.log_artifact( self, ds, local_path=local_path, artifact_path=extend_artifact_path(artifact_path, self.artifact_path), target_path=target_path, tag=tag, upload=upload, db_key=db_key, labels=labels, ) self._update_db() return item
[docs] def log_model( self, key, body=None, framework="", tag="", model_dir=None, model_file=None, algorithm=None, metrics=None, parameters=None, artifact_path=None, upload=True, labels=None, inputs: List[Feature] = None, outputs: List[Feature] = None, feature_vector: str = None, feature_weights: list = None, training_set=None, label_column: Union[str, list] = None, extra_data=None, db_key=None, **kwargs, ): """log a model artifact and optionally upload it to datastore example:: context.log_model("model", body=dumps(model), model_file="model.pkl", metrics=context.results, training_set=training_df, label_column='label', feature_vector=feature_vector_uri, labels={"app": "fraud"}) :param key: artifact key or artifact class () :param body: will use the body as the artifact content :param model_file: path to the local model file we upload (see also model_dir) or to a model file data url (e.g. http://host/path/model.pkl) :param model_dir: path to the local dir holding the model file and extra files :param artifact_path: target artifact path (when not using the default) to define a subpath under the default location use: `artifact_path=context.artifact_subpath('data')` :param framework: name of the ML framework :param algorithm: training algorithm name :param tag: version tag :param metrics: key/value dict of model metrics :param parameters: key/value dict of model parameters :param inputs: ordered list of model input features (name, type, ..) :param outputs: ordered list of model output/result elements (name, type, ..) :param upload: upload to datastore (default is True) :param labels: a set of key/value labels to tag the artifact with :param feature_vector: feature store feature vector uri (store://feature-vectors/<project>/<name>[:tag]) :param feature_weights: list of feature weights, one per input column :param training_set: training set dataframe, used to infer inputs & outputs :param label_column: which columns in the training set are the label (target) columns :param extra_data: key/value list of extra files/charts to link with this dataset value can be absolute path | relative path (to model dir) | bytes | artifact object :param db_key: the key to use in the artifact DB table, by default its run name + '_' + key db_key=False will not register it in the artifacts table :returns: artifact object """ if training_set is not None and inputs: raise MLRunInvalidArgumentError( "cannot specify inputs and training set together" ) model = ModelArtifact( key, body, model_file=model_file, model_dir=model_dir, metrics=metrics, parameters=parameters, inputs=inputs, outputs=outputs, framework=framework, algorithm=algorithm, feature_vector=feature_vector, feature_weights=feature_weights, extra_data=extra_data, **kwargs, ) if training_set is not None: model.infer_from_df(training_set, label_column) item = self._artifacts_manager.log_artifact( self, model, artifact_path=extend_artifact_path(artifact_path, self.artifact_path), tag=tag, upload=upload, db_key=db_key, labels=labels, ) self._update_db() return item
[docs] def get_cached_artifact(self, key): """return an logged artifact from cache (for potential updates)""" return self._artifacts_manager.artifacts[key]
[docs] def update_artifact(self, artifact_object): """update an artifact object in the cache and the DB""" self._artifacts_manager.update_artifact(self, artifact_object)
[docs] def commit(self, message: str = "", completed=True): """save run state and optionally add a commit message :param message: commit message to save in the run :param completed: mark run as completed """ completed = completed and self._state == "running" if message: self._annotations["message"] = message if completed: self._state = "completed" if self._parent: self._parent.update_child_iterations() self._parent._last_update = now_date() self._parent._update_db(commit=True, message=message) if self._children: self.update_child_iterations(commit_children=True, completed=completed) self._last_update = now_date() self._update_db(commit=True, message=message) if completed and not self.iteration: mlrun.runtimes.utils.global_context.set(None)
[docs] def set_state(self, state: str = None, error: str = None, commit=True): """modify and store the run state or mark an error :param state: set run state :param error: error message (if exist will set the state to error) :param commit: will immediately update the state in the DB """ updates = {"status.last_update": now_date().isoformat()} if error: self._state = "error" self._error = str(error) updates["status.state"] = "error" updates["status.error"] = error elif state and state != self._state and self._state != "error": self._state = state updates["status.state"] = state self._last_update = now_date() if self._rundb and commit: self._rundb.update_run( updates, self._uid, self.project, iter=self._iteration )
[docs] def set_hostname(self, host: str): """update the hostname, for internal use""" self._host = host if self._rundb: updates = {"status.host": host} self._rundb.update_run( updates, self._uid, self.project, iter=self._iteration )
[docs] def to_dict(self): """convert the run context to a dictionary""" def set_if_valid(struct, key, val): if val: struct[key] = val struct = { "kind": "run", "metadata": { "name": self.name, "uid": self._uid, "iteration": self._iteration, "project": self._project, "labels": self._labels, "annotations": self._annotations, }, "spec": { "function": self._function, "log_level": self._log_level, "parameters": self._parameters, "handler": self._handler, "outputs": self._outputs, run_keys.output_path: self.artifact_path, run_keys.inputs: {k: v.artifact_url for k, v in self._inputs.items()}, }, "status": { "state": self._state, "results": self._results, "start_time": to_date_str(self._start_time), "last_update": to_date_str(self._last_update), }, } if not self._iteration: struct["spec"]["hyperparams"] = self._hyperparams struct["spec"]["hyper_param_options"] = self._hyper_param_options.to_dict() set_if_valid(struct["status"], "error", self._error) set_if_valid(struct["status"], "commit", self._commit) if self._iteration_results: struct["status"]["iterations"] = self._iteration_results struct["status"][run_keys.artifacts] = self._artifacts_manager.artifact_list() self._data_stores.to_dict(struct["spec"]) return struct
[docs] def to_yaml(self): """convert the run context to a yaml buffer""" return dict_to_yaml(self.to_dict())
[docs] def to_json(self): """convert the run context to a json buffer""" return dict_to_json(self.to_dict())
def _update_db(self, commit=False, message=""): self.last_update = now_date() if self._tmpfile: data = self.to_json() with open(self._tmpfile, "w") as fp: fp.write(data) fp.close() if commit or self._autocommit: self._commit = message if self._rundb: self._rundb.store_run( self.to_dict(), self._uid, self.project, iter=self._iteration )
def _cast_result(value): if isinstance(value, (int, str, float)): return value if isinstance(value, list): return [_cast_result(v) for v in value] if isinstance(value, dict): return {k: _cast_result(v) for k, v in value.items()} if isinstance(value, (np.int64, np.integer)): return int(value) if isinstance(value, (np.floating, np.float64)): return float(value) if isinstance(value, np.ndarray): return value.tolist() return str(value)