mlrun.serving#

class mlrun.serving.states.BaseStep(name: str | None = None, after: list | None = None, shape: str | None = None)[source]#
error_handler(name: str | None = None, class_name=None, handler=None, before=None, function=None, full_event: bool | None = None, input_path: str | None = None, result_path: str | None = None, **class_args)[source]#

set error handler on a step or the entire graph (to be executed on failure/raise)

When setting the error_handler on the graph object, the graph completes after the error handler execution.

Example

in the below example, an 'error_catcher' step is set as the error_handler of the 'raise' step: in case of error/raise in 'raise' step, the handle_error will be run. after that, the 'echo' step will be run. graph = function.set_topology('flow', engine='async') graph.to(name='raise', handler='raising_step') .error_handler(name='error_catcher', handler='handle_error', full_event=True, before='echo') graph.add_step(name="echo", handler='echo', after="raise").respond()

Parameters:
  • name -- unique name (and path) for the error handler step, default is class name

  • class_name -- class name or step object to build the step from the error handler step is derived from task step (ie no router/queue functionally)

  • handler -- class/function handler to invoke on run/event

  • before -- string or list of next step(s) names that will run after this step. the before param must not specify upstream steps as it will cause a loop. if before is not specified, the graph will complete after the error handler execution.

  • function -- function this step should run in

  • full_event -- this step accepts the full event (not just the body)

  • input_path -- selects the key/path in the event to use as input to the step this requires that the event body will behave like a dict, for example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means the step will receive 7 as input

  • result_path -- selects the key/path in the event to write the results to this requires that the event body will behave like a dict, for example: event: {"x": 5} , result_path="y" means the output of the step will be written to event["y"] resulting in {"x": 5, "y": <result>}

  • class_args -- class init arguments

to(class_name: str | StepToDict | None = None, name: str | None = None, handler: str | None = None, graph_shape: str | None = None, function: str | None = None, full_event: bool | None = None, input_path: str | None = None, result_path: str | None = None, **class_args)[source]#

add a step right after this step and return the new step

Example

a 4-step pipeline ending with a stream: graph.to('URLDownloader') .to('ToParagraphs') .to(name='to_json', handler='json.dumps') .to('>>', 'to_v3io', path=stream_path)

Parameters:
  • class_name -- class name or step object to build the step from for router steps the class name should start with '*' for queue/stream step the class should be '>>' or '$queue'

  • name -- unique name (and path) for the child step, default is class name

  • handler -- class/function handler to invoke on run/event

  • graph_shape -- graphviz shape name

  • function -- function this step should run in

  • full_event -- this step accepts the full event (not just body)

  • input_path -- selects the key/path in the event to use as input to the step this requires that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means the step will receive 7 as input

  • result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="y" means the output of the step will be written to event["y"] resulting in {"x": 5, "y": <result>}

  • class_args -- class init arguments

class mlrun.serving.ErrorStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, name: str | None = None, after: list | None = None, full_event: bool | None = None, function: str | None = None, responder: bool | None = None, input_path: str | None = None, result_path: str | None = None)[source]#

Bases: TaskStep

error execution step, runs a class or handler

kind = 'error_step'#
class mlrun.serving.GraphContext(level='info', logger=None, server=None, nuclio_context: Context | None = None)[source]#

Bases: object

Graph context object

get_param(key: str, default=None)[source]#
get_remote_endpoint(name, external=True)[source]#

return the remote nuclio/serving function http(s) endpoint given its name

Parameters:
  • name -- the function name/uri in the form [project/]function-name[:tag]

  • external -- return the external url (returns the external url by default)

get_secret(key: str)[source]#
property project: str#

current project name (for the current function)

push_error(event, message, source=None, **kwargs)[source]#
property server#
class mlrun.serving.GraphServer(graph=None, parameters=None, load_mode=None, function_uri=None, verbose=False, version=None, functions=None, graph_initializer=None, error_stream=None, track_models=None, tracking_policy=None, secret_sources=None, default_content_type=None)[source]#

Bases: ModelObj

property graph: RootFlowStep | RouterStep#
init_object(namespace)[source]#
init_states(context, namespace, resource_cache: ResourceCache | None = None, logger=None, is_mock=False, monitoring_mock=False)[source]#

for internal use, initialize all steps (recursively)

kind = 'server'#
run(event, context=None, get_body=False, extra_args=None)[source]#
set_current_function(function)[source]#

