class mlrun.serving.GraphContext(level='info', logger=None, server=None, nuclio_context=None)[source]

Bases: object

Graph context object

get_param(key: str, default=None)[source]
get_remote_endpoint(name, external=False)[source]

return the remote nuclio/serving function http(s) endpoint given its name

  • name – the function name/uri in the form [project/]function-name[:tag]

  • external – return the external url (returns the in-cluster url by default)

get_secret(key: str)[source]
push_error(event, message, source=None, **kwargs)[source]
property server
class mlrun.serving.GraphServer(graph=None, parameters=None, load_mode=None, function_uri=None, verbose=False, version=None, functions=None, graph_initializer=None, error_stream=None, track_models=None, secret_sources=None, default_content_type=None)[source]

Bases: mlrun.model.ModelObj

property graph
init_states(context, namespace, resource_cache: Optional[mlrun.datastore.store_resources.ResourceCache] = None, logger=None, is_mock=False)[source]

for internal use, initialize all steps (recursively)

kind = 'server'
run(event, context=None, get_body=False, extra_args=None)[source]

set which child function this server is currently running on


set/initialize the error notification stream

test(path: str = '/', body: Optional[Union[str, bytes, dict]] = None, method: str = '', content_type: Optional[str] = None, silent: bool = False, get_body: bool = True, event_id: Optional[str] = None)[source]

invoke a test event into the server to simulate/test server behavior


server = create_graph_server()
server.add_model("my", class_name=MyModelClass, model_path="{path}", z=100)
print(server.test("my/infer", testdata))
  • path – api path, e.g. (/{router.url_prefix}/{model-name}/..) path

  • body – message body (dict or json str/bytes)

  • method – optional, GET, POST, ..

  • content_type – optional, http mime type

  • silent – don’t raise on error responses (when not 20X)

  • get_body – return the body as py object (vs serialize response into json)

  • event_id – specify the unique event ID (by default a random value will be generated)


wait for async operation to complete

class mlrun.serving.QueueStep(name: Optional[str] = None, path: Optional[str] = None, after: Optional[list] = None, shards: Optional[int] = None, retention_in_hours: Optional[int] = None, **options)[source]

Bases: mlrun.serving.states.BaseStep

queue step, implement an async queue or represent a stream


specify the previous step name

property async_object
default_shape = 'cds'
init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]

init the step class

kind = 'queue'
run(event, *args, **kwargs)[source]
class mlrun.serving.RouterStep(class_name: Optional[Union[str, type]] = None, class_args: Optional[dict] = None, handler: Optional[str] = None, routes: Optional[list] = None, name: Optional[str] = None, function: Optional[str] = None, input_path: Optional[str] = None, result_path: Optional[str] = None)[source]

Bases: mlrun.serving.states.TaskStep

router step, implement routing logic for running child routes

add_route(key, route=None, class_name=None, handler=None, function=None, **class_args)[source]

add child route step or class to the router

  • key – unique name (and route path) for the child step

  • route – child step object (Task, ..)

  • class_name – class name to build the route step from (when route is not provided)

  • class_args – class init arguments

  • handler – class handler to invoke on run/event

clear_children(routes: list)[source]

clear child steps (routes)

default_shape = 'doubleoctagon'

get child steps (routes)

init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]

init the step class

kind = 'router'
plot(filename=None, format=None, source=None, **kw)[source]

plot/save a graphviz plot

property routes

child routes/steps, traffic is routed to routes based on router logic

class mlrun.serving.TaskStep(class_name: Optional[Union[str, type]] = None, class_args: Optional[dict] = None, handler: Optional[str] = None, name: Optional[str] = None, after: Optional[list] = None, full_event: Optional[bool] = None, function: Optional[str] = None, responder: Optional[bool] = None, input_path: Optional[str] = None, result_path: Optional[str] = None)[source]

Bases: mlrun.serving.states.BaseStep

task execution step, runs a class or handler

property async_object

return the sync or async (storey) class instance

init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]

