Using model serving graphs#

A graph is composed of steps, routers, and queues, and has two modes of operation (topologies), all described and used in the examples in this section.

  • Step: A step runs a function or class handler or a REST API call. MLRun comes with a list of pre-built steps that include data manipulation, readers, writers and model serving. You can also write your own steps using standard Python functions or custom functions/classes, or can be a external REST API (the special $remote class).

  • Router: A special type of step is a router with routing logic and multiple child routes/models. The basic routing logic is to route to the child routes based on the event.path. More advanced or custom routing can be used, for example, the ensemble router sends the event to all child routes in parallel, aggregates the result and responds.

  • Queue: A queue or stream that accepts data from one or more source steps and publishes to one or more output steps. Queues are best used to connect independent functions/containers. Queues can run in-memory or be implemented using a stream, which allows it to span processes/containers. Currently queues support Iguazio v3io and Kafka streams.

In this section

Serving Functions#

To start using a serving graph, you first need a serving function. A serving function contains the serving class code to run the model and all the code necessary to run the tasks. MLRun comes with a wide library of tasks. If you use just those, you don't have to add any special code to the serving function, you only have to provide the code that runs the model. For more information about serving classes see Build your own model serving class.

For example, the following code is a basic model serving class:

# mlrun: start-code
from cloudpickle import load
from typing import List
import numpy as np

import mlrun


class ClassifierModel(mlrun.serving.V2ModelServer):
    def load(self):
        """load and initialize the model and/or other elements"""
        model_file, extra_data = self.get_model(".pkl")
        self.model = load(open(model_file, "rb"))

    def predict(self, body: dict) -> List:
        """Generate model predictions from sample."""
        feats = np.asarray(body["inputs"])
        result: np.ndarray = self.model.predict(feats)
        return result.tolist()
# mlrun: end-code

To define the serving function, create the project, then the function with project.set_function and specify kind to be serving.

project = mlrun.get_or_create_project("serving")
fn = project.set_function(name="serving_example", kind="serving", image="mlrun/mlrun")

Topology#

Once you have a serving function, you need to choose the graph topology:

Note

Once the topology is set, you cannot change an existing function topology.

Router#

The default is router topology. It is a minimal configuration with a single router and child tasks/routes. This can be used for simple model serving or single hop configurations. With the router topology you can specify different machine learning models. Each model has a logical name. This name is used to route to the correct model when calling the serving function.

from sklearn.datasets import load_iris

# set the topology/router
graph = fn.set_topology("router")

# Add the model
fn.add_model(
    "model1",
    class_name="ClassifierModel",
    model_path="https://s3.wasabisys.com/iguazio/models/iris/model.pkl",
)

# Add additional models
# fn.add_model("model2", class_name="ClassifierModel", model_path="<path2>")

# create and use the graph simulator
server = fn.to_mock_server()
x = load_iris()["data"].tolist()
result = server.test("/v2/models/model1/infer", {"inputs": x})
server.wait_for_completion()

print(result)
> 2021-11-02 04:18:36,925 [info] model model1 was loaded
> 2021-11-02 04:18:36,926 [info] Initializing endpoint records
> 2021-11-02 04:18:36,965 [info] Loaded ['model1']
{'id': '6bd11e864805484ea888f58e478d1f91', 'model_name': 'model1', 'outputs': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]}

Flow#

The flow topology is a full graph/DAG. It is implemented using two engines: async (the default) is based on Storey and asynchronous event loop; and sync, which supports a simple sequence of steps. You can use the flow topology to specify tasks, which typically manipulate the data. The most common scenario is pre-processing of data prior to the model execution.

In this topology, you build and connect the graph (DAG) by adding steps using the step.to() method, or by using the graph.add_step() method.

The step.to() is typically used to chain steps together. graph.add_step can add steps anywhere on the graph and has before and after parameters to specify the location of the step.

fn2 = project.set_function(
    name="serving_example_flow", kind="serving", image="mlrun/mlrun"
)

graph2 = fn2.set_topology("flow")

graph2_enrich = graph2.to("storey.Extend", name="enrich", _fn='({"tag": "something"})')

