Concurrent processing

Concurrent processing#

Concurrent processing is typically used for serving of deep-learning models, where preparation steps and inference can be CPU/GPU heavy, or involving I/O. The concurrency modes are:

  • asyncio — Default. For I/O implemented using asyncio.

  • threading — For blocking I/O.

  • multiprocessing — For processing-intensive tasks.

Additional configuration:

  • max_in_flight: Maximum number of events to be processed at a time (default 8)

  • retries: Maximum number of retries per event (default 0)

  • backoff_factor: Wait time in seconds before the first retry (default 1). Subsequent retries each wait twice long as the previous retry, up to a maximum of two minutes.

  • pass_context: If False, the process_event function is called with just one parameter (event). If True, the process_event function is called with two parameters (event, context). (Defaults to False)

  • full_event: Whether the event processor should receive and return Event objects (when True), or only the payload (when False). (Defaults to False)

This example illustrates a multiprocess step to perform predictions.

First define the processes:

import mlrun
import mlrun.artifacts
import storey
import storey.steps
import asyncio
import time
import numpy as np
import pickle


model_file = "model.pkl"


class Predict:
    def __init__(self):
        self.model = pickle.load(open(model_file, "rb"))

    def predict(self, body: dict):
        print("predicting...")
        feats = np.asarray(body["inputs"])
        result: np.ndarray = self.model.predict(feats)
        return result.tolist()


predict = Predict().predict


async def preprocess(event, context):
    context.logger.info("preprocessing...")
    await asyncio.sleep(0.1)
    return event


def postprocess(event, context):
    context.logger.info("postprocessing...")
    time.sleep(0.1)
    return event

Define the project and the function:

project_name = "concurrent-prediction"
project = mlrun.get_or_create_project(project_name)
function = project.set_function(
    name="test",
    kind="serving",
    image="mlrun/mlrun",
)

function.spec.build.commands = [
    "wget https://github.com/mlrun/mlrun/raw/development/tests/system/model_monitoring/assets/model.pkl",
]

graph = function.set_topology("flow", engine="async")

And, finally, the multi-process step:

graph.to(
    "storey.ConcurrentExecution",
    "preprocess",
    _event_processor="preprocess",
    pass_context=True,
    max_in_flight=8,
).to(
    "storey.ConcurrentExecution",
    "prediction",
    _event_processor="predict",
    concurrency_mechanism="multiprocessing",
    max_in_flight=2,
).to(
    "storey.ConcurrentExecution",
    "postprocess",
    _event_processor="postprocess",
    concurrency_mechanism="threading",
    pass_context=True,
    max_in_flight=8,
)