Running multiple functions with ExitHandler#
This example uses the Kubernetes function ExitHandler to run multiple functions before continuing to the next function. The ExitHandler function is especially useful to do something that needs to be performed as the very last step; for example, sending an email or updating the run results of the previous steps somewhere else.
import mlrun
Create the project and its functions#
Function D is designed to fail, to show the ExitHandler running after a failure.
project = mlrun.get_or_create_project(
"waiting-for-multiple-steps", "./", user_project=True
)
> 2024-04-16 09:28:55,586 [info] Project loaded successfully: {'project_name': 'waiting-for-multiple-steps'}
%%writefile funcs.py
import time
# First 3 functions to run
def func_A(context):
time.sleep(5)
context.logger.info(f"Function A is running now")
return "Function A has been triggered"
def func_B(context):
time.sleep(10)
context.logger.info(f"Function B is running now")
return "Function B has been triggered"
def func_C(context):
time.sleep(15)
context.logger.info(f"Function C is running now")
return "Function C has been triggered"
# This function waits for the 3 functions to complete, then logs their results before raising an exception.
def func_D(context, func_a_res, func_b_res, func_c_res):
context.logger.info(f"Function D is running now, logging the results of previous functions.")
context.log_artifact("func_a_result", str(func_a_res))
context.log_artifact("func_b_result", str(func_b_res))
context.log_artifact("func_c_result", str(func_c_res))
context.logger.info("Function D has been triggered")
# Now it raises an Exception so the function fails.
raise Exception("This is an example exception")
# This function will be part of the ExitHandler.
def func_final(context):
context.logger.info(f"The final function is now running regardless of whether all preceding functions succeeded.")
return "Function final has been triggered"
Overwriting funcs.py
# Set the functions in the project
func_A = project.set_function(
func="funcs.py", name="func-A", handler="func_A", image="mlrun/mlrun", kind="job"
)
func_B = project.set_function(
func="funcs.py", name="func-B", handler="func_B", image="mlrun/mlrun", kind="job"
)
func_C = project.set_function(
func="funcs.py", name="func-C", handler="func_C", image="mlrun/mlrun", kind="job"
)
func_D = project.set_function(
func="funcs.py", name="func-D", handler="func_D", image="mlrun/mlrun", kind="job"
)
func_final = project.set_function(
func="funcs.py",
name="func-final",
handler="func_final",
image="mlrun/mlrun",
kind="job",
)
Create the pipeline#
The ExitHandler function runs after the functions, A, B, C, D. It is triggered regardless of whether or not all of the preceding functions succeeded.
%%writefile workflow.py
from kfp import dsl
from mlrun.platforms import auto_mount
import os
import sys
import mlrun
@dsl.pipeline(name="ExitHandler and multiple wait pipeline", description="Pipeline that runs 3 functions simultaneously, waits for all of them to finish, and then runs another function that logs their results and fails. After all of this, an exit function is triggered.")
def kfpipeline(input_val):
project = mlrun.get_current_project()
# 'func_final' is executed after everything inside the block finishes or crashes.
with dsl.ExitHandler(mlrun.run_function("func-final")):
# Start 3 functions simultaneously
step_1 = mlrun.run_function('func-A', returns=['first_func_res'])
step_2 = mlrun.run_function('func-B', returns=['second_func_res'])
step_3 = mlrun.run_function('func-C', returns=['third_func_res'])
# Start the function only after the 3 first functions are done. This function logs the outputs of the previous functions as artifacts, and then crashes.
step_4 = mlrun.run_function('func-D', params = {"func_a_res":step_1.outputs["first_func_res"],
"func_b_res":step_2.outputs["second_func_res"],
"func_c_res":step_3.outputs["third_func_res"]}, returns=["fourth_func_res"]).after(step_1, step_2, step_3)
Overwriting workflow.py
project.set_workflow(name="workflow-func", workflow_path="workflow.py")
project.save()
<mlrun.projects.project.MlrunProject at 0x7f1fe10d5250>
Run the workflow#
Now run the workflow and check the workflow graph in the UI.
project.run(workflow_path="workflow.py", watch=True, local=False)
Pipeline running (id=2cb4810e-78bc-4722-9f45-6f8b33144df6), click here to view the details in MLRun UI
Run Results
[info] Workflow 2cb4810e-78bc-4722-9f45-6f8b33144df6 finished with 1 errors
click the hyper links below to see detailed results
uid | start | state | name | parameters | results |
---|---|---|---|---|---|
Apr 16 09:35:45 | completed | func-final | return=Function final has been triggered |
||
Apr 16 09:35:18 | error |
func-d | func_a_res=Function A has been triggered func_b_res=Function B has been triggered func_c_res=Function C has been triggered |
||
Apr 16 09:34:39 | completed | func-b | second_func_res=Function B has been triggered |
||
Apr 16 09:34:39 | completed | func-c | third_func_res=Function C has been triggered |
||
Apr 16 09:34:39 | completed | func-a | first_func_res=Function A has been triggered |
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Cell In[16], line 1
----> 1 project.run(workflow_path='workflow.py',watch=True, local=False)
File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/project.py:2719, in MlrunProject.run(self, name, workflow_path, arguments, artifact_path, workflow_handler, namespace, sync, watch, dirty, engine, local, schedule, timeout, source, cleanup_ttl, notifications)
2716 if engine == "remote" and status_engine.engine != "local":
2717 status_engine = _RemoteRunner
-> 2719 status_engine.get_run_status(project=self, run=run, timeout=timeout)
2720 return run
File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/pipelines.py:700, in _KFPRunner.get_run_status(project, run, timeout, expected_statuses, notifiers)
697 notifiers.push(text, "info", runs)
699 if raise_error:
--> 700 raise raise_error
701 return state, had_errors, text
File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/pipelines.py:674, in _KFPRunner.get_run_status(project, run, timeout, expected_statuses, notifiers)
672 if timeout:
673 logger.info("Waiting for pipeline run completion")
--> 674 state = run.wait_for_completion(
675 timeout=timeout, expected_statuses=expected_statuses
676 )
677 except RuntimeError as exc:
678 # push runs table also when we have errors
679 raise_error = exc
File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/pipelines.py:460, in _PipelineRunStatus.wait_for_completion(self, timeout, expected_statuses)
459 def wait_for_completion(self, timeout=None, expected_statuses=None):
--> 460 self._state = self._engine.wait_for_completion(
461 self.run_id,
462 project=self.project,
463 timeout=timeout,
464 expected_statuses=expected_statuses,
465 )
466 return self._state
File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/pipelines.py:640, in _KFPRunner.wait_for_completion(run_id, project, timeout, expected_statuses)
638 timeout = 60 * 60
639 project_name = project.metadata.name if project else ""
--> 640 run_info = wait_for_pipeline_completion(
641 run_id,
642 timeout=timeout,
643 expected_statuses=expected_statuses,
644 project=project_name,
645 )
646 status = ""
647 if run_info:
File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/run.py:971, in wait_for_pipeline_completion(run_id, timeout, expected_statuses, namespace, remote, project)
969 if expected_statuses:
970 if status not in expected_statuses:
--> 971 raise RuntimeError(
972 f"Pipeline run status {status}{', ' + message if message else ''}"
973 )
975 logger.debug(
976 f"Finished waiting for pipeline completion."
977 f" run_id: {run_id},"
(...)
980 f" namespace: {namespace}"
981 )
983 return resp
RuntimeError: Pipeline run status Failed