# add an Ensemble router with two child models (routes)
router = graph2.add_step(mlrun.serving.ModelRouter(), name="router", after="enrich")
router.add_route(
    "m1",
    class_name="ClassifierModel",
    model_path="https://s3.wasabisys.com/iguazio/models/iris/model.pkl",
)
router.respond()

# add an error handling step, run only when/if the "pre-process" step fails
graph.to(name="pre-process", handler="raising_step").error_handler(
    name="catcher", handler="handle_error", full_event=True
)

# Add additional models
# router.add_route("m2", class_name="ClassifierModel", model_path=path2)

# plot the graph (using Graphviz)
graph2.plot(rankdir="LR")
../_images/f707b7a3e0019266c2b046f092c113e7adabe36fb14dd1e09c3e4969237b8bb6.svg
fn2_server = fn2.to_mock_server()

result = fn2_server.test("/v2/models/m1/infer", {"inputs": x})
fn2_server.wait_for_completion()

print(result)
> 2021-11-02 04:18:42,142 [info] model m1 was loaded
> 2021-11-02 04:18:42,142 [info] Initializing endpoint records
> 2021-11-02 04:18:42,183 [info] Loaded ['m1']
{'id': 'f713fd7eedeb431eba101b13c53a15b5'}

Remote execution#

You can chain functions together with remote execution. This allows you to:

  • Call existing functions from the graph and reuse them from other graphs.

  • Scale up and down different components individually.

Calling a remote function can either use HTTP or via a queue (streaming).

HTTP#

Calling a function using http uses the special $remote class. First deploy the remote function:

remote_func_name = "serving-example-flow"
fn_remote = project.set_function(
    name=remote_func_name, kind="serving", image="mlrun/mlrun"
)

fn_remote.add_model(
    "model1",
    class_name="ClassifierModel",
    model_path="https://s3.wasabisys.com/iguazio/models/iris/model.pkl",
)

remote_addr = fn_remote.deploy()
> 2022-03-17 08:20:40,674 [info] Starting remote function deploy
2022-03-17 08:20:40  (info) Deploying function
2022-03-17 08:20:40  (info) Building
2022-03-17 08:20:40  (info) Staging files and preparing base images
2022-03-17 08:20:40  (info) Building processor image
2022-03-17 08:20:42  (info) Build complete
2022-03-17 08:20:47  (info) Function deploy complete
> 2022-03-17 08:20:48,289 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-graph-basic-concepts-serving-example-flow.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['graph-basic-concepts-serving-example-flow-graph-basic-concepts.default-tenant.app.maor-gcp2.iguazio-cd0.com/']}

Create a new function with a graph and call the remote function above:

fn_preprocess = project.set_function(name="preprocess", kind="serving")
graph_preprocessing = fn_preprocess.set_topology("flow")

graph_preprocessing.to("storey.Extend", name="enrich", _fn='({"tag": "something"})').to(
    "$remote", "remote_func", url=f"{remote_addr}v2/models/model1/infer", method="put"
).respond()

graph_preprocessing.plot(rankdir="LR")
../_images/56a4553e46b450f91b3446f1858e51597e65dfd688b09686219df40c565356d1.svg
fn3_server = fn_preprocess.to_mock_server()
my_data = """{"inputs":[[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}"""
result = fn3_server.test("/v2/models/my_model/infer", body=my_data)
fn3_server.wait_for_completion()
print(result)
> 2022-03-17 08:20:48,374 [warning] run command, file or code were not specified
{'id': '3a1dd36c-e7de-45af-a0c4-72e3163ba92a', 'model_name': 'model1', 'outputs': [0, 2]}

Queue (streaming)#

You can use queues to send events from one part of the graph to another and to decouple the processing of those parts. Queues are better suited to deal with bursts of events, since all the events are stored in the queue until they are processed.

V3IO stream example#

The example below uses a V3IO stream, which is a fast real-time implementation of a stream that allows processing of events at very low latency.

%%writefile echo.py
def echo_handler(x):
    print(x)
    return x
