Model monitoring using LLM#

This tutorial illustrates a model monitoring system that leverages LLMs to maintain high standards for deployed models.

In this tutorial

This tutorial explains how an LLM can be monitored. To see it in action, run the Large Language Model Monitoring demo.

Prerequisites#

  • GPU node with NVIDIA drivers is necessary for the serving function

import mlrun
from mlrun.features import Feature
from mlrun.datastore.datastore_profile import DatastoreProfileV3io

Create the project

project = mlrun.get_or_create_project(name="llm-monitoring-intro", context="./")
> 2024-12-15 08:42:56,260 [info] Project loaded successfully: {"project_name":"llm-monitoring-intro"}

Set the credentials

tsdb_profile = DatastoreProfileV3io(name="v3io-tsdb-profile")
project.register_datastore_profile(tsdb_profile)

stream_profile = DatastoreProfileV3io(
    name="v3io-stream-profile",
    v3io_access_key=mlrun.mlconf.get_v3io_access_key(),
)
project.register_datastore_profile(stream_profile)
project.set_model_monitoring_credentials(
    tsdb_profile_name=tsdb_profile.name,
    stream_profile_name=stream_profile.name,
)

Enable model monitoring for the project

project.enable_model_monitoring(
    base_period=2,  # frequency (in minutes) at which the monitoring applications are triggered
)
> 2024-12-15 08:42:58,423 [warning] enable_model_monitoring: 'base_period' < 10 minutes is not supported in production environments: {"project":"llm-monitoring-intro"}

Add the monitoring-function code#

The monitoring function code collects the traffic to the serving function, analyzes it, and generates results for the specified metric.

%%writefile monit-code.py
import re
from typing import Any, Union

import mlrun
import mlrun.common.schemas
from mlrun.model_monitoring.applications import (
    ModelMonitoringApplicationBase,
    ModelMonitoringApplicationResult,
)

STATUS_RESULT_MAPPING = {
    0: mlrun.common.schemas.model_monitoring.constants.ResultStatusApp.detected,
    1: mlrun.common.schemas.model_monitoring.constants.ResultStatusApp.no_detection,
}


class LLMAsAJudgeApplication(ModelMonitoringApplicationBase):

    def do_tracking(
        self,
        monitoring_context,
    ) -> Union[
        ModelMonitoringApplicationResult, list[ModelMonitoringApplicationResult]
    ]:
        
        # User monitoring sampling, in this case an integer representing model performance
        # Can be calulated based off the traffic to the function using monitoring_context.sample_df
        result = 0.9

        tag = re.sub(pattern, "-", str(monitoring_context.end_infer_time))
        monitoring_context.log_dataset(
            key="llm-monitoring-df",
            df=monitoring_context.sample_df
        )

        # get status:
        status = STATUS_RESULT_MAPPING[round(result)]

        return ModelMonitoringApplicationResult(
            name="llm-monitoring-df",
            value=result,
            kind=mlrun.common.schemas.model_monitoring.constants.ResultKindApp.model_performance,
            status=status,
            extra_data={},
        )
Writing monit-code.py

Define the model monitoring custom function that scans the traffic and calculates the performance metrics

application = project.set_model_monitoring_function(
    func="monit-code.py",
    application_class="LLMAsAJudgeApplication",
    name="llm-monit",
    image="mlrun/mlrun",
)
application.spec.readiness_timeout = 1200
project.deploy_function(application)
> 2024-12-15 08:43:12,586 [info] Starting remote function deploy
2024-12-15 08:43:12  (info) Deploying function
2024-12-15 08:43:13  (info) Building
2024-12-15 08:43:13  (info) Staging files and preparing base images
2024-12-15 08:43:13  (warn) Using user provided base image, runtime interpreter version is provided by the base image
2024-12-15 08:43:13  (info) Building processor image
2024-12-15 08:50:13  (info) Build complete
2024-12-15 08:50:22  (info) Function deploy complete
> 2024-12-15 08:50:30,666 [info] Successfully deployed function: {"external_invocation_urls":[],"internal_invocation_urls":["nuclio-llm-monitoring-intro-llm-monit.default-tenant.svc.cluster.local:8080"]}
DeployStatus(state=ready, outputs={'endpoint': 'http://nuclio-llm-monitoring-intro-llm-monit.default-tenant.svc.cluster.local:8080', 'name': 'llm-monitoring-intro-llm-monit'})

Create a model serving class that loads the LLM and generates responses

%%writefile model-serving.py
import mlrun
from mlrun.serving.v2_serving import V2ModelServer
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import Any

