Running a workflow with multiple functions in parallel#
This example illustrates a workflow that triggers multiple functions in parallel. The "parent" functions run in parallel, and each parent function encapsulates multiple functions that run serially.
Use this type of workflow for anything that needs to be done in parallel. For example, data processing on multiple data sources and then combining them together.
import mlrun
> 2025-05-05 11:33:47,178 [info] Server and client versions are not the same but compatible: {'parsed_server_version': Version(major=1, minor=8, patch=0, prerelease='rc53', build=None), 'parsed_client_version': Version(major=1, minor=6, patch=4, prerelease=None, build=None)}
Create the project and the function#
Create a function that prints to the logger. This is the function that will be encapsulated.
project = mlrun.get_or_create_project("parallel-workflow", "./", user_project=True)
> 2025-05-05 11:33:47,240 [info] Project loaded successfully: {'project_name': 'parallel-workflow'}
%%writefile func.py
import mlrun
# Function that prints to the logger i*100 for i that is 0-20.
def func(context):
for i in range(5):
context.logger.info(str(i)*100)
return 1
Overwriting func.py
# Set the function in the project
project.set_function(
name="func", func="func.py", image="mlrun/mlrun", kind="job", handler="func"
)
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f1eefea8400>
Run the function#
Run the function to observe the output: 4 functions running serially.
project.run_function("func")
> 2025-05-05 11:33:47,316 [info] Storing function: {'name': 'func-func', 'uid': '1df442b770724582bf0855db667322d0', 'db': 'http://mlrun-api:8080'}
> 2025-05-05 11:33:47,635 [info] Job is running in the background, pod: func-func-hh44l
> 2025-05-05 11:33:51,144 [info] Server and client versions are not the same but compatible: {'parsed_server_version': Version(major=1, minor=8, patch=0, prerelease='rc53', build=None), 'parsed_client_version': Version(major=1, minor=6, patch=4, prerelease=None, build=None)}
> 2025-05-05 11:33:51,467 [info] 0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
> 2025-05-05 11:33:51,467 [info] 1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
> 2025-05-05 11:33:51,467 [info] 2222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222
> 2025-05-05 11:33:51,467 [info] 3333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333
> 2025-05-05 11:33:51,467 [info] 4444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444
> 2025-05-05 11:33:51,685 [info] To track results use the CLI: {'info_cmd': 'mlrun get run 1df442b770724582bf0855db667322d0 -p parallel-workflow-jill', 'logs_cmd': 'mlrun logs 1df442b770724582bf0855db667322d0 -p parallel-workflow-jill'}
> 2025-05-05 11:33:51,685 [info] Or click for UI: {'ui_url': 'https://dashboard.default-tenant.app.cust-cs-il.iguazio-cd0.com/mlprojects/parallel-workflow-jill/jobs/monitor/1df442b770724582bf0855db667322d0/overview'}
> 2025-05-05 11:33:51,685 [info] Run execution finished: {'status': 'completed', 'name': 'func-func'}
| project | uid | iter | start | state | name | labels | inputs | parameters | results |
|---|---|---|---|---|---|---|---|---|---|
| parallel-workflow-jill | 0 | May 05 11:33:51 | completed | func-func | v3io_user=Jill kind=job owner=Jill mlrun/client_version=1.6.4 mlrun/client_python_version=3.9.18 host=func-func-hh44l |
return=1 |
> 2025-05-05 11:33:56,903 [info] Run execution finished: {'status': 'completed', 'name': 'func-func'}
<mlrun.model.RunObject at 0x7f1eefec9580>
Create the workflow#
The pipe_num determines how many pipelines run in parallel in the workflow. Each pipeline creates 20 jobs that run one by one.
In this example, pipe_num=4, 4 pipelines in parallel. Each pipeline runs 5 jobs,
giving a total of 4*5=20.
%%writefile workflow.py
from kfp import dsl
from mlrun.platforms import auto_mount
import os
import sys
import mlrun
pipe_num = 4
def kfpipeline():
with dsl.ParallelFor([i for i in range(pipe_num)]) as item:
step_1 = mlrun.run_function('func')
step_2 = mlrun.run_function('func').after(step_1)
step_3 = mlrun.run_function('func').after(step_2)
step_4 = mlrun.run_function('func').after(step_3)
step_5 = mlrun.run_function('func').after(step_4)
Overwriting workflow.py
Note
The "parallelism" parameter of dsl.ParallelFor is not supported (see the Kubeflow documentation). You can use the dsl.SubGraph(parallelism=) component to limit parallel executions, however the workflow graph does not display in MLRun UI until the workflow completes. For example:
pipe_num = 4
def kfpipeline():
with dsl.SubGraph(parallelism=1)"
with dsl.ParallelFor([i for i in range(pipe_num)]) as item:
step_1 = mlrun.run_function('func')
step_2 = mlrun.run_function('func').after(step_1)
step_3 = mlrun.run_function('func').after(step_2)
step_4 = mlrun.run_function('func').after(step_3)
step_5 = mlrun.run_function('func').after(step_4)
project.run(workflow_path="workflow.py", watch=True)
Run Results
[info] Workflow 9e4f313d-fe67-4a5c-a3fc-abb3ac40e9ca finished, state=Succeeded
click the hyper links below to see detailed results
| uid | start | state | name | parameters | results |
|---|---|---|---|---|---|
| May 05 11:36:09 | completed | func | return=1 |
||
| May 05 11:36:09 | completed | func | return=1 |
||
| May 05 11:36:09 | completed | func | return=1 |
||
| May 05 11:36:09 | completed | func | return=1 |
||
| May 05 11:35:38 | completed | func | return=1 |
||
| May 05 11:35:38 | completed | func | return=1 |
||
| May 05 11:35:38 | completed | func | return=1 |
||
| May 05 11:35:37 | completed | func | return=1 |
||
| May 05 11:35:07 | completed | func | return=1 |
||
| May 05 11:35:07 | completed | func | return=1 |
||
| May 05 11:35:07 | completed | func | return=1 |
||
| May 05 11:35:07 | completed | func | return=1 |
||
| May 05 11:34:40 | completed | func | return=1 |
||
| May 05 11:34:40 | completed | func | return=1 |
||
| May 05 11:34:40 | completed | func | return=1 |
||
| May 05 11:34:40 | completed | func | return=1 |
||
| May 05 11:34:12 | completed | func | return=1 |
||
| May 05 11:34:11 | completed | func | return=1 |
||
| May 05 11:34:11 | completed | func | return=1 |
||
| May 05 11:34:11 | completed | func | return=1 |
9e4f313d-fe67-4a5c-a3fc-abb3ac40e9ca
View the results in the UI Monitor Workflows tab#