Basic steps#

All steps are supported by the storey engine.

In this section

Choice steps#

This icon in the UI indicates choice steps: graph-steps-choice.

Choice#

  • Description: Routes each event to one or more downstream branches based on custom logic. See Choice.

  • Use case: Route events to an add or multiply branch based on an operation field. Events with "operation": "add" are sent to the addition branch, while all other events default to multiplication.

  • Graph topology:

MyChoiceStep ─┬─► add        (add_handler: x + y)
              └─► multiply   (multiply_handler: x * y)
  • Example:

%%writefile graph_handlers.py
import storey

# Custom Choice step: override select_outlets() to implement routing logic.
# select_outlets() returns the names of downstream steps; the event is forwarded to those branches.
class MyChoiceStep(storey.Choice):
    def select_outlets(self, event):
        if event.get("operation") == "add":
            return ["add"]
        return ["multiply"]


def add_handler(event):
    x = event.get("x", 0)
    y = event.get("y", 0)

    result = x + y
    print(f"[ADD] {x} + {y} = {result}")

    return {**event, "result": result, "operation_used": "addition"}


def multiply_handler(event):
    x = event.get("x", 0)
    y = event.get("y", 0)

    result = x * y
    print(f"[MULTIPLY] {x} * {y} = {result}")

    return {**event, "result": result, "operation_used": "multiplication"}
fn_choice = project.set_function(
    name="math-choice-demo",
    func="graph_handlers.py",
    kind="serving",
    image="mlrun/mlrun",
)

graph_choice = fn_choice.set_topology("flow", engine="async")


graph_choice.add_step(name="router", class_name="MyChoiceStep")
graph_choice.add_step(name="add", handler="add_handler", after="router")
graph_choice.add_step(name="multiply", handler="multiply_handler", after="router")

server_choice = fn_choice.to_mock_server()

test_events = [
    {"operation": "add", "x": 5, "y": 3},  # → add → 8
    {"operation": "multiply", "x": 5, "y": 3},  # → multiply → 15
    {"x": 2, "y": 4},  # → default → multiply → 8
]

for event in test_events:
    result = server_choice.test("/", body=event)
[ADD] 5 + 3 = 8
[MULTIPLY] 5 * 3 = 15
[MULTIPLY] 2 * 4 = 8

ChoiceByField#

  • Description: Routes events to downstream steps based on an event field that contains the step name or names. See ChoiceByField.

  • Use case: Use this step when routing decisions in a serving graph should be determined dynamically based on a field in the event. Instead of subclassing a choice step and implementing custom routing logic, you can add a field to the event containing the name (or names) of the downstream step(s) to route to. The value of the configured field can be either:

    • a string – the event will be forwarded to the corresponding outlet.

    • a list or tuple of strings – the event will be forwarded to all specified outlets.

    This simplifies conditional routing logic by separating decision logic (a previous step that sets the field) from routing logic (handled by ChoiceByField).

  • Example:

%%writefile example.py

# Step that decides the route and adds it to the event
def choose_route(event):
    if isinstance(event["value"], dict):
        event["route"] = "dict"
    elif isinstance(event["value"], list):
        event["route"] = "list"
    else:
        raise AttributeError("Key 'route' in event must be either dict or list")
    return event


def handle_dict(event):
    print(f"handle_dict has been chosen")
    event["sum"] = sum(event["value"].values())
    return event


def handle_list(event):
    print(f"handle_list has been chosen")
    event["sum"] = sum(event["value"])
    return event


def pprint(event):
    print(f"sum is : {event['sum']}")
    return event
from mlrun.serving.steps import ChoiceByField

# Create a serving function
serving_fn = project.set_function(
    name="choice-example", func="example.py", kind="serving"
)

graph = serving_fn.set_topology("flow")

graph.add_step(name="router", handler="choose_route")
graph.add_step(class_name=ChoiceByField("route"), name="routing", after=["router"])
graph.add_step(name="dict", handler="handle_dict", after=["routing"])
graph.add_step(name="list", handler="handle_list", after=["routing"])
graph.add_step(name="pprint", handler="pprint", after=["dict", "list"]).respond()

