Pipelines Using Dask, Kubeflow and MLRun

Create a project to host our functions, jobs and artifacts

Projects are used to package multiple functions, workflows, and artifacts. We usually store project code and definitions in a Git archive.

The following code creates a new project in a local dir and initialize git tracking on that

import os
import mlrun
import warnings
warnings.filterwarnings("ignore")

# set project name and dir
project_name = 'sk-project-dask'
project_dir = './project'

# specify artifacts target location
_, artifact_path = mlrun.set_environment(project=project_name)

# set project
sk_dask_proj = mlrun.new_project(project_name, project_dir, init_git=True)
> 2021-01-24 16:39:27,665 [warning] Failed resolving version info. Ignoring and using defaults
> 2021-01-24 16:39:29,248 [warning] Unable to parse server or client version. Assuming compatible: {'server_version': 'unstable', 'client_version': 'unstable'}

Init Dask Cluster

import mlrun
# set up function from local file
dsf = mlrun.new_function(name="mydask", kind="dask", image="mlrun/ml-models")

# set up function specs for dask
dsf.spec.remote = True
dsf.spec.replicas = 5
dsf.spec.service_type = 'NodePort'
dsf.with_limits(mem="6G")
dsf.spec.nthreads = 5
> 2021-01-24 16:39:36,831 [info] using in-cluster config.
# apply mount_v3io over our function so that our k8s pod which run our function
# will be able to access our data (shared data access)
dsf.apply(mlrun.mount_v3io())
<mlrun.runtimes.daskjob.DaskCluster at 0x7f5dfe154550>
dsf.save()
'52f5dcddb916b12943e9d44e9e2b75f48e286ec7'
# init dask cluster
dsf.client
> 2021-01-24 20:15:37,716 [info] trying dask client at: tcp://mlrun-mydask-997e6385-a.default-tenant:8786
> 2021-01-24 20:15:48,564 [warning] remote scheduler at tcp://mlrun-mydask-997e6385-a.default-tenant:8786 not ready, will try to restart Timed out trying to connect to 'tcp://mlrun-mydask-997e6385-a.default-tenant:8786' after 10 s: Timed out trying to connect to 'tcp://mlrun-mydask-997e6385-a.default-tenant:8786' after 10 s: [Errno -2] Name or service not known
> 2021-01-24 20:15:54,442 [info] using remote dask scheduler (mlrun-mydask-b4eb4ec5-8) at: tcp://mlrun-mydask-b4eb4ec5-8.default-tenant:8786

Client

Cluster

  • Workers: 0
  • Cores: 0
  • Memory: 0 B

Load and run a functions

load the function object from .py .yaml file or function hub (marketplace)

# load function from the marketplace
sk_dask_proj.set_function('hub://describe_dask', name='describe')
sk_dask_proj.set_function('hub://sklearn_classifier_dask', name='dask_classifier')
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f5dfc3b8d90>
sk_dask_proj.set_function('/User/dask/04-describe.py', name='describe', kind='job', image='mlrun/ml-models')

Create a Fully Automated ML Pipeline

Add more functions to our project to be used in our pipeline (from the functions hub/marketplace)

Describe data, train and eval model with dask

Define and save a pipeline

The following workflow definition will be written into a file, it describes a Kubeflow execution graph (DAG)
and how functions and data are connected to form an end to end pipeline.

  • Ingest data

  • Describe data

  • Train, test and evaluate with dask

Check the code below to see how functions objects are initialized and used (by name) inside the workflow.
The workflow.py file has two parts, initialize the function objects and define pipeline dsl (connect the function inputs and outputs).

Note: the pipeline can include CI steps like building container images and deploying models as illustrated in the following example.

%%writefile project/workflow.py
from kfp import dsl
from mlrun import mount_v3io

# params
funcs       = {}
LABELS      = "label"
DROP        = 'congestion_surcharge'
#DATA_URL    = "/User/iris.csv"
DATA_URL    = "/User/iris.csv"
DASK_CLIENT = "db://sk-project-dask/mydask"

# init functions is used to configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(mount_v3io())
        pass
     
@dsl.pipeline(
    name="Demo training pipeline",
    description="Shows how to use mlrun"
)
def kfpipeline():
    
    # describe data
    describe = funcs['describe'].as_step(
        params={"dask_function"  : DASK_CLIENT},
        inputs={"dataset"       : DATA_URL}
    )
    
    # get data, train, test and evaluate 
    train = funcs['dask_classifier'].as_step(
        name="train",
        handler="train_model",
        params={"label_column"    : LABELS,
                "dask_function"    : DASK_CLIENT,
                "test_size"       : 0.10,
                "model_pkg_class" : "sklearn.ensemble.RandomForestClassifier",
                "drop_cols"       : DROP},
        inputs={"dataset"         : DATA_URL},
        outputs=['model', 'test_set']
    )
    
    train.after(describe)
Overwriting project/workflow.py
# register the workflow file as "main", embed the workflow code into the project YAML
sk_dask_proj.set_workflow('main', 'workflow.py', embed=True)

Save the project definitions to a file (project.yaml), it is recommended to commit all changes to a Git repo.

sk_dask_proj.save()

Run a pipeline workflow

use the run method to execute a workflow, you can provide alternative arguments and specify the default target for workflow artifacts.
The workflow ID is returned and can be used to track the progress or you can use the hyperlinks

Note: The same command can be issued through CLI commands:
mlrun project my-proj/ -r main -p "v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/"

The dirty flag allows us to run a project with uncommitted changes (when the notebook is in the same git dir it will always be dirty)
The watch flag will wait for the pipeline to complete and print results

artifact_path = os.path.abspath('./pipe/{{workflow.uid}}')
run_id = sk_dask_proj.run(
    'main',
    arguments={}, 
    artifact_path=artifact_path, 
    dirty=False, watch=True)
Experiment link here
Run link here
> 2021-01-24 21:30:12,077 [info] Pipeline run id=c1b351fc-073b-4cdd-a6c3-fc167afbce8e, check UI or DB for progress
> 2021-01-24 21:30:12,079 [info] waiting for pipeline run completion

Run Results

Workflow c1b351fc-073b-4cdd-a6c3-fc167afbce8e finished, status=Succeeded
click the hyper links below to see detailed results
uid start state name results artifacts
Jan 24 21:30:37 completed train
micro=0.9979224376731302
macro=1.0
precision-1=1.0
precision-0=1.0
precision-2=0.8571428571428571
recall-1=1.0
recall-0=0.8461538461538461
recall-2=1.0
f1-1=1.0
f1-0=0.9166666666666666
f1-2=0.923076923076923
ROCAUC
ClassificationReport
ConfusionMatrix
FeatureImportances
model
standard_scaler
label_encoder
test_set
Jan 24 21:30:20 completed describe-dask
scale_pos_weight=1.00
histograms
imbalance
correlation

back to top