Writing custom steps#

The Graph executes built-in task classes, or task classes and functions that you implement. The task parameters include the following:

  • class_name (str): the relative or absolute class name.

  • handler (str): the function handler (if class_name is not specified it is the function handler).

  • **class_args: a set of class __init__ arguments.

For example, see the following simple echo class:

import mlrun
# mlrun: start
# echo class, custom class example
class Echo:
    def __init__(self, context, name=None, **kw):
        self.context = context
        self.name = name
        self.kw = kw

    def do(self, x):
        print("Echo:", self.name, x)
        return x
# mlrun: end

Test the graph: first convert the code to function, and then add the step to the graph:

fn_echo = mlrun.code_to_function("echo_function", kind="serving", image="mlrun/mlrun")

graph_echo = fn_echo.set_topology("flow")

graph_echo.to(class_name="Echo", name="pre-process", some_arg="abc")


Create a mock server to test this locally:

echo_server = fn_echo.to_mock_server(current_function="*")

result = echo_server.test("", {"inputs": 123})

{'id': '97397ea412334afdb5e4cb7d7c2e6dd3'}
Echo: pre-process {'inputs': 123}

For more information, see the Advanced model serving graph notebook example

You can use any Python function by specifying the handler name (e.g. handler=json.dumps). The function is triggered with the event.body as the first argument, and its result is passed to the next step.

Alternatively, you can use classes that can also store some step/configuration and separate the one time init logic from the per event logic. The classes are initialized with the class_args. If the class init args contain context or name, they are initialized with the graph context and the step name.

By default, the class_name and handler specify a class/function name in the globals() (i.e. this module). Alternatively, those can be full paths to the class (module.submodule.class), e.g. storey.WriteToParquet. You can also pass the module as an argument to functions such as function.to_mock_server(namespace=module). In this case the class or handler names are also searched in the provided module.

When using classes the class event handler is invoked on every event with the event.body. If the Task step full_event parameter is set to True the handler is invoked and returns the full event object. If the class event handler is not specified, it invokes the class do() method.

If you need to implement async behavior, then subclass storey.MapClass.

Create a single step#

When creating a serving function with a step that is a part of the class, the graph context is created in the init, and you can use this context to get the functions' parameters.

When creating a single step (no class), the context is not created, and therefore the get_param does not work. The following example illustrates how to create the context and then use the parameters.

%%writefile serving-handler-func.py
import pandas as pd
import mlrun
import os
import json
def test(event):
    server_context = mlrun.serving.GraphServer().from_dict(json.loads(os.environ["SERVING_SPEC_ENV"]))
    context = mlrun.serving.GraphContext(server=server_context)
    param = context.get_param("Test")
    return param
Overwriting serving-handler-func.py
serving_func_handler = project.set_function(
serving_func_handler.spec.parameters = {"Test": "test"}
graph = serving_func_handler.set_topology("flow")

graph.to(name="test", handler="test").respond()
serving_func_deploy = project.deploy_function("serving-handler-func")
> 2023-05-09 14:24:55,287 [info] Starting remote function deploy
2023-05-09 14:24:56  (info) Deploying function
2023-05-09 14:24:56  (info) Building
2023-05-09 14:24:56  (info) Staging files and preparing base images
2023-05-09 14:24:57  (info) Building processor image
2023-05-09 14:26:02  (info) Build complete
> 2023-05-09 14:26:08,232 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-serving-context-shapira-serving-handler-func.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['serving-context-shapira-serving-handler-func-serving-c-6v7nqbg6.default-tenant.app.cust-cs-il-3-5-2.iguazio-cd2.com/']}