init the step class

kind = 'task'

mark this step as the responder.

step output will be returned as the flow result, no other step can follow

run(event, *args, **kwargs)[source]

run this step, in async flows the run is done through storey

class mlrun.serving.V2ModelServer(context=None, name: Optional[str] = None, model_path: Optional[str] = None, model=None, protocol=None, **kwargs)[source]

Bases: mlrun.serving.utils.StepToDict

base model serving class (v2), using similar API to KFServing v2 and Triton

The class is initialized automatically by the model server and can run locally as part of a nuclio serverless function, or as part of a real-time pipeline default model url is: /v2/models/<model>[/versions/<ver>]/operation

You need to implement two mandatory methods:

load() - download the model file(s) and load the model into memory predict() - accept request payload and return prediction/inference results

you can override additional methods : preprocess, validate, postprocess, explain you can add custom api endpoint by adding method op_xx(event), will be invoked by calling the <model-url>/xx (operation = xx)


defining a class:

class MyClass(V2ModelServer):
    def load(self):
        # load and initialize the model and/or other elements
        model_file, extra_data = self.get_model(suffix='.pkl')
        self.model = load(open(model_file, "rb"))

    def predict(self, request):
        events = np.array(request['inputs'])
        dmatrix = xgb.DMatrix(events)
        result: xgb.DMatrix = self.model.predict(dmatrix)
        return {"outputs": result.tolist()}
do_event(event, *args, **kwargs)[source]

main model event handler method

explain(request: Dict)Dict[source]

model explain operation


get the model file(s) and metadata from model store

the method returns a path to the model file and the extra data (dict of dataitem objects) it also loads the model metadata into the self.model_spec attribute, allowing direct access to all the model metadata attributes.

get_model is usually used in the model .load() method to init the model .. rubric:: Examples

def load(self):
    model_file, extra_data = self.get_model(suffix='.pkl')
    self.model = load(open(model_file, "rb"))
    categories = extra_data['categories'].as_df()

suffix (str) – optional, model file suffix (when the model_path is a directory)


  • str – (local) model file

  • dict – extra dataitems dictionary

get_param(key: str, default=None)[source]

get param by key (specified in the model or the function)


model loading function, see also .get_model() method


sync/async model loading, for internal use

postprocess(request: Dict)Dict[source]

postprocess, before returning response

predict(request: Dict)Dict[source]

model prediction operation

preprocess(request: Dict, operation)Dict[source]

preprocess the event body before validate and action

set_metric(name: str, value)[source]

set real time metric (for model monitoring)

validate(request, operation)[source]

validate the event body (after preprocess)

class mlrun.serving.VotingEnsemble(context=None, name: Optional[str] = None, routes=None, protocol: Optional[str] = None, url_prefix: Optional[str] = None, health_prefix: Optional[str] = None, vote_type=None, executor_type=None, prediction_col_name=None, **kwargs)[source]

Bases: mlrun.serving.routers.BaseModelRouter

do_event(event, *args, **kwargs)[source]

Handles incoming requests.


event (nuclio.Event) – Incoming request as a nuclio.Event.


Event response after running the requested logic

Return type



Extracts the prediction from the model response. This function is used to allow multiple model return types. and allow for easy extension to the user’s ensemble and models best practices.


response (Union[List, Dict]) – The model response’s output field.


The model’s predictions

Return type



Validate the event body (after preprocessing)


request (dict) – Event body.


Event body after validation

Return type


  • Exceptioninputs key not found in request

  • Exceptioninputs should be of type List

mlrun.serving.create_graph_server(parameters={}, load_mode=None, graph=None, verbose=False, current_function=None, **kwargs)mlrun.serving.server.GraphServer[source]

create graph server host/emulator for local or test runs

Usage example:

server = create_graph_server(graph=RouterStep(), parameters={})
server.init(None, globals())
server.graph.add_route("my", class_name=MyModelClass, model_path="{path}", z=100)
print(server.test("/v2/models/my/infer", testdata))