mlrun.model_monitoring#

mlrun.model_monitoring.api.get_or_create_model_endpoint(project: str, model_path: str = '', model_endpoint_name: str = '', endpoint_id: str = '', function_name: str = '', context: MLClientCtx | None = None, sample_set_statistics: dict[str, Any] | None = None, drift_threshold: float | None = None, possible_drift_threshold: float | None = None, monitoring_mode: ModelMonitoringMode = ModelMonitoringMode.disabled, db_session=None) ModelEndpoint[source]#

Get a single model endpoint object. If not exist, generate a new model endpoint with the provided parameters. Note that in case of generating a new model endpoint, by default the monitoring features are disabled. To enable these features, set monitoring_mode=enabled.

Parameters:
  • project -- Project name.

  • model_path -- The model store path (applicable only to new endpoint_id).

  • model_endpoint_name -- If a new model endpoint is created, the model endpoint name will be presented under this endpoint (applicable only to new endpoint_id).

  • endpoint_id -- Model endpoint unique ID. If not exist in DB, will generate a new record based on the provided endpoint_id.

  • function_name -- If a new model endpoint is created, use this function name for generating the function URI (applicable only to new endpoint_id).

  • context -- MLRun context. If function_name not provided, use the context to generate the full function hash.

  • sample_set_statistics -- Dictionary of sample set statistics that will be used as a reference data for the new model endpoint (applicable only to new endpoint_id).

  • drift_threshold -- The threshold of which to mark drifts (applicable only to new endpoint_id).

  • possible_drift_threshold -- The threshold of which to mark possible drifts (applicable only to new endpoint_id).

  • monitoring_mode -- If enabled, apply model monitoring features on the provided endpoint id (applicable only to new endpoint_id).

  • db_session -- A runtime session that manages the current dialog with the database.

Returns:

A ModelEndpoint object

mlrun.model_monitoring.api.get_sample_set_statistics(sample_set: DataItem | list | dict | DataFrame | Series | ndarray | Any | None = None, model_artifact_feature_stats: dict | None = None, sample_set_columns: list | None = None, sample_set_drop_columns: list | None = None, sample_set_label_columns: list | None = None) dict[source]#

Get the sample set statistics either from the given sample set or the statistics logged with the model while favoring the given sample set.

Parameters:
  • sample_set -- A sample dataset to give to compare the inputs in the drift analysis.

  • model_artifact_feature_stats -- The feature_stats attribute in the spec of the model artifact, where the original sample set statistics of the model was used.

  • sample_set_columns -- The column names of sample_set.

  • sample_set_drop_columns -- str / int or a list of str / int that represent the column names / indices to drop.

  • sample_set_label_columns -- The target label(s) of the column(s) in the dataset. for Regression or Classification tasks.

Returns:

The sample set statistics.

raises MLRunInvalidArgumentError: If no sample set or statistics were given.

mlrun.model_monitoring.api.read_dataset_as_dataframe(dataset: DataItem | list | dict | DataFrame | Series | ndarray | Any, feature_columns: str | list[str] | None = None, label_columns: str | list[str] | None = None, drop_columns: str | list[str] | int | list[int] | None = None) tuple[pandas.core.frame.DataFrame, list[str]][source]#

Parse the given dataset into a DataFrame and drop the columns accordingly. In addition, the label columns will be parsed and validated as well.

Parameters:
  • dataset -- A dataset that will be converted into a DataFrame. Can be either a list of lists, numpy.ndarray, dict, pd.Series, DataItem or a FeatureVector.

  • feature_columns -- List of feature columns that will be used to build the dataframe when dataset is from type list or numpy array.

  • label_columns -- The target label(s) of the column(s) in the dataset. for Regression or Classification tasks.

  • drop_columns -- str / int or a list of str / int that represent the column names / indices to drop.

Returns:

A tuple of: [0] = The parsed dataset as a DataFrame [1] = Label columns.

raises MLRunInvalidArgumentError: If the drop_columns are not matching the dataset or unsupported dataset type.

mlrun.model_monitoring.api.record_results(project: str, model_path: str, model_endpoint_name: str, endpoint_id: str = '', function_name: str = '', context: MLClientCtx | None = None, infer_results_df: DataFrame | None = None, sample_set_statistics: dict[str, Any] | None = None, monitoring_mode: ModelMonitoringMode = ModelMonitoringMode.enabled, drift_threshold: float | None = None, possible_drift_threshold: float | None = None, trigger_monitoring_job: bool = False, artifacts_tag: str = '', default_batch_image: str = 'mlrun/mlrun') ModelEndpoint[source]#

Write a provided inference dataset to model endpoint parquet target. If not exist, generate a new model endpoint record and use the provided sample set statistics as feature stats that will be used later for the drift analysis. To activate model monitoring, run project.enable_model_monitoring(). The model monitoring applications will be triggered with the recorded data according to a periodic schedule.