set which child function this server is currently running on

set_error_stream(error_stream)[source]#

set/initialize the error notification stream

test(path: str = '/', body: str | bytes | dict | None = None, method: str = '', headers: str | None = None, content_type: str | None = None, silent: bool = False, get_body: bool = True, event_id: str | None = None, trigger: MockTrigger | None = None, offset=None, time=None)[source]#

invoke a test event into the server to simulate/test server behavior

example:

server = create_graph_server()
server.add_model("my", class_name=MyModelClass, model_path="{path}", z=100)
print(server.test("my/infer", testdata))
Parameters:
  • path -- api path, e.g. (/{router.url_prefix}/{model-name}/..) path

  • body -- message body (dict or json str/bytes)

  • method -- optional, GET, POST, ..

  • headers -- optional, request headers, ..

  • content_type -- optional, http mime type

  • silent -- don't raise on error responses (when not 20X)

  • get_body -- return the body as py object (vs serialize response into json)

  • event_id -- specify the unique event ID (by default a random value will be generated)

  • trigger -- nuclio trigger info or mlrun.serving.server.MockTrigger class (holds kind and name)

  • offset -- trigger offset (for streams)

  • time -- event time Datetime or str, default to now()

wait_for_completion()[source]#

wait for async operation to complete

class mlrun.serving.MonitoringApplicationStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, name: str | None = None, after: list | None = None, full_event: bool | None = None, function: str | None = None, responder: bool | None = None, input_path: str | None = None, result_path: str | None = None)[source]#

Bases: TaskStep

monitoring application execution step, runs users class code

kind = 'monitoring_application'#
class mlrun.serving.QueueStep(name: str | None = None, path: str | None = None, after: list | None = None, shards: int | None = None, retention_in_hours: int | None = None, trigger_args: dict | None = None, **options)[source]#

Bases: BaseStep

queue step, implement an async queue or represent a stream

property async_object#
default_shape = 'cds'#
init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#

init the step class

kind = 'queue'#
run(event, *args, **kwargs)[source]#
to(class_name: str | StepToDict | None = None, name: str | None = None, handler: str | None = None, graph_shape: str | None = None, function: str | None = None, full_event: bool | None = None, input_path: str | None = None, result_path: str | None = None, **class_args)[source]#

add a step right after this step and return the new step

Example

a 4-step pipeline ending with a stream: graph.to('URLDownloader') .to('ToParagraphs') .to(name='to_json', handler='json.dumps') .to('>>', 'to_v3io', path=stream_path)

Parameters:
  • class_name -- class name or step object to build the step from for router steps the class name should start with '*' for queue/stream step the class should be '>>' or '$queue'

  • name -- unique name (and path) for the child step, default is class name

  • handler -- class/function handler to invoke on run/event

  • graph_shape -- graphviz shape name

  • function -- function this step should run in

  • full_event -- this step accepts the full event (not just body)

  • input_path -- selects the key/path in the event to use as input to the step this requires that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means the step will receive 7 as input

  • result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="y" means the output of the step will be written to event["y"] resulting in {"x": 5, "y": <result>}

  • class_args -- class init arguments

class mlrun.serving.RouterStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, routes: list | None = None, name: str | None = None, function: str | None = None, input_path: str | None = None, result_path: str | None = None)[source]#

Bases: TaskStep

router step, implement routing logic for running child routes

add_route(key, route=None, class_name=None, handler=None, function=None, **class_args)[source]#

add child route step or class to the router

Parameters:
  • key -- unique name (and route path) for the child step

  • route -- child step object (Task, ..)

  • class_name -- class name to build the route step from (when route is not provided)

  • class_args -- class init arguments

  • handler -- class handler to invoke on run/event

  • function -- function this step should run in

clear_children(routes: list)[source]#

clear child steps (routes)

default_shape = 'doubleoctagon'#
get_children()[source]#

get child steps (routes)

init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#

init the step class

kind = 'router'#
plot(filename=None, format=None, source=None, **kw)[source]#

plot/save graph using graphviz

Parameters:
  • filename -- target filepath for the image (None for the notebook)

  • format -- The output format used for rendering ('pdf', 'png', etc.)

  • source -- source step to add to the graph

  • kw -- kwargs passed to graphviz, e.g. rankdir="LR" (see: https://graphviz.org/doc/info/attrs.html)

Returns:

graphviz graph object

property routes#

child routes/steps, traffic is routed to routes based on router logic

class mlrun.serving.TaskStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, name: str | None = None, after: list | None = None, full_event: bool | None = None, function: str | None = None, responder: bool | None = None, input_path: str | None = None, result_path: str | None = None)[source]#

