Source code for mlrun.model_monitoring.api

# Copyright 2023 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import hashlib
import typing

import numpy as np
import pandas as pd

import mlrun.common.schemas.model_monitoring.constants as mm_constants
import mlrun.datastore.base
import mlrun.feature_store
import mlrun.model_monitoring.applications as mm_app
import mlrun.serving
from mlrun.data_types.infer import InferOptions, get_df_stats
from mlrun.utils import check_if_hub_uri, datetime_now, merge_requirements

from ..common.schemas.hub import HubModuleType

# A union of all supported dataset types:
DatasetType = typing.Union[
    mlrun.datastore.base.DataItem,
    list,
    dict,
    pd.DataFrame,
    pd.Series,
    np.ndarray,
    typing.Any,
]


[docs] def get_sample_set_statistics( sample_set: DatasetType = None, model_artifact_feature_stats: dict | None = None, sample_set_columns: list | None = None, sample_set_drop_columns: list | None = None, sample_set_label_columns: list | None = None, ) -> dict: """ Get the sample set statistics either from the given sample set or the statistics logged with the model while favoring the given sample set. :param sample_set: A sample dataset to give to compare the inputs in the drift analysis. :param model_artifact_feature_stats: The `feature_stats` attribute in the spec of the model artifact, where the original sample set statistics of the model was used. :param sample_set_columns: The column names of sample_set. :param sample_set_drop_columns: ``str`` / ``int`` or a list of ``str`` / ``int`` that represent the column names / indices to drop. :param sample_set_label_columns: The target label(s) of the column(s) in the dataset. for Regression or Classification tasks. :returns: The sample set statistics. raises MLRunInvalidArgumentError: If no sample set or statistics were given. """ # Check if a sample set was provided: if sample_set is None: # Check if the model was logged with a sample set: if model_artifact_feature_stats is None: raise mlrun.errors.MLRunInvalidArgumentError( "Cannot perform drift analysis as there is no sample set to compare to. The model artifact was not " "logged with a sample set and `sample_set` was not provided to the function." ) # Return the statistics logged with the model: return model_artifact_feature_stats # Turn other object types to DataFrame: # A DataFrame is necessary for executing the "drop features" operation. dataset_types = list(DatasetType.__args__) if typing.Any in dataset_types: dataset_types.remove(typing.Any) if isinstance( sample_set, tuple(dataset_types), ): sample_set, _ = read_dataset_as_dataframe( dataset=sample_set, feature_columns=sample_set_columns, drop_columns=sample_set_drop_columns, label_columns=sample_set_label_columns, ) else: raise mlrun.errors.MLRunInvalidArgumentError( f"Parameter sample_set has an unsupported type: {type(sample_set)}" ) # Return the sample set statistics: return get_df_stats(df=sample_set, options=InferOptions.Histogram)
[docs] def read_dataset_as_dataframe( dataset: DatasetType, feature_columns: typing.Union[str, list[str]] | None = None, label_columns: typing.Union[str, list[str]] | None = None, drop_columns: typing.Union[str, list[str], int, list[int]] | None = None, ) -> tuple[pd.DataFrame, list[str]]: """ Parse the given dataset into a DataFrame and drop the columns accordingly. In addition, the label columns will be parsed and validated as well. :param dataset: A dataset that will be converted into a DataFrame. Can be either a list of lists, numpy.ndarray, dict, pd.Series, DataItem or a FeatureVector. :param feature_columns: List of feature columns that will be used to build the dataframe when dataset is from type list or numpy array. :param label_columns: The target label(s) of the column(s) in the dataset. for Regression or Classification tasks. :param drop_columns: ``str`` / ``int`` or a list of ``str`` / ``int`` that represent the column names / indices to drop. :returns: A tuple of: [0] = The parsed dataset as a DataFrame [1] = Label columns. raises MLRunInvalidArgumentError: If the `drop_columns` are not matching the dataset or unsupported dataset type. """ # Turn the `drop labels` into a list if given: if drop_columns is not None: if not isinstance(drop_columns, list): drop_columns = [drop_columns] # Check if the dataset is in fact a Feature Vector: if isinstance(dataset, mlrun.feature_store.FeatureVector): # Try to get the label columns if not provided: if label_columns is None: label_columns = dataset.status.label_column # Get the features and parse to DataFrame: dataset = dataset.get_offline_features(drop_columns=drop_columns).to_dataframe() elif isinstance(dataset, list | np.ndarray): if not feature_columns: raise mlrun.errors.MLRunInvalidArgumentError( "Feature columns list must be provided when dataset input as from type list or numpy array" ) # Parse the list / numpy array into a DataFrame: dataset = pd.DataFrame(dataset, columns=feature_columns) # Validate the `drop_columns` is given as integers: if drop_columns and not all(isinstance(col, int) for col in drop_columns): raise mlrun.errors.MLRunInvalidArgumentError( "`drop_columns` must be an integer / list of integers if provided as a list." ) elif isinstance(dataset, mlrun.DataItem): if ( not dataset.url and dataset.artifact_url and mlrun.datastore.parse_store_uri(dataset.artifact_url)[0] == mlrun.utils.StorePrefix.FeatureVector ): raise mlrun.errors.MLRunInvalidArgumentError( f"No data has been found. Make sure you have applied `get_offline_features` " f"on your feature vector {dataset.artifact_url} with a valid target before passing " f"it as an input." ) # Turn the DataItem to DataFrame: dataset = dataset.as_df() else: # Parse the object (should be a pd.DataFrame / pd.Series, dictionary) into a DataFrame: try: dataset = pd.DataFrame(dataset) except ValueError as e: raise mlrun.errors.MLRunInvalidArgumentError( f"Could not parse the given dataset of type {type(dataset)} into a pandas DataFrame. " f"Received the following error: {e}" ) # Drop columns if needed: if drop_columns: dataset.drop(drop_columns, axis=1, inplace=True) # Turn the `label_columns` into a list by default: if label_columns is None: label_columns = [] elif isinstance(label_columns, str | int): label_columns = [label_columns] return dataset, label_columns
def log_result( context: "mlrun.MLClientCtx", result_set_name: str, result_set: pd.DataFrame, artifacts_tag: str, batch_id: str, ) -> None: # Log the result set: context.logger.info("Logging result set (x | prediction)...") context.log_dataset( key=result_set_name, df=result_set, db_key=result_set_name, tag=artifacts_tag, ) # Log the batch ID: if batch_id is None: batch_id = hashlib.sha224(str(datetime_now()).encode()).hexdigest() context.log_result( key="batch_id", value=batch_id, ) def _create_model_monitoring_function_base( *, project: str, func: typing.Union[str, None] = None, application_class: typing.Union[ str, "mm_app.ModelMonitoringApplicationBase", None ] = None, name: str | None = None, image: str | None = None, tag: str | None = None, requirements: typing.Union[list[str], None] = None, requirements_file: str = "", local_path: str | None = None, otlp_enabled: bool = False, **application_kwargs, ) -> mlrun.runtimes.ServingRuntime: """ Note: this is an internal API only. This function does not set the labels or mounts v3io. """ if name in mm_constants._RESERVED_FUNCTION_NAMES: raise mlrun.errors.MLRunValueError( "An application cannot have the following names: " f"{mm_constants._RESERVED_FUNCTION_NAMES}" ) _, has_valid_suffix, suffix = mlrun.utils.helpers.ensure_batch_job_suffix(name) if name and not has_valid_suffix: raise mlrun.errors.MLRunValueError( f"Model monitoring application names cannot end with `{suffix}`" ) if func is None: func = "" if check_if_hub_uri(func): hub_module = mlrun.get_hub_module(url=func, local_path=local_path) if hub_module.kind != HubModuleType.monitoring_app: raise mlrun.errors.MLRunInvalidArgumentError( "The provided module is not a monitoring application" ) requirements = mlrun.model.ImageBuilder.resolve_requirements( requirements, requirements_file ) requirements = merge_requirements( reqs_priority=requirements, reqs_secondary=hub_module.requirements ) func = hub_module.get_module_file_path() func_obj = typing.cast( mlrun.runtimes.ServingRuntime, mlrun.code_to_function( filename=func, name=name, project=project, tag=tag, kind=mlrun.run.RuntimeKinds.serving, image=image, requirements=requirements, requirements_file=requirements_file, ), ) graph = func_obj.set_topology(mlrun.serving.states.StepKinds.flow) prepare_step = graph.to( class_name="mlrun.model_monitoring.applications._application_steps._PrepareMonitoringEvent", name="PrepareMonitoringEvent", application_name=name, ) if isinstance(application_class, str): app_step = prepare_step.to(class_name=application_class, **application_kwargs) else: app_step = prepare_step.to(class_name=application_class) app_step.__class__ = mlrun.serving.MonitoringApplicationStep app_step.to( class_name="mlrun.model_monitoring.applications._application_steps._PushToMonitoringWriter", name="PushToMonitoringWriter", project=project, ) if otlp_enabled: otel_prep = app_step.to( class_name="mlrun.model_monitoring.applications._application_steps._PrepareOTelEvent", name="PrepareOTelEvent", ) otel_prep.to( class_name="mlrun.serving.OTelMetricsExporter", name="OTelMetricsExporter", headers_source="file", ) func_obj.spec.mount_otlp_secret = otlp_enabled graph.error_handler( class_name="mlrun.model_monitoring.applications._application_steps._ApplicationErrorHandler", name="ApplicationErrorHandler", full_event=True, project=project, application_name=name, user_step_name=app_step.name, ) def block_to_mock_server(*args, **kwargs) -> typing.NoReturn: raise NotImplementedError( "Model monitoring serving functions do not support `.to_mock_server`. " "You may call your model monitoring application object logic via the `.evaluate` method." ) func_obj.to_mock_server = block_to_mock_server # Until ML-7643 is implemented return func_obj