mlrun.model_monitoring#

mlrun.model_monitoring.api.get_or_create_model_endpoint(project: str, model_endpoint_name: str, model_path: str = '', endpoint_id: str = '', function_name: str = '', function_tag: str = 'latest', context: MLClientCtx | None = None, sample_set_statistics: dict[str, Any] | None = None, monitoring_mode: ModelMonitoringMode = ModelMonitoringMode.enabled, db_session=None, feature_analysis: bool = False) ModelEndpoint[source]#

Get a single model endpoint object. If not exist, generate a new model endpoint with the provided parameters. Note that in case of generating a new model endpoint, by default the monitoring features are disabled. To enable these features, set monitoring_mode=enabled.

Parameters:
  • project -- Project name.

  • model_endpoint_name -- If a new model endpoint is created, the model endpoint name will be presented under this endpoint (applicable only to new endpoint_id).

  • model_path -- The model store path (applicable only to new endpoint_id).

  • endpoint_id -- Model endpoint unique ID. If not exist in DB, will generate a new record based on the provided endpoint_id.

  • function_name -- If a new model endpoint is created, use this function name.

  • function_tag -- If a new model endpoint is created, use this function tag.

  • context -- MLRun context. If function_name not provided, use the context to generate the full function hash.

  • sample_set_statistics -- Dictionary of sample set statistics that will be used as a reference data for the new model endpoint (applicable only to new endpoint_id).

  • monitoring_mode -- If enabled, apply model monitoring features on the provided endpoint id (applicable only to new endpoint_id).

  • db_session -- A runtime session that manages the current dialog with the database.

  • feature_analysis -- If True, the model endpoint will be retrieved with the feature analysis mode.

Returns:

A ModelEndpoint object

mlrun.model_monitoring.api.get_sample_set_statistics(sample_set: DataItem | list | dict | DataFrame | Series | ndarray | Any = None, model_artifact_feature_stats: dict | None = None, sample_set_columns: list | None = None, sample_set_drop_columns: list | None = None, sample_set_label_columns: list | None = None) dict[source]#

Get the sample set statistics either from the given sample set or the statistics logged with the model while favoring the given sample set.

Parameters:
  • sample_set -- A sample dataset to give to compare the inputs in the drift analysis.

  • model_artifact_feature_stats -- The feature_stats attribute in the spec of the model artifact, where the original sample set statistics of the model was used.

  • sample_set_columns -- The column names of sample_set.

  • sample_set_drop_columns -- str / int or a list of str / int that represent the column names / indices to drop.

  • sample_set_label_columns -- The target label(s) of the column(s) in the dataset. for Regression or Classification tasks.

Returns:

The sample set statistics.

raises MLRunInvalidArgumentError: If no sample set or statistics were given.

mlrun.model_monitoring.api.read_dataset_as_dataframe(dataset: DataItem | list | dict | DataFrame | Series | ndarray | Any, feature_columns: str | list[str] | None = None, label_columns: str | list[str] | None = None, drop_columns: str | list[str] | int | list[int] | None = None) tuple[pandas.core.frame.DataFrame, list[str]][source]#

Parse the given dataset into a DataFrame and drop the columns accordingly. In addition, the label columns will be parsed and validated as well.

Parameters:
  • dataset -- A dataset that will be converted into a DataFrame. Can be either a list of lists, numpy.ndarray, dict, pd.Series, DataItem or a FeatureVector.

  • feature_columns -- List of feature columns that will be used to build the dataframe when dataset is from type list or numpy array.

  • label_columns -- The target label(s) of the column(s) in the dataset. for Regression or Classification tasks.

  • drop_columns -- str / int or a list of str / int that represent the column names / indices to drop.

Returns:

A tuple of: [0] = The parsed dataset as a DataFrame [1] = Label columns.

raises MLRunInvalidArgumentError: If the drop_columns are not matching the dataset or unsupported dataset type.

