Model Monitoring (beta)

Model Endpoints Overview Section

overview

Model Endpoint Details Section

details

Model Endpoint Performance Section

performance

Initial set up (and pre-requisites)

  1. Make sure you have the mlrun-api as a Grafana data source configured in your Grafana instance. If not configured, otherwise add it by:

    1. Open your grafana instance

    2. Navigate to Configuration -> Data Sources

    3. Press Add data source

    4. Select the SimpleJson datasource and configure the following parameters

    Name: mlrun-api
    URL: http://mlrun-api:8080/api/grafana-proxy/model-endpoints
    Access: Server (default)
    
    ## Add a custom header of:
    X-V3io-Session-Key: <YOUR ACCESS KEY>
    
    1. Press Save & Test to make sure it works, a confirmation message should appear when this button is pressed

  2. Download monitoring dashboards:

    1. Overview

    2. Details

    3. Performance

  3. Import the downloaded dashboards to your Grafana instance

  4. To allow the system to utilize drift measurement, make sure you supply the train set when logging the model on the training step

    # Log model
    context.log_model(
        "model",
        body=dumps(model),
        artifact_path=context.artifact_subpath("models"),
        extra_data=eval_metrics,
        model_file="model.pkl",
        metrics=context.results,
        training_set=<TRAIN_SET>,  # <-
        label_cols=<LABEL_COLS>,  # <-
        labels={"class": "sklearn.linear_model.LogisticRegression"}
    )
    
  5. When serving the model, make sure that the Nuclio function is deployed with tracking enabled by applying fn.set_tracking() on the serving function

Configuration

The stream processing portion of the model monitoring, can be deployed under multiple configuration options. The available configurations can be found under stream.Config. Once configured it should be supplied as environment parameters to the Nuclio function by setting fn.set_envs

project: str                        # project name
sample_window: int                  # The sampling window for the data that flows into the TSDB and the KV
tsdb_batching_max_events: int       # The max amount of event to batch before writing the batch to tsdb
tsdb_batching_timeout_secs: int     # The max amount of seconds a given batch can be gathered before being emitted
parquet_batching_max_events: int    # The max amount of event to batch before writing the batch to parquet
parquet_batching_timeout_secs: int  # The max amount of seconds, a given batch can be gathered before being written to parquet
aggregate_count_windows: List[str]  # List of window sizes for predictions count
aggregate_count_period: str         # Period of predictions count windows
aggregate_avg_windows: List[str]    # List of window sizes for average latency
aggregate_avg_period: str           # Period of average latency windows
v3io_access_key: str                # V3IO Access key, if not set will be taken from environment
v3io_framesd: str                   # V3IO framesd URL, if not set will be taken from environment
# Set project name
project = ""

Deploy Model Servers

import pandas as pd
from sklearn.datasets import load_iris

from mlrun import import_function, get_dataitem
from mlrun import projects
from mlrun.platforms import auto_mount

proj = projects.new_project(project)

get_dataitem("https://s3.wasabisys.com/iguazio/models/iris/model.pkl").download("model.pkl")

iris = load_iris()
train_set = pd.DataFrame(iris['data'], columns=['sepal_length_cm', 'sepal_width_cm', 'petal_length_cm', 'petal_width_cm'])

model_names = [
    "sklearn_ensemble_RandomForestClassifier",
    "sklearn_linear_model_LogisticRegression",
    "sklearn_ensemble_AdaBoostClassifier"
]

serving_fn = import_function('hub://v2_model_server').apply(auto_mount())

for name in model_names:
    proj.log_model(name, model_file="model.pkl", training_set=train_set)
    serving_fn.add_model(name, model_path=f"store://models/{project}/{name}:latest")

serving_fn.metadata.project = project
serving_fn.set_tracking()
serving_fn.deploy()

Deploy Stream Processing

import os

from mlrun import import_function
from mlrun.platforms import mount_v3io
from mlrun.runtimes import RemoteRuntime
import json

fn: RemoteRuntime = import_function("hub://model_monitoring_stream")

fn.add_v3io_stream_trigger(
    stream_path=f"projects/{project}/model-endpoints/stream",
    name="monitoring_stream_trigger",
)

fn.set_env("MODEL_MONITORING_PARAMETERS", json.dumps({"project": project, "v3io_framesd": os.environ.get("V3IO_FRAMESD")}))

fn.metadata.project = project
fn.apply(mount_v3io())
fn.deploy()

Deploy Batch Processing

from mlrun import import_function
from mlrun.platforms import mount_v3io
from mlrun.runtimes import KubejobRuntime

fn: KubejobRuntime = import_function("hub://model_monitoring_batch")
fn.metadata.project = project
fn.apply(mount_v3io())
fn.run(name='model-monitoring-batch', schedule="0 */1 * * *", params={"project": project})

Simulating Requests

import json
from time import sleep
from random import choice, uniform
from sklearn.datasets import load_iris

iris = load_iris()
iris_data = iris['data'].tolist()

while True:
    for name in model_names:
        data_point = choice(iris_data)
        serving_fn.invoke(f'v2/models/{name}/infer', json.dumps({'inputs': [data_point]}))
        sleep(uniform(0.1, 0.4))
    sleep(uniform(0.2, 1.7))