Part 3: Serving

In this part we will user MLRun’s serving runtime to deploy our trained models from the previous stage a Voting Ensemble using max vote logic.
We will also use MLRun’s Feature store to receive the latest tag of the online Feature Vector we defined in the preveious stage.

By the end of this tutorial you’ll learn how to:

  • Define a model class to load our models, run preprocessing and predict on the data

  • Define Voting Ensemble function on top of our models

  • Test the serving function locally using our mock server

  • Deploy the function to the cluster and test it live

Environment Setup

Since our work is done in a this project scope, we will first want to define the project itself for all our MLRun work in this notebook.

project_name = 'fraud-demo'
import mlrun

# Initialize the MLRun project object
project = mlrun.get_or_create_project(project_name, context="./", user_project=True)
> 2021-09-27 05:26:55,710 [info] loaded project fraud-demo from MLRun DB

Define Model Class

  • Load models

  • Predict from the FS Online service via the source key

# mlrun: start-code
import numpy as np
from cloudpickle import load
from mlrun.serving.v2_serving import V2ModelServer

class ClassifierModel(V2ModelServer):
    
    def load(self):
        """load and initialize the model and/or other elements"""
        model_file, extra_data = self.get_model('.pkl')
        self.model = load(open(model_file, 'rb'))
        
    def predict(self, body: dict) -> list:
        """Generate model predictions from sample"""
        print(f"Input -> {body['inputs']}")
        feats = np.asarray(body['inputs'])
        result: np.ndarray = self.model.predict(feats)
        return result.tolist()
# mlrun: end-code

Define a Serving Function

MLRun serving can produce managed real-time serverless pipelines from various tasks, including MLRun models or standard model files. The pipelines use the Nuclio real-time serverless engine, which can be deployed anywhere. Nuclio is a high-performance open-source serverless framework that’s focused on data, I/O, and compute-intensive workloads.

In the code below we perform the following steps:

  • Gather ClassifierModel code from this notebook

  • Define VotingEnsemble - Max-Vote based ensemble

  • Add the previously trained models to the ensemble

# Create the serving function from our code above
serving_fn = mlrun.code_to_function('transaction-fraud', kind='serving', image="mlrun/mlrun")

serving_fn.set_topology('router', 'mlrun.serving.routers.EnrichmentVotingEnsemble', name='VotingEnsemble',
                        feature_vector_uri="transactions-fraud-short", impute_policy={"*": "$mean"})

model_names = [
'RandomForestClassifier',
'GradientBoostingClassifier',
'AdaBoostClassifier'
]

for i, name in enumerate(model_names, start=1):
    serving_fn.add_model(name, class_name="ClassifierModel", model_path=project.get_artifact_uri(f"training_model#{i}:latest"))

# Plot the ensemble configuration
serving_fn.spec.graph.plot()
../../_images/03-deploy-serving-model_9_0.svg

Deploying the function on the kubernetes cluster

We can now deploy the function, once deployed we will get a function with http trigger that can be called from other locations.

import os

# Enable model monitoring
serving_fn.set_tracking()
project.set_model_monitoring_credentials(os.getenv('V3IO_ACCESS_KEY'))

# Deploy the serving function
serving_fn.deploy()
> 2021-09-27 05:27:05,199 [info] Starting remote function deploy
2021-09-27 05:27:06  (info) Deploying function
2021-09-27 05:27:06  (info) Building
2021-09-27 05:27:06  (info) Staging files and preparing base images
2021-09-27 05:27:06  (info) Building processor image
2021-09-27 05:27:07  (info) Build complete
2021-09-27 05:27:13  (info) Function deploy complete
> 2021-09-27 05:27:14,661 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-fraud-demo-iguazio-transaction-fraud.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['default-tenant.app.insucefujgwr.iguazio-cd1.com:31078']}
Input -> [[14.68, 14.68, 1.0, 14.68, 53.25]]Input -> [[14.68, 14.68, 1.0, 14.68, 53.25]]

Input -> [[14.68, 14.68, 1.0, 14.68, 53.25]]
'http://default-tenant.app.insucefujgwr.iguazio-cd1.com:31078'

Test the Server

We can test the serving function and examine the model output

# Choose an id for our test
sample_id = 'C76780537'

model_inference_path = '/v2/models/infer'

# Send our sample ID for predcition
serving_fn.invoke(path=model_inference_path,
                  body={'inputs': [[sample_id]]})
> 2021-09-27 05:27:26,328 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-iguazio-transaction-fraud.default-tenant.svc.cluster.local:8080/v2/models/infer'}
{'id': '3ab4374f-e93b-45c1-8821-48d8c4c878f9',
 'model_name': 'VotingEnsemble',
 'outputs': [0],
 'model_version': 'v1'}

We can also directly query the feature store values, which is used in the enrichment

import mlrun.feature_store as fstore

# Create the online feature service
svc = fstore.get_online_feature_service('transactions-fraud-short:latest')

# Get sample feature vector
sample_fv = svc.get([{'source': sample_id}])
sample_fv
> 2021-09-19 18:05:12,550 [info] Starting remote function deploy
2021-09-19 18:05:14  (info) Deploying function
2021-09-19 18:05:14  (info) Building
2021-09-19 18:05:15  (info) Staging files and preparing base images
2021-09-19 18:05:15  (info) Building processor image
2021-09-19 18:05:16  (info) Build complete
2021-09-19 18:05:21  (info) Function deploy complete
> 2021-09-19 18:05:22,300 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-fraud-demo-admin-transaction-fraud.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['fraud-demo-admin-transaction-fraud-fraud-demo-admin.default-tenant.app.jnewriujxdig.iguazio-cd1.com/']}
[{'amount_max_2h': -inf,
  'amount_max_12h': 53.25,
  'amount_sum_2h': 0.0,
  'amount_count_2h': 0.0,
  'amount_avg_2h': nan}]

Simulate incoming data

# Load the dataset
data = mlrun.get_dataitem('https://s3.wasabisys.com/iguazio/data/fraud-demo-mlrun-fs-docs/data.csv').as_df()

# Sample 50k lines
data = data.sample(50000)

# keys
sample_ids = data['source'].to_list()
from random import choice, uniform
from time import sleep

# Sending random requests
for _ in range(4000):
    data_point = choice(sample_ids)
    try:
        serving_fn.invoke(path=model_inference_path, body={'inputs': [[data_point]]})
        sleep(uniform(0.2, 1.7))
    except OSError:
        pass
> 2021-09-27 05:27:37,106 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-iguazio-transaction-fraud.default-tenant.svc.cluster.local:8080/v2/models/infer'}
> 2021-09-27 05:27:38,332 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-iguazio-transaction-fraud.default-tenant.svc.cluster.local:8080/v2/models/infer'}
> 2021-09-27 05:27:39,912 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-iguazio-transaction-fraud.default-tenant.svc.cluster.local:8080/v2/models/infer'}
> 2021-09-27 05:27:41,600 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-iguazio-transaction-fraud.default-tenant.svc.cluster.local:8080/v2/models/infer'}