mlrun.model_monitoring.api.record_results(project: str, model_path: str, model_endpoint_name: str, endpoint_id: str = '', function_name: str = '', context: MLClientCtx | None = None, infer_results_df: DataFrame | None = None, sample_set_statistics: dict[str, Any] | None = None, monitoring_mode: ModelMonitoringMode = ModelMonitoringMode.enabled, drift_threshold: float | None = None, possible_drift_threshold: float | None = None, trigger_monitoring_job: bool = False, artifacts_tag: str = '', default_batch_image: str = 'mlrun/mlrun') ModelEndpoint[source]#

Write a provided inference dataset to model endpoint parquet target. If not exist, generate a new model endpoint record and use the provided sample set statistics as feature stats that will be used later for the drift analysis. To activate model monitoring, run project.enable_model_monitoring(). The model monitoring applications will be triggered with the recorded data according to a periodic schedule.

Parameters:
  • project -- Project name.

  • model_path -- The model Store path.

  • model_endpoint_name -- If a new model endpoint is generated, the model endpoint name will be presented under this endpoint.

  • endpoint_id -- Model endpoint unique ID. If not exist in DB, will generate a new record based on the provided endpoint_id.

  • function_name -- If a new model endpoint is created, use this function name for generating the function URI.

  • context -- MLRun context. Note that the context is required generating the model endpoint.

  • infer_results_df -- DataFrame that will be stored under the model endpoint parquet target. Will be used for doing the drift analysis. Please make sure that the dataframe includes both feature names and label columns. If you are recording results for existing model endpoint, the endpoint should be a batch endpoint.

  • sample_set_statistics -- Dictionary of sample set statistics that will be used as a reference data for the current model endpoint.

  • monitoring_mode -- If enabled, apply model monitoring features on the provided endpoint id. Enabled by default.

  • drift_threshold -- (deprecated) The threshold of which to mark drifts.

  • possible_drift_threshold -- (deprecated) The threshold of which to mark possible drifts.

  • trigger_monitoring_job -- (deprecated) If true, run the batch drift job. If not exists, the monitoring batch function will be registered through MLRun API with the provided image.

  • artifacts_tag -- (deprecated) Tag to use for all the artifacts resulted from the function. Will be relevant only if the monitoring batch job has been triggered.

  • default_batch_image -- (deprecated) The image that will be used when registering the model monitoring batch job.

Returns:

A ModelEndpoint object

mlrun.model_monitoring.api.write_monitoring_df(endpoint_id: str, infer_results_df: DataFrame, infer_datetime: datetime, monitoring_feature_set: FeatureSet | None = None, feature_set_uri: str = '') None[source]#

Write infer results dataframe to the monitoring parquet target of the current model endpoint. The dataframe will be written using feature set ingest process. Please make sure that you provide either a valid monitoring feature set (with parquet target) or a valid monitoring feature set uri.

Parameters:
  • endpoint_id -- Model endpoint unique ID.

  • infer_results_df -- DataFrame that will be stored under the model endpoint parquet target.

  • monitoring_feature_set -- A mlrun.feature_store.FeatureSet object corresponding to the provided endpoint_id.

  • feature_set_uri -- if monitoring_feature_set not provided, use the feature_set_uri value to get the relevant mlrun.feature_store.FeatureSet.

class mlrun.model_monitoring.applications.ModelMonitoringApplicationResult(name: str, value: float, kind: ~mlrun.common.schemas.model_monitoring.constants.ResultKindApp, status: ~mlrun.common.schemas.model_monitoring.constants.ResultStatusApp, extra_data: dict = <factory>)[source]#

Class representing the result of a custom model monitoring application.

Parameters:
  • name -- (str) Name of the application result. This name must be unique for each metric in a single application (name must be of the format [a-zA-Z_][a-zA-Z0-9_]*).

  • value -- (float) Value of the application result.

  • kind -- (ResultKindApp) Kind of application result.

  • status -- (ResultStatusApp) Status of the application result.

  • extra_data -- (dict) Extra data associated with the application result. Note that if the extra data is exceeding the maximum size of 998 characters, it will be ignored and a message will be logged. In this case, we recommend logging the extra data as a separate artifact or shortening it.