server_choice = serving_fn.to_mock_server()

test_events = [{"value": [1, 2, 3]}, {"value": {"x": 1, "y": 3, "z": 4}}]

for event in test_events:
    result = server_choice.test("/", body=event)
handle_list has been chosen
sum is : 6
handle_dict has been chosen
sum is : 8

Event operation steps#

This icon in the UI indicates event operation steps: graph-steps-event-operation.

Class name

Description

Collector

Collects streaming chunks and emits a single event once all chunks for a stream are received. (It acts as a no-op passthrough for non-streaming events.)

Extend

Adds new fields to each event using values returned by a user-defined function. See Extend.

FlatMap

Applies a function that can expand a single event into multiple downstream events.

Flatten

Flattens iterable outputs so that each element is emitted as a separate event.

JoinWithTable

Joins each event with data from the given table.

JoinWithV3IOTable

Joins each event with a V3IO table. Used for event augmentation.

MapClass

Similar to Map, but instead of a function argument, this class should be extended and its do() method overridden.

MapWithState

Maps, or transforms, incoming events using a stateful user-provided function, and an initial state, which can be a database table.

Partition

Partitions events by calling a predicate function on each event. Routes each event to a left if condition is True or right branch if False.

storey.Reduce

Reduces incoming events into a single value that is returned upon the successful termination of the flow.

ReduceToDataFrame

Builds a pandas DataFrame from events and returns that DataFrame on flow termination.

Extend (storey.Extend)#

  • Description: Adds new fields to each event using values returned by a user-defined function.

  • Use case: Enrich an order event with total_price and is_high_value before passing it to a scoring model.

  • Graph topology:

    storey.Extend(_fn="({'total_price': event['quantity'] * event['unit_price'], 'is_high_value': ...})")
    
  • Example

%%writefile graph_handlers.py
def passthrough(event):
    print(f"event: {event}")
    return event


fn_extend = project.set_function(
    name="extend-demo",
    func="graph_handlers.py",
    kind="serving",
    image="mlrun/mlrun"
)

graph_extend = fn_extend.set_topology("flow", engine="async")

# storey.Extend: _fn is a Python expression evaluated with `event` in scope.
# It must return a dict; that dict is MERGED into the event body —
# new keys are added while existing keys are preserved.
graph_extend.add_step(
    name="enrich",
    class_name="storey.Extend",
    _fn=(
        "({'total_price': event['quantity'] * event['unit_price'],"
        " 'is_high_value': event['quantity'] * event['unit_price'] > 500})"
    )
)

graph_extend.add_step(name="end", handler="passthrough", after="enrich")

server_extend = fn_extend.to_mock_server()

test_orders = [
    {"order_id": "A1", "quantity": 2,  "unit_price": 49.99},
    {"order_id": "A2", "quantity": 10, "unit_price": 75.00},
]

for order in test_orders:
    result = server_extend.test("/", order)

Batch operation steps#

This icon in the UI indicates batch steps: graph-steps-batch.

Class name

Description

Batch

Collects events until the batch reaches a configured size or age, then sends them downstream together. See batch

BatchHttpRequests

Sends multiple HTTP requests to remote step endpoints in parallel for batch processing. see BatchHttpRequests

ForEach

Runs custom logic for every event and then passes the original event downstream.

batch#

  • Description: Batch collects events until the batch reaches a configured size or age, then sends them downstream together.

  • Use case: A model inference endpoint expects fixed-size batches of sensor readings, not individual events — batching reduces per-request overhead.

  • Graph topology

    storey.Batch(max_events=3, flush_after_seconds=3) → print_batch
    
  • Example

%%writefile graph_handlers.py

def print_batch(batch):
    print(f"Batch received ({len(batch)} events):")
    for item in batch:
        print(" ", item)
    return batch


fn_batch = project.set_function(name="batch-demo", func="graph_handlers.py", kind="serving", image="mlrun/mlrun")

graph_batch = fn_batch.set_topology("flow", engine="async")