Parameters:
  • project -- Project name.

  • model_path -- The model Store path.

  • model_endpoint_name -- If a new model endpoint is generated, the model endpoint name will be presented under this endpoint.

  • endpoint_id -- Model endpoint unique ID. If not exist in DB, will generate a new record based on the provided endpoint_id.

  • function_name -- If a new model endpoint is created, use this function name for generating the function URI.

  • context -- MLRun context. Note that the context is required for logging the artifacts following the batch drift job.

  • infer_results_df -- DataFrame that will be stored under the model endpoint parquet target. Will be used for doing the drift analysis. Please make sure that the dataframe includes both feature names and label columns.

  • sample_set_statistics -- Dictionary of sample set statistics that will be used as a reference data for the current model endpoint.

  • monitoring_mode -- If enabled, apply model monitoring features on the provided endpoint id. Enabled by default.

  • drift_threshold -- (deprecated) The threshold of which to mark drifts.

  • possible_drift_threshold -- (deprecated) The threshold of which to mark possible drifts.

  • trigger_monitoring_job -- (deprecated) If true, run the batch drift job. If not exists, the monitoring batch function will be registered through MLRun API with the provided image.

  • artifacts_tag -- (deprecated) Tag to use for all the artifacts resulted from the function. Will be relevant only if the monitoring batch job has been triggered.

  • default_batch_image -- (deprecated) The image that will be used when registering the model monitoring batch job.

Returns:

A ModelEndpoint object

mlrun.model_monitoring.api.write_monitoring_df(endpoint_id: str, infer_results_df: DataFrame, infer_datetime: datetime, monitoring_feature_set: FeatureSet | None = None, feature_set_uri: str = '') None[source]#

Write infer results dataframe to the monitoring parquet target of the current model endpoint. The dataframe will be written using feature set ingest process. Please make sure that you provide either a valid monitoring feature set (with parquet target) or a valid monitoring feature set uri.

Parameters:
  • endpoint_id -- Model endpoint unique ID.

  • infer_results_df -- DataFrame that will be stored under the model endpoint parquet target.

  • monitoring_feature_set -- A mlrun.feature_store.FeatureSet object corresponding to the provided endpoint_id.

  • feature_set_uri -- if monitoring_feature_set not provided, use the feature_set_uri value to get the relevant mlrun.feature_store.FeatureSet.

class mlrun.model_monitoring.application.ModelMonitoringApplicationResult(name: str, value: float, kind: ~mlrun.common.schemas.model_monitoring.constants.ResultKindApp, status: ~mlrun.common.schemas.model_monitoring.constants.ResultStatusApp, extra_data: dict = <factory>)[source]#

Class representing the result of a custom model monitoring application.

Parameters:
  • name -- (str) Name of the application result. This name must be unique for each metric in a single application (name must be of the format [a-zA-Z_][a-zA-Z0-9_]*).

  • value -- (float) Value of the application result.

  • kind -- (ResultKindApp) Kind of application result.

  • status -- (ResultStatusApp) Status of the application result.

  • extra_data -- (dict) Extra data associated with the application result.

to_dict()[source]#

Convert the object to a dictionary format suitable for writing.

Returns:

(dict) Dictionary representation of the result.

class mlrun.model_monitoring.application.ModelMonitoringApplicationBase[source]#

A base class for a model monitoring application. Inherit from this class to create a custom model monitoring application.

example for very simple custom application::

# mlrun: start-code class MyApp(ApplicationBase):

def do_tracking(

self, sample_df_stats: mlrun.common.model_monitoring.helpers.FeatureStats, feature_stats: mlrun.common.model_monitoring.helpers.FeatureStats, start_infer_time: pd.Timestamp, end_infer_time: pd.Timestamp, schedule_time: pd.Timestamp, latest_request: pd.Timestamp, endpoint_id: str, output_stream_uri: str,

) -> ModelMonitoringApplicationResult:
self.context.log_artifact(
TableArtifact(

"sample_df_stats", df=self.dict_to_histogram(sample_df_stats)

)

) return ModelMonitoringApplicationResult(

name="data_drift_test", value=0.5, kind=mm_constant.ResultKindApp.data_drift, status=mm_constant.ResultStatusApp.detected,

)

# mlrun: end-code

static dict_to_histogram(histogram_dict: FeatureStats) DataFrame[source]#

Convert histogram dictionary to pandas DataFrame with feature histograms as columns

Parameters:

histogram_dict -- Histogram dictionary

Returns:

Histogram dataframe

abstract do_tracking(application_name: str, sample_df_stats: FeatureStats, feature_stats: FeatureStats, sample_df: DataFrame, start_infer_time: Timestamp, end_infer_time: Timestamp, latest_request: Timestamp, endpoint_id: str, output_stream_uri: str) ModelMonitoringApplicationResult | list[mlrun.model_monitoring.application.ModelMonitoringApplicationResult][source]#

Implement this method with your custom monitoring logic.

Parameters:
  • application_name -- (str) the app name

  • sample_df_stats -- (FeatureStats) The new sample distribution dictionary.

  • feature_stats -- (FeatureStats) The train sample distribution dictionary.

  • sample_df -- (pd.DataFrame) The new sample DataFrame.

  • start_infer_time -- (pd.Timestamp) Start time of the monitoring schedule.

  • end_infer_time -- (pd.Timestamp) End time of the monitoring schedule.

  • latest_request -- (pd.Timestamp) Timestamp of the latest request on this endpoint_id.

  • endpoint_id -- (str) ID of the monitored model endpoint

  • output_stream_uri -- (str) URI of the output stream for results

Returns:

(ModelMonitoringApplicationResult) or (list[ModelMonitoringApplicationResult]) of the application results.