to_dict()[source]#

Convert the object to a dictionary format suitable for writing.

Returns:

(dict) Dictionary representation of the result.

classmethod validate_extra_data_len(result_extra_data: dict)[source]#

Ensure that the extra data is not exceeding the maximum size which is important to avoid possible storage issues.

class mlrun.model_monitoring.applications.ModelMonitoringApplicationMetric(name: str, value: float)[source]#

Class representing a single metric of a custom model monitoring application.

Parameters:
  • name -- (str) Name of the application metric. This name must be unique for each metric in a single application (name must be of the format [a-zA-Z_][a-zA-Z0-9_]*).

  • value -- (float) Value of the application metric.

to_dict()[source]#

Convert the object to a dictionary format suitable for writing.

Returns:

(dict) Dictionary representation of the result.

class mlrun.model_monitoring.applications.MonitoringApplicationContext(*, application_name: str, event: dict[str, Any], project: MlrunProject, artifacts_logger: _ArtifactsLogger, logger: Logger, nuclio_logger: Logger, model_endpoint_dict: dict[str, mlrun.common.schemas.model_monitoring.model_endpoints.ModelEndpoint] | None = None, sample_df: DataFrame | None = None, feature_stats: FeatureStats | None = None, feature_sets_dict: dict[str, mlrun.feature_store.feature_set.FeatureSet] | None = None)[source]#

The MonitoringApplicationContext object holds all the relevant information for the model monitoring application, and can be used for logging artifacts and messages. The monitoring context has the following attributes:

Parameters:
  • application_name -- (str) The model monitoring application name.

  • project -- (MlrunProject) The current MLRun project object.

  • project_name -- (str) The project name.

  • logger -- (Logger) MLRun logger.

  • nuclio_logger -- (nuclio.request.Logger) Nuclio logger.

  • sample_df_stats -- (FeatureStats) The new sample distribution dictionary.

  • feature_stats -- (FeatureStats) The train sample distribution dictionary.

  • sample_df -- (pd.DataFrame) The new sample DataFrame.

  • start_infer_time -- (pd.Timestamp) Start time of the monitoring schedule.

  • end_infer_time -- (pd.Timestamp) End time of the monitoring schedule.

  • endpoint_id -- (str) ID of the monitored model endpoint

  • feature_set -- (FeatureSet) the model endpoint feature set

  • endpoint_name -- (str) Name of the monitored model endpoint

  • output_stream_uri -- (str) URI of the output stream for results

  • model_endpoint -- (ModelEndpoint) The model endpoint object.

  • feature_names -- (list[str]) List of models feature names.

  • label_names -- (list[str]) List of models label names.

  • model -- (tuple[str, ModelArtifact, dict]) The model file, model spec object, and a list of extra data items.

static dict_to_histogram(histogram_dict: FeatureStats) DataFrame[source]#

Convert histogram dictionary to pandas DataFrame with feature histograms as columns

Parameters:

histogram_dict -- Histogram dictionary

Returns:

Histogram dataframe

log_artifact(item, body=None, tag: str = '', local_path: str = '', artifact_path: str | None = None, format: str | None = None, upload: bool | None = None, labels: dict[str, str] | None = None, target_path: str | None = None, unique_per_endpoint: bool = True, **kwargs) Artifact[source]#

Log an artifact.

Caution

Logging artifacts in every model monitoring window may cause scale issues. This method should be called on special occasions only.

See log_artifact() for the full documentation, except for one new argument:

Parameters:

unique_per_endpoint -- by default True, we will log different artifact for each model endpoint, set to False without changing item key will cause artifact override.

log_dataset(key, df, tag='', local_path=None, artifact_path=None, upload=None, labels=None, format='', preview=None, stats=None, target_path='', extra_data=None, label_column: str | None = None, unique_per_endpoint: bool = True, **kwargs) DatasetArtifact[source]#

Log a dataset artifact.

Caution