Bases: BaseStep

task execution step, runs a class or handler

property async_object#

return the sync or async (storey) class instance

clear_object()[source]#
get_full_class_args(namespace, class_object, **extra_kwargs)[source]#
get_step_class_object(namespace)[source]#
init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#

init the step class

kind = 'task'#
respond()[source]#

mark this step as the responder.

step output will be returned as the flow result, no other step can follow

run(event, *args, **kwargs)[source]#

run this step, in async flows the run is done through storey

class mlrun.serving.V2ModelServer(context=None, name: str | None = None, model_path: str | None = None, model=None, protocol=None, input_path: str | None = None, result_path: str | None = None, **kwargs)[source]#

Bases: StepToDict

base model serving class (v2), using similar API to KFServing v2 and Triton

The class is initialized automatically by the model server and can run locally as part of a nuclio serverless function, or as part of a real-time pipeline default model url is: /v2/models/<model>[/versions/<ver>]/operation

You need to implement two mandatory methods:

load() - download the model file(s) and load the model into memory predict() - accept request payload and return prediction/inference results

you can override additional methods : preprocess, validate, postprocess, explain you can add custom api endpoint by adding method op_xx(event), will be invoked by calling the <model-url>/xx (operation = xx)

model server classes are subclassed (subclass implements the load() and predict() methods) the subclass can be added to a serving graph or to a model router

defining a sub class:

class MyClass(V2ModelServer):
    def load(self):
        # load and initialize the model and/or other elements
        model_file, extra_data = self.get_model(suffix=".pkl")
        self.model = load(open(model_file, "rb"))

    def predict(self, request):
        events = np.array(request["inputs"])
        dmatrix = xgb.DMatrix(events)
        result: xgb.DMatrix = self.model.predict(dmatrix)
        return {"outputs": result.tolist()}

usage example:

# adding a model to a serving graph using the subclass MyClass
# MyClass will be initialized with the name "my", the model_path, and an arg called my_param
graph = fn.set_topology("router")
fn.add_model("my", class_name="MyClass", model_path="<model-uri>>", my_param=5)
Parameters:
  • context -- for internal use (passed in init)

  • name -- step name

  • model_path -- model file/dir or artifact path

  • model -- model object (for local testing)

  • protocol -- serving API protocol (default "v2")

  • input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7

  • result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}

  • kwargs -- extra arguments (can be accessed using self.get_param(key))

do_event(event, *args, **kwargs)[source]#

main model event handler method

explain(request: dict) dict[source]#

model explain operation

get_model(suffix='')[source]#

get the model file(s) and metadata from model store

the method returns a path to the model file and the extra data (dict of dataitem objects) it also loads the model metadata into the self.model_spec attribute, allowing direct access to all the model metadata attributes.

get_model is usually used in the model .load() method to init the model .. rubric:: Examples

def load(self):
    model_file, extra_data = self.get_model(suffix=".pkl")
    self.model = load(open(model_file, "rb"))
    categories = extra_data["categories"].as_df()
Parameters:

suffix (str) -- optional, model file suffix (when the model_path is a directory)

Returns:

  • str -- (local) model file

  • dict -- extra dataitems dictionary

get_param(key: str, default=None)[source]#

get param by key (specified in the model or the function)

load()[source]#

model loading function, see also .get_model() method

logged_results(request: dict, response: dict, op: str)[source]#

hook for controlling which results are tracked by the model monitoring

this hook allows controlling which input/output data is logged by the model monitoring allow filtering out columns or adding custom values, can also be used to monitor derived metrics for example in image classification calculate and track the RGB values vs the image bitmap

the request["inputs"] holds a list of input values/arrays, the response["outputs"] holds a list of corresponding output values/arrays (the schema of the input/output fields is stored in the model object), this method should return lists of alternative inputs and outputs which will be monitored

Parameters:
  • request -- predict/explain request, see model serving docs for details

  • response -- result from the model predict/explain (after postprocess())

  • op -- operation (predict/infer or explain)

Returns:

the input and output lists to track

post_init(mode='sync')[source]#

sync/async model loading, for internal use

postprocess(request: dict) dict[source]#

postprocess, before returning response

predict(request: dict) list[source]#

