Deploying graphs#
In this section
Graph serving function#
To start using a serving graph, you first need a serving function. A serving function contains the serving class code to run the model and all the code necessary to run the tasks. MLRun comes with a wide library of tasks. If you use just those, you don't have to add any special code to the serving function, you only have to provide the code that runs the model. For more information about serving classes see Build your own model serving class.
For example, the following code is a basic model serving class:
# mlrun: start-code
from cloudpickle import load
from typing import List
import numpy as np
import mlrun
class ClassifierModel(mlrun.serving.V2ModelServer):
def load(self):
"""load and initialize the model and/or other elements"""
model_file, extra_data = self.get_model(".pkl")
self.model = load(open(model_file, "rb"))
def predict(self, body: dict) -> List:
"""Generate model predictions from sample."""
feats = np.asarray(body["inputs"])
result: np.ndarray = self.model.predict(feats)
return result.tolist()
# mlrun: end-code
To define the serving function, create the project, then the function with project.set_function and specify kind to be serving.
project = mlrun.get_or_create_project("serving")
fn = project.set_function(name="serving_example", kind="serving", image="mlrun/mlrun")
Graph engines#
Once you have a serving function, you need to choose the graph topology:
Note
Once the topology is set, you cannot change an existing function topology.
Router#
The default topology is the router topology. It is a minimal configuration with a single router and one or more child routes/models, used for simple model serving or
single hop configurations. The basic routing logic is to route to the child routes based on the event.path.
With the router topology you can specify different machine learning models. Each model has a logical name. This name is used to route to the correct model when calling the serving function.
More advanced or custom routing can be used, for example, the ensemble router sends the event to all child routes in parallel, aggregates the result, and responds.
from sklearn.datasets import load_iris
# set the topology/router
graph = fn.set_topology("router")
# Add the model
fn.add_model(
"model1",
class_name="ClassifierModel",
model_path="https://s3.wasabisys.com/iguazio/models/iris/model.pkl",
)
# Add additional models
# fn.add_model("model2", class_name="ClassifierModel", model_path="<path2>")
# create and use the graph simulator
server = fn.to_mock_server()
x = load_iris()["data"].tolist()
result = server.test("/v2/models/model1/infer", {"inputs": x})
server.wait_for_completion()
print(result)
Flow#
The flow topology is a full graph/DAG. It is implemented using two engines: async (the default)
is based on Storey and asynchronous event loop; and sync, which supports a simple
sequence of steps. You can use the flow topology to specify tasks, which typically manipulate the data. The most common scenario is pre-processing of data prior to the model execution.
In this topology, you build and connect the graph (DAG) by adding steps using the step.to() method, or by using the
graph.add_step() method.
The step.to() is typically used to chain steps together. graph.add_step can add steps anywhere on the
graph and has before and after parameters to specify the location of the step.
fn2 = project.set_function(
name="serving_example_flow", kind="serving", image="mlrun/mlrun"
)
graph2 = fn2.set_topology("flow")
graph2_enrich = graph2.to("storey.Extend", name="enrich", _fn='({"tag": "something"})')
# add an Ensemble router with two child models (routes)
router = graph2.add_step(mlrun.serving.ModelRouter(), name="router", after="enrich")
router.add_route(
"m1",
class_name="ClassifierModel",
model_path="https://s3.wasabisys.com/iguazio/models/iris/model.pkl",
)
router.respond()
# add an error handling step, run only when/if the "pre-process" step fails
graph.to(name="pre-process", handler="raising_step").error_handler(
name="catcher", handler="handle_error", full_event=True
)
# Add additional models
# router.add_route("m2", class_name="ClassifierModel", model_path=path2)
# plot the graph (using Graphviz)
graph2.plot(rankdir="LR")
Deploy the function to a mock server#
Use MLRun's mock server to test and debug your model before deploying it. Specify a mock server with either to_mock_server() or with mock=True in deploy_function().
fn2_server = fn2.to_mock_server()
result = fn2_server.test("/v2/models/m1/infer", {"inputs": x})
fn2_server.wait_for_completion()
print(result)
Deploy the function as a Nuclio function#
Deploy graphs as a real-time Nuclio serverless function to your cluster with the command: function.deploy(). See deploy().
fn2.deploy()
Deploy the function as a Kubernetes job#
You can deploy serving graphs as one-time, or scheduled, KubejobRuntime. This enables use-cases such as batch-infer and various evaluation options. And you can run the graph on demand with a list of inputs. Use to_job(). See an example in Batch inference and drift detection.
job = fn2.to_job()
run_obj = project.run_function(job)