Logging datasets in every model monitoring window may cause scale issues. This method should be called on special occasions only.

See log_dataset() for the full documentation, except for one new argument:

Parameters:

unique_per_endpoint -- by default True, we will log different artifact for each model endpoint, set to False without changing item key will cause artifact override.

class mlrun.model_monitoring.applications.ModelMonitoringApplicationBase[source]#

The base class for a model monitoring application. Inherit from this class to create a custom model monitoring application.

For example, MyApp below is a simplistic custom application:

from mlrun.common.schemas.model_monitoring.constants import (
    ResultKindApp,
    ResultStatusApp,
)
from mlrun.model_monitoring.applications import (
    ModelMonitoringApplicationBase,
    ModelMonitoringApplicationResult,
    MonitoringApplicationContext,
)


class MyApp(ModelMonitoringApplicationBase):
    def do_tracking(
        self, monitoring_context: MonitoringApplicationContext
    ) -> ModelMonitoringApplicationResult:
        monitoring_context.logger.info(
            "Running application",
            application_name=monitoring_context.application_name,
        )
        return ModelMonitoringApplicationResult(
            name="data_drift_test",
            value=0.5,
            kind=ResultKindApp.data_drift,
            status=ResultStatusApp.detected,
        )
classmethod deploy(func_name: str, func_path: str | None = None, image: str | None = None, handler: str | None = None, with_repo: bool | None = False, tag: str | None = None, requirements: str | list[str] | None = None, requirements_file: str = '', **application_kwargs) None[source]#

Set the application to the current project and deploy it as a Nuclio serving function. Required for your model monitoring application to work as a part of the model monitoring framework.

Parameters:
  • func_name -- The name of the function.

  • func_path -- The path of the function, None refers to the current Jupyter notebook.

For the other arguments, refer to set_model_monitoring_function().

abstract do_tracking(monitoring_context: MonitoringApplicationContext) ModelMonitoringApplicationResult | list[Union[mlrun.model_monitoring.applications.results.ModelMonitoringApplicationResult, mlrun.model_monitoring.applications.results.ModelMonitoringApplicationMetric]] | dict[str, Any][source]#

Implement this method with your custom monitoring logic.

Parameters:

monitoring_context -- (MonitoringApplicationContext) The monitoring context to process.

Returns:

(ModelMonitoringApplicationResult) or (list[Union[ModelMonitoringApplicationResult, ModelMonitoringApplicationMetric]]) or dict that contains the application metrics only (in this case the name of each metric name is the key and the metric value is the corresponding value).

classmethod evaluate(func_path: str | None = None, func_name: str | None = None, *, tag: str | None = None, run_local: bool = True, auto_build: bool = True, sample_data: DataFrame | None = None, reference_data: DataFrame | None = None, image: str | None = None, with_repo: bool | None = False, class_handler: str | None = None, requirements: str | list[str] | None = None, requirements_file: str = '', endpoints: list[tuple[str, str]] | list[str] | str | None = None, start: datetime | None = None, end: datetime | None = None, base_period: int | None = None) RunObject[source]#

Call this function to run the application's do_tracking() model monitoring logic as a KubejobRuntime, which is an MLRun function.

This function has default values for all of its arguments. You should be change them when you want to pass data to the application.