Overwriting echo.py

Configure the streams:

import os

streams_prefix = (
    f"v3io:///users/{os.getenv('V3IO_USERNAME')}/examples/graph-basic-concepts"
)

input_stream = streams_prefix + "/in-stream"
out_stream = streams_prefix + "/out-stream"
err_stream = streams_prefix + "/err-stream"

Create the graph. In the to method the class name is one of >> or $queue to specify that this is a queue. To configure a consumer group for the step, include the group in the to method.

fn_preprocess2 = project.set_function("preprocess", kind="serving")
fn_preprocess2.add_child_function("echo_func", "./echo.py", "mlrun/mlrun")

graph_preprocess2 = fn_preprocess2.set_topology("flow")

graph_preprocess2.to("storey.Extend", name="enrich", _fn='({"tag": "something"})').to(
    ">>", "input_stream", path=input_stream, group="mygroup"
).to(name="echo", handler="echo_handler", function="echo_func").to(
    ">>", "output_stream", path=out_stream, sharding_func="partition"
)

graph_preprocess2.plot(rankdir="LR")
../_images/f729dd3128e0d3cee3b1f84b0129194590343a716ebb1a8f424d2b6073f3a97a.svg
from echo import *

fn4_server = fn_preprocess2.to_mock_server(current_function="*")

my_data = """{"inputs": [[5.1, 3.5, 1.4, 0.2], [7.7, 3.8, 6.7, 2.2]], "partition": 0}"""

result = fn4_server.test("/v2/models/my_model/infer", body=my_data)
fn4_server.wait_for_completion()

print(result)
> 2022-03-17 08:20:55,182 [warning] run command, file or code were not specified
{'id': 'a6efe8217b024ec7a7e02cf0b7850b91'}
{'inputs': [[5.1, 3.5, 1.4, 0.2], [7.7, 3.8, 6.7, 2.2]], 'tag': 'something'}

Kafka stream example#

You can also use Kafka to configure the streams.

%%writefile echo.py
def echo_handler(x):
    print(x)
    return x
Overwriting echo.py

Configure the streams

import os

kafka_prefix = f"kafka://{broker}/"
internal_topic = kafka_prefix + "in-topic"
out_topic = kafka_prefix + "out-topic"
err_topic = kafka_prefix + "err-topic"

# replace this
brokers = "<broker IP>"

Create the graph. In the to method the class name is one of >> or $queue to specify that this is a queue. To configure a consumer group for the step, include the group in the to method.

import mlrun

fn_preprocess2 = project.set_function("preprocess", kind="serving")
fn_preprocess2.add_child_function("echo_func", "./echo.py", "mlrun/mlrun")

graph_preprocess2 = fn_preprocess2.set_topology("flow")

graph_preprocess2.to("storey.Extend", name="enrich", _fn='({"tag": "something"})').to(
    ">>",
    "input_stream",
    path=input_topic,
    group="mygroup",
    kafka_brokers=brokers,
).to(name="echo", handler="echo_handler", function="echo_func").to(
    ">>", "output_stream", path=out_topic, kafka_brokers=brokers
)

graph_preprocess2.plot(rankdir="LR")

from echo import *

fn4_server = fn_preprocess2.to_mock_server(current_function="*")

fn4_server.set_error_stream(f"kafka://{brokers}/{err_topic}")

my_data = """{"inputs":[[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}"""

result = fn4_server.test("/v2/models/my_model/infer", body=my_data)
fn4_server.wait_for_completion()

print(result)

Examples of graph functionality#

Distributed graphs#

Graphs can be hosted by a single function (using zero to n containers), or span multiple functions where each function can have its own container image and resources (replicas, GPUs/CPUs, volumes, etc.). It has a root function, which is where you configure triggers (http, incoming stream, cron, …), and optional downstream child functions.

You can specify the function attribute in task or router steps. This indicates where this step should run. When the function attribute is not specified it runs on the root function. function="*" means the step can run in any of the child functions.

Steps on different functions should be connected using a queue step (a stream).

Adding a child function:

