Running a workflow with multiple functions in parallel#

This example illustrates a workflow that triggers multiple functions in parallel. The "parent" functions run in parallel, and each parent function encapsulates multiple functions that run serially.

Use this type of workflow for anything that needs to be done in parallel. For example, data processing on multiple data sources and then combining them together.

import mlrun
> 2025-05-05 11:33:47,178 [info] Server and client versions are not the same but compatible: {'parsed_server_version': Version(major=1, minor=8, patch=0, prerelease='rc53', build=None), 'parsed_client_version': Version(major=1, minor=6, patch=4, prerelease=None, build=None)}

Create the project and the function#

Create a function that prints to the logger. This is the function that will be encapsulated.

project = mlrun.get_or_create_project("parallel-workflow", "./", user_project=True)
> 2025-05-05 11:33:47,240 [info] Project loaded successfully: {'project_name': 'parallel-workflow'}
%%writefile func.py

import mlrun

# Function that prints to the logger i*100 for i that is 0-20.
def func(context):
    for i in range(5):
        context.logger.info(str(i)*100)
        
    return 1
Overwriting func.py
# Set the function in the project
project.set_function(
    name="func", func="func.py", image="mlrun/mlrun", kind="job", handler="func"
)
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f1eefea8400>

Run the function#

Run the function to observe the output: 4 functions running serially.

project.run_function("func")
> 2025-05-05 11:33:47,316 [info] Storing function: {'name': 'func-func', 'uid': '1df442b770724582bf0855db667322d0', 'db': 'http://mlrun-api:8080'}
> 2025-05-05 11:33:47,635 [info] Job is running in the background, pod: func-func-hh44l
> 2025-05-05 11:33:51,144 [info] Server and client versions are not the same but compatible: {'parsed_server_version': Version(major=1, minor=8, patch=0, prerelease='rc53', build=None), 'parsed_client_version': Version(major=1, minor=6, patch=4, prerelease=None, build=None)}
> 2025-05-05 11:33:51,467 [info] 0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
> 2025-05-05 11:33:51,467 [info] 1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
> 2025-05-05 11:33:51,467 [info] 2222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222
> 2025-05-05 11:33:51,467 [info] 3333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333
> 2025-05-05 11:33:51,467 [info] 4444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444
> 2025-05-05 11:33:51,685 [info] To track results use the CLI: {'info_cmd': 'mlrun get run 1df442b770724582bf0855db667322d0 -p parallel-workflow-jill', 'logs_cmd': 'mlrun logs 1df442b770724582bf0855db667322d0 -p parallel-workflow-jill'}
> 2025-05-05 11:33:51,685 [info] Or click for UI: {'ui_url': 'https://dashboard.default-tenant.app.cust-cs-il.iguazio-cd0.com/mlprojects/parallel-workflow-jill/jobs/monitor/1df442b770724582bf0855db667322d0/overview'}
> 2025-05-05 11:33:51,685 [info] Run execution finished: {'status': 'completed', 'name': 'func-func'}
project uid iter start state name labels inputs parameters results
parallel-workflow-jill 0 May 05 11:33:51 completed func-func
v3io_user=Jill
kind=job
owner=Jill
mlrun/client_version=1.6.4
mlrun/client_python_version=3.9.18
host=func-func-hh44l
return=1

> to track results use the .show() or .logs() methods or click here to open in UI
> 2025-05-05 11:33:56,903 [info] Run execution finished: {'status': 'completed', 'name': 'func-func'}
<mlrun.model.RunObject at 0x7f1eefec9580>

Create the workflow#

The pipe_num determines how many pipelines run in parallel in the workflow. Each pipeline creates 20 jobs that run one by one.

In this example, pipe_num=4, 4 pipelines in parallel. Each pipeline runs 5 jobs, giving a total of 4*5=20.

%%writefile workflow.py

from kfp import dsl
from mlrun.platforms import auto_mount
import os
import sys
import mlrun

pipe_num = 4
def kfpipeline():
    with dsl.ParallelFor([i for i in range(pipe_num)]) as item:
        step_1 = mlrun.run_function('func')
        step_2 = mlrun.run_function('func').after(step_1)
        step_3 = mlrun.run_function('func').after(step_2)
        step_4 = mlrun.run_function('func').after(step_3)
        step_5 = mlrun.run_function('func').after(step_4)
Overwriting workflow.py

Note

The "parallelism" parameter of dsl.ParallelFor is not supported (see the Kubeflow documentation). You can use the dsl.SubGraph(parallelism=) component to limit parallel executions, however the workflow graph does not display in MLRun UI until the workflow completes. For example:

pipe_num = 4
def kfpipeline():
    with dsl.SubGraph(parallelism=1)"
        with dsl.ParallelFor([i for i in range(pipe_num)]) as item:
             step_1 = mlrun.run_function('func')
             step_2 = mlrun.run_function('func').after(step_1)
             step_3 = mlrun.run_function('func').after(step_2)
             step_4 = mlrun.run_function('func').after(step_3)
             step_5 = mlrun.run_function('func').after(step_4)
project.run(workflow_path="workflow.py", watch=True)
Pipeline running (id=9e4f313d-fe67-4a5c-a3fc-abb3ac40e9ca), click here to view the details in MLRun UI
../_images/c74133f3d09a0065df42a8ebf523c672d4ef2015b1b00485477a54f537c544d6.svg

Run Results

[info] Workflow 9e4f313d-fe67-4a5c-a3fc-abb3ac40e9ca finished, state=Succeeded


click the hyper links below to see detailed results
uid start state name parameters results
May 05 11:36:09 completed func
return=1
May 05 11:36:09 completed func
return=1
May 05 11:36:09 completed func
return=1
May 05 11:36:09 completed func
return=1
May 05 11:35:38 completed func
return=1
May 05 11:35:38 completed func
return=1
May 05 11:35:38 completed func
return=1
May 05 11:35:37 completed func
return=1
May 05 11:35:07 completed func
return=1
May 05 11:35:07 completed func
return=1
May 05 11:35:07 completed func
return=1
May 05 11:35:07 completed func
return=1
May 05 11:34:40 completed func
return=1
May 05 11:34:40 completed func
return=1
May 05 11:34:40 completed func
return=1
May 05 11:34:40 completed func
return=1
May 05 11:34:12 completed func
return=1
May 05 11:34:11 completed func
return=1
May 05 11:34:11 completed func
return=1
May 05 11:34:11 completed func
return=1
9e4f313d-fe67-4a5c-a3fc-abb3ac40e9ca

View the results in the UI Monitor Workflows tab#

../_images/parallel-workflow.png