Creating a custom model serving class#

Model serving classes implement the full model serving functionality which include loading models, pre- and post-processing, prediction, explainability, and model monitoring.

Model serving classes must inherit from mlrun.serving.V2ModelServer, and at the minimum implement the load() (download the model file(s) and load the model into memory) and predict() (accept request payload and return prediction/inference results) methods.

The class is initialized automatically by the model server and can run locally as part of a nuclio serverless function, or as part of a real-time pipeline

You need to implement two mandatory methods:

  • load() - download the model file(s) and load the model into memory, note this can be done synchronously or asynchronously

  • predict() - accept request payload and return prediction/inference results

You can override additional methods : preprocess, validate, postprocess, explain.
You can add a custom api endpoint by adding method op_xx(event). Invoke it by calling the /xx (operation = xx).

In this section

Minimal sklearn serving function example#

from cloudpickle import load
import numpy as np
import mlrun

class ClassifierModel(mlrun.serving.V2ModelServer):
    def load(self):
        """load and initialize the model and/or other elements"""
        model_file, extra_data = self.get_model('.pkl')
        self.model = load(open(model_file, 'rb'))

    def predict(self, body: dict) -> list:
        """Generate model predictions from sample"""
        feats = np.asarray(body['inputs'])
        result: np.ndarray = self.model.predict(feats)
        return result.tolist()

Test the function locally using the mock server:

import mlrun
from sklearn.datasets import load_iris

fn = mlrun.new_function('my_server', kind='serving')

# set the topology/router and add models
graph = fn.set_topology("router")
fn.add_model("model1", class_name="ClassifierModel", model_path="<path1>")
fn.add_model("model2", class_name="ClassifierModel", model_path="<path2>")

# create and use the graph simulator
server = fn.to_mock_server()
x = load_iris()['data'].tolist()
result = server.test("/v2/models/model1/infer", {"inputs": x})

load() method#

In the load method, download the model from external store, run the algorithm/framework load() call, and do any other initialization logic.

The load runs synchronously (the deploy is stalled until load completes). This can be an issue for large models and cause a readiness timeout. You can increase the function spec.readiness_timeout, or alternatively choose async loading (load () runs in the background) by setting the function spec.load_mode = "async".

The function self.get_model() downloads the model metadata object and main file (into model_file path). Additional files can be accessed using the returned extra_data (dict of dataitem objects).

The model metadata object is stored in self.model_spec and provides model parameters, metrics, schema, etc. Parameters can be accessed using self.get_param(key). The parameters can be specified in the model or during the function/model deployment.

predict() method#

The predict method is called when you access the /infer or /predict url suffix (operation). The method accepts the request object (as dict), see Model server API. And it should return the specified response object.

explain() method#

The explain method provides a hook for model explainability, and is accessed using the /explain operation.

pre/post and validate hooks#

You can overwrite the preprocess, validate, and postprocess methods for additional control The call flow is:

pre-process -> validate -> predict/explain -> post-process 

Models, routers and graphs#

Every serving function can host multiple models and logical steps. Multiple functions can connect in a graph to form complex real-time pipelines.

The basic serving function has a logical router with routes to multiple child models. The url or the message determines which model is selected, e.g. using the url schema:

/v2/models/<model>[/versions/<ver>]/operation

Note: the model, version and operation can also be specified in the message body to support streaming protocols (e.g. Kafka).

More complex routers can be used to support ensembles (send the request to all child models and aggregate the result), multi-armed-bandit, etc.

You can use a pre-defined Router class, or write your own custom router. Routera can route to models on the same function or access models on a separate function.

To specify the topology, router class and class args use .set_topology() with your function.

Creating a model serving function (service)#

To provision a serving function, you need to create an MLRun function of type serving. This can be done by using the code_to_function() call from a notebook. You can also import an existing serving function/template from the marketplace.

Example (run inside a notebook): this code converts a notebook to a serving function and adding a model to it:

from mlrun import code_to_function
fn = code_to_function('my-function', kind='serving')
fn.add_model('m1', model_path=<model-artifact/dir>, class_name='MyClass', x=100)

See .add_model() docstring for help and parameters.

See the full Model Server example.

If you want to use multiple versions for the same model, use : to separate the name from the version. For example, if the name is mymodel:v2 it means model name mymodel version v2.

You should specify the model_path (url of the model artifact/dir) and the class_name name (or class module.submodule.class). Alternatively, you can set the model_url for calling a model that is served by another function (can be used for ensembles).

The function object(fn) accepts many options. You can specify replicas range (auto-scaling), cpu/gpu/mem resources, add shared volume mounts, secrets, and any other Kubernetes resource through the fn.spec object or fn methods.

For example, fn.gpu(1) means each replica uses one GPU.

To deploy a model, simply call:

fn.deploy()

You can also deploy a model from within an ML pipeline (check the various demos for details).

Model monitoring#

Model activities can be tracked into a real-time stream and time-series DB. The monitoring data is used to create real-time dashboards and track model accuracy and drift. To set the tracking stream options, specify the following function spec attributes:

    fn.set_tracking(stream_path, batch, sample)
  • stream_path - the v3io stream path (e.g. v3io:///users/..)

  • sample - optional, sample every N requests

  • batch - optional, send micro-batches every N requests