Running multiple functions with ExitHandler#
This example uses the Kubernetes function ExitHandler to run multiple functions before continuing to the next function. The ExitHandler function is especially useful to do something that needs to be performed as the very last step; for example, sending an email or updating the run results of the previous steps somewhere else.
import mlrun
> 2025-05-06 08:14:15,634 [info] Server and client versions are not the same but compatible: {'parsed_server_version': Version(major=1, minor=8, patch=0, prerelease='rc53', build=None), 'parsed_client_version': Version(major=1, minor=6, patch=4, prerelease=None, build=None)}
Create the project and its functions#
Function B is designed to fail, to show the ExitHandler running after a failure.
project = mlrun.get_or_create_project(
"waiting-for-multiple-steps", "./", user_project=True
)
> 2025-05-06 08:14:15,715 [info] Project loaded successfully: {'project_name': 'waiting-for-multiple-steps'}
%%writefile funcs.py
import time
# First 3 functions to run
def func_A(context):
time.sleep(5)
context.logger.info(f"Function A is running now")
return "Function A has been triggered"
def func_B(context):
time.sleep(10)
context.logger.info(f"Function B is running now")
# Now it raises an Exception so the function fails.
raise Exception("This is an example exception")
def func_C(context):
time.sleep(15)
context.logger.info(f"Function C is running now")
return "Function C has been triggered"
# This function waits for the 3 functions to complete, then logs their results before raising an exception.
def func_D(context, func_a_res, func_b_res, func_c_res):
context.logger.info(f"Function D is running now, logging the results of previous functions.")
context.log_artifact("func_a_result", str(func_a_res))
context.log_artifact("func_b_result", str(func_b_res))
context.log_artifact("func_c_result", str(func_c_res))
context.logger.info("Function D has been triggered")
# This function will be part of the ExitHandler.
def func_final(context):
context.logger.info(f"The final function is now running regardless of whether all preceding functions succeeded.")
return "Function final has been triggered"
Overwriting funcs.py
# Set the functions in the project
func_A = project.set_function(
func="funcs.py", name="func-A", handler="func_A", image="mlrun/mlrun", kind="job"
)
func_B = project.set_function(
func="funcs.py", name="func-B", handler="func_B", image="mlrun/mlrun", kind="job"
)
func_C = project.set_function(
func="funcs.py", name="func-C", handler="func_C", image="mlrun/mlrun", kind="job"
)
func_D = project.set_function(
func="funcs.py", name="func-D", handler="func_D", image="mlrun/mlrun", kind="job"
)
func_final = project.set_function(
func="funcs.py",
name="func-final",
handler="func_final",
image="mlrun/mlrun",
kind="job",
)
Create the pipeline#
The ExitHandler function runs after the functions, A, B, C, D. It is triggered regardless of whether or not all of the preceding functions succeeded.
%%writefile workflow.py
from kfp import dsl
from mlrun.platforms import auto_mount
import os
import sys
import mlrun
@dsl.pipeline(name="ExitHandler and multiple wait pipeline", description="Pipeline that runs 3 functions simultaneously, waits for all of them to finish, and then runs another function that logs their results and fails. After all of this, an exit function is triggered.")
def kfpipeline(input_val):
project = mlrun.get_current_project()
# 'func_final' is executed after everything inside the block finishes or crashes.
with dsl.ExitHandler(mlrun.run_function("func-final")):
# Start 3 functions simultaneously
step_1 = mlrun.run_function('func-A', returns=['first_func_res'])
step_2 = mlrun.run_function('func-B', returns=['second_func_res'])
step_3 = mlrun.run_function('func-C', returns=['third_func_res'])
# Start the function only after the 3 first functions are done. This function logs the outputs of the previous functions as artifacts, and then crashes.
step_4 = mlrun.run_function('func-D', params = {"func_a_res":step_1.outputs["first_func_res"],
"func_b_res":step_2.outputs["second_func_res"],
"func_c_res":step_3.outputs["third_func_res"]}, returns=["fourth_func_res"]).after(step_1, step_2, step_3)
Overwriting workflow.py
project.set_workflow(name="workflow-func", workflow_path="workflow.py")
project.save()
<mlrun.projects.project.MlrunProject at 0x7f9713208b80>
Run the workflow#
Now run the workflow and check the workflow graph in the UI.
project.run("workflow-func", watch=True, local=False)
Pipeline running (id=f8c36890-494f-47f3-9ab7-2b9cd9da7451), click here to view the details in MLRun UI
Run Results
[info] Workflow f8c36890-494f-47f3-9ab7-2b9cd9da7451 finished with 1 errors
click the hyper links below to see detailed results
uid | start | state | name | parameters | results |
---|---|---|---|---|---|
May 06 08:21:13 | completed | func-final | return=Function final has been triggered |
||
May 06 08:20:33 | error |
func-b | |||
May 06 08:20:33 | completed | func-a | first_func_res=Function A has been triggered |
||
May 06 08:20:33 | completed | func-c | third_func_res=Function C has been triggered |
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[9], line 1
----> 1 project.run("workflow-func", watch=True, local=False)
File /conda/envs/mlrun-base/lib/python3.9/site-packages/mlrun/projects/project.py:2726, in MlrunProject.run(self, name, workflow_path, arguments, artifact_path, workflow_handler, namespace, sync, watch, dirty, engine, local, schedule, timeout, source, cleanup_ttl, notifications)
2723 if engine == "remote" and status_engine.engine != "local":
2724 status_engine = _RemoteRunner
-> 2726 status_engine.get_run_status(project=self, run=run, timeout=timeout)
2727 return run
File /conda/envs/mlrun-base/lib/python3.9/site-packages/mlrun/projects/pipelines.py:705, in _KFPRunner.get_run_status(project, run, timeout, expected_statuses, notifiers)
702 notifiers.push(text, "info", runs)
704 if raise_error:
--> 705 raise raise_error
706 return state, had_errors, text
File /conda/envs/mlrun-base/lib/python3.9/site-packages/mlrun/projects/pipelines.py:679, in _KFPRunner.get_run_status(project, run, timeout, expected_statuses, notifiers)
677 if timeout:
678 logger.info("Waiting for pipeline run completion")
--> 679 state = run.wait_for_completion(
680 timeout=timeout, expected_statuses=expected_statuses
681 )
682 except RuntimeError as exc:
683 # push runs table also when we have errors
684 raise_error = exc
File /conda/envs/mlrun-base/lib/python3.9/site-packages/mlrun/projects/pipelines.py:464, in _PipelineRunStatus.wait_for_completion(self, timeout, expected_statuses)
463 def wait_for_completion(self, timeout=None, expected_statuses=None):
--> 464 self._state = self._engine.wait_for_completion(
465 self.run_id,
466 project=self.project,
467 timeout=timeout,
468 expected_statuses=expected_statuses,
469 )
470 return self._state
File /conda/envs/mlrun-base/lib/python3.9/site-packages/mlrun/projects/pipelines.py:645, in _KFPRunner.wait_for_completion(run_id, project, timeout, expected_statuses)
643 timeout = 60 * 60
644 project_name = project.metadata.name if project else ""
--> 645 run_info = wait_for_pipeline_completion(
646 run_id,
647 timeout=timeout,
648 expected_statuses=expected_statuses,
649 project=project_name,
650 )
651 status = ""
652 if run_info:
File /conda/envs/mlrun-base/lib/python3.9/site-packages/mlrun/run.py:973, in wait_for_pipeline_completion(run_id, timeout, expected_statuses, namespace, remote, project)
971 if expected_statuses:
972 if status not in expected_statuses:
--> 973 raise RuntimeError(
974 f"Pipeline run status {status}{', ' + message if message else ''}"
975 )
977 logger.debug(
978 f"Finished waiting for pipeline completion."
979 f" run_id: {run_id},"
(...)
982 f" namespace: {namespace}"
983 )
985 return resp
RuntimeError: Pipeline run status Failed
Viewing the results in the UI Monitor Workflows tab#
