mlrun.model_monitoring#
- mlrun.model_monitoring.api.get_sample_set_statistics(sample_set: DataItem | list | dict | DataFrame | Series | ndarray | Any = None, model_artifact_feature_stats: dict | None = None, sample_set_columns: list | None = None, sample_set_drop_columns: list | None = None, sample_set_label_columns: list | None = None) dict[source]#
Get the sample set statistics either from the given sample set or the statistics logged with the model while favoring the given sample set.
- Parameters:
sample_set -- A sample dataset to give to compare the inputs in the drift analysis.
model_artifact_feature_stats -- The feature_stats attribute in the spec of the model artifact, where the original sample set statistics of the model was used.
sample_set_columns -- The column names of sample_set.
sample_set_drop_columns --
str/intor a list ofstr/intthat represent the column names / indices to drop.sample_set_label_columns -- The target label(s) of the column(s) in the dataset. for Regression or Classification tasks.
- Returns:
The sample set statistics.
raises MLRunInvalidArgumentError: If no sample set or statistics were given.
- mlrun.model_monitoring.api.read_dataset_as_dataframe(dataset: DataItem | list | dict | DataFrame | Series | ndarray | Any, feature_columns: str | list[str] | None = None, label_columns: str | list[str] | None = None, drop_columns: str | list[str] | int | list[int] | None = None) tuple[DataFrame, list[str]][source]#
Parse the given dataset into a DataFrame and drop the columns accordingly. In addition, the label columns will be parsed and validated as well.
- Parameters:
dataset -- A dataset that will be converted into a DataFrame. Can be either a list of lists, numpy.ndarray, dict, pd.Series, DataItem or a FeatureVector.
feature_columns -- List of feature columns that will be used to build the dataframe when dataset is from type list or numpy array.
label_columns -- The target label(s) of the column(s) in the dataset. for Regression or Classification tasks.
drop_columns --
str/intor a list ofstr/intthat represent the column names / indices to drop.
- Returns:
A tuple of: [0] = The parsed dataset as a DataFrame [1] = Label columns.
raises MLRunInvalidArgumentError: If the drop_columns are not matching the dataset or unsupported dataset type.
- class mlrun.model_monitoring.applications.ModelMonitoringApplicationResult(name: str, value: float, kind: ~mlrun.common.schemas.model_monitoring.constants.ResultKindApp, status: ~mlrun.common.schemas.model_monitoring.constants.ResultStatusApp, extra_data: dict = <factory>)[source]#
Class representing the result of a custom model monitoring application.
- Parameters:
name -- (str) Name of the application result. This name must be unique for each metric in a single application (name must be of the format
[a-zA-Z_][a-zA-Z0-9_]*).value -- (float) Value of the application result.
kind -- (ResultKindApp) Kind of application result.
status -- (ResultStatusApp) Status of the application result.
extra_data -- (dict) Extra data associated with the application result. Note that if the extra data is exceeding the maximum size of 998 characters, it will be ignored and a message will be logged. In this case, we recommend logging the extra data as a separate artifact or shortening it.
- class mlrun.model_monitoring.applications.ModelMonitoringApplicationMetric(name: str, value: float)[source]#
Class representing a single metric of a custom model monitoring application.
- Parameters:
name -- (str) Name of the application metric. This name must be unique for each metric in a single application (name must be of the format
[a-zA-Z_][a-zA-Z0-9_]*).value -- (float) Value of the application metric.
- class mlrun.model_monitoring.applications.MonitoringApplicationContext(*, application_name: str, event: dict[str, Any], project: MlrunProject, artifacts_logger: _ArtifactsLogger, logger: Logger, nuclio_logger: Logger, model_endpoint_dict: dict[str, ModelEndpoint] | None = None, sample_df: DataFrame | None = None, feature_stats: FeatureStats | None = None, feature_sets_dict: dict[str, FeatureSet] | None = None)[source]#
The
MonitoringApplicationContextobject holds all the relevant information for the model monitoring application, and can be used for logging artifacts and messages. The monitoring context has the following attributes:- Parameters:
application_name -- (str) The model monitoring application name.
project -- (
MlrunProject) The current MLRun project object.project_name -- (str) The project name.
logger -- (
Logger) MLRun logger.nuclio_logger -- (nuclio.request.Logger) Nuclio logger.
sample_df_stats -- (FeatureStats) The new sample distribution dictionary.
feature_stats -- (FeatureStats) The train sample distribution dictionary.
sample_df -- (pd.DataFrame) The new sample DataFrame.
start_infer_time -- (pd.Timestamp) Start time of the monitoring schedule.
end_infer_time -- (pd.Timestamp) End time of the monitoring schedule.
endpoint_id -- (str) ID of the monitored model endpoint
feature_set -- (FeatureSet) the model endpoint feature set
endpoint_name -- (str) Name of the monitored model endpoint
output_stream_uri -- (str) URI of the output stream for results
model_endpoint -- (ModelEndpoint) The model endpoint object.
feature_names -- (list[str]) List of models feature names.
label_names -- (list[str]) List of models label names.
model -- (tuple[str, ModelArtifact, dict]) The model file, model spec object, and a list of extra data items.
- static dict_to_histogram(histogram_dict: FeatureStats) DataFrame[source]#
Convert histogram dictionary to pandas DataFrame with feature histograms as columns
- Parameters:
histogram_dict -- Histogram dictionary
- Returns:
Histogram dataframe
- log_artifact(item, body=None, tag: str = '', local_path: str = '', artifact_path: str | None = None, format: str | None = None, upload: bool | None = None, labels: dict[str, str] | None = None, target_path: str | None = None, unique_per_endpoint: bool = True, **kwargs) Artifact[source]#
Log an artifact.
Caution
Logging artifacts in every model monitoring window may cause scale issues. This method should be called on special occasions only.
See
log_artifact()for the full documentation, except for one new argument:- Parameters:
unique_per_endpoint -- by default
True, we will log different artifact for each model endpoint, set toFalsewithout changing item key will cause artifact override.
- log_dataset(key, df, tag='', local_path=None, artifact_path=None, upload=None, labels=None, format='', preview=None, stats=None, target_path='', extra_data=None, label_column: str | None = None, unique_per_endpoint: bool = True, **kwargs) DatasetArtifact[source]#
Log a dataset artifact.
Caution
Logging datasets in every model monitoring window may cause scale issues. This method should be called on special occasions only.
See
log_dataset()for the full documentation, except for one new argument:- Parameters:
unique_per_endpoint -- by default
True, we will log different artifact for each model endpoint, set toFalsewithout changing item key will cause artifact override.
- class mlrun.model_monitoring.applications.ModelMonitoringApplicationBase[source]#
The base class for a model monitoring application. Inherit from this class to create a custom model monitoring application.
For example,
MyAppbelow is a simplistic custom application:from mlrun.common.schemas.model_monitoring.constants import ( ResultKindApp, ResultStatusApp, ) from mlrun.model_monitoring.applications import ( ModelMonitoringApplicationBase, ModelMonitoringApplicationResult, MonitoringApplicationContext, ) class MyApp(ModelMonitoringApplicationBase): def do_tracking( self, monitoring_context: MonitoringApplicationContext ) -> ModelMonitoringApplicationResult: monitoring_context.logger.info( "Running application", application_name=monitoring_context.application_name, ) return ModelMonitoringApplicationResult( name="data_drift_test", value=0.5, kind=ResultKindApp.data_drift, status=ResultStatusApp.detected, )
- classmethod deploy(func_name: str, func_path: str | None = None, image: str | None = None, handler: str | None = None, with_repo: bool | None = False, tag: str | None = None, requirements: str | list[str] | None = None, requirements_file: str = '', **application_kwargs) None[source]#
Set the application to the current project and deploy it as a Nuclio serving function. Required for your model monitoring application to work as a part of the model monitoring framework.
- Parameters:
func_name -- The name of the function.
func_path -- The path of the function,
Nonerefers to the current Jupyter notebook.
For the other arguments, refer to
set_model_monitoring_function().
- abstractmethod do_tracking(monitoring_context: MonitoringApplicationContext) ModelMonitoringApplicationResult | list[ModelMonitoringApplicationResult | ModelMonitoringApplicationMetric] | dict[str, Any][source]#
Implement this method with your custom monitoring logic.
- Parameters:
monitoring_context -- (MonitoringApplicationContext) The monitoring context to process.
- Returns:
(ModelMonitoringApplicationResult) or (list[Union[ModelMonitoringApplicationResult, ModelMonitoringApplicationMetric]]) or dict that contains the application metrics only (in this case the name of each metric name is the key and the metric value is the corresponding value).
- classmethod evaluate(func_path: str | None = None, func_name: str | None = None, *, tag: str | None = None, run_local: bool = True, auto_build: bool = True, sample_data: DataFrame | str | None = None, reference_data: DataFrame | str | None = None, image: str | None = None, with_repo: bool | None = False, class_handler: str | None = None, class_arguments: dict[str, Any] | None = None, requirements: str | list[str] | None = None, requirements_file: str = '', endpoints: list[tuple[str, str]] | list[str] | Literal['all'] | None = None, start: datetime | None = None, end: datetime | None = None, base_period: int | None = None, write_output: bool = False, existing_data_handling: ExistingDataHandling = ExistingDataHandling.fail_on_overlap, stream_profile: DatastoreProfile | None = None) RunObject[source]#
Call this function to run the application's
do_tracking()model monitoring logic as aKubejobRuntime, which is an MLRun function.This function has default values for all of its arguments. You should change them when you want to pass data to the application.
- Parameters:
func_path -- The path to the function. If
None, the current notebook is used.func_name -- The name of the function. If
None, the normalized class name is used (mlrun.utils.helpers.normalize_name()). A"-batch"suffix is guaranteed to be added if not already there. The function name is also used as the application name to use for the results.tag -- Tag for the function.
run_local -- Whether to run the function locally or remotely.
auto_build -- Whether to auto build the function.
sample_data -- Pandas data-frame or
DatasetArtifactURI as the current dataset. When set, it replaces the data read from the model endpoint's offline source.reference_data -- Pandas data-frame or
DatasetArtifactURI as the reference dataset. When set, its statistics override the model endpoint's feature statistics. You do not need to have a model endpoint to use this option.image -- Docker image to run the job on (when running remotely).
with_repo -- Whether to clone the current repo to the build source.
class_handler -- The relative path to the application class, useful when using Git sources or code from images.
class_arguments -- The arguments for the application class constructor. These are passed to the class
__init__. The values must be JSON-serializable.requirements -- List of Python requirements to be installed in the image.
requirements_file -- Path to a Python requirements file to be installed in the image.
endpoints --
The model endpoints to get the data from. The options are:
a list of tuples of the model endpoints
[(name, uid), ...]a list of model endpoint names
[name, ...]"all"for all the project's model endpoints
Note: a model endpoint name retrieves all the active model endpoints using this name, which may be more than one per name when the same name is used across multiple serving functions.
If provided, and
sample_datais notNone, you have to provide also thestartandendtimes of the data to analyze from the model endpoints.start -- The start time of the endpoint's data, not included. If you want the model endpoint's data at
startincluded, you need to subtract a smalldatetime.timedeltafrom it. Make sure to include the time zone when constructingdatetime.datetimeobjects manually. When bothstartandendtimes do not include a time zone, they will be treated as UTC.end -- The end time of the endpoint's data, included. Please note: when
startandendare set, they create a left-open time interval ("window") \((\operatorname{start}, \operatorname{end}]\) that excludes the endpoint's data atstartand includes the data atend: \(\operatorname{start} < t \leq \operatorname{end}\), \(t\) is the time taken in the window's data.base_period -- The window length in minutes. If
None, the whole window fromstarttoendis taken. If an integer is specified, the application is run fromstarttoendinbase_periodlength windows: \((\operatorname{start}, \operatorname{start} + \operatorname{base\_period}], (\operatorname{start} + \operatorname{base\_period}, \operatorname{start} + 2\cdot\operatorname{base\_period}], ..., (\operatorname{start} + (m - 1)\cdot\operatorname{base\_period}, \operatorname{end}]\), where \(m\) is a positive integer and \(\operatorname{end} = \operatorname{start} + m\cdot\operatorname{base\_period}\). Please note that the difference betweenendandstartmust be a multiple ofbase_period.write_output -- Whether to write the results and metrics to the time-series DB. Can be
Trueonly ifendpointsare passed. Note: the model monitoring infrastructure must be up for the writing to work.existing_data_handling --
How to handle the existing application data for the model endpoints when writing new data whose requested
starttime precedes theendtime of a previous run that also wrote to the database. Relevant only whenwrite_output=True. The options are:"fail_on_overlap": Default. An error is raised."skip_overlap": the overlapping data is ignored and the time window is cut so that it starts at the earliest possible time afterstart."delete_all": delete all the data that was written by the application to the model endpoints, regardless of the time window, and write the new data.
stream_profile -- The stream datastore profile. It should be provided only when running locally and writing the outputs to the database (i.e., when both
run_localandwrite_outputare set toTrue). For more details on configuring the stream profile, seeset_model_monitoring_credentials().
- Returns:
The output of the
do_tracking()method with the given parameters and inputs, wrapped in aRunObject.
- classmethod get_job_handler(handler_to_class: str) str[source]#
A helper function to get the handler to the application job
_handler.- Parameters:
handler_to_class -- The handler to the application class, e.g.
my_package.sub_module1.MonitoringApp1.- Returns:
The handler to the job of the application class.
- classmethod to_job(*, class_handler: str | None = None, func_path: str | None = None, func_name: str | None = None, tag: str | None = None, image: str | None = None, with_repo: bool | None = False, requirements: str | list[str] | None = None, requirements_file: str = '', project: MlrunProject | None = None) KubejobRuntime[source]#
Get the application's
do_tracking()model monitoring logic as aKubejobRuntime.The returned job can be run as any MLRun job with the relevant inputs and params to your application:
job = ModelMonitoringApplicationBase.to_job( class_handler="package.module.AppClass" ) job.run( inputs={}, params={}, local=False ) # Add the relevant inputs and params
Optional inputs:
sample_data,pd.DataFramereference_data,pd.DataFrame
Optional params:
endpoints,list[tuple[str, str]]start,datetimeend,datetimebase_period,intwrite_output,boolexisting_data_handling,str_init_args,dict- the arguments for the application class constructor (equivalent toclass_arguments)
See
evaluate()for more details about these inputs and params.For Git sources, add the source archive to the returned job and change the handler:
handler = ModelMonitoringApplicationBase.get_job_handler("module.AppClass") job.with_source_archive( "git://github.com/owner/repo.git#branch-category/specific-task", workdir="path/to/application/folder", handler=handler, )
- Parameters:
class_handler -- The handler to the class, e.g.
path.to.module::MonitoringApplication, useful when using Git sources or code from images. IfNone, the current class, deriving fromModelMonitoringApplicationBase, is used.func_path -- The path to the function. If
None, the current notebook is used.func_name -- The name of the function. If
None, the normalized class name is used (mlrun.utils.helpers.normalize_name()). A"-batch"suffix is guaranteed to be added if not already there. The function name is also used as the application name to use for the results.tag -- Tag for the function.
image -- Docker image to run the job on (when running remotely).
with_repo -- Whether to clone the current repo to the build source.
requirements -- List of Python requirements to be installed in the image.
requirements_file -- Path to a Python requirements file to be installed in the image.
project -- The current project to set the function to. If not set, the current project is used.
- Returns:
The
KubejobRuntimejob that wraps the model monitoring application's logic.
- class mlrun.model_monitoring.applications.evidently.EvidentlyModelMonitoringApplicationBase(evidently_project_id: str | UUID, evidently_workspace_path: str | None = None, cloud_workspace: bool = False)[source]#
A class for integrating Evidently for MLRun model monitoring within a monitoring application.
Note
The
evidentlypackage is not installed by default in the mlrun/mlrun image. It must be installed separately to use this class.- Parameters:
evidently_project_id -- (str) The ID of the Evidently project.
evidently_workspace_path -- (str) The path to the Evidently workspace.
cloud_workspace -- (bool) Whether the workspace is an Evidently Cloud workspace.
- get_cloud_workspace() CloudWorkspace[source]#
Load the Evidently cloud workspace according to the EVIDENTLY_API_KEY environment variable.
- get_workspace() WorkspaceBase[source]#
Get the Evidently workspace. Override this method for customize access to the workspace.
- static log_evidently_object(monitoring_context: MonitoringApplicationContext, evidently_object: Snapshot, artifact_name: str, unique_per_endpoint: bool = True) None[source]#
Logs an Evidently report or suite as an artifact.
Caution
Logging Evidently objects in every model monitoring window may cause scale issues. This method should be called on special occasions only.
- Parameters:
monitoring_context -- (MonitoringApplicationContext) The monitoring context to process.
evidently_object -- (Snapshot) The Evidently run to log, e.g. a report run.
artifact_name -- (str) The name for the logged artifact.
unique_per_endpoint -- by default
True, we will log different artifact for each model endpoint, set toFalsewithout changing item key will cause artifact override.
- class mlrun.model_monitoring.applications.histogram_data_drift.DataDriftClassifier(potential: float = 0.5, detected: float = 0.7)[source]#
Classify data drift numeric values into categorical status.
- class mlrun.model_monitoring.applications.histogram_data_drift.HistogramDataDriftApplication(value_classifier: ValueClassifier | None = None, produce_json_artifact: bool = False, produce_plotly_artifact: bool = False)[source]#
MLRun's default data drift application for model monitoring.
The application expects tabular numerical data, and calculates three metrics over the shared features' histograms. The metrics are calculated on features that have reference data from the training dataset. When there is no reference data (feature_stats), this application send a warning log and does nothing. The three metrics are:
Hellinger distance.
Total variance distance.
Kullback-Leibler divergence.
Each metric is calculated over all the features individually and the mean is taken as the metric value. The average of Hellinger and total variance distance is taken as the result.
The application can log two artifacts (disabled by default due to performance issues):
JSON with the general drift value per feature.
Plotly table with the various metrics and histograms per feature.
This application is deployed by default when calling
enable_model_monitoring(). To avoid it, passdeploy_histogram_data_drift_app=False.If you want to change the application defaults, such as the classifier or which artifacts to produce, you need to inherit from this class and deploy it as any other model monitoring application. Please make sure to keep the default application name. This ensures that the full functionality of the application, including the statistics view in the UI, is available.
- Parameters:
value_classifier -- Classifier object that adheres to the
ValueClassifierprotocol. If not provided, the defaultDataDriftClassifieris used.produce_json_artifact -- Whether to produce the JSON artifact or not,
Falseby default.produce_plotly_artifact -- Whether to produce the Plotly artifact or not,
Falseby default.
- do_tracking(monitoring_context: MonitoringApplicationContext) list[ModelMonitoringApplicationResult | ModelMonitoringApplicationMetric | _ModelMonitoringApplicationStats][source]#
Calculate and return the data drift metrics, averaged over the features.