Router steps#
Router steps allow branching, enrichment, aggregation, etc.
This icon in the UI indicates router steps:
.
In this section
RouterStep#
Description#
RouterStep implements routing logic for running child routes.
Note
The * prefix indicates a router class (not a simple processing step).
Examples#
VotingEnsemble#
Description#
VotingEnsemble is a router that encapsulates both execution and aggregation of multiple model routes. It outputs a single result.
An ensemble machine learning model that combines the prediction of several models.
Use Case#
Examples#
ensemble = graph.add_step(
"*mlrun.serving.VotingEnsemble", name="ensemble", vote_type="regression"
) # for numeric output
In classification mode:
ensemble = graph.add_step(
"*mlrun.serving.VotingEnsemble", name="ensemble", vote_type="classification"
) # for categorical output
A full flow example:
import mlrun
project_name = "flow-with-routes"
project = mlrun.get_or_create_project(
project_name, context="./", allow_cross_project=True
)
fn = mlrun.code_to_function(
name="routers_example",
kind="serving",
project=project_name,
filename="serving_code.py",
image="mlrun/mlrun",
)
graph = fn.set_topology("flow")
# Preprocessing
graph.add_step("PreProcess", class_name="PreProcessClass")
# Voting
models_path = "https://s3.wasabisys.com/iguazio/models/iris/model.pkl"
path1 = models_path
path2 = models_path
router = graph.add_step(
"*mlrun.serving.VotingEnsemble",
name="ensemble",
vote_type="classification",
)
router.add_route("m1", class_name="ClassifierModel", model_path=path1)
router.add_route("m2", class_name="ClassifierModel", model_path=path2)
# Postprocess
graph.add_step("PostProcess", class_name="PostProcessClass", after="enricher").respond()
# Local test
server = fn.to_mock_server()
result = server.test(body={"user_id": 123, "inputs": [0.1, 0.2, 0.3]})
print(result)
Routers are added as:
graph.add_step("*mlrun.serving.ParallelRun", name="parallel")
Parallel execution (ParallelRun)#
Description#
Use ParallelRun() to run multiple independent branches with custom merging or postprocessing. It outputs a dict or list.
Note
The * prefix indicates a router class (not a simple processing step). The graph sends the event to one of its children, based on path.
Use Cases#
Examples#
parallel = graph.add_step("*mlrun.serving.ParallelRun", name="parallel_models")
parallel.add_route("model_a", class_name="Cls1")
Downstream, you can add custom merger or processing steps.