class LLMModelServer(V2ModelServer):

    def __init__(
        self,
        context: mlrun.MLClientCtx = None,
        name: str = None,
        model_path: str = None,
        model_name: str = None,
        **kwargs
    ):
        super().__init__(name=name, context=context, model_path=model_path, **kwargs)
        self.model_name = model_name
    
    def load(
        self,
    ):
        # Load the model from Hugging Face
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(self.model_name)


    def predict(self, request: dict[str, Any]):
        inputs = request.get("inputs", [])
      
        input_ids, attention_mask = self.tokenizer(
            inputs[0], return_tensors="pt"
        ).values()

        outputs = self.model.generate(input_ids=input_ids, attention_mask=attention_mask)

        # Remove input:
        outputs = self.tokenizer.decode(outputs[0])
        outputs = outputs.split(inputs[0])[-1].replace(self.tokenizer.eos_token, "")
        return [{"generated_text": outputs}]
Overwriting model-serving.py

Create the serving function using the class you just defined

serving_fn = project.set_function(
    func="model-serving.py",
    name="llm-server",
    kind="serving",
    image="gcr.io/iguazio/llm-serving-base:1.0",
    requirements=[
        "mlrun==" + mlrun.__version__,
        "transformers",
        "adapters",
        "openai",
        "protobuf==3.20.3",
    ],
)

# Set readiness timeout to 20 minutes, deploy might take a while.
serving_fn.spec.readiness_timeout = 1200

# Attach fuse mount to the function
serving_fn.apply(mlrun.auto_mount())

serving_fn.with_limits(gpus=1)

Deploy the model, enable tracking, and deploy the function#

This tutorial uses the gpt2 model by Google.

Log the model to the project

base_model = "gpt2"
project.log_model(
    base_model,
    model_file="src/model-iris.pkl",
    inputs=[Feature(value_type="str", name="question")],
    outputs=[Feature(value_type="str", name="answer")],
)
<mlrun.artifacts.model.ModelArtifact at 0x7f97892bd4c0>

Adding the model parameters to the endpoint. This allow the model server class to initialize.

serving_fn.add_model(
    "gpt2",
    class_name="LLMModelServer",
    model_path=f"store://models/{project.name}/gpt2:latest",
    model_name="gpt2",
)
<mlrun.serving.states.TaskStep at 0x7f97892bdc10>

Enable tracking for the function, then deploy it.

serving_fn.set_tracking()
deployment = serving_fn.deploy()
> 2024-12-15 09:01:57,479 [info] Starting remote function deploy
2024-12-15 09:01:57  (info) Deploying function
2024-12-15 09:01:57  (info) Building
2024-12-15 09:01:58  (info) Staging files and preparing base images
2024-12-15 09:01:58  (warn) Using user provided base image, runtime interpreter version is provided by the base image
2024-12-15 09:01:58  (info) Building processor image
2024-12-15 09:09:33  (info) Build complete
2024-12-15 09:10:35  (info) Function deploy complete
> 2024-12-15 09:10:41,055 [info] Successfully deployed function: {"external_invocation_urls":["llm-monitoring-intro-llm-server.default-tenant.app.llm-3-6-0.iguazio-cd1.com/"],"internal_invocation_urls":["nuclio-llm-monitoring-intro-llm-server.default-tenant.svc.cluster.local:8080"]}

Let's generate traffic against the model:

import time


def question_model(questions, serving_function, base_model):
    for question in questions:
        seconds = 0.5
        # Invoking the pretrained model:
        ret = serving_fn.invoke(
            path=f"/v2/models/{base_model}/infer",
            body={"inputs": [question]},
        )
        print(ret)
        time.sleep(seconds)
example_questions = [
    "What is a mortgage?",
    "How does a credit card work?",
    "Who painted the Mona Lisa?",
    "Please plan me a 4-days trip to north Italy",
    "Write me a song",
    "How much people are there in the world?",
    "What is climate change?",
    "How does the stock market work?",
    "Who wrote 'To Kill a Mockingbird'?",
    "Please plan me a 3-day trip to Paris",
    "Write me a poem about the ocean",
    "How many continents are there in the world?",
    "What is artificial intelligence?",
    "How does a hybrid car work?",
    "Who invented the telephone?",
    "Please plan me a week-long trip to New Zealand",
]
question_model(
    questions=example_questions,
    serving_function=serving_fn,
    base_model=base_model,
)

Now the traffic to the function is analyzed and the performance is calculated.