Running multiple functions with ExitHandler#

This example uses the Kubernetes function ExitHandler to run multiple functions before continuing to the next function. The ExitHandler function is especially useful to do something that needs to be performed as the very last step; for example, sending an email or updating the run results of the previous steps somewhere else.

import mlrun

Create the project and its functions#

Function D is designed to fail, to show the ExitHandler running after a failure.

project = mlrun.get_or_create_project(
    "waiting-for-multiple-steps", "./", user_project=True
)
> 2024-04-16 09:28:55,586 [info] Project loaded successfully: {'project_name': 'waiting-for-multiple-steps'}
%%writefile funcs.py
import time

# First 3 functions to run
def func_A(context):
    time.sleep(5)
    context.logger.info(f"Function A is running now")
    return "Function A has been triggered"

def func_B(context):
    time.sleep(10)
    context.logger.info(f"Function B is running now")        
    return "Function B has been triggered"

def func_C(context):
    time.sleep(15)
    context.logger.info(f"Function C is running now")        
    return "Function C has been triggered"


# This function waits for the 3 functions to complete, then logs their results before raising an exception.
def func_D(context, func_a_res, func_b_res, func_c_res):
    context.logger.info(f"Function D is running now, logging the results of previous functions.")
    context.log_artifact("func_a_result", str(func_a_res))
    context.log_artifact("func_b_result", str(func_b_res))
    context.log_artifact("func_c_result", str(func_c_res))
    context.logger.info("Function D has been triggered")
    
    # Now it raises an Exception so the function fails.
    raise Exception("This is an example exception")

# This function will be part of the ExitHandler.
def func_final(context):
    context.logger.info(f"The final function is now running regardless of whether all preceding functions succeeded.")
    return "Function final has been triggered"
Overwriting funcs.py
# Set the functions in the project
func_A = project.set_function(
    func="funcs.py", name="func-A", handler="func_A", image="mlrun/mlrun", kind="job"
)
func_B = project.set_function(
    func="funcs.py", name="func-B", handler="func_B", image="mlrun/mlrun", kind="job"
)
func_C = project.set_function(
    func="funcs.py", name="func-C", handler="func_C", image="mlrun/mlrun", kind="job"
)
func_D = project.set_function(
    func="funcs.py", name="func-D", handler="func_D", image="mlrun/mlrun", kind="job"
)
func_final = project.set_function(
    func="funcs.py",
    name="func-final",
    handler="func_final",
    image="mlrun/mlrun",
    kind="job",
)

Create the pipeline#

The ExitHandler function runs after the functions, A, B, C, D. It is triggered regardless of whether or not all of the preceding functions succeeded.

%%writefile workflow.py

from kfp import dsl
from mlrun.platforms import auto_mount
import os
import sys
import mlrun

@dsl.pipeline(name="ExitHandler and multiple wait pipeline", description="Pipeline that runs 3 functions simultaneously, waits for all of them to finish, and then runs another function that logs their results and fails. After all of this, an exit function is triggered.")
def kfpipeline(input_val):
    project = mlrun.get_current_project()
    
    # 'func_final' is executed after everything inside the block finishes or crashes.
    with dsl.ExitHandler(mlrun.run_function("func-final")):
        # Start 3 functions simultaneously
        step_1 = mlrun.run_function('func-A', returns=['first_func_res'])
        step_2 = mlrun.run_function('func-B', returns=['second_func_res'])
        step_3 = mlrun.run_function('func-C', returns=['third_func_res'])

        # Start the function only after the 3 first functions are done. This function logs the outputs of the previous functions as artifacts, and then crashes.
        step_4 = mlrun.run_function('func-D', params = {"func_a_res":step_1.outputs["first_func_res"],
                                                        "func_b_res":step_2.outputs["second_func_res"],
                                                        "func_c_res":step_3.outputs["third_func_res"]}, returns=["fourth_func_res"]).after(step_1, step_2, step_3)  
