Deploying graphs#

In this section

Flow topology and engines#

The flow topology is a full graph/DAG. You can use the flow topology to specify tasks, which typically manipulate the data. A common scenario is pre-processing of data prior to the model execution. The flow topology is implemented using two engines:

  • async is the the default. It's used for ModelRunnerStep and for more advanced graphs (looping, branching, etc.). When you trigger a graph, you immediately get a tracking ID while the graph keeps running in the background. async is based on storey. Storey is streaming library for real time event processing and feature extraction. It's based on asyncio and offers both synchronous and asynchronous APIs. Storey flows are graphs of steps that perform computational and IO tasks.

  • sync supports a simple sequence of steps. A typical synchronous flow (meaning, you trigger the graph and wait for response) is triggering a model.

With flow topology, you build and connect the graph (DAG) by adding steps using:

  • the to method, typically used to chain steps together.

  • the add_step method, to add steps anywhere on the graph. It has before and after parameters to specify the location of the step.

fn2 = project.set_function(
    name="serving_example_flow", kind="serving", image="mlrun/mlrun"
)

graph2 = fn2.set_topology("flow")

graph2_enrich = graph2.to("storey.Extend", name="enrich", _fn='({"tag": "something"})')

# add an Ensemble router with two child models (routes)
router = graph2.add_step(mlrun.serving.ModelRouter(), name="router", after="enrich")
router.add_route(
    "m1",
    class_name="ClassifierModel",
    model_path="https://s3.wasabisys.com/iguazio/models/iris/model.pkl",
)
router.respond()

# add an error handling step, run only when/if the "pre-process" step fails
graph.to(name="pre-process", handler="raising_step").error_handler(
    name="catcher", handler="handle_error", full_event=True
)

# Add additional models
# router.add_route("m2", class_name="ClassifierModel", model_path=path2)

# plot the graph (using Graphviz)
graph2.plot(rankdir="LR")

Deploy the function to a mock server#

Use MLRun's mock server to test and debug your model before deploying it. Specify a mock server with either to_mock_server() or with mock=True in deploy_function().

fn2_server = fn2.to_mock_server()

result = fn2_server.test("/v2/models/m1/infer", {"inputs": x})
fn2_server.wait_for_completion()

print(result)

Deploy the function as a Nuclio function#

Deploy graphs as a real-time Nuclio serverless function to your cluster with the command: function.deploy(). See deploy().

fn2.deploy()

Deploy the function as a Kubernetes job#

You can deploy serving graphs as one-time, or scheduled, KubejobRuntime. This enables use-cases such as batch-infer and various evaluation options. And you can run the graph on demand with a list of inputs. Use to_job(). See an example in Batch inference and drift detection.

job = fn2.to_job()
run_obj = project.run_function(job)