Parameters:
  • func_path -- The path to the function. If None, the current notebook is used.

  • func_name -- The name of the function. If not None, the class name is used.

  • tag -- Tag for the function.

  • run_local -- Whether to run the function locally or remotely.

  • auto_build -- Whether to auto build the function.

  • sample_data -- Pandas data-frame as the current dataset. When set, it replaces the data read from the model endpoint's offline source.

  • reference_data -- Pandas data-frame of the reference dataset. When set, its statistics override the model endpoint's feature statistics.

  • image -- Docker image to run the job on (when running remotely).

  • with_repo -- Whether to clone the current repo to the build source.

  • class_handler -- The relative path to the class, useful when using Git sources or code from images.

  • requirements -- List of Python requirements to be installed in the image.

  • requirements_file -- Path to a Python requirements file to be installed in the image.

  • endpoints -- A list of tuples of the model endpoint (name, uid) to get the data from. allow providing a list of model_endpoint names or name for a single model_endpoint. Note: provide names retrieves the model all the active model endpoints using those names (cross function model endpoints) If provided, and sample_data is not None, you have to provide also the start and end times of the data to analyze from the model endpoints.

  • start -- The start time of the endpoint's data, not included. If you want the model endpoint's data at start included, you need to subtract a small datetime.timedelta from it.

  • end -- The end time of the endpoint's data, included. Please note: when start and end are set, they create a left-open time interval ("window") \((\operatorname{start}, \operatorname{end}]\) that excludes the endpoint's data at start and includes the data at end: \(\operatorname{start} < t \leq \operatorname{end}\), \(t\) is the time taken in the window's data.

  • base_period -- The window length in minutes. If None, the whole window from start to end is taken. If an integer is specified, the application is run from start to end in base_period length windows, except for the last window that ends at end and therefore may be shorter: \((\operatorname{start}, \operatorname{start} + \operatorname{base\_period}], (\operatorname{start} + \operatorname{base\_period}, \operatorname{start} + 2\cdot\operatorname{base\_period}], ..., (\operatorname{start} + m\cdot\operatorname{base\_period}, \operatorname{end}]\), where \(m\) is some positive integer.

Returns:

The output of the do_tracking() method with the given parameters and inputs, wrapped in a RunObject.

classmethod get_job_handler(handler_to_class: str) str[source]#

A helper function to get the handler to the application job _handler.

Parameters:

handler_to_class -- The handler to the application class, e.g. my_package.sub_module1.MonitoringApp1.

Returns:

The handler to the job of the application class.

classmethod to_job(*, class_handler: str | None = None, func_path: str | None = None, func_name: str | None = None, tag: str | None = None, image: str | None = None, with_repo: bool | None = False, requirements: str | list[str] | None = None, requirements_file: str = '', project: MlrunProject | None = None) KubejobRuntime[source]#

Get the application's do_tracking() model monitoring logic as a KubejobRuntime.

The returned job can be run as any MLRun job with the relevant inputs and params to your application:

job = ModelMonitoringApplicationBase.to_job(
    class_handler="package.module.AppClass"
)
job.run(inputs={}, params={}, local=False)  # Add the relevant inputs and params

Optional inputs:

  • sample_data, pd.DataFrame

  • reference_data, pd.DataFrame

Optional params:

  • endpoints, list[tuple[str, str]]

  • start, datetime

  • end, datetime

  • base_period, int

For Git sources, add the source archive to the returned job and change the handler:

handler = ModelMonitoringApplicationBase.get_job_handler("module.AppClass")
job.with_source_archive(
    "git://github.com/owner/repo.git#branch-category/specific-task",
    workdir="path/to/application/folder",
    handler=handler,
)
Parameters:
  • class_handler -- The handler to the class, e.g. path.to.module::MonitoringApplication, useful when using Git sources or code from images. If None, the current class, deriving from ModelMonitoringApplicationBase, is used.

  • func_path -- The path to the function. If None, the current notebook is used.

  • func_name -- The name of the function. If not None, the class name is used.

  • tag -- Tag for the function.

  • image -- Docker image to run the job on (when running remotely).

  • with_repo -- Whether to clone the current repo to the build source.

  • requirements -- List of Python requirements to be installed in the image.

  • requirements_file -- Path to a Python requirements file to be installed in the image.

  • project -- The current project to set the function to. If not set, the current project is used.

Returns:

The KubejobRuntime job that wraps the model monitoring application's logic.

class mlrun.model_monitoring.applications.evidently.EvidentlyModelMonitoringApplicationBase(evidently_project_id: str | UUID, evidently_workspace_path: str | None = None, cloud_workspace: bool = False)[source]#

A class for integrating Evidently for MLRun model monitoring within a monitoring application.

Note