Overwriting workflow.py
project.set_workflow(name="workflow-func", workflow_path="workflow.py")
project.save()
<mlrun.projects.project.MlrunProject at 0x7f1fe10d5250>

Run the workflow#

Now run the workflow and check the workflow graph in the UI.

project.run(workflow_path="workflow.py", watch=True, local=False)
Pipeline running (id=2cb4810e-78bc-4722-9f45-6f8b33144df6), click here to view the details in MLRun UI
../_images/dbabf999be7e8b3bcc34010822e8c381589c1863921a1a6ad470e68de1ddf8ba.svg

Run Results

[info] Workflow 2cb4810e-78bc-4722-9f45-6f8b33144df6 finished with 1 errors


click the hyper links below to see detailed results
uid start state name parameters results
Apr 16 09:35:45 completed func-final
return=Function final has been triggered
Apr 16 09:35:18
error
func-d
func_a_res=Function A has been triggered
func_b_res=Function B has been triggered
func_c_res=Function C has been triggered
Apr 16 09:34:39 completed func-b
second_func_res=Function B has been triggered
Apr 16 09:34:39 completed func-c
third_func_res=Function C has been triggered
Apr 16 09:34:39 completed func-a
first_func_res=Function A has been triggered
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[16], line 1
----> 1 project.run(workflow_path='workflow.py',watch=True, local=False)

File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/project.py:2719, in MlrunProject.run(self, name, workflow_path, arguments, artifact_path, workflow_handler, namespace, sync, watch, dirty, engine, local, schedule, timeout, source, cleanup_ttl, notifications)
   2716     if engine == "remote" and status_engine.engine != "local":
   2717         status_engine = _RemoteRunner
-> 2719     status_engine.get_run_status(project=self, run=run, timeout=timeout)
   2720 return run

File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/pipelines.py:700, in _KFPRunner.get_run_status(project, run, timeout, expected_statuses, notifiers)
    697 notifiers.push(text, "info", runs)
    699 if raise_error:
--> 700     raise raise_error
    701 return state, had_errors, text

File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/pipelines.py:674, in _KFPRunner.get_run_status(project, run, timeout, expected_statuses, notifiers)
    672     if timeout:
    673         logger.info("Waiting for pipeline run completion")
--> 674         state = run.wait_for_completion(
    675             timeout=timeout, expected_statuses=expected_statuses
    676         )
    677 except RuntimeError as exc:
    678     # push runs table also when we have errors
    679     raise_error = exc

File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/pipelines.py:460, in _PipelineRunStatus.wait_for_completion(self, timeout, expected_statuses)
    459 def wait_for_completion(self, timeout=None, expected_statuses=None):
--> 460     self._state = self._engine.wait_for_completion(
    461         self.run_id,
    462         project=self.project,
    463         timeout=timeout,
    464         expected_statuses=expected_statuses,
    465     )
    466     return self._state

File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/projects/pipelines.py:640, in _KFPRunner.wait_for_completion(run_id, project, timeout, expected_statuses)
    638     timeout = 60 * 60
    639 project_name = project.metadata.name if project else ""
--> 640 run_info = wait_for_pipeline_completion(
    641     run_id,
    642     timeout=timeout,
    643     expected_statuses=expected_statuses,
    644     project=project_name,
    645 )
    646 status = ""
    647 if run_info:

File /conda/envs/mlrun-extended/lib/python3.9/site-packages/mlrun/run.py:971, in wait_for_pipeline_completion(run_id, timeout, expected_statuses, namespace, remote, project)
    969 if expected_statuses:
    970     if status not in expected_statuses:
--> 971         raise RuntimeError(
    972             f"Pipeline run status {status}{', ' + message if message else ''}"
    973         )
    975 logger.debug(
    976     f"Finished waiting for pipeline completion."
    977     f" run_id: {run_id},"
   (...)
    980     f" namespace: {namespace}"
    981 )
    983 return resp

RuntimeError: Pipeline run status Failed

Viewing the results in the UI Monitor Workflows tab#

../_images/multi-step-workflow.png