Concurrent processing#
Concurrent processing is typically used for serving of deep-learning models, where preparation steps and inference can be CPU/GPU heavy, or involving I/O. The concurrency modes are:
asyncio — Default. For I/O implemented using asyncio.
threading — For blocking I/O.
multiprocessing — For processing-intensive tasks.
Additional configuration:
max_in_flight: Maximum number of events to be processed at a time (default 8)
retries: Maximum number of retries per event (default 0)
backoff_factor: Wait time in seconds before the first retry (default 1). Subsequent retries each wait twice long as the previous retry, up to a maximum of two minutes.
pass_context: If False, the
process_eventfunction is called with just one parameter (event). If True, theprocess_eventfunction is called with two parameters (event, context). (Defaults to False)full_event: Whether the event processor should receive and return Event objects (when True), or only the payload (when False). (Defaults to False)
This example illustrates a multiprocess step to perform predictions.
First define the processes:
import mlrun
import mlrun.artifacts
import storey
import storey.steps
import asyncio
import time
import numpy as np
import pickle
model_file = "model.pkl"
class Predict:
def __init__(self):
self.model = pickle.load(open(model_file, "rb"))
def predict(self, body: dict):
print("predicting...")
feats = np.asarray(body["inputs"])
result: np.ndarray = self.model.predict(feats)
return result.tolist()
predict = Predict().predict
async def preprocess(event, context):
context.logger.info("preprocessing...")
await asyncio.sleep(0.1)
return event
def postprocess(event, context):
context.logger.info("postprocessing...")
time.sleep(0.1)
return event
Define the project and the function:
project_name = "concurrent-prediction"
project = mlrun.get_or_create_project(project_name)
function = project.set_function(
name="test",
kind="serving",
image="mlrun/mlrun",
)
function.spec.build.commands = [
"wget https://github.com/mlrun/mlrun/raw/development/tests/system/model_monitoring/assets/model.pkl",
]
graph = function.set_topology("flow", engine="async")
And, finally, the multi-process step:
graph.to(
"storey.ConcurrentExecution",
"preprocess",
_event_processor="preprocess",
pass_context=True,
max_in_flight=8,
).to(
"storey.ConcurrentExecution",
"prediction",
_event_processor="predict",
concurrency_mechanism="multiprocessing",
max_in_flight=2,
).to(
"storey.ConcurrentExecution",
"postprocess",
_event_processor="postprocess",
concurrency_mechanism="threading",
pass_context=True,
max_in_flight=8,
)