mlrun.serving#
- class mlrun.serving.states.BaseStep(name: str | None = None, after: list | None = None, shape: str | None = None)[source]#
- error_handler(name: str | None = None, class_name=None, handler=None, before=None, function=None, full_event: bool | None = None, input_path: str | None = None, result_path: str | None = None, **class_args)[source]#
set error handler on a step or the entire graph (to be executed on failure/raise)
When setting the error_handler on the graph object, the graph completes after the error handler execution.
Example
in the below example, an 'error_catcher' step is set as the error_handler of the 'raise' step: in case of error/raise in 'raise' step, the handle_error will be run. after that, the 'echo' step will be run. graph = function.set_topology('flow', engine='async') graph.to(name='raise', handler='raising_step') .error_handler(name='error_catcher', handler='handle_error', full_event=True, before='echo') graph.add_step(name="echo", handler='echo', after="raise").respond()
- Parameters:
name -- unique name (and path) for the error handler step, default is class name
class_name -- class name or step object to build the step from the error handler step is derived from task step (ie no router/queue functionally)
handler -- class/function handler to invoke on run/event
before -- string or list of next step(s) names that will run after this step. the before param must not specify upstream steps as it will cause a loop. if before is not specified, the graph will complete after the error handler execution.
function -- function this step should run in
full_event -- this step accepts the full event (not just the body)
input_path -- selects the key/path in the event to use as input to the step this requires that the event body will behave like a dict, for example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means the step will receive 7 as input
result_path -- selects the key/path in the event to write the results to this requires that the event body will behave like a dict, for example: event: {"x": 5} , result_path="y" means the output of the step will be written to event["y"] resulting in {"x": 5, "y": <result>}
class_args -- class init arguments
- to(class_name: str | StepToDict | None = None, name: str | None = None, handler: str | None = None, graph_shape: str | None = None, function: str | None = None, full_event: bool | None = None, input_path: str | None = None, result_path: str | None = None, **class_args)[source]#
add a step right after this step and return the new step
Example
a 4-step pipeline ending with a stream: graph.to('URLDownloader') .to('ToParagraphs') .to(name='to_json', handler='json.dumps') .to('>>', 'to_v3io', path=stream_path)
- Parameters:
class_name -- class name or step object to build the step from for router steps the class name should start with '*' for queue/stream step the class should be '>>' or '$queue'
name -- unique name (and path) for the child step, default is class name
handler -- class/function handler to invoke on run/event
graph_shape -- graphviz shape name
function -- function this step should run in
full_event -- this step accepts the full event (not just body)
input_path -- selects the key/path in the event to use as input to the step this requires that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means the step will receive 7 as input
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="y" means the output of the step will be written to event["y"] resulting in {"x": 5, "y": <result>}
class_args -- class init arguments
- class mlrun.serving.ErrorStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, name: str | None = None, after: list | None = None, full_event: bool | None = None, function: str | None = None, responder: bool | None = None, input_path: str | None = None, result_path: str | None = None)[source]#
Bases:
TaskStep
error execution step, runs a class or handler
- kind = 'error_step'#
- class mlrun.serving.GraphContext(level='info', logger=None, server=None, nuclio_context: Context | None = None)[source]#
Bases:
object
Graph context object
- get_remote_endpoint(name, external=True)[source]#
return the remote nuclio/serving function http(s) endpoint given its name
- Parameters:
name -- the function name/uri in the form [project/]function-name[:tag]
external -- return the external url (returns the external url by default)
- property project: str#
current project name (for the current function)
- property server#
- class mlrun.serving.GraphServer(graph=None, parameters=None, load_mode=None, function_uri=None, verbose=False, version=None, functions=None, graph_initializer=None, error_stream=None, track_models=None, tracking_policy=None, secret_sources=None, default_content_type=None, function_name=None, function_tag=None, project=None)[source]#
Bases:
ModelObj
- property graph: RootFlowStep | RouterStep#
- init_states(context, namespace, resource_cache: ResourceCache | None = None, logger=None, is_mock=False, monitoring_mock=False)[source]#
for internal use, initialize all steps (recursively)
- kind = 'server'#
- set_current_function(function)[source]#
set which child function this server is currently running on
- test(path: str = '/', body: str | bytes | dict | None = None, method: str = '', headers: str | None = None, content_type: str | None = None, silent: bool = False, get_body: bool = True, event_id: str | None = None, trigger: MockTrigger | None = None, offset=None, time=None)[source]#
invoke a test event into the server to simulate/test server behavior
example:
server = create_graph_server() server.add_model("my", class_name=MyModelClass, model_path="{path}", z=100) print(server.test("my/infer", testdata))
- Parameters:
path -- api path, e.g. (/{router.url_prefix}/{model-name}/..) path
body -- message body (dict or json str/bytes)
method -- optional, GET, POST, ..
headers -- optional, request headers, ..
content_type -- optional, http mime type
silent -- don't raise on error responses (when not 20X)
get_body -- return the body as py object (vs serialize response into json)
event_id -- specify the unique event ID (by default a random value will be generated)
trigger -- nuclio trigger info or mlrun.serving.server.MockTrigger class (holds kind and name)
offset -- trigger offset (for streams)
time -- event time Datetime or str, default to now()
- class mlrun.serving.Model(name: str, **kwargs)[source]#
Bases:
ParallelExecutionRunnable
- predict(body: Any) Any [source]#
Override to implement prediction logic. If the logic requires asyncio, override predict_async() instead.
- async predict_async(body: Any) Any [source]#
Override to implement prediction logic if the logic requires asyncio.
- class mlrun.serving.ModelRunner(*args, model_selector: ModelSelector | None = None, **kwargs)[source]#
Bases:
ParallelExecution
Runs multiple Models on each event. See ModelRunnerStep.
- Parameters:
model_selector -- ModelSelector instance whose select() method will be used to select models to run on each event. Optional. If not passed, all models will be run.
- class mlrun.serving.ModelRunnerStep(*args, model_selector: str | ModelSelector | None = None, **kwargs)[source]#
Bases:
TaskStep
Runs multiple Models on each event.
example:
model_runner_step = ModelRunnerStep(name="my_model_runner") model_runner_step.add_model(MyModel(name="my_model")) graph.to(model_runner_step)
- Parameters:
model_selector -- ModelSelector instance whose select() method will be used to select models to run on each event. Optional. If not passed, all models will be run.
- init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#
init the step class
- kind = 'model_runner'#
- class mlrun.serving.ModelSelector[source]#
Bases:
object
Used to select which models to run on each event.
- select(event, available_models: list[mlrun.serving.states.Model]) list[str] | list[mlrun.serving.states.Model] [source]#
Given an event, returns a list of model names or a list of model objects to run on the event. If None is returned, all models will be run.
- Parameters:
event -- The full event
available_models -- List of available models
- class mlrun.serving.MonitoringApplicationStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, name: str | None = None, after: list | None = None, full_event: bool | None = None, function: str | None = None, responder: bool | None = None, input_path: str | None = None, result_path: str | None = None)[source]#
Bases:
TaskStep
monitoring application execution step, runs users class code
- kind = 'monitoring_application'#
- class mlrun.serving.QueueStep(name: str | None = None, path: str | None = None, after: list | None = None, shards: int | None = None, retention_in_hours: int | None = None, trigger_args: dict | None = None, **options)[source]#
Bases:
BaseStep
queue step, implement an async queue or represent a stream
- property async_object#
- default_shape = 'cds'#
- init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#
init the step class
- kind = 'queue'#
- to(class_name: str | StepToDict | None = None, name: str | None = None, handler: str | None = None, graph_shape: str | None = None, function: str | None = None, full_event: bool | None = None, input_path: str | None = None, result_path: str | None = None, **class_args)[source]#
add a step right after this step and return the new step
Example
a 4-step pipeline ending with a stream: graph.to('URLDownloader') .to('ToParagraphs') .to(name='to_json', handler='json.dumps') .to('>>', 'to_v3io', path=stream_path)
- Parameters:
class_name -- class name or step object to build the step from for router steps the class name should start with '*' for queue/stream step the class should be '>>' or '$queue'
name -- unique name (and path) for the child step, default is class name
handler -- class/function handler to invoke on run/event
graph_shape -- graphviz shape name
function -- function this step should run in
full_event -- this step accepts the full event (not just body)
input_path -- selects the key/path in the event to use as input to the step this requires that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means the step will receive 7 as input
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="y" means the output of the step will be written to event["y"] resulting in {"x": 5, "y": <result>}
class_args -- class init arguments
- class mlrun.serving.RouterStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, routes: list | None = None, name: str | None = None, function: str | None = None, input_path: str | None = None, result_path: str | None = None)[source]#
Bases:
TaskStep
router step, implement routing logic for running child routes
- add_route(key, route=None, class_name=None, handler=None, function=None, creation_strategy: ModelEndpointCreationStrategy = ModelEndpointCreationStrategy.INPLACE, **class_args)[source]#
add child route step or class to the router, if key exists it will be updated
- Parameters:
key -- unique name (and route path) for the child step
route -- child step object (Task, ..)
class_name -- class name to build the route step from (when route is not provided)
class_args -- class init arguments
handler -- class handler to invoke on run/event
function -- function this step should run in
creation_strategy -- Strategy for creating or updating the model endpoint: * overwrite: 1. If model endpoints with the same name exist, delete the latest one. 2. Create a new model endpoint entry and set it as latest. * inplace (default): 1. If model endpoints with the same name exist, update the latest entry. 2. Otherwise, create a new entry. * archive: 1. If model endpoints with the same name exist, preserve them. 2. Create a new model endpoint with the same name and set it to latest.
- default_shape = 'doubleoctagon'#
- init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#
init the step class
- kind = 'router'#
- plot(filename=None, format=None, source=None, **kw)[source]#
plot/save graph using graphviz
- Parameters:
filename -- target filepath for the image (None for the notebook)
format -- The output format used for rendering (
'pdf'
,'png'
, etc.)source -- source step to add to the graph
kw -- kwargs passed to graphviz, e.g. rankdir="LR" (see: https://graphviz.org/doc/info/attrs.html)
- Returns:
graphviz graph object
- property routes#
child routes/steps, traffic is routed to routes based on router logic
- class mlrun.serving.TaskStep(class_name: str | type | None = None, class_args: dict | None = None, handler: str | None = None, name: str | None = None, after: list | None = None, full_event: bool | None = None, function: str | None = None, responder: bool | None = None, input_path: str | None = None, result_path: str | None = None, model_endpoint_creation_strategy: ModelEndpointCreationStrategy | None = ModelEndpointCreationStrategy.SKIP, endpoint_type: EndpointType | None = EndpointType.NODE_EP)[source]#
Bases:
BaseStep
task execution step, runs a class or handler
- property async_object#
return the sync or async (storey) class instance
- init_object(context, namespace, mode='sync', reset=False, **extra_kwargs)[source]#
init the step class
- kind = 'task'#
- class mlrun.serving.V2ModelServer(context=None, name: str | None = None, model_path: str | None = None, model=None, protocol=None, input_path: str | None = None, result_path: str | None = None, shard_by_endpoint: bool | None = None, **kwargs)[source]#
Bases:
StepToDict
base model serving class (v2), using similar API to KFServing v2 and Triton
The class is initialized automatically by the model server and can run locally as part of a nuclio serverless function, or as part of a real-time pipeline default model url is: /v2/models/<model>[/versions/<ver>]/operation
- You need to implement two mandatory methods:
load() - download the model file(s) and load the model into memory predict() - accept request payload and return prediction/inference results
you can override additional methods : preprocess, validate, postprocess, explain you can add custom api endpoint by adding method op_xx(event), will be invoked by calling the <model-url>/xx (operation = xx)
model server classes are subclassed (subclass implements the load() and predict() methods) the subclass can be added to a serving graph or to a model router
defining a sub class:
class MyClass(V2ModelServer): def load(self): # load and initialize the model and/or other elements model_file, extra_data = self.get_model(suffix=".pkl") self.model = load(open(model_file, "rb")) def predict(self, request): events = np.array(request["inputs"]) dmatrix = xgb.DMatrix(events) result: xgb.DMatrix = self.model.predict(dmatrix) return {"outputs": result.tolist()}
usage example:
# adding a model to a serving graph using the subclass MyClass # MyClass will be initialized with the name "my", the model_path, and an arg called my_param graph = fn.set_topology("router") fn.add_model("my", class_name="MyClass", model_path="<model-uri>>", my_param=5)
- Parameters:
context -- for internal use (passed in init)
name -- step name
model_path -- model file/dir or artifact path
model -- model object (for local testing)
protocol -- serving API protocol (default "v2")
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
shard_by_endpoint -- whether to use the endpoint as the partition/sharding key when writing to model monitoring stream. Defaults to True.
kwargs -- extra arguments (can be accessed using self.get_param(key))
- get_model(suffix='')[source]#
get the model file(s) and metadata from model store
the method returns a path to the model file and the extra data (dict of dataitem objects) it also loads the model metadata into the self.model_spec attribute, allowing direct access to all the model metadata attributes.
get_model is usually used in the model .load() method to init the model .. rubric:: Examples
def load(self): model_file, extra_data = self.get_model(suffix=".pkl") self.model = load(open(model_file, "rb")) categories = extra_data["categories"].as_df()
- Parameters:
suffix (str) -- optional, model file suffix (when the model_path is a directory)
- Returns:
str -- (local) model file
dict -- extra dataitems dictionary
- get_param(key: str, default=None)[source]#
get param by key (specified in the model or the function)
- logged_results(request: dict, response: dict, op: str)[source]#
hook for controlling which results are tracked by the model monitoring
this hook allows controlling which input/output data is logged by the model monitoring allow filtering out columns or adding custom values, can also be used to monitor derived metrics for example in image classification calculate and track the RGB values vs the image bitmap
the request["inputs"] holds a list of input values/arrays, the response["outputs"] holds a list of corresponding output values/arrays (the schema of the input/output fields is stored in the model object), this method should return lists of alternative inputs and outputs which will be monitored
- Parameters:
request -- predict/explain request, see model serving docs for details
response -- result from the model predict/explain (after postprocess())
op -- operation (predict/infer or explain)
- Returns:
the input and output lists to track
- predict(request: dict) list [source]#
model prediction operation :return: list with the model prediction results (can be multi-port) or list of lists for multiple predictions
- class mlrun.serving.VotingEnsemble(context=None, name: str | None = None, routes=None, protocol: str | None = None, url_prefix: str | None = None, health_prefix: str | None = None, vote_type: str | None = None, weights: dict[str, float] | None = None, executor_type: ParallelRunnerModes | str = ParallelRunnerModes.thread, format_response_with_col_name_flag: bool = False, prediction_col_name: str = 'prediction', shard_by_endpoint: bool | None = None, **kwargs)[source]#
Bases:
ParallelRun
Voting Ensemble
The VotingEnsemble class enables you to apply prediction logic on top of the different added models.
You can use it by calling:
- <prefix>/<model>[/versions/<ver>]/operation
Sends the event to the specific <model>[/versions/<ver>]
- <prefix>/operation
Sends the event to all models and applies vote(self, event)
The VotingEnsemble applies the following logic: Incoming Event -> Router Preprocessing -> Send to model/s -> Apply all model/s logic (Preprocessing -> Prediction -> Postprocessing) -> Router Voting logic -> Router Postprocessing -> Response
This enables you to do the general preprocessing and postprocessing steps once on the router level, with only model-specific adjustments at the model level.
When enabling model tracking via set_tracking() the ensemble logic predictions will appear with model name as the given VotingEnsemble name or "VotingEnsemble" by default.
Example:
# Define a serving function # Note: You can point the function to a file containing you own Router or Classifier Model class # this basic class supports sklearn based models (with `<model>.predict()` api) fn = mlrun.code_to_function(name='ensemble', kind='serving', filename='model-server.py' image='mlrun/mlrun') # Set the router class # You can set your own classes by simply changing the `class_name` fn.set_topology(class_name='mlrun.serving.routers.VotingEnsemble') # Add models fn.add_model(<model_name>, <model_path>, <model_class_name>) fn.add_model(<model_name>, <model_path>, <model_class_name>)
How to extend the VotingEnsemble:
The VotingEnsemble applies its logic using the logic(predictions) function. The logic() function receives an array of (# samples, # predictors) which you can then use to apply whatever logic you may need.
If we use this VotingEnsemble as an example, the logic() function tries to figure out whether you are trying to do a classification or a regression prediction by the prediction type or by the given vote_type parameter. Then we apply the appropriate max_vote() or mean_vote() which calculates the actual prediction result and returns it as the VotingEnsemble's prediction.
- Parameters:
context -- for internal use (passed in init)
name -- step name
routes -- for internal use (routes passed in init)
protocol -- serving API protocol (default "v2")
url_prefix -- url prefix for the router (default /v2/models)
health_prefix -- health api url prefix (default /v2/health)
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
vote_type -- Voting type to be used (from VotingTypes). by default will try to self-deduct upon the first event: - float prediction type: regression - int prediction type: classification
({"<model_name>" (weights A dictionary) -- <model_weight>}) that specified each model weight, if there is a model that didn't appear in the dictionary his weight will be count as a zero. None means that all the models have the same weight.
executor_type -- Parallelism mechanism, out of ParallelRunnerModes, by default threads
format_response_with_col_name_flag --
- If this flag is True the model's responses output format is
{id: <id>, model_name: <name>, outputs: {..., prediction: [<predictions>], ...}}
- Else
{id: <id>, model_name: <name>, outputs: [<predictions>]}
prediction_col_name -- The dict key for the predictions column in the model's responses output. Example: If the model returns {id: <id>, model_name: <name>, outputs: {..., prediction: [<predictions>], ...}} the prediction_col_name should be prediction. by default, prediction
shard_by_endpoint -- whether to use the endpoint as the partition/sharding key when writing to model monitoring stream. Defaults to True.
kwargs -- extra arguments
- do_event(event, *args, **kwargs)[source]#
Handles incoming requests.
- Parameters:
event (nuclio.Event) -- Incoming request as a nuclio.Event.
- Returns:
Event response after running the requested logic
- Return type:
Response
- extract_results_from_response(response)[source]#
Extracts the prediction from the model response. This function is used to allow multiple model return types. and allow for easy extension to the user's ensemble and models best practices.
- Parameters:
response (Union[List, Dict]) -- The model response's output field.
- Returns:
The model's predictions
- Return type:
List
- logic(predictions: list[list[Union[int, float]]], weights: list[float])[source]#
Returns the final prediction of all the models after applying the desire logic
- Parameters:
predictions -- The predictions from all models, per event
weights -- models weights in the prediction order
- Returns:
List of the resulting voted predictions
- mlrun.serving.create_graph_server(parameters=None, load_mode=None, graph=None, verbose=False, current_function=None, **kwargs) GraphServer [source]#
create graph server host/emulator for local or test runs
Usage example:
server = create_graph_server(graph=RouterStep(), parameters={}) server.init(None, globals()) server.graph.add_route("my", class_name=MyModelClass, model_path="{path}", z=100) print(server.test("/v2/models/my/infer", testdata))
- class mlrun.serving.remote.BatchHttpRequests(url: str | None = None, subpath: str | None = None, method: str | None = None, headers: dict | None = None, url_expression: str | None = None, body_expression: str | None = None, return_json: bool = True, input_path: str | None = None, result_path: str | None = None, retries=None, backoff_factor=None, timeout=None, **kwargs)[source]#
class for calling remote endpoints in parallel
sync and async graph step implementation for request/resp to remote service (class shortcut = "$remote") url can be an http(s) url (e.g. https://myservice/path) or an mlrun function uri ([project/]name). alternatively the url_expression can be specified to build the url from the event (e.g. "event['url']").
example pipeline:
function = mlrun.new_function("myfunc", kind="serving") flow = function.set_topology("flow", engine="async") flow.to( BatchHttpRequests( url_expression="event['url']", body_expression="event['data']", method="POST", input_path="req", result_path="resp", ) ).respond() server = function.to_mock_server() # request contains a list of elements, each with url and data request = [{"url": f"{base_url}/{i}", "data": i} for i in range(2)] resp = server.test(body={"req": request})
- Parameters:
url -- http(s) url or function [project/]name to call
subpath -- path (which follows the url)
method -- HTTP method (GET, POST, ..), default to POST
headers -- dictionary with http header values
url_expression -- an expression for getting the url from the event, e.g. "event['url']"
body_expression -- an expression for getting the request body from the event, e.g. "event['data']"
return_json -- indicate the returned value is json, and convert it to a py object
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
retries -- number of retries (in exponential backoff)
backoff_factor -- A backoff factor in seconds to apply between attempts after the second try
timeout -- How long to wait for the server to send data before giving up, float in seconds
- class mlrun.serving.remote.RemoteStep(url: str, subpath: str | None = None, method: str | None = None, headers: dict | None = None, url_expression: str | None = None, body_expression: str | None = None, return_json: bool = True, input_path: str | None = None, result_path: str | None = None, max_in_flight=None, retries=None, backoff_factor=None, timeout=None, headers_expression: str | None = None, **kwargs)[source]#
class for calling remote endpoints
sync and async graph step implementation for request/resp to remote service (class shortcut = "$remote") url can be an http(s) url (e.g. https://myservice/path) or an mlrun function uri ([project/]name). alternatively the url_expression can be specified to build the url from the event (e.g. "event['url']").
example pipeline:
flow = function.set_topology("flow", engine="async") flow.to(name="step1", handler="func1") .to(RemoteStep(name="remote_echo", url="https://myservice/path", method="POST")) .to(name="laststep", handler="func2").respond()
- Parameters:
url -- http(s) url or function [project/]name to call
subpath -- path (which follows the url), use $path to use the event.path
method -- HTTP method (GET, POST, ..), default to POST
headers -- dictionary with http header values
url_expression -- an expression for getting the url from the event, e.g. "event['url']"
body_expression -- an expression for getting the request body from the event, e.g. "event['data']"
return_json -- indicate the returned value is json, and convert it to a py object
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
retries -- number of retries (in exponential backoff)
backoff_factor -- A backoff factor in seconds to apply between attempts after the second try
timeout -- How long to wait for the server to send data before giving up, float in seconds
headers_expression -- an expression for getting the request headers from the event, e.g. "event['headers']"
- class mlrun.serving.routers.BaseModelRouter(context=None, name: str | None = None, routes=None, protocol: str | None = None, url_prefix: str | None = None, health_prefix: str | None = None, input_path: str | None = None, result_path: str | None = None, **kwargs)[source]#
base model router class
Model Serving Router, route between child models
- Parameters:
context -- for internal use (passed in init)
name -- step name
routes -- for internal use (routes passed in init)
protocol -- serving API protocol (default "v2")
url_prefix -- url prefix for the router (default /v2/models)
health_prefix -- health api url prefix (default /v2/health)
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
kwargs -- extra arguments
- class mlrun.serving.routers.EnrichmentModelRouter(context=None, name: str | None = None, routes=None, protocol: str | None = None, url_prefix: str | None = None, health_prefix: str | None = None, feature_vector_uri: str = '', impute_policy: dict | None = None, **kwargs)[source]#
Model router with feature enrichment and imputing
Model router with feature enrichment (from the feature store)
The EnrichmentModelRouter class enrich the incoming event with real-time features read from a feature vector (in MLRun feature store) and forwards the enriched event to the child models
The feature vector is specified using the feature_vector_uri, in addition an imputing policy can be specified to substitute None/NaN values with pre defines constant or stats.
- Parameters:
feature_vector_uri -- feature vector uri in the form: [project/]name[:tag]
impute_policy -- value imputing (substitute NaN/Inf values with statistical or constant value), you can set the impute_policy parameter with the imputing policy, and specify which constant or statistical value will be used instead of NaN/Inf value, this can be defined per column or for all the columns ("*"). The replaced value can be fixed number for constants or $mean, $max, $min, $std, $count for statistical values. “*” is used to specify the default for all features, example: impute_policy={"*": "$mean", "age": 33}
context -- for internal use (passed in init)
name -- step name
routes -- for internal use (routes passed in init)
protocol -- serving API protocol (default "v2")
url_prefix -- url prefix for the router (default /v2/models)
health_prefix -- health api url prefix (default /v2/health)
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7.
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
kwargs -- extra arguments
- class mlrun.serving.routers.EnrichmentVotingEnsemble(context=None, name: str | None = None, routes=None, protocol=None, url_prefix: str | None = None, health_prefix: str | None = None, vote_type: str | None = None, executor_type: ParallelRunnerModes | str = ParallelRunnerModes.thread, prediction_col_name: str | None = None, feature_vector_uri: str = '', impute_policy: dict | None = None, **kwargs)[source]#
Voting Ensemble with feature enrichment (from the feature store)
Voting Ensemble with feature enrichment (from the feature store)
The EnrichmentVotingEnsemble class enables to enrich the incoming event with real-time features read from a feature vector (in MLRun feature store) and apply prediction logic on top of the different added models.
You can use it by calling:
- <prefix>/<model>[/versions/<ver>]/operation
Sends the event to the specific <model>[/versions/<ver>]
- <prefix>/operation
Sends the event to all models and applies vote(self, event)
The VotingEnsemble applies the following logic: Incoming Event -> Feature enrichment -> Send to model/s -> Apply all model/s logic (Preprocessing -> Prediction -> Postprocessing) -> Router Voting logic -> Router Postprocessing -> Response
The feature vector is specified using the feature_vector_uri, in addition an imputing policy can be specified to substitute None/NaN values with pre defines constant or stats.
When enabling model tracking via set_tracking() the ensemble logic predictions will appear with model name as the given VotingEnsemble name or "VotingEnsemble" by default.
Example:
# Define a serving function # Note: You can point the function to a file containing you own Router or Classifier Model class # this basic class supports sklearn based models (with `<model>.predict()` api) fn = mlrun.code_to_function( name='ensemble', kind='serving', filename='model-server.py', image='mlrun/mlrun') # Set the router class # You can set your own classes by simply changing the `class_name` fn.set_topology( class_name='mlrun.serving.routers.EnrichmentVotingEnsemble', feature_vector_uri="transactions-fraud", impute_policy={"*": "$mean"}) # Add models fn.add_model(<model_name>, <model_path>, <model_class_name>) fn.add_model(<model_name>, <model_path>, <model_class_name>)
How to extend the VotingEnsemble#
The VotingEnsemble applies its logic using the logic(predictions) function. The logic() function receives an array of (# samples, # predictors) which you can then use to apply whatever logic you may need.
If we use this VotingEnsemble as an example, the logic() function tries to figure out whether you are trying to do a classification or a regression prediction by the prediction type or by the given vote_type parameter. Then we apply the appropriate max_vote() or mean_vote() which calculates the actual prediction result and returns it as the VotingEnsemble's prediction.
- param context:
for internal use (passed in init)
- param name:
step name
- param routes:
for internal use (routes passed in init)
- param protocol:
serving API protocol (default v2)
- param url_prefix:
url prefix for the router (default /v2/models)
- param health_prefix:
health api url prefix (default /v2/health)
- param feature_vector_uri:
feature vector uri in the form [project/]name[:tag]
- param impute_policy:
value imputing (substitute NaN/Inf values with statistical or constant value), you can set the impute_policy parameter with the imputing policy, and specify which constant or statistical value will be used instead of NaN/Inf value, this can be defined per column or for all the columns ("*"). The replaced value can be fixed number for constants or $mean, $max, $min, $std, $count for statistical values. “*” is used to specify the default for all features, example: impute_policy={"*": "$mean", "age": 33}
- param input_path:
when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7.
- param result_path:
selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}.
- param vote_type:
Voting type to be used (from VotingTypes). by default will try to self-deduct upon the first event: * float prediction type: regression * int prediction type: classification
- param executor_type:
Parallelism mechanism, out of ParallelRunnerModes, by default threads
- param prediction_col_name:
The dict key for the predictions column in the model's responses output. Example: If the model returns {id: <id>, model_name: <name>, outputs: {..., prediction: [<predictions>], ...}}, the prediction_col_name should be prediction. By default, prediction.
- param kwargs:
extra arguments
- class mlrun.serving.routers.ModelRouter(context=None, name: str | None = None, routes=None, protocol: str | None = None, url_prefix: str | None = None, health_prefix: str | None = None, input_path: str | None = None, result_path: str | None = None, **kwargs)[source]#
Model Serving Router, route between child models
- Parameters:
context -- for internal use (passed in init)
name -- step name
routes -- for internal use (routes passed in init)
protocol -- serving API protocol (default "v2")
url_prefix -- url prefix for the router (default /v2/models)
health_prefix -- health api url prefix (default /v2/health)
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
kwargs -- extra arguments
- class mlrun.serving.routers.OperationTypes(value)[source]#
Supported opreations for VotingEnsemble
- explain = 'explain'#
- infer = 'infer'#
- predict = 'predict'#
- class mlrun.serving.routers.ParallelRun(context=None, name: str | None = None, routes=None, protocol: str | None = None, url_prefix: str | None = None, health_prefix: str | None = None, extend_event=None, executor_type: ParallelRunnerModes | str = ParallelRunnerModes.thread, **kwargs)[source]#
Process multiple steps (child routes) in parallel and merge the results
By default the results dict from each step are merged (by key), when setting the extend_event the results will start from the event body dict (values can be overwritten)
Users can overwrite the merger() method to implement custom merging logic.
Example:
# create a function with a parallel router and 3 children fn = mlrun.new_function("parallel", kind="serving") graph = fn.set_topology( "router", mlrun.serving.routers.ParallelRun( extend_event=True, executor_type=executor ), ) graph.add_route("child1", class_name="Cls1") graph.add_route("child2", class_name="Cls2", my_arg={"c": 7}) graph.add_route("child3", handler="my_handler") server = fn.to_mock_server() resp = server.test("", {"x": 8})
- Parameters:
context -- for internal use (passed in init)
name -- step name
routes -- for internal use (routes passed in init)
protocol -- serving API protocol (default "v2")
url_prefix -- url prefix for the router (default /v2/models)
health_prefix -- health api url prefix (default /v2/health)
executor_type -- Parallelism mechanism, Have 3 option : * array - running one by one * process - running in separated process * thread - running in separated threads by default threads
extend_event -- True will add the event body to the result
kwargs -- extra arguments
- class mlrun.serving.routers.ParallelRunnerModes(value)[source]#
Supported parallel running modes for VotingEnsemble
- array = 'array'#
- process = 'process'#
- thread = 'thread'#
- class mlrun.serving.routers.VotingEnsemble(context=None, name: str | None = None, routes=None, protocol: str | None = None, url_prefix: str | None = None, health_prefix: str | None = None, vote_type: str | None = None, weights: dict[str, float] | None = None, executor_type: ParallelRunnerModes | str = ParallelRunnerModes.thread, format_response_with_col_name_flag: bool = False, prediction_col_name: str = 'prediction', shard_by_endpoint: bool | None = None, **kwargs)[source]#
Voting Ensemble
The VotingEnsemble class enables you to apply prediction logic on top of the different added models.
You can use it by calling:
- <prefix>/<model>[/versions/<ver>]/operation
Sends the event to the specific <model>[/versions/<ver>]
- <prefix>/operation
Sends the event to all models and applies vote(self, event)
The VotingEnsemble applies the following logic: Incoming Event -> Router Preprocessing -> Send to model/s -> Apply all model/s logic (Preprocessing -> Prediction -> Postprocessing) -> Router Voting logic -> Router Postprocessing -> Response
This enables you to do the general preprocessing and postprocessing steps once on the router level, with only model-specific adjustments at the model level.
When enabling model tracking via set_tracking() the ensemble logic predictions will appear with model name as the given VotingEnsemble name or "VotingEnsemble" by default.
Example:
# Define a serving function # Note: You can point the function to a file containing you own Router or Classifier Model class # this basic class supports sklearn based models (with `<model>.predict()` api) fn = mlrun.code_to_function(name='ensemble', kind='serving', filename='model-server.py' image='mlrun/mlrun') # Set the router class # You can set your own classes by simply changing the `class_name` fn.set_topology(class_name='mlrun.serving.routers.VotingEnsemble') # Add models fn.add_model(<model_name>, <model_path>, <model_class_name>) fn.add_model(<model_name>, <model_path>, <model_class_name>)
How to extend the VotingEnsemble:
The VotingEnsemble applies its logic using the logic(predictions) function. The logic() function receives an array of (# samples, # predictors) which you can then use to apply whatever logic you may need.
If we use this VotingEnsemble as an example, the logic() function tries to figure out whether you are trying to do a classification or a regression prediction by the prediction type or by the given vote_type parameter. Then we apply the appropriate max_vote() or mean_vote() which calculates the actual prediction result and returns it as the VotingEnsemble's prediction.
- Parameters:
context -- for internal use (passed in init)
name -- step name
routes -- for internal use (routes passed in init)
protocol -- serving API protocol (default "v2")
url_prefix -- url prefix for the router (default /v2/models)
health_prefix -- health api url prefix (default /v2/health)
input_path -- when specified selects the key/path in the event to use as body this require that the event body will behave like a dict, example: event: {"data": {"a": 5, "b": 7}}, input_path="data.b" means request body will be 7
result_path -- selects the key/path in the event to write the results to this require that the event body will behave like a dict, example: event: {"x": 5} , result_path="resp" means the returned response will be written to event["y"] resulting in {"x": 5, "resp": <result>}
vote_type -- Voting type to be used (from VotingTypes). by default will try to self-deduct upon the first event: - float prediction type: regression - int prediction type: classification
({"<model_name>" (weights A dictionary) -- <model_weight>}) that specified each model weight, if there is a model that didn't appear in the dictionary his weight will be count as a zero. None means that all the models have the same weight.
executor_type -- Parallelism mechanism, out of ParallelRunnerModes, by default threads
format_response_with_col_name_flag --
- If this flag is True the model's responses output format is
{id: <id>, model_name: <name>, outputs: {..., prediction: [<predictions>], ...}}
- Else
{id: <id>, model_name: <name>, outputs: [<predictions>]}
prediction_col_name -- The dict key for the predictions column in the model's responses output. Example: If the model returns {id: <id>, model_name: <name>, outputs: {..., prediction: [<predictions>], ...}} the prediction_col_name should be prediction. by default, prediction
shard_by_endpoint -- whether to use the endpoint as the partition/sharding key when writing to model monitoring stream. Defaults to True.
kwargs -- extra arguments
- do_event(event, *args, **kwargs)[source]#
Handles incoming requests.
- Parameters:
event (nuclio.Event) -- Incoming request as a nuclio.Event.
- Returns:
Event response after running the requested logic
- Return type:
Response
- extract_results_from_response(response)[source]#
Extracts the prediction from the model response. This function is used to allow multiple model return types. and allow for easy extension to the user's ensemble and models best practices.
- Parameters:
response (Union[List, Dict]) -- The model response's output field.
- Returns:
The model's predictions
- Return type:
List
- logic(predictions: list[list[Union[int, float]]], weights: list[float])[source]#
Returns the final prediction of all the models after applying the desire logic
- Parameters:
predictions -- The predictions from all models, per event
weights -- models weights in the prediction order
- Returns:
List of the resulting voted predictions