Custom steps#

Serving graphs execute built-in task classes, or task classes and functions that you implement.

This icon in the UI indicates remote steps: steps-custom.

In this section

Basics#

The task parameters include the following:

  • class_name (str): the relative or absolute class name.

  • handler (str): the function handler (if class_name is not specified it is the function handler).

  • **class_args: a set of class __init__ arguments.

For example, see the following simple echo class:

import mlrun
# mlrun: start
# echo class, custom class example
class Echo:
    def __init__(self, context, name=None, **kw):
        self.context = context
        self.name = name
        self.kw = kw

    def do(self, x):
        print("Echo:", self.name, x)
        return x
# mlrun: end

Test the graph: first convert the code to function, and then add the step to the graph:

project = mlrun.get_or_create_project("myproj", "./")

fn_echo = project.set_function("echo_function", kind="serving", image="mlrun/mlrun")

graph_echo = fn_echo.set_topology("flow")

graph_echo.to(class_name="Echo", name="pre-process", some_arg="abc")

graph_echo.plot(rankdir="LR")
../_images/8d7ba409f9714cb8b605852a1a22b379a91d17c4ae317179702e9fbd53bdea1e.svg

Create a mock server to test this locally:

echo_server = fn_echo.to_mock_server(current_function="*")

result = echo_server.test("", {"inputs": 123})
echo_server.wait_for_completion()

print(result)
{'id': '97397ea412334afdb5e4cb7d7c2e6dd3'}
Echo: pre-process {'inputs': 123}

For more information, see the Advanced model serving graph notebook example.

You can use any Python function by specifying the handler name (e.g. handler=json.dumps). The function is triggered with the event.body as the first argument, and its result is passed to the next step.

Alternatively, you can use classes that can also store some step/configuration and separate the one time init logic from the per event logic. The classes are initialized with the class_args. If the class init args contain context or name, they are initialized with the graph context and the step name.

By default, the class_name and handler specify a class/function name in the globals() (i.e. this module). Alternatively, those can be full paths to the class (module.submodule.class), e.g. storey.WriteToParquet. You can also pass the module as an argument to functions such as function.to_mock_server(namespace=module). In this case the class or handler names are also searched in the provided module.

When using classes the class event handler is invoked on every event with the event.body. If the Task step full_event parameter is set to True the handler is invoked and returns the full event object. If the class event handler is not specified, it invokes the class do() method.

If you need to implement async behavior, then subclass storey.MapClass.

Create a single step#

When creating a serving function with a step that is a part of the class, the graph context is created in the init, and you can use this context to get the functions' parameters.

When creating a single step (no class), the context is not created, and therefore the get_param does not work. The following example illustrates how to create the context and then use the parameters.

%%writefile serving-handler-func.py
import pandas as pd
import mlrun
import mlrun.utils
import os
import json

def test(event):
    serving_spec = mlrun.utils.get_serving_spec()
    server_context = mlrun.serving.GraphServer().from_dict(serving_spec)
    context = mlrun.serving.GraphContext(server=server_context)
    param = context.get_param("Test")
    return param
Overwriting serving-handler-func.py
serving_func_handler = project.set_function(
    name="serving-handler-func",
    func="serving-handler-func.py",
    image="mlrun/mlrun",
    kind="serving",
)
serving_func_handler.spec.parameters = {"Test": "test"}
graph = serving_func_handler.set_topology("flow")

graph.to(name="test", handler="test").respond()
serving_func_deploy = project.deploy_function("serving-handler-func")
> 2023-05-09 14:24:55,287 [info] Starting remote function deploy
2023-05-09 14:24:56  (info) Deploying function
2023-05-09 14:24:56  (info) Building
2023-05-09 14:24:56  (info) Staging files and preparing base images
2023-05-09 14:24:57  (info) Building processor image
2023-05-09 14:26:02  (info) Build complete
> 2023-05-09 14:26:08,232 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-serving-context-shapira-serving-handler-func.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['serving-context-shapira-serving-handler-func-serving-c-6v7nqbg6.default-tenant.app.cust-cs-il-3-5-2.iguazio-cd2.com/']}
serving_func_deploy.function.invoke(
    "/",
)

Graph state machine#

Learn about the serving graph state machines:

The event object#

The graph state machine accepts an event object (similar to a Nuclio event) and passes it along the pipeline. An event object hosts the event body along with other attributes such as path (http request path), method (GET, POST, …), and id (unique event ID).

In some cases the events represent a record with a unique key, which can be read/set through the event.key.

The task steps are called with the event.body by default. If a task step needs to read or set other event elements (key, path, time, …) you should set the task full_event argument to True.

Task steps support optional input_path and result_path attributes that allow controlling which portion of the event is sent as input to the step, and where to update the returned result.

For example, for an event body {"req": {"body": "x"}}, input_path="req.body" and result_path="resp" the step gets "x" as the input. The output after the step is {"req": {"body": "x"}: "resp": <step output>}. Note that input_path and result_path do not work together with full_event=True.

The context object#

The step classes are initialized with a context object (when they have context in their __init__ args). The context is used to pass data and for interfacing with system services. The context object has the following attributes and methods.

Attributes:

  • logger: Central logger (Nuclio logger when running in Nuclio).

  • verbose: True if in verbose/debug mode.

  • root: The graph object.

  • current_function: When running in a distributed graph, the current child function name.

Methods:

  • get_param(key, default=None): Get the graph parameter by key. Parameters are set at the serving function (e.g. function.spec.parameters = {"param1": "x"}).

  • get_secret(key): Get the value of a project/user secret.

  • get_store_resource(uri, use_cache=True): Get the mlrun store object (data item, artifact, model, feature set, feature vector).

  • get_remote_endpoint(name, external=False): Return the remote nuclio/serving function http(s) endpoint given its [project/]function-name[:tag].

  • Response(headers=None, body=None, content_type=None, status_code=200): Create a nuclio response object, for returning detailed http responses.

Example, using the context:if self.context.verbose: self.context.logger.info("my message", some_arg="text")

if self.context.verbose:
    self.context.logger.info("my message", some_arg="text")
    x = self.context.get_param("x", 0)