Basic steps#
All steps are supported by the storey engine.
In this section
Choice steps#
This icon in the UI indicates choice steps:
.
Choice#
Description: Routes each event to one or more downstream branches based on custom logic. See
Choice.Use case: Route events to an
addormultiplybranch based on an operation field. Events with "operation": "add" are sent to the addition branch, while all other events default to multiplication.Graph topology:
MyChoiceStep ─┬─► add (add_handler: x + y)
└─► multiply (multiply_handler: x * y)
Example:
%%writefile graph_handlers.py
import storey
# Custom Choice step: override select_outlets() to implement routing logic.
# select_outlets() returns the names of downstream steps; the event is forwarded to those branches.
class MyChoiceStep(storey.Choice):
def select_outlets(self, event):
if event.get("operation") == "add":
return ["add"]
return ["multiply"]
def add_handler(event):
x = event.get("x", 0)
y = event.get("y", 0)
result = x + y
print(f"[ADD] {x} + {y} = {result}")
return {**event, "result": result, "operation_used": "addition"}
def multiply_handler(event):
x = event.get("x", 0)
y = event.get("y", 0)
result = x * y
print(f"[MULTIPLY] {x} * {y} = {result}")
return {**event, "result": result, "operation_used": "multiplication"}
fn_choice = project.set_function(
name="math-choice-demo",
func="graph_handlers.py",
kind="serving",
image="mlrun/mlrun",
)
graph_choice = fn_choice.set_topology("flow", engine="async")
graph_choice.add_step(name="router", class_name="MyChoiceStep")
graph_choice.add_step(name="add", handler="add_handler", after="router")
graph_choice.add_step(name="multiply", handler="multiply_handler", after="router")
server_choice = fn_choice.to_mock_server()
test_events = [
{"operation": "add", "x": 5, "y": 3}, # → add → 8
{"operation": "multiply", "x": 5, "y": 3}, # → multiply → 15
{"x": 2, "y": 4}, # → default → multiply → 8
]
for event in test_events:
result = server_choice.test("/", body=event)
[ADD] 5 + 3 = 8
[MULTIPLY] 5 * 3 = 15
[MULTIPLY] 2 * 4 = 8
ChoiceByField#
Description: Routes events to downstream steps based on an event field that contains the step name or names. See
ChoiceByField.Use case: Use this step when routing decisions in a serving graph should be determined dynamically based on a field in the event. Instead of subclassing a choice step and implementing custom routing logic, you can add a field to the event containing the name (or names) of the downstream step(s) to route to. The value of the configured field can be either:
a string – the event will be forwarded to the corresponding outlet.
a list or tuple of strings – the event will be forwarded to all specified outlets.
This simplifies conditional routing logic by separating decision logic (a previous step that sets the field) from routing logic (handled by ChoiceByField).
Example:
%%writefile example.py
# Step that decides the route and adds it to the event
def choose_route(event):
if isinstance(event["value"], dict):
event["route"] = "dict"
elif isinstance(event["value"], list):
event["route"] = "list"
else:
raise AttributeError("Key 'route' in event must be either dict or list")
return event
def handle_dict(event):
print(f"handle_dict has been chosen")
event["sum"] = sum(event["value"].values())
return event
def handle_list(event):
print(f"handle_list has been chosen")
event["sum"] = sum(event["value"])
return event
def pprint(event):
print(f"sum is : {event['sum']}")
return event
from mlrun.serving.steps import ChoiceByField
# Create a serving function
serving_fn = project.set_function(
name="choice-example", func="example.py", kind="serving"
)
graph = serving_fn.set_topology("flow")
graph.add_step(name="router", handler="choose_route")
graph.add_step(class_name=ChoiceByField("route"), name="routing", after=["router"])
graph.add_step(name="dict", handler="handle_dict", after=["routing"])
graph.add_step(name="list", handler="handle_list", after=["routing"])
graph.add_step(name="pprint", handler="pprint", after=["dict", "list"]).respond()
server_choice = serving_fn.to_mock_server()
test_events = [{"value": [1, 2, 3]}, {"value": {"x": 1, "y": 3, "z": 4}}]
for event in test_events:
result = server_choice.test("/", body=event)
handle_list has been chosen
sum is : 6
handle_dict has been chosen
sum is : 8
Event operation steps#
This icon in the UI indicates event operation steps:
.
Class name |
Description |
|---|---|
Collects streaming chunks and emits a single event once all chunks for a stream are received. (It acts as a no-op passthrough for non-streaming events.) |
|
Adds new fields to each event using values returned by a user-defined function. See Extend. |
|
Applies a function that can expand a single event into multiple downstream events. |
|
Flattens iterable outputs so that each element is emitted as a separate event. |
|
Joins each event with data from the given table. |
|
JoinWithV3IOTable |
Joins each event with a V3IO table. Used for event augmentation. |
Similar to Map, but instead of a function argument, this class should be extended and its do() method overridden. |
|
Maps, or transforms, incoming events using a stateful user-provided function, and an initial state, which can be a database table. |
|
Partitions events by calling a predicate function on each event. Routes each event to a left if condition is True or right branch if False. |
|
storey.Reduce |
Reduces incoming events into a single value that is returned upon the successful termination of the flow. |
ReduceToDataFrame |
Builds a pandas DataFrame from events and returns that DataFrame on flow termination. |
Extend (storey.Extend)#
Description: Adds new fields to each event using values returned by a user-defined function.
Use case: Enrich an order event with
total_priceandis_high_valuebefore passing it to a scoring model.Graph topology:
storey.Extend(_fn="({'total_price': event['quantity'] * event['unit_price'], 'is_high_value': ...})")
Example
%%writefile graph_handlers.py
def passthrough(event):
print(f"event: {event}")
return event
fn_extend = project.set_function(
name="extend-demo",
func="graph_handlers.py",
kind="serving",
image="mlrun/mlrun"
)
graph_extend = fn_extend.set_topology("flow", engine="async")
# storey.Extend: _fn is a Python expression evaluated with `event` in scope.
# It must return a dict; that dict is MERGED into the event body —
# new keys are added while existing keys are preserved.
graph_extend.add_step(
name="enrich",
class_name="storey.Extend",
_fn=(
"({'total_price': event['quantity'] * event['unit_price'],"
" 'is_high_value': event['quantity'] * event['unit_price'] > 500})"
)
)
graph_extend.add_step(name="end", handler="passthrough", after="enrich")
server_extend = fn_extend.to_mock_server()
test_orders = [
{"order_id": "A1", "quantity": 2, "unit_price": 49.99},
{"order_id": "A2", "quantity": 10, "unit_price": 75.00},
]
for order in test_orders:
result = server_extend.test("/", order)
Batch operation steps#
This icon in the UI indicates batch steps:
.
Class name |
Description |
|---|---|
Collects events until the batch reaches a configured size or age, then sends them downstream together. See batch |
|
Sends multiple HTTP requests to remote step endpoints in parallel for batch processing. see BatchHttpRequests |
|
Runs custom logic for every event and then passes the original event downstream. |
batch#
Description:
Batchcollects events until the batch reaches a configured size or age, then sends them downstream together.Use case: A model inference endpoint expects fixed-size batches of sensor readings, not individual events — batching reduces per-request overhead.
Graph topology
storey.Batch(max_events=3, flush_after_seconds=3) → print_batch
Example
%%writefile graph_handlers.py
def print_batch(batch):
print(f"Batch received ({len(batch)} events):")
for item in batch:
print(" ", item)
return batch
fn_batch = project.set_function(name="batch-demo", func="graph_handlers.py", kind="serving", image="mlrun/mlrun")
graph_batch = fn_batch.set_topology("flow", engine="async")
# storey.Batch buffers individual events and emits a list when either condition fires:
# - max_events events have accumulated, OR
# - flush_after_seconds seconds have passed since the last event
graph_batch.add_step(
name="batcher",
class_name="storey.Batch",
max_events=3, # emit a list of 3 events at once
flush_after_seconds=3, # or after 3s of inactivity (timeout flush)
).respond()
# print_batch receives the full list and returns it
graph_batch.add_step(name="print_batch", handler="print_batch", after="batcher")
server_batch = fn_batch.to_mock_server()
print("Sending 3 events – batch fires on the 3rd:\n")
for i in range(1, 4):
payload = {"sensor": f"S{i}", "value": i * 10}
result = server_batch.test("/", payload)
BatchHttpRequests#
Description: Collects events until the batch reaches a configured size or age, then sends them downstream together.
Use case: Fan out scoring requests to an external inference endpoint for multiple items in a single graph step, reducing overall latency compared to sequential calls.
Graph topology:
BatchHttpRequests(url_expression="event['url']", body_expression="event['data']", method="POST", input_path="req", result_path="resp")
Example:
import mlrun
from mlrun.serving.remote import BatchHttpRequests
fn_bhttp = project.set_function(
name="batch-http-example",
func="graph_handlers.py",
kind="serving",
image="mlrun/mlrun",
)
graph_bhttp = fn_bhttp.set_topology("flow", engine="async")
graph_bhttp.to(
BatchHttpRequests(
url_expression="event['url']",
body_expression="event['data']",
method="POST",
input_path="req",
result_path="resp",
)
).respond()
server_bhttp = fn_bhttp.to_mock_server()
# request contains a list of items, each with its own url and payload
requests = [
{"url": "https://httpbin.org/post", "data": {"value": 1}},
{"url": "https://httpbin.org/post", "data": {"value": 2}},
]
resp = server_bhttp.test(body={"req": requests})
print(resp)
Filter steps#
This icon in the UI indicates filter steps:
.
Class name |
Description |
|---|---|
Filters events based on a user-provided function. See Filter. |
|
Samples a single event from every group of events based on a configured policy such as first or last. |
Filter#
Description: Filters events based on a user-provided function.
Use case: Drop corrupted sensor readings (negative or
Nonevalues) before they reach the model.Graph topology:
storey.Filter(_fn="(isinstance(event.get('value'),(int,float)) and event.get('value',-1)>=0)") → endExample:
%%writefile graph_handlers.py
def passthrough(event):
print(f"event: {event}")
return event
import graph_handlers
import storey
import mlrun
fn_filter = project.set_function(
name="filter-demo",
func="graph_handlers.py",
kind="serving",
image="mlrun/mlrun"
)
graph_filter = fn_filter.set_topology("flow", engine="async")
# storey.Filter: _fn is a Python boolean expression evaluated with `event` in scope.
# Events where the expression evaluates to True pass through;
# events where it evaluates to False are silently dropped (not forwarded).
graph_filter.add_step(
name="validate",
class_name="storey.Filter",
_fn="(isinstance(event.get('value'), (int, float)) and event.get('value', -1) >= 0)",
)
# Only events that passed the filter reach this step
graph_filter.add_step(name="end", handler="passthrough", after="validate")
server_filter = fn_filter.to_mock_server()
test_events = [
{"sensor": "S1", "value": 7}, # valid
{"sensor": "S2", "value": -6.0}, # filtered out
{"sensor": "S3", "value": -2}, # filtered out
{"sensor": "S4", "value": 0.0}, # valid
]
for event in test_events:
result = server_filter.test("./", body=event)
Model Server#
Class name |
Description |
|---|---|
|
A model serving class for serving ONYX Models. A sub-class of the V2ModelServer class. |
|
A model serving class for serving PyTorch Models. A sub-class of the V2ModelServer class. |
|
A model serving class for serving Sklearn Models. A sub-class of the V2ModelServer class. |
|
A model serving class for serving TFKeras Models. A sub-class of the V2ModelServer class. |
|
A model serving class for serving XGB Models. A sub-class of the V2ModelServer class. |