Pipelines using Dask, Kubeflow and MLRun#

Create a project to host functions, jobs and artifacts#

Projects are used to package multiple functions, workflows, and artifacts. Project code and definitions are usually stored in a Git archive.

The following code creates a new project in a local dir and initializes git tracking on it.

import os
import mlrun
import warnings
warnings.filterwarnings("ignore")

# set project name and dir
project_name = 'sk-project-dask'
project_dir = './'

# specify artifacts target location
_, artifact_path = mlrun.set_environment(project=project_name)

# set project
sk_dask_proj = mlrun.get_or_create_project(project_name, project_dir, init_git=True)
> 2022-09-27 17:26:14,808 [info] loaded project sk-project-dask from MLRun DB
> 2022-09-27 17:26:14,839 [info] loaded project sk-project-dask from MLRun DB

Init Dask cluster#

import mlrun
# set up function from local file
dsf = mlrun.new_function(name="mydask", kind="dask", image="mlrun/ml-models")

# set up function specs for dask
dsf.spec.remote = True
dsf.spec.replicas = 5
dsf.spec.service_type = 'NodePort'
dsf.with_limits(mem="6G")
dsf.spec.nthreads = 5
# apply mount_v3io over our function so that our k8s pod which run our function
# will be able to access our data (shared data access)
dsf.apply(mlrun.mount_v3io())
<mlrun.runtimes.daskjob.DaskCluster at 0x7f47fce9c850>
dsf.save()
'db://sk-project-dask/mydask'
# init dask cluster
dsf.client
> 2022-09-27 17:26:25,134 [info] trying dask client at: tcp://mlrun-mydask-d7df9301-d.default-tenant:8786
> 2022-09-27 17:26:25,162 [info] using remote dask scheduler (mlrun-mydask-d7df9301-d) at: tcp://mlrun-mydask-d7df9301-d.default-tenant:8786

Client

Client-83392da2-3e89-11ed-b7e8-82a5d7054c46

Connection method: Direct
Dashboard: http://mlrun-mydask-d7df9301-d.default-tenant:8787/status

Scheduler Info

Scheduler

Scheduler-b8468d53-b900-4041-9982-5e14d5e5eb81

Comm: tcp://10.200.152.178:8786 Workers: 0
Dashboard: http://10.200.152.178:8787/status Total threads: 0
Started: Just now Total memory: 0 B

Workers

Load and run a functions#

Load the function object from .py .yaml file or Function Hub (marketplace).

# load function from the Function Hub
sk_dask_proj.set_function("hub://describe", name="describe")
sk_dask_proj.set_function("hub://sklearn_classifier_dask", name="dask_classifier")
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f48353d5130>

Create a fully automated ML pipeline#

Add more functions to the project to be used in the pipeline (from the Function Hub)#

Describe data, train and eval model with dask.

Define and save a pipeline#

The following workflow definition will be written into a file. It describes a Kubeflow execution graph (DAG) and how functions and data are connected to form an end to end pipeline.

  • Describe data.

  • Train, test and evaluate with dask.

Check the code below to see how functions objects are initialized and used (by name) inside the workflow.
The workflow.py file has two parts, initialize the function objects and define pipeline dsl (connect the function inputs and outputs).

Note: The pipeline can include CI steps like building container images and deploying models as illustrated in the following example.

%%writefile workflow.py
import os
from kfp import dsl
import mlrun

# params
funcs = {}
LABELS = "label"
DROP = "congestion_surcharge"
DATA_URL = mlrun.get_sample_path("data/iris/iris_dataset.csv")
DASK_CLIENT = "db://sk-project-dask/mydask"


# init functions is used to configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(mlrun.mount_v3io())
        pass


@dsl.pipeline(name="Demo training pipeline", description="Shows how to use mlrun")
def kfpipeline():
    # Describe the data
    describe = funcs["describe"].as_step(
        inputs={"table": DATA_URL},
        params={"dask_function": DASK_CLIENT},
    )

    # Train, test and evaluate:
    train = funcs["dask_classifier"].as_step(
        name="train",
        handler="train_model",
        inputs={"dataset": DATA_URL},
        params={
            "label_column": LABELS,
            "dask_function": DASK_CLIENT,
            "test_size": 0.10,
            "model_pkg_class": "sklearn.ensemble.RandomForestClassifier",
            "drop_cols": DROP,
        },
        outputs=["model", "test_set"],
    )
    train.after(describe)
Overwriting workflow.py
# register the workflow file as "main", embed the workflow code into the project YAML
sk_dask_proj.set_workflow('main', 'workflow.py', embed=False)

Save the project definitions to a file (project.yaml). It is recommended to commit all changes to a Git repo.

sk_dask_proj.save()
<mlrun.projects.project.MlrunProject at 0x7f48342e4880>

Run a pipeline workflow#

Use the run method to execute a workflow. You can provide alternative arguments and specify the default target for workflow artifacts.
The workflow ID is returned and can be used to track the progress or you can use the hyperlinks.

Note: The same command can be issued through CLI commands:
mlrun project my-proj/ -r main -p "v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/"

The dirty flag lets you run a project with uncommitted changes (when the notebook is in the same git dir it is always dirty)
The watch flag waits for the pipeline to complete and print results.

artifact_path = os.path.abspath('./pipe/{{workflow.uid}}')
run_id = sk_dask_proj.run(
    'main',
    arguments={}, 
    artifact_path=artifact_path, 
    dirty=False, 
    watch=True
)
Pipeline running (id=631ad0a3-19f1-4df0-bfa7-6c38c60275e0), click here to view the details in MLRun UI
../_images/fe40e47eb8d9e98f01323bdd698714fa348435403e346b4ce55c73c2e6a34a40.svg

Run Results

Workflow 631ad0a3-19f1-4df0-bfa7-6c38c60275e0 finished, state=Succeeded
click the hyper links below to see detailed results
uid start state name parameters results
Sep 27 17:27:09 completed train
label_column=label
dask_function=db://sk-project-dask/mydask
test_size=0.1
model_pkg_class=sklearn.ensemble.RandomForestClassifier
drop_cols=congestion_surcharge
micro=0.9944598337950138
macro=0.9945823158323159
precision-0=1.0
precision-1=0.9166666666666666
precision-2=0.8
recall-0=1.0
recall-1=0.7857142857142857
recall-2=0.9230769230769231
f1-0=1.0
f1-1=0.8461538461538461
f1-2=0.8571428571428571
Sep 27 17:26:42 completed describe
dask_function=db://sk-project-dask/mydask

back to top