Source code for mlrun.model_monitoring.applications.context

# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import socket
from typing import Any, Optional, Protocol, cast

import nuclio.request
import numpy as np
import pandas as pd

import mlrun.common.constants as mlrun_constants
import mlrun.common.schemas.model_monitoring.constants as mm_constants
import mlrun.errors
import mlrun.feature_store as fstore
import mlrun.feature_store.feature_set as fs
import mlrun.serving
import mlrun.utils
from mlrun.artifacts import Artifact, DatasetArtifact, ModelArtifact, get_model
from mlrun.common.model_monitoring.helpers import FeatureStats
from mlrun.common.schemas import ModelEndpoint
from mlrun.model_monitoring.helpers import calculate_inputs_statistics


class _ArtifactsLogger(Protocol):
    """
    Classes that implement this protocol are :code:`MlrunProject` and :code:`MLClientCtx`.
    """

    def log_artifact(self, *args, **kwargs) -> Artifact: ...
    def log_dataset(self, *args, **kwargs) -> DatasetArtifact: ...


[docs] class MonitoringApplicationContext: _logger_name = "monitoring-application" def __init__( self, *, application_name: str, event: dict[str, Any], project: "mlrun.MlrunProject", artifacts_logger: _ArtifactsLogger, logger: mlrun.utils.Logger, nuclio_logger: nuclio.request.Logger, model_endpoint_dict: dict[str, ModelEndpoint] | None = None, sample_df: pd.DataFrame | None = None, feature_stats: FeatureStats | None = None, feature_sets_dict: dict[str, fs.FeatureSet] | None = None, ) -> None: """ The :code:`MonitoringApplicationContext` object holds all the relevant information for the model monitoring application, and can be used for logging artifacts and messages. The monitoring context has the following attributes: :param application_name: (str) The model monitoring application name. :param project: (:py:class:`~mlrun.projects.MlrunProject`) The current MLRun project object. :param project_name: (str) The project name. :param logger: (:py:class:`~mlrun.utils.Logger`) MLRun logger. :param nuclio_logger: (nuclio.request.Logger) Nuclio logger. :param sample_df_stats: (FeatureStats) The new sample distribution dictionary. :param feature_stats: (FeatureStats) The train sample distribution dictionary. :param sample_df: (pd.DataFrame) The new sample DataFrame. :param start_infer_time: (pd.Timestamp) Start time of the monitoring schedule. :param end_infer_time: (pd.Timestamp) End time of the monitoring schedule. :param endpoint_id: (str) ID of the monitored model endpoint :param feature_set: (FeatureSet) the model endpoint feature set :param endpoint_name: (str) Name of the monitored model endpoint :param output_stream_uri: (str) URI of the output stream for results :param model_endpoint: (ModelEndpoint) The model endpoint object. :param feature_names: (list[str]) List of models feature names. :param label_names: (list[str]) List of models label names. :param model: (tuple[str, ModelArtifact, dict]) The model file, model spec object, and a list of extra data items. """ self.application_name = application_name self.project = project self.project_name = project.name self._artifacts_logger = artifacts_logger # MLRun Logger self.logger = logger # Nuclio logger - `nuclio.request.Logger`. # Note: this logger accepts keyword arguments only in its `_with` methods, e.g. `info_with`. self.nuclio_logger = nuclio_logger # event data self.start_infer_time = pd.Timestamp( cast(str, event.get(mm_constants.ApplicationEvent.START_INFER_TIME)) ) self.end_infer_time = pd.Timestamp( cast(str, event.get(mm_constants.ApplicationEvent.END_INFER_TIME)) ) self.endpoint_id = cast( str, event.get(mm_constants.ApplicationEvent.ENDPOINT_ID) ) self.endpoint_name = cast( str, event.get(mm_constants.ApplicationEvent.ENDPOINT_NAME) ) self._feature_stats: FeatureStats | None = feature_stats self._sample_df_stats: FeatureStats | None = None # Default labels for the artifacts self._default_labels = self._get_default_labels() # Persistent data - fetched when needed self._sample_df: pd.DataFrame | None = sample_df self._model_endpoint: ModelEndpoint | None = ( model_endpoint_dict.get(self.endpoint_id) if model_endpoint_dict else None ) self._feature_set: fs.FeatureSet | None = ( feature_sets_dict.get(self.endpoint_id) if feature_sets_dict else None ) store, _, _ = mlrun.store_manager.get_or_create_store( mlrun.mlconf.artifact_path ) self.storage_options = store.get_storage_options() @classmethod def _from_ml_ctx( cls, context: "mlrun.MLClientCtx", *, project: Optional["mlrun.MlrunProject"] = None, application_name: str, event: dict[str, Any], model_endpoint_dict: dict[str, ModelEndpoint] | None = None, sample_df: pd.DataFrame | None = None, feature_stats: FeatureStats | None = None, ) -> "MonitoringApplicationContext": project = project or context.get_project_object() if not project: raise mlrun.errors.MLRunValueError("Could not load project from context") logger = context.logger artifacts_logger = context nuclio_logger = nuclio.request.Logger( level=mlrun.mlconf.log_level, name=cls._logger_name ) return cls( application_name=application_name, event=event, model_endpoint_dict=model_endpoint_dict, project=project, logger=logger, nuclio_logger=nuclio_logger, artifacts_logger=artifacts_logger, sample_df=sample_df, feature_stats=feature_stats, ) @classmethod def _from_graph_ctx( cls, graph_context: mlrun.serving.GraphContext, *, application_name: str, event: dict[str, Any], model_endpoint_dict: dict[str, ModelEndpoint] | None = None, sample_df: pd.DataFrame | None = None, feature_stats: FeatureStats | None = None, feature_sets_dict: dict[str, fs.FeatureSet] | None = None, ) -> "MonitoringApplicationContext": nuclio_logger = graph_context.logger artifacts_logger = graph_context.project_obj logger = mlrun.utils.create_logger( level=mlrun.mlconf.log_level, formatter_kind=mlrun.mlconf.log_formatter, name=cls._logger_name, ) return cls( application_name=application_name, event=event, project=graph_context.project_obj, model_endpoint_dict=model_endpoint_dict, logger=logger, nuclio_logger=nuclio_logger, artifacts_logger=artifacts_logger, sample_df=sample_df, feature_stats=feature_stats, feature_sets_dict=feature_sets_dict, ) def _get_default_labels(self) -> dict[str, str]: labels = { mlrun_constants.MLRunInternalLabels.runner_pod: socket.gethostname(), mlrun_constants.MLRunInternalLabels.producer_type: "model-monitoring-app", mlrun_constants.MLRunInternalLabels.app_name: self.application_name, mlrun_constants.MLRunInternalLabels.endpoint_id: self.endpoint_id, mlrun_constants.MLRunInternalLabels.endpoint_name: self.endpoint_name, } return {key: value for key, value in labels.items() if value is not None} def _add_default_labels(self, labels: dict[str, str] | None) -> dict[str, str]: """Add the default labels to logged artifacts labels""" return (labels or {}) | self._default_labels @property def sample_df(self) -> pd.DataFrame: """The new sample DataFrame for the monitored window. :raises mlrun.errors.MLRunValueError: if the sample DataFrame cannot be provided because the model endpoint's details and times were not supplied (and no ``sample_data`` was given directly). :raises mlrun.errors.MLRunEmptySampleDFError: if there is no inference data logged in the requested time window. This dedicated error type lets callers iterating over monitoring windows skip empty windows without swallowing unrelated value errors (e.g. the missing-details case above). """ if self._sample_df is None: if ( self.endpoint_name is None or self.endpoint_id is None or pd.isnull(self.start_infer_time) or pd.isnull(self.end_infer_time) ): raise mlrun.errors.MLRunValueError( "You have tried to access `monitoring_context.sample_df`, but have not provided it directly " "through `sample_data`, nor have you provided the model endpoint's name, ID, and the start and " f"end times: `endpoint_name`={self.endpoint_name}, `endpoint_uid`={self.endpoint_id}, " f"`start`={self.start_infer_time}, and `end`={self.end_infer_time}. " "You can either provide the sample dataframe directly, the model endpoint's details and times, " "or adapt the application's logic to not access the sample dataframe." ) df = self.feature_set.to_dataframe( start_time=self.start_infer_time, end_time=self.end_infer_time, time_column=mm_constants.EventFieldType.TIMESTAMP, storage_options=self.storage_options, ) if df.empty: raise mlrun.errors.MLRunEmptySampleDFError( "The sample dataframe is empty, which may indicate that there are no features logged in the " "model endpoint during the specified time window. Please check that your model endpoint is logging " "features correctly, and that the time window is correct." ) self._sample_df = df.reset_index(drop=True) return self._sample_df @property def model_endpoint(self) -> ModelEndpoint: if not self._model_endpoint: if self.endpoint_name is None or self.endpoint_id is None: raise mlrun.errors.MLRunValueError( "You have NOT provided the model endpoint's name and ID: " f"`endpoint_name`={self.endpoint_name} and `endpoint_id`={self.endpoint_id}, " "but you have tried to access `monitoring_context.model_endpoint` " "directly or indirectly in your application. You can either provide them, " "or adapt the application's logic to not access the model endpoint." ) self._model_endpoint = mlrun.db.get_run_db().get_model_endpoint( name=self.endpoint_name, project=self.project_name, endpoint_id=self.endpoint_id, feature_analysis=True, tsdb_metrics=False, ) return self._model_endpoint @property def feature_set(self) -> fs.FeatureSet: if not self._feature_set and self.model_endpoint: self._feature_set = fstore.get_feature_set( self.model_endpoint.spec.monitoring_feature_set_uri ) return self._feature_set @property def feature_stats(self) -> FeatureStats: if not self._feature_stats: self._feature_stats = self.model_endpoint.spec.feature_stats return self._feature_stats @property def sample_df_stats(self) -> FeatureStats: """statistics of the sample dataframe""" if not self._sample_df_stats: self._sample_df_stats = calculate_inputs_statistics( self.feature_stats, self.sample_df ) return self._sample_df_stats @property def feature_names(self) -> list[str]: """The feature names of the model""" return self.model_endpoint.spec.feature_names @property def label_names(self) -> list[str]: """The label names of the model""" return self.model_endpoint.spec.label_names @property def model(self) -> tuple[str, ModelArtifact, dict]: """The model file, model spec object, and a list of extra data items""" return get_model(self.model_endpoint.spec.model_uri)
[docs] @staticmethod def dict_to_histogram(histogram_dict: FeatureStats) -> pd.DataFrame: """ Convert histogram dictionary to pandas DataFrame with feature histograms as columns :param histogram_dict: Histogram dictionary :returns: Histogram dataframe """ # Create a dictionary with feature histograms as values histograms = {} for feature, stats in histogram_dict.items(): if "hist" in stats: # Normalize to probability distribution of each feature histograms[feature] = np.array(stats["hist"][0]) / stats["count"] # Convert the dictionary to pandas DataFrame histograms = pd.DataFrame(histograms) return histograms
[docs] def log_artifact( self, item, body=None, tag: str = "", local_path: str = "", artifact_path: str | None = None, format: str | None = None, upload: bool | None = None, labels: dict[str, str] | None = None, target_path: str | None = None, unique_per_endpoint: bool = True, **kwargs, ) -> Artifact: """ Log an artifact. .. caution:: Logging artifacts in every model monitoring window may cause scale issues. This method should be called on special occasions only. See :func:`~mlrun.projects.MlrunProject.log_artifact` for the full documentation, except for one new argument: :param unique_per_endpoint: by default ``True``, we will log different artifact for each model endpoint, set to ``False`` without changing item key will cause artifact override. """ labels = self._add_default_labels(labels) # By default, we want to log different artifact for each model endpoint endpoint_id = labels.get(mlrun_constants.MLRunInternalLabels.endpoint_id, "") if unique_per_endpoint and isinstance(item, str): item = f"{item}-{endpoint_id}" if endpoint_id else item elif unique_per_endpoint: # isinstance(item, Artifact) is True item.key = f"{item.key}-{endpoint_id}" if endpoint_id else item.key return self._artifacts_logger.log_artifact( item, body=body, tag=tag, local_path=local_path, artifact_path=artifact_path, format=format, upload=upload, labels=labels, target_path=target_path, **kwargs, )
[docs] def log_dataset( self, key, df, tag="", local_path=None, artifact_path=None, upload=None, labels=None, format="", preview=None, stats=None, target_path="", extra_data=None, label_column: str | None = None, unique_per_endpoint: bool = True, **kwargs, ) -> DatasetArtifact: """ Log a dataset artifact. .. caution:: Logging datasets in every model monitoring window may cause scale issues. This method should be called on special occasions only. See :func:`~mlrun.projects.MlrunProject.log_dataset` for the full documentation, except for one new argument: :param unique_per_endpoint: by default ``True``, we will log different artifact for each model endpoint, set to ``False`` without changing item key will cause artifact override. """ labels = self._add_default_labels(labels) # By default, we want to log different artifact for each model endpoint endpoint_id = labels.get(mlrun_constants.MLRunInternalLabels.endpoint_id, "") if unique_per_endpoint and isinstance(key, str): key = f"{key}-{endpoint_id}" if endpoint_id else key return self._artifacts_logger.log_dataset( key, df, tag=tag, local_path=local_path, artifact_path=artifact_path, upload=upload, labels=labels, format=format, preview=preview, stats=stats, target_path=target_path, extra_data=extra_data, label_column=label_column, **kwargs, )