The evidently package is not installed by default in the mlrun/mlrun image. It must be installed separately to use this class.

Parameters:
  • evidently_project_id -- (str) The ID of the Evidently project.

  • evidently_workspace_path -- (str) The path to the Evidently workspace.

  • cloud_workspace -- (bool) Whether the workspace is an Evidently Cloud workspace.

get_cloud_workspace() CloudWorkspace[source]#

Load the Evidently cloud workspace according to the EVIDENTLY_API_KEY environment variable.

get_workspace() WorkspaceBase[source]#

Get the Evidently workspace. Override this method for customize access to the workspace.

load_project() Project[source]#

Load the Evidently project.

static log_evidently_object(monitoring_context: MonitoringApplicationContext, evidently_object: Snapshot, artifact_name: str, unique_per_endpoint: bool = True) None[source]#

Logs an Evidently report or suite as an artifact.

Caution

Logging Evidently objects in every model monitoring window may cause scale issues. This method should be called on special occasions only.

Parameters:
  • monitoring_context -- (MonitoringApplicationContext) The monitoring context to process.

  • evidently_object -- (Snapshot) The Evidently run to log, e.g. a report run.

  • artifact_name -- (str) The name for the logged artifact.

  • unique_per_endpoint -- by default True, we will log different artifact for each model endpoint, set to False without changing item key will cause artifact override.

class mlrun.model_monitoring.applications.histogram_data_drift.DataDriftClassifier(potential: float = 0.5, detected: float = 0.7)[source]#

Classify data drift numeric values into categorical status.

value_to_status(value: float) ResultStatusApp[source]#

Translate the numeric value into status category.

Parameters:

value -- The numeric value of the data drift metric, between 0 and 1.

Returns:

ResultStatusApp according to the classification.

class mlrun.model_monitoring.applications.histogram_data_drift.HistogramDataDriftApplication(value_classifier: ValueClassifier | None = None, produce_json_artifact: bool = False, produce_plotly_artifact: bool = False)[source]#

MLRun's default data drift application for model monitoring.

The application expects tabular numerical data, and calculates three metrics over the shared features' histograms. The metrics are calculated on features that have reference data from the training dataset. When there is no reference data (feature_stats), this application send a warning log and does nothing. The three metrics are:

  • Hellinger distance.

  • Total variance distance.

  • Kullback-Leibler divergence.

Each metric is calculated over all the features individually and the mean is taken as the metric value. The average of Hellinger and total variance distance is taken as the result.

The application can log two artifacts (disabled by default due to performance issues):

  • JSON with the general drift value per feature.

  • Plotly table with the various metrics and histograms per feature.

This application is deployed by default when calling enable_model_monitoring(). To avoid it, pass deploy_histogram_data_drift_app=False.

If you want to change the application defaults, such as the classifier or which artifacts to produce, you need to inherit from this class and deploy it as any other model monitoring application. Please make sure to keep the default application name. This ensures that the full functionality of the application, including the statistics view in the UI, is available.

Parameters:
  • value_classifier -- Classifier object that adheres to the ValueClassifier protocol. If not provided, the default DataDriftClassifier is used.

  • produce_json_artifact -- Whether to produce the JSON artifact or not, False by default.

  • produce_plotly_artifact -- Whether to produce the Plotly artifact or not, False by default.

do_tracking(monitoring_context: MonitoringApplicationContext) list[Union[mlrun.model_monitoring.applications.results.ModelMonitoringApplicationResult, mlrun.model_monitoring.applications.results.ModelMonitoringApplicationMetric, mlrun.model_monitoring.applications.results._ModelMonitoringApplicationStats]][source]#

Calculate and return the data drift metrics, averaged over the features.

exception mlrun.model_monitoring.applications.histogram_data_drift.InvalidMetricValueError[source]#
exception mlrun.model_monitoring.applications.histogram_data_drift.InvalidThresholdValueError[source]#
class mlrun.model_monitoring.applications.histogram_data_drift.ValueClassifier(*args, **kwargs)[source]#