Writing custom steps#
The Graph executes built-in task classes, or task classes and functions that you implement. The task parameters include the following:
class_name
(str): the relative or absolute class name.handler
(str): the function handler (if class_name is not specified it is the function handler).**class_args
: a set of class__init__
arguments.
For example, see the following simple echo
class:
import mlrun
# mlrun: start
# echo class, custom class example
class Echo:
def __init__(self, context, name=None, **kw):
self.context = context
self.name = name
self.kw = kw
def do(self, x):
print("Echo:", self.name, x)
return x
# mlrun: end
Test the graph: first convert the code to function, and then add the step to the graph:
fn_echo = mlrun.code_to_function("echo_function", kind="serving", image="mlrun/mlrun")
graph_echo = fn_echo.set_topology("flow")
graph_echo.to(class_name="Echo", name="pre-process", some_arg="abc")
graph_echo.plot(rankdir="LR")
Create a mock server to test this locally:
echo_server = fn_echo.to_mock_server(current_function="*")
result = echo_server.test("", {"inputs": 123})
echo_server.wait_for_completion()
print(result)
{'id': '97397ea412334afdb5e4cb7d7c2e6dd3'}
Echo: pre-process {'inputs': 123}
For more information, see the Advanced model serving graph notebook example
You can use any Python function by specifying the handler name (e.g. handler=json.dumps
).
The function is triggered with the event.body
as the first argument, and its result
is passed to the next step.
Alternatively, you can use classes that can also store some step/configuration and separate the
one time init logic from the per event logic. The classes are initialized with the class_args
.
If the class init args contain context
or name
, they are initialized with the
graph context and the step name.
By default, the class_name
and handler specify a class/function name in the globals()
(i.e. this module).
Alternatively, those can be full paths to the class (module.submodule.class), e.g. storey.WriteToParquet
.
You can also pass the module as an argument to functions such as function.to_mock_server(namespace=module)
.
In this case the class or handler names are also searched in the provided module.
When using classes the class event handler is invoked on every event with the event.body
.
If the Task step full_event
parameter is set to True
the handler is invoked and returns
the full event
object. If the class event handler is not specified, it invokes the class do()
method.
If you need to implement async behavior, then subclass storey.MapClass
.
Create a single step#
When creating a serving function with a step that is a part of the class, the graph context is created in the init, and you can use this context to get the functions' parameters.
When creating a single step (no class), the context is not created, and therefore the get_param
does not work. The following example illustrates how to create the context and then use the parameters.
%%writefile serving-handler-func.py
import pandas as pd
import mlrun
import mlrun.utils
import os
import json
def test(event):
serving_spec = mlrun.utils.get_serving_spec()
server_context = mlrun.serving.GraphServer().from_dict(serving_spec)
context = mlrun.serving.GraphContext(server=server_context)
param = context.get_param("Test")
return param
Overwriting serving-handler-func.py
serving_func_handler = project.set_function(
name="serving-handler-func",
func="serving-handler-func.py",
image="mlrun/mlrun",
kind="serving",
)
serving_func_handler.spec.parameters = {"Test": "test"}
graph = serving_func_handler.set_topology("flow")
graph.to(name="test", handler="test").respond()
serving_func_deploy = project.deploy_function("serving-handler-func")
> 2023-05-09 14:24:55,287 [info] Starting remote function deploy
2023-05-09 14:24:56 (info) Deploying function
2023-05-09 14:24:56 (info) Building
2023-05-09 14:24:56 (info) Staging files and preparing base images
2023-05-09 14:24:57 (info) Building processor image
2023-05-09 14:26:02 (info) Build complete
> 2023-05-09 14:26:08,232 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-serving-context-shapira-serving-handler-func.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['serving-context-shapira-serving-handler-func-serving-c-6v7nqbg6.default-tenant.app.cust-cs-il-3-5-2.iguazio-cd2.com/']}
serving_func_deploy.function.invoke(
"/",
)