model prediction operation :return: list with the model prediction results (can be multi-port) or list of lists for multiple predictions

preprocess(request: dict, operation) dict[source]#

preprocess the event body before validate and action

set_metric(name: str, value)[source]#

set real time metric (for model monitoring)

validate(request, operation)[source]#

validate the event body (after preprocess)

class mlrun.serving.VotingEnsemble(context=None, name: str | None = None, routes=None, protocol: str | None = None, url_prefix: str | None = None, health_prefix: str | None = None, vote_type: str | None = None, weights: dict[str, float] | None = None, executor_type: ParallelRunnerModes | str = ParallelRunnerModes.thread, format_response_with_col_name_flag: bool = False, prediction_col_name: str = 'prediction', **kwargs)[source]#

Bases: ParallelRun

Voting Ensemble

The VotingEnsemble class enables you to apply prediction logic on top of the different added models.

You can use it by calling:

  • <prefix>/<model>[/versions/<ver>]/operation

    Sends the event to the specific <model>[/versions/<ver>]

  • <prefix>/operation

    Sends the event to all models and applies vote(self, event)

The VotingEnsemble applies the following logic: Incoming Event -> Router Preprocessing -> Send to model/s -> Apply all model/s logic (Preprocessing -> Prediction -> Postprocessing) -> Router Voting logic -> Router Postprocessing -> Response

This enables you to do the general preprocessing and postprocessing steps once on the router level, with only model-specific adjustments at the model level.

When enabling model tracking via set_tracking() the ensemble logic predictions will appear with model name as the given VotingEnsemble name or "VotingEnsemble" by default.

Example:

# Define a serving function
# Note: You can point the function to a file containing you own Router or Classifier Model class
#       this basic class supports sklearn based models (with `<model>.predict()` api)
fn = mlrun.code_to_function(name='ensemble',
                            kind='serving',
                            filename='model-server.py'
                            image='mlrun/mlrun')

# Set the router class
# You can set your own classes by simply changing the `class_name`
fn.set_topology(class_name='mlrun.serving.routers.VotingEnsemble')

# Add models
fn.add_model(<model_name>, <model_path>, <model_class_name>)
fn.add_model(<model_name>, <model_path>, <model_class_name>)

How to extend the VotingEnsemble:

The VotingEnsemble applies its logic using the logic(predictions) function. The logic() function receives an array of (# samples, # predictors) which you can then use to apply whatever logic you may need.

If we use this VotingEnsemble as an example, the logic() function tries to figure out whether you are trying to do a classification or a regression prediction by the prediction type or by the given vote_type parameter. Then we apply the appropriate max_vote() or mean_vote() which calculates the actual prediction result and returns it as the VotingEnsemble's prediction.

Parameters:
  • context -- for internal use (passed in init)

  • name -- step name

  • routes -- for internal use (routes passed in init)

  • protocol -- serving API protocol (default "v2")

  • url_prefix -- url prefix for the router (default /v2/models)

  • health_prefix -- health api url prefix (default /v2/health)

  • input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7

  • result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}

  • vote_type -- Voting type to be used (from VotingTypes). by default will try to self-deduct upon the first event: - float prediction type: regression - int prediction type: classification

  • ({"<model_name>" (weights A dictionary) -- <model_weight>}) that specified each model weight, if there is a model that didn't appear in the dictionary his weight will be count as a zero. None means that all the models have the same weight.

  • executor_type -- Parallelism mechanism, out of ParallelRunnerModes, by default threads

  • format_response_with_col_name_flag --

    If this flag is True the model's responses output format is

    {id: <id>, model_name: <name>, outputs: {..., prediction: [<predictions>], ...}}

    Else

    {id: <id>, model_name: <name>, outputs: [<predictions>]}

  • prediction_col_name -- The dict key for the predictions column in the model's responses output. Example: If the model returns {id: <id>, model_name: <name>, outputs: {..., prediction: [<predictions>], ...}} the prediction_col_name should be prediction. by default, prediction

  • kwargs -- extra arguments

do_event(event, *args, **kwargs)[source]#

Handles incoming requests.

Parameters:

event (nuclio.Event) -- Incoming request as a nuclio.Event.

Returns:

Event response after running the requested logic

Return type:

Response

extract_results_from_response(response)[source]#

Extracts the prediction from the model response. This function is used to allow multiple model return types. and allow for easy extension to the user's ensemble and models best practices.

Parameters:

