mlrun.serving#
- class mlrun.serving.states.BaseStep(name: str | None = None, after: list | None = None, shape: str | None = None)[source]#
- error_handler(name: str | None = None, class_name=None, handler=None, before=None, function=None, full_event: bool | None = None, input_path: str | None = None, result_path: str | None = None, **class_args)[source]#
set error handler on a step or the entire graph (to be executed on failure/raise)
When setting the error_handler on the graph object, the graph completes after the error handler execution.
Example
in the below example, an 'error_catcher' step is set as the error_handler of the 'raise' step: in case of error/raise in 'raise' step, the handle_error will be run. after that, the 'echo' step will be run. graph = function.set_topology('flow', engine='async') graph.to(name='raise', handler='raising_step') .error_handler(name='error_catcher', handler='handle_error', full_event=True, before='echo') graph.add_step(name="echo", handler='echo', after="raise").respond()
- Parameters:
name -- unique name (and path) for the error handler step, default is class name
class_name -- class name or step object to build the step from the error handler step is derived from task step (ie no router/queue functionally)
handler -- class/function handler to invoke on run/event
before -- string or list of next step(s) names that will run after this step. the before param must not specify upstream steps as it will cause a loop. if before is not specified, the graph will complete after the error handler execution.
function -- function this step should run in
full_event -- this step accepts the full event (not just the body)
input_path -- selects the key/path in the event to use as input to the step this requires that the event body will behave like a dict, for example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means the step will receive 7 as input
result_path -- selects the key/path in the event to write the results to this requires that the event body will behave like a dict, for example: event: {"x": 5} , result_path="y" means the output of the step will be written to event["y"] resulting in {"x": 5, "y": <result>}
class_args -- class init arguments
- to(class_name: str | StepToDict | None = None, name: str | None = None, handler: str | None = None, graph_shape: str | None = None, function: str | None = None, full_event: bool | None = None, input_path: str | None = None, result_path: str | None = None, **class_args)[source]#
add a step right after this step and return the new step
Example
a 4-step pipeline ending with a stream: graph.to('URLDownloader') .to('ToParagraphs') .to(name='to_json', handler='json.dumps') .to('>>', 'to_v3io', path=stream_path)
- Parameters:
class_name -- class name or step object to build the step from for router steps the class name should start with '*' for queue/stream step the class should be '>>' or '$queue'
name -- unique name (and path) for the child step, default is class name
handler -- class/function handler to invoke on run/event
graph_shape -- graphviz shape name
function -- function this step should run in
full_event -- this step accepts the full event (not just body)
input_path -- selects the key/path in the event to use as input to the step this requires that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means the step will receive 7 as input
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="y" means the output of the step will be written to event["y"] resulting in {"x": 5, "y": <result>}
class_args -- class init arguments
- class mlrun.serving.ErrorStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, name: str | None = None, after: list | None = None, full_event: bool | None = None, function: str | None = None, responder: bool | None = None, input_path: str | None = None, result_path: str | None = None)[source]#
Bases:
TaskStep
error execution step, runs a class or handler
- kind = 'error_step'#
- class mlrun.serving.GraphContext(level='info', logger=None, server=None, nuclio_context: Context | None = None)[source]#
Bases:
object
Graph context object
- get_remote_endpoint(name, external=True)[source]#
return the remote nuclio/serving function http(s) endpoint given its name
- Parameters:
name -- the function name/uri in the form [project/]function-name[:tag]
external -- return the external url (returns the external url by default)
- property project: str#
current project name (for the current function)
- property server#
- class mlrun.serving.GraphServer(graph=None, parameters=None, load_mode=None, function_uri=None, verbose=False, version=None, functions=None, graph_initializer=None, error_stream=None, track_models=None, tracking_policy=None, secret_sources=None, default_content_type=None)[source]#
Bases:
ModelObj
- property graph: RootFlowStep | RouterStep#
- init_states(context, namespace, resource_cache: ResourceCache | None = None, logger=None, is_mock=False, monitoring_mock=False)[source]#
for internal use, initialize all steps (recursively)
- kind = 'server'#
- set_current_function(function)[source]#
set which child function this server is currently running on
- test(path: str = '/', body: str | bytes | dict | None = None, method: str = '', headers: str | None = None, content_type: str | None = None, silent: bool = False, get_body: bool = True, event_id: str | None = None, trigger: MockTrigger | None = None, offset=None, time=None)[source]#
invoke a test event into the server to simulate/test server behavior
example:
server = create_graph_server() server.add_model("my", class_name=MyModelClass, model_path="{path}", z=100) print(server.test("my/infer", testdata))
- Parameters:
path -- api path, e.g. (/{router.url_prefix}/{model-name}/..) path
body -- message body (dict or json str/bytes)
method -- optional, GET, POST, ..
headers -- optional, request headers, ..
content_type -- optional, http mime type
silent -- don't raise on error responses (when not 20X)
get_body -- return the body as py object (vs serialize response into json)
event_id -- specify the unique event ID (by default a random value will be generated)
trigger -- nuclio trigger info or mlrun.serving.server.MockTrigger class (holds kind and name)
offset -- trigger offset (for streams)
time -- event time Datetime or str, default to now()
- class mlrun.serving.MonitoringApplicationStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, name: str | None = None, after: list | None = None, full_event: bool | None = None, function: str | None = None, responder: bool | None = None, input_path: str | None = None, result_path: str | None = None)[source]#
Bases:
TaskStep
monitoring application execution step, runs users class code
- kind = 'monitoring_application'#
- class mlrun.serving.QueueStep(name: str | None = None, path: str | None = None, after: list | None = None, shards: int | None = None, retention_in_hours: int | None = None, trigger_args: dict | None = None, **options)[source]#
Bases:
BaseStep
queue step, implement an async queue or represent a stream
- property async_object#
- default_shape = 'cds'#
- init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#
init the step class
- kind = 'queue'#
- to(class_name: str | StepToDict | None = None, name: str | None = None, handler: str | None = None, graph_shape: str | None = None, function: str | None = None, full_event: bool | None = None, input_path: str | None = None, result_path: str | None = None, **class_args)[source]#
add a step right after this step and return the new step
Example
a 4-step pipeline ending with a stream: graph.to('URLDownloader') .to('ToParagraphs') .to(name='to_json', handler='json.dumps') .to('>>', 'to_v3io', path=stream_path)
- Parameters:
class_name -- class name or step object to build the step from for router steps the class name should start with '*' for queue/stream step the class should be '>>' or '$queue'
name -- unique name (and path) for the child step, default is class name
handler -- class/function handler to invoke on run/event
graph_shape -- graphviz shape name
function -- function this step should run in
full_event -- this step accepts the full event (not just body)
input_path -- selects the key/path in the event to use as input to the step this requires that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means the step will receive 7 as input
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="y" means the output of the step will be written to event["y"] resulting in {"x": 5, "y": <result>}
class_args -- class init arguments
- class mlrun.serving.RouterStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, routes: list | None = None, name: str | None = None, function: str | None = None, input_path: str | None = None, result_path: str | None = None)[source]#
Bases:
TaskStep
router step, implement routing logic for running child routes
- add_route(key, route=None, class_name=None, handler=None, function=None, **class_args)[source]#
add child route step or class to the router
- Parameters:
key -- unique name (and route path) for the child step
route -- child step object (Task, ..)
class_name -- class name to build the route step from (when route is not provided)
class_args -- class init arguments
handler -- class handler to invoke on run/event
function -- function this step should run in
- default_shape = 'doubleoctagon'#
- init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#
init the step class
- kind = 'router'#
- plot(filename=None, format=None, source=None, **kw)[source]#
plot/save graph using graphviz
- Parameters:
filename -- target filepath for the image (None for the notebook)
format -- The output format used for rendering (
'pdf'
,'png'
, etc.)source -- source step to add to the graph
kw -- kwargs passed to graphviz, e.g. rankdir="LR" (see: https://graphviz.org/doc/info/attrs.html)
- Returns:
graphviz graph object
- property routes#
child routes/steps, traffic is routed to routes based on router logic
- class mlrun.serving.TaskStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, name: str | None = None, after: list | None = None, full_event: bool | None = None, function: str | None = None, responder: bool | None = None, input_path: str | None = None, result_path: str | None = None)[source]#
Bases:
BaseStep
task execution step, runs a class or handler
- property async_object#
return the sync or async (storey) class instance
- init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#
init the step class
- kind = 'task'#
- class mlrun.serving.V2ModelServer(context=None, name: str | None = None, model_path: str | None = None, model=None, protocol=None, input_path: str | None = None, result_path: str | None = None, **kwargs)[source]#
Bases:
StepToDict
base model serving class (v2), using similar API to KFServing v2 and Triton
The class is initialized automatically by the model server and can run locally as part of a nuclio serverless function, or as part of a real-time pipeline default model url is: /v2/models/<model>[/versions/<ver>]/operation
- You need to implement two mandatory methods:
load() - download the model file(s) and load the model into memory predict() - accept request payload and return prediction/inference results
you can override additional methods : preprocess, validate, postprocess, explain you can add custom api endpoint by adding method op_xx(event), will be invoked by calling the <model-url>/xx (operation = xx)
model server classes are subclassed (subclass implements the load() and predict() methods) the subclass can be added to a serving graph or to a model router
defining a sub class:
class MyClass(V2ModelServer): def load(self): # load and initialize the model and/or other elements model_file, extra_data = self.get_model(suffix=".pkl") self.model = load(open(model_file, "rb")) def predict(self, request): events = np.array(request["inputs"]) dmatrix = xgb.DMatrix(events) result: xgb.DMatrix = self.model.predict(dmatrix) return {"outputs": result.tolist()}
usage example:
# adding a model to a serving graph using the subclass MyClass # MyClass will be initialized with the name "my", the model_path, and an arg called my_param graph = fn.set_topology("router") fn.add_model("my", class_name="MyClass", model_path="<model-uri>>", my_param=5)
- Parameters:
context -- for internal use (passed in init)
name -- step name
model_path -- model file/dir or artifact path
model -- model object (for local testing)
protocol -- serving API protocol (default "v2")
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
kwargs -- extra arguments (can be accessed using self.get_param(key))
- get_model(suffix='')[source]#
get the model file(s) and metadata from model store
the method returns a path to the model file and the extra data (dict of dataitem objects) it also loads the model metadata into the self.model_spec attribute, allowing direct access to all the model metadata attributes.
get_model is usually used in the model .load() method to init the model .. rubric:: Examples
def load(self): model_file, extra_data = self.get_model(suffix=".pkl") self.model = load(open(model_file, "rb")) categories = extra_data["categories"].as_df()
- Parameters:
suffix (str) -- optional, model file suffix (when the model_path is a directory)
- Returns:
str -- (local) model file
dict -- extra dataitems dictionary
- get_param(key: str, default=None)[source]#
get param by key (specified in the model or the function)
- logged_results(request: dict, response: dict, op: str)[source]#
hook for controlling which results are tracked by the model monitoring
this hook allows controlling which input/output data is logged by the model monitoring allow filtering out columns or adding custom values, can also be used to monitor derived metrics for example in image classification calculate and track the RGB values vs the image bitmap
the request["inputs"] holds a list of input values/arrays, the response["outputs"] holds a list of corresponding output values/arrays (the schema of the input/output fields is stored in the model object), this method should return lists of alternative inputs and outputs which will be monitored
- Parameters:
request -- predict/explain request, see model serving docs for details
response -- result from the model predict/explain (after postprocess())
op -- operation (predict/infer or explain)
- Returns:
the input and output lists to track
- predict(request: dict) list [source]#
model prediction operation :return: list with the model prediction results (can be multi-port) or list of lists for multiple predictions
- class mlrun.serving.VotingEnsemble(context=None, name: str | None = None, routes=None, protocol: str | None = None, url_prefix: str | None = None, health_prefix: str | None = None, vote_type: str | None = None, weights: dict[str, float] | None = None, executor_type: ParallelRunnerModes | str = ParallelRunnerModes.thread, format_response_with_col_name_flag: bool = False, prediction_col_name: str = 'prediction', **kwargs)[source]#
Bases:
ParallelRun
Voting Ensemble
The VotingEnsemble class enables you to apply prediction logic on top of the different added models.
You can use it by calling:
- <prefix>/<model>[/versions/<ver>]/operation
Sends the event to the specific <model>[/versions/<ver>]
- <prefix>/operation
Sends the event to all models and applies vote(self, event)
The VotingEnsemble applies the following logic: Incoming Event -> Router Preprocessing -> Send to model/s -> Apply all model/s logic (Preprocessing -> Prediction -> Postprocessing) -> Router Voting logic -> Router Postprocessing -> Response
This enables you to do the general preprocessing and postprocessing steps once on the router level, with only model-specific adjustments at the model level.
When enabling model tracking via set_tracking() the ensemble logic predictions will appear with model name as the given VotingEnsemble name or "VotingEnsemble" by default.
Example:
# Define a serving function # Note: You can point the function to a file containing you own Router or Classifier Model class # this basic class supports sklearn based models (with `<model>.predict()` api) fn = mlrun.code_to_function(name='ensemble', kind='serving', filename='model-server.py' image='mlrun/mlrun') # Set the router class # You can set your own classes by simply changing the `class_name` fn.set_topology(class_name='mlrun.serving.routers.VotingEnsemble') # Add models fn.add_model(<model_name>, <model_path>, <model_class_name>) fn.add_model(<model_name>, <model_path>, <model_class_name>)
How to extend the VotingEnsemble:
The VotingEnsemble applies its logic using the logic(predictions) function. The logic() function receives an array of (# samples, # predictors) which you can then use to apply whatever logic you may need.
If we use this VotingEnsemble as an example, the logic() function tries to figure out whether you are trying to do a classification or a regression prediction by the prediction type or by the given vote_type parameter. Then we apply the appropriate max_vote() or mean_vote() which calculates the actual prediction result and returns it as the VotingEnsemble's prediction.
- Parameters:
context -- for internal use (passed in init)
name -- step name
routes -- for internal use (routes passed in init)
protocol -- serving API protocol (default "v2")
url_prefix -- url prefix for the router (default /v2/models)
health_prefix -- health api url prefix (default /v2/health)
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
vote_type -- Voting type to be used (from VotingTypes). by default will try to self-deduct upon the first event: - float prediction type: regression - int prediction type: classification
({"<model_name>" (weights A dictionary) -- <model_weight>}) that specified each model weight, if there is a model that didn't appear in the dictionary his weight will be count as a zero. None means that all the models have the same weight.
executor_type -- Parallelism mechanism, out of ParallelRunnerModes, by default threads
format_response_with_col_name_flag --
- If this flag is True the model's responses output format is
{id: <id>, model_name: <name>, outputs: {..., prediction: [<predictions>], ...}}
- Else
{id: <id>, model_name: <name>, outputs: [<predictions>]}
prediction_col_name -- The dict key for the predictions column in the model's responses output. Example: If the model returns {id: <id>, model_name: <name>, outputs: {..., prediction: [<predictions>], ...}} the prediction_col_name should be prediction. by default, prediction
kwargs -- extra arguments
- do_event(event, *args, **kwargs)[source]#
Handles incoming requests.
- Parameters:
event (nuclio.Event) -- Incoming request as a nuclio.Event.
- Returns:
Event response after running the requested logic
- Return type:
Response
- extract_results_from_response(response)[source]#
Extracts the prediction from the model response. This function is used to allow multiple model return types. and allow for easy extension to the user's ensemble and models best practices.
- Parameters:
response (Union[List, Dict]) -- The model response's output field.
- Returns:
The model's predictions
- Return type:
List
- logic(predictions: list[list[Union[int, float]]], weights: list[float])[source]#
Returns the final prediction of all the models after applying the desire logic
- Parameters:
predictions -- The predictions from all models, per event
weights -- models weights in the prediction order
- Returns:
List of the resulting voted predictions
- mlrun.serving.create_graph_server(parameters=None, load_mode=None, graph=None, verbose=False, current_function=None, **kwargs) GraphServer [source]#
create graph server host/emulator for local or test runs
Usage example:
server = create_graph_server(graph=RouterStep(), parameters={}) server.init(None, globals()) server.graph.add_route("my", class_name=MyModelClass, model_path="{path}", z=100) print(server.test("/v2/models/my/infer", testdata))
- class mlrun.serving.remote.RemoteStep(url: str, subpath: str | None = None, method: str | None = None, headers: dict | None = None, url_expression: str | None = None, body_expression: str | None = None, return_json: bool = True, input_path: str | None = None, result_path: str | None = None, max_in_flight=None, retries=None, backoff_factor=None, timeout=None, **kwargs)[source]#
class for calling remote endpoints
sync and async graph step implementation for request/resp to remote service (class shortcut = "$remote") url can be an http(s) url (e.g. "https://myservice/path") or an mlrun function uri ([project/]name). alternatively the url_expression can be specified to build the url from the event (e.g. "event['url']").
example pipeline:
flow = function.set_topology("flow", engine="async") flow.to(name="step1", handler="func1") .to(RemoteStep(name="remote_echo", url="https://myservice/path", method="POST")) .to(name="laststep", handler="func2").respond()
- Parameters:
url -- http(s) url or function [project/]name to call
subpath -- path (which follows the url), use $path to use the event.path
method -- HTTP method (GET, POST, ..), default to POST
headers -- dictionary with http header values
url_expression -- an expression for getting the url from the event, e.g. "event['url']"
body_expression -- an expression for getting the request body from the event, e.g. "event['data']"
return_json -- indicate the returned value is json, and convert it to a py object
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
retries -- number of retries (in exponential backoff)
backoff_factor -- A backoff factor in seconds to apply between attempts after the second try
timeout -- How long to wait for the server to send data before giving up, float in seconds
- __init__(url: str, subpath: str | None = None, method: str | None = None, headers: dict | None = None, url_expression: str | None = None, body_expression: str | None = None, return_json: bool = True, input_path: str | None = None, result_path: str | None = None, max_in_flight=None, retries=None, backoff_factor=None, timeout=None, **kwargs)[source]#
class for calling remote endpoints
sync and async graph step implementation for request/resp to remote service (class shortcut = "$remote") url can be an http(s) url (e.g. "https://myservice/path") or an mlrun function uri ([project/]name). alternatively the url_expression can be specified to build the url from the event (e.g. "event['url']").
example pipeline:
flow = function.set_topology("flow", engine="async") flow.to(name="step1", handler="func1") .to(RemoteStep(name="remote_echo", url="https://myservice/path", method="POST")) .to(name="laststep", handler="func2").respond()
- Parameters:
url -- http(s) url or function [project/]name to call
subpath -- path (which follows the url), use $path to use the event.path
method -- HTTP method (GET, POST, ..), default to POST
headers -- dictionary with http header values
url_expression -- an expression for getting the url from the event, e.g. "event['url']"
body_expression -- an expression for getting the request body from the event, e.g. "event['data']"
return_json -- indicate the returned value is json, and convert it to a py object
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
retries -- number of retries (in exponential backoff)
backoff_factor -- A backoff factor in seconds to apply between attempts after the second try
timeout -- How long to wait for the server to send data before giving up, float in seconds