# storey.Batch buffers individual events and emits a list when either condition fires:
#   - max_events events have accumulated, OR
#   - flush_after_seconds seconds have passed since the last event
graph_batch.add_step(
    name="batcher",
    class_name="storey.Batch",
    max_events=3,           # emit a list of 3 events at once
    flush_after_seconds=3,  # or after 3s of inactivity (timeout flush)
).respond()

# print_batch receives the full list and returns it
graph_batch.add_step(name="print_batch", handler="print_batch", after="batcher")

server_batch = fn_batch.to_mock_server()

print("Sending 3 events – batch fires on the 3rd:\n")

for i in range(1, 4):
    payload = {"sensor": f"S{i}", "value": i * 10}
    result  = server_batch.test("/", payload)

BatchHttpRequests#

  • Description: Collects events until the batch reaches a configured size or age, then sends them downstream together.

  • Use case: Fan out scoring requests to an external inference endpoint for multiple items in a single graph step, reducing overall latency compared to sequential calls.

  • Graph topology:

    BatchHttpRequests(url_expression="event['url']", body_expression="event['data']",
                      method="POST", input_path="req", result_path="resp")
    
  • Example:

import mlrun
from mlrun.serving.remote import BatchHttpRequests

fn_bhttp = project.set_function(
    name="batch-http-example",
    func="graph_handlers.py",
    kind="serving",
    image="mlrun/mlrun",
)

graph_bhttp = fn_bhttp.set_topology("flow", engine="async")

graph_bhttp.to(
    BatchHttpRequests(
        url_expression="event['url']",
        body_expression="event['data']",
        method="POST",
        input_path="req",
        result_path="resp",
    )
).respond()


server_bhttp = fn_bhttp.to_mock_server()

# request contains a list of items, each with its own url and payload
requests = [
    {"url": "https://httpbin.org/post", "data": {"value": 1}},
    {"url": "https://httpbin.org/post", "data": {"value": 2}},
]

resp = server_bhttp.test(body={"req": requests})
print(resp)

Filter steps#

This icon in the UI indicates filter steps: graph-steps-filter.

Class name

Description

Filter

Filters events based on a user-provided function. See Filter.

SampleWindow

Samples a single event from every group of events based on a configured policy such as first or last.

Filter#

  • Description: Filters events based on a user-provided function.

  • Use case: Drop corrupted sensor readings (negative or None values) before they reach the model.

  • Graph topology:

    storey.Filter(_fn="(isinstance(event.get('value'),(int,float)) and event.get('value',-1)>=0)") → end
    
  • Example:

%%writefile graph_handlers.py
def passthrough(event):
    print(f"event: {event}")
    return event


import graph_handlers
import storey
import mlrun

fn_filter = project.set_function(
    name="filter-demo",
    func="graph_handlers.py",
    kind="serving",
    image="mlrun/mlrun"
)

graph_filter = fn_filter.set_topology("flow", engine="async")

# storey.Filter: _fn is a Python boolean expression evaluated with `event` in scope.
# Events where the expression evaluates to True pass through;
# events where it evaluates to False are silently dropped (not forwarded).
graph_filter.add_step(
    name="validate",
    class_name="storey.Filter",
    _fn="(isinstance(event.get('value'), (int, float)) and event.get('value', -1) >= 0)",
)

# Only events that passed the filter reach this step
graph_filter.add_step(name="end", handler="passthrough", after="validate")

server_filter = fn_filter.to_mock_server()

test_events = [
    {"sensor": "S1", "value": 7},  # valid
    {"sensor": "S2", "value": -6.0},  # filtered out
    {"sensor": "S3", "value": -2},  # filtered out
    {"sensor": "S4", "value": 0.0},   # valid
]

for event in test_events:
    result = server_filter.test("./", body=event)

Model Server#

Class name

Description

ONNXModelServer

A model serving class for serving ONYX Models. A sub-class of the V2ModelServer class.

PyTorchModelServer

A model serving class for serving PyTorch Models. A sub-class of the V2ModelServer class.

SKLearnModelServer

A model serving class for serving Sklearn Models. A sub-class of the V2ModelServer class.

TFKerasModelServer

A model serving class for serving TFKeras Models. A sub-class of the V2ModelServer class.

XGBModelServer

A model serving class for serving XGB Models. A sub-class of the V2ModelServer class.