response (Union[List, Dict]) -- The model response's output field.

Returns:

The model's predictions

Return type:

List

logic(predictions: list[list[Union[int, float]]], weights: list[float])[source]#

Returns the final prediction of all the models after applying the desire logic

Parameters:
  • predictions -- The predictions from all models, per event

  • weights -- models weights in the prediction order

Returns:

List of the resulting voted predictions

post_init(mode='sync')[source]#
validate(request: dict, method: str)[source]#

Validate the event body (after preprocessing)

Parameters:
  • request -- Event body.

  • method -- Event method.

Returns:

The given Event body (request).

Raises:

Exception -- If validation failed.

mlrun.serving.create_graph_server(parameters=None, load_mode=None, graph=None, verbose=False, current_function=None, **kwargs) GraphServer[source]#

create graph server host/emulator for local or test runs

Usage example:

server = create_graph_server(graph=RouterStep(), parameters={})
server.init(None, globals())
server.graph.add_route("my", class_name=MyModelClass, model_path="{path}", z=100)
print(server.test("/v2/models/my/infer", testdata))
class mlrun.serving.remote.RemoteStep(url: str, subpath: str | None = None, method: str | None = None, headers: dict | None = None, url_expression: str | None = None, body_expression: str | None = None, return_json: bool = True, input_path: str | None = None, result_path: str | None = None, max_in_flight=None, retries=None, backoff_factor=None, timeout=None, **kwargs)[source]#

class for calling remote endpoints

sync and async graph step implementation for request/resp to remote service (class shortcut = "$remote") url can be an http(s) url (e.g. "https://myservice/path") or an mlrun function uri ([project/]name). alternatively the url_expression can be specified to build the url from the event (e.g. "event['url']").

example pipeline:

flow = function.set_topology("flow", engine="async")
flow.to(name="step1", handler="func1")
    .to(RemoteStep(name="remote_echo", url="https://myservice/path", method="POST"))
    .to(name="laststep", handler="func2").respond()
Parameters:
  • url -- http(s) url or function [project/]name to call

  • subpath -- path (which follows the url), use $path to use the event.path

  • method -- HTTP method (GET, POST, ..), default to POST

  • headers -- dictionary with http header values

  • url_expression -- an expression for getting the url from the event, e.g. "event['url']"

  • body_expression -- an expression for getting the request body from the event, e.g. "event['data']"

  • return_json -- indicate the returned value is json, and convert it to a py object

  • input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7

  • result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}

  • retries -- number of retries (in exponential backoff)

  • backoff_factor -- A backoff factor in seconds to apply between attempts after the second try

  • timeout -- How long to wait for the server to send data before giving up, float in seconds

__init__(url: str, subpath: str | None = None, method: str | None = None, headers: dict | None = None, url_expression: str | None = None, body_expression: str | None = None, return_json: bool = True, input_path: str | None = None, result_path: str | None = None, max_in_flight=None, retries=None, backoff_factor=None, timeout=None, **kwargs)[source]#

class for calling remote endpoints

sync and async graph step implementation for request/resp to remote service (class shortcut = "$remote") url can be an http(s) url (e.g. "https://myservice/path") or an mlrun function uri ([project/]name). alternatively the url_expression can be specified to build the url from the event (e.g. "event['url']").

example pipeline:

flow = function.set_topology("flow", engine="async")
flow.to(name="step1", handler="func1")
    .to(RemoteStep(name="remote_echo", url="https://myservice/path", method="POST"))
    .to(name="laststep", handler="func2").respond()
Parameters:
  • url -- http(s) url or function [project/]name to call

  • subpath -- path (which follows the url), use $path to use the event.path

  • method -- HTTP method (GET, POST, ..), default to POST

  • headers -- dictionary with http header values

  • url_expression -- an expression for getting the url from the event, e.g. "event['url']"

  • body_expression -- an expression for getting the request body from the event, e.g. "event['data']"

  • return_json -- indicate the returned value is json, and convert it to a py object

  • input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7

  • result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}

  • retries -- number of retries (in exponential backoff)

  • backoff_factor -- A backoff factor in seconds to apply between attempts after the second try

  • timeout -- How long to wait for the server to send data before giving up, float in seconds

class mlrun.serving.utils.StepToDict[source]#

auto serialization of graph steps to a python dictionary

to_dict(fields: list | None = None, exclude: list | None = None, strip: bool = False)[source]#

convert the step object to a python dictionary