Running a conditional workflow#

This example demonstrates creating a workflow that runs a function based on the result of the previous function.

import mlrun

Create the project and the functions#

project = mlrun.get_or_create_project("condition-workflow", "./", user_project=True)
> 2024-02-12 16:29:38,982 [info] Project loaded successfully: {'project_name': 'workflow-based-on-results2'}
%%writefile funcs.py
import time

def func_A(context, input_value):
    
    context.logger.info(f"Function A is running with input {input_value}")
    return int(input_value)

def func_B(context):
    
    context.logger.info(f"Function B is running now")        
    return "Function B has been triggered"

def func_C(context):
    
    context.logger.info(f"Function C is running now")        
    return "Function C has been triggered"
Overwriting funcs.py
# Set the functions in the project
func_A = project.set_function(
    func="funcs.py", name="func-A", handler="func_A", image="mlrun/mlrun", kind="job"
)
func_B = project.set_function(
    func="funcs.py", name="func-B", handler="func_B", image="mlrun/mlrun", kind="job"
)
func_C = project.set_function(
    func="funcs.py", name="func-C", handler="func_C", image="mlrun/mlrun", kind="job"
)
> 2024-02-12 16:29:39,017 [warning] Failed to add git metadata, ignore if path is not part of a git repo: {'path': './', 'error': '/User/workflows/workflow-based-on-results'}
> 2024-02-12 16:29:39,039 [warning] Failed to add git metadata, ignore if path is not part of a git repo: {'path': './', 'error': '/User/workflows/workflow-based-on-results'}
> 2024-02-12 16:29:39,064 [warning] Failed to add git metadata, ignore if path is not part of a git repo: {'path': './', 'error': '/User/workflows/workflow-based-on-results'}
Names with underscore '_' are about to be deprecated, use dashes '-' instead. Replacing 'func_A' underscores with dashes.
Names with underscore '_' are about to be deprecated, use dashes '-' instead. Replacing 'func_B' underscores with dashes.
Names with underscore '_' are about to be deprecated, use dashes '-' instead. Replacing 'func_C' underscores with dashes.

Create the workflow#

%%writefile workflow.py

from kfp import dsl
from mlrun.platforms import auto_mount
import os
import sys
import mlrun

@dsl.pipeline(name="Condition pipeline", description="Pipeline that runs a function based on the result of a previous function")
def kfpipeline(input_val):
    project = mlrun.get_current_project()
    
    # Run first function with the input value given to the workflow
    step_1 = mlrun.run_function('func-A', params={"input_value":str(input_val)}, returns=['first_func_res'])    

    # Based on func_A result, we run either func_B or func_C
    step_1_output = step_1.outputs["first_func_res"]
    
    with dsl.Condition(step_1_output > 5) as condition1:
        step_2 = mlrun.run_function('func-B', returns=['second_func_res'])
    condition1.after(step_1)
        
    with dsl.Condition(step_1_output <= 5) as condition2:
        step_2 = mlrun.run_function('func-C', returns=['second_func_res'])
    condition2.after(step_1)

    
Overwriting workflow.py
project.set_workflow(name="workflow-func", workflow_path="workflow.py")
project.save()
<mlrun.projects.project.MlrunProject at 0x7fea35651310>

Run the workflow with input_val > 5. Func_B will be triggered#

project.run(
    workflow_path="workflow.py", watch=True, arguments={"input_val": 6}, local=False
)
Pipeline running (id=6eb7fc4f-d4e5-45c6-9d18-875ed716d01c), click here to view the details in MLRun UI
../_images/f44d60e2327486571e0fbf4772cc8e159835efbedc8bcb1f8ca2086928022178.svg

Run Results

[info] Workflow 6eb7fc4f-d4e5-45c6-9d18-875ed716d01c finished, state=Succeeded


click the hyper links below to see detailed results
uid start state name parameters results
Feb 12 16:30:20 completed func-b
second_func_res=Function B has been triggered
Feb 12 16:29:48 completed func-a
input_value=6
first_func_res=6
6eb7fc4f-d4e5-45c6-9d18-875ed716d01c

Run the workflow with input_val < 5. Func_C will be triggered#

project.run(
    workflow_path="workflow.py", watch=True, arguments={"input_val": 3}, local=False
)
Pipeline running (id=9f1337a5-3e9c-4957-b2e1-1e73744507da), click here to view the details in MLRun UI
../_images/2d08f2eb3313cd9fd765e93f9bfde968aa8f41a6037b19497ded8e9c5a0bbf17.svg

Run Results

[info] Workflow 9f1337a5-3e9c-4957-b2e1-1e73744507da finished, state=Succeeded


click the hyper links below to see detailed results
uid start state name parameters results
Feb 12 16:31:29 completed func-c
second_func_res=Function C has been triggered
Feb 12 16:31:00 completed func-a
input_value=3
first_func_res=3
9f1337a5-3e9c-4957-b2e1-1e73744507da