fn.add_child_function(
    "enrich",
    "./entity_extraction.ipynb",
    image="mlrun/mlrun",
    requirements=["storey", "sklearn"],
)

A distributed graph looks like this:

distributed graph

Graph that splits and rejoins#

You can define a graph that splits into two parallel steps, and the output of both steps join back together.

In this basic example, all input goes into both stepA and stepB, and then both stepA and stepB forward the input to stepC. This means that a dataset of 5 rows generates an output of 10 rows (barring any filtering or other processing that would change the number of rows).

Note

Use this configuration to join the graph branches and not to join the events into a single large one.

Example:

graph.to("stepB")
graph.to("stepC")
graph.add_step(name="stepD", after=["stepB", "stepC"])


graph = fn.set_topology("flow", exist_ok=True)
dbl = graph.to(name="double", handler="double")
dbl.to(name="add3", class_name="Adder", add=3)
dbl.to(name="add2", class_name="Adder", add=2)
graph.add_step("Gather").after("add2", "add3")

Graphs that split and rejoin can also be used for these types of scenarios:

  • Steps B and C are filter steps that complement each other. For example B passes events where key < X, and C passes events where key >= X. The resulting DF contains the exact event ingested, since each event was handled once on one of the branches.

  • Steps B and C modify the content of the event in different ways. B adds a column col1 with value X, and C adds a column col2 with value X. The resulting DF contains both col1 and col2. Each key is represented twice: once with col1 == X, col2 == null and once with col1 == null, col2 == X.

Concurrent processing#

Concurrent processing is typically used for serving of deep-learning models, where preparation steps and inference can be CPU/GPU heavy, or involving I/O. The concurrency modes are:

  • asyncio — Default. For I/O implemented using asyncio.

  • threading — For blocking I/O.

  • multiprocessing — For processing-intensive tasks.

Additional configuration:

  • max_in_flight: Maximum number of events to be processed at a time (default 8)

  • retries: Maximum number of retries per event (default 0)

  • backoff_factor: Wait time in seconds before the first retry (default 1). Subsequent retries each wait twice long as the previous retry, up to a maximum of two minutes.

  • pass_context: If False, the process_event function is called with just one parameter (event). If True, the process_event function is called with two parameters (event, context). (Defaults to False)

  • full_event: Whether the event processor should receive and return Event objects (when True), or only the payload (when False). (Defaults to False)

This example illustrates a multiprocess step to perform predictions.

First define the processes:

import mlrun
import mlrun.artifacts
import storey
import storey.steps
import asyncio
import time
import numpy as np
import pickle


model_file = "model.pkl"


class Predict:
    def __init__(self):
        self.model = pickle.load(open(model_file, "rb"))

    def predict(self, body: dict):
        print("predicting...")
        feats = np.asarray(body["inputs"])
        result: np.ndarray = self.model.predict(feats)
        return result.tolist()


predict = Predict().predict


async def preprocess(event, context):
    context.logger.info("preprocessing...")
    await asyncio.sleep(0.1)
    return event


def postprocess(event, context):
    context.logger.info("postprocessing...")
    time.sleep(0.1)
    return event

Define the project and the function:

project_name = "concurrent-prediction"
project = mlrun.get_or_create_project(project_name)
function = project.set_function(
    name="test",
    kind="serving",
    image="mlrun/mlrun",
)

function.spec.build.commands = [
    "wget https://github.com/mlrun/mlrun/raw/development/tests/system/model_monitoring/assets/model.pkl",
]

graph = function.set_topology("flow", engine="async")

And, finally, the multi-process step:

graph.to(
    "storey.ConcurrentExecution",
    "preprocess",
    _event_processor="preprocess",
    pass_context=True,
    max_in_flight=8,
).to(
    "storey.ConcurrentExecution",
    "prediction",
    _event_processor="predict",
    concurrency_mechanism="multiprocessing",
    max_in_flight=2,
).to(
    "storey.ConcurrentExecution",
    "postprocess",
    _event_processor="postprocess",
    concurrency_mechanism="threading",
    pass_context=True,
    max_in_flight=8,
)