Demo End to End SKLearn Pipeline (iris)

Creating a local function, running predefined functions, creating and running a full ML pipeline with local and library functions.

notebook how-to’s

  • Create and test a simple function

  • Examine data using serverless (containerized) describe function

  • Create an automated ML pipeline from various library functions

  • Running and tracking the pipeline results and artifacts

Create and Test a Local Function (Iris Data Generator)

Import nuclio SDK and magics, do not remove the cell and comment !!!

# mlrun: ignore
import mlrun

Specify function dependencies and configuration

%%nuclio cmd -c
pip install sklearn
pip install pyarrow
%nuclio config spec.build.baseImage = "mlrun/mlrun"
%nuclio: setting spec.build.baseImage to 'mlrun/mlrun'

Function code

Generate the iris dataset and log the dataframe (as csv or parquet file)

import os
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import accuracy_score
from mlrun.artifacts import TableArtifact, PlotArtifact
import pandas as pd

def iris_generator(context, format='csv'):
    iris = load_iris()
    iris_dataset = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    iris_labels = pd.DataFrame(data=iris.target, columns=['label'])
    iris_dataset = pd.concat([iris_dataset, iris_labels], axis=1)
    
    context.logger.info('saving iris dataframe to {}'.format(context.artifact_path))
    context.log_dataset('iris_dataset', df=iris_dataset, format=format, index=False)

The following end-code annotation tells nuclio-jupyter parser to stop parsing the notebook from this cell. Please do not remove this cell:

# mlrun: end-code
# marks the end of a code section

Create a project to host our functions, jobs and artifacts

Projects are used to package multiple functions, workflows, and artifacts. We usually store project code and definitions in a Git archive.

The following code creates a new project in a local dir and initialize git tracking on that

from os import path
from mlrun import NewTask, import_function, mount_v3io, new_project, code_to_function


# set the default environment configuration
project_name_base = 'sk-project'
project_name, artifact_path = mlrun.set_environment(project=project_name_base, artifact_path='./', user_project=True)

# Create the project
project_dir = './project'
skproj = new_project(project_name, project_dir, init_git=True)

Run the data generator function locally

The functions above can be tested locally. Parameters, inputs, and outputs can be specified in the API or the Task object.
when using run_local() the function inputs and outputs are automatically recorded by MLRun experiment and data tracking DB.

In each run we can specify the function, inputs, parameters/hyper-parameters, etc… For more details, see the mlrun_basics notebook.

# run the function locally
gen = mlrun.run_local(name='iris_gen', handler=iris_generator, 
                      artifact_path=path.join(artifact_path, 'data')) 
> 2021-01-26 18:30:34,491 [info] starting run iris_gen uid=089bde62946e402bb42705dc3202b5f8 DB=http://mlrun-api:8080
> 2021-01-26 18:30:34,619 [info] saving iris dataframe to /User/mlrun-demos/demos/scikit-learn-pipeline/data
project uid iter start state name labels inputs parameters results artifacts
sk-project-orz 0 Jan 26 18:30:34 completed iris_gen
v3io_user=orz
kind=handler
owner=orz
host=jupyter-orz-6fd46d5d99-t2d5w
iris_dataset
to track results use .show() or .logs() or in CLI: 
!mlrun get run 089bde62946e402bb42705dc3202b5f8 --project sk-project-orz , !mlrun logs 089bde62946e402bb42705dc3202b5f8 --project sk-project-orz
> 2021-01-26 18:30:34,779 [info] run executed, status=completed

Convert our local code to a distributed serverless function object

gen_func = code_to_function(name='gen_iris', kind='job')
skproj.set_function(gen_func)
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fd6f5308110>

Load and run a library function (visualize dataset features and stats)

Step 1: load the function object from the function hub (marketplace) into our project

note: the function marketplace location is configurable, by default it points to mlrun/functions git

describe = skproj.set_function('hub://describe')
# read the remote function doc, params, usage
describe.doc()
function: describe
describe and visualizes dataset stats
default handler: summarize
entry points:
  summarize: Summarize a table
    context(MLClientCtx)  - the function context, default=
    table(DataItem)  - MLRun input pointing to pandas dataframe (csv/parquet file path), default=
    label_column(str)  - ground truth column label, default=None
    class_labels(List[str])  - label for each class in tables and plots, default=[]
    plot_hist(bool)  - (True) set this to False for large tables, default=True
    plots_dest(str)  - destination folder of summary plots (relative to artifact_path), default=plots
    update_dataset  - when the table is a registered dataset update the charts in-place, default=False

Step 2: Run the describe function as a Kubernetes job with specified parameters.

mount_v3io() connect our function to v3io shared file system and allow us to pass the data and get back the results (plots) directly to our notebook, we can choose other mount options to use NFS or object storage

describe.apply(mount_v3io()).run(params={'label_column': 'label'}, 
                                          inputs={"table": gen.outputs['iris_dataset']}, 
                                          artifact_path=artifact_path)
> 2021-01-26 18:30:48,582 [info] starting run describe-summarize uid=940f2a9e2e33485c9f81155fe9baad38 DB=http://mlrun-api:8080
> 2021-01-26 18:30:48,848 [info] Job is running in the background, pod: describe-summarize-6n7pt
> 2021-01-26 18:30:59,151 [info] run executed, status=completed
final state: completed
project uid iter start state name labels inputs parameters results artifacts
sk-project-orz 0 Jan 26 18:30:54 completed describe-summarize
v3io_user=orz
kind=job
owner=orz
host=describe-summarize-6n7pt
table
label_column=label
histograms
violin
imbalance
imbalance-weights-vec
correlation-matrix
correlation
to track results use .show() or .logs() or in CLI: 
!mlrun get run 940f2a9e2e33485c9f81155fe9baad38 --project sk-project-orz , !mlrun logs 940f2a9e2e33485c9f81155fe9baad38 --project sk-project-orz
> 2021-01-26 18:31:08,093 [info] run executed, status=completed
<mlrun.model.RunObject at 0x7fd6f4e45c10>

Create a Fully Automated ML Pipeline

Add more functions to our project to be used in our pipeline (from the functions hub/marketplace)

AutoML training (classifier), Model validation (test_classifier), Real-time model server, and Model REST API Tester

skproj.set_function('hub://sklearn_classifier', 'train')
skproj.set_function('hub://test_classifier', 'test')
skproj.set_function('hub://v2_model_server', 'serving')
skproj.set_function('hub://v2_model_tester', 'live_tester')
#print(skproj.to_yaml())
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fd6f4f96710>

Define and save a pipeline

The following workflow definition will be written into a file, it describes a Kubeflow execution graph (DAG)
and how functions and data are connected to form an end to end pipeline.

  • Build the iris generator (ingest) function container

  • Ingest the iris data

  • Analyze the dataset (describe)

  • Train and test the model

  • Deploy the model as a real-time serverless function

  • Test the serverless function REST API with test dataset

Check the code below to see how functions objects are initialized and used (by name) inside the workflow.
The workflow.py file has two parts, initialize the function objects and define pipeline dsl (connect the function inputs and outputs).

Note: the pipeline can include CI steps like building container images and deploying models as illustrated in the following example.

%%writefile project/workflow.py
from kfp import dsl
from mlrun import mount_v3io

funcs = {}
DATASET = 'iris_dataset'
LABELS  = "label"


# init functions is used to configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(mount_v3io())
     
    # uncomment this line to collect the inference results into a stream
    # and specify a path in V3IO (<datacontainer>/<subpath>)
    # functions['serving'].set_tracking(f'projects/{project.name}/model_stream')

    
@dsl.pipeline(
    name="Demo training pipeline",
    description="Shows how to use mlrun."
)
def kfpipeline():
    
    # build our ingestion function (container image)
    builder = funcs['gen-iris'].deploy_step(skip_deployed=True)
    
    # run the ingestion function with the new image and params
    ingest = funcs['gen-iris'].as_step(
        name="get-data",
        handler='iris_generator',
        image=builder.outputs['image'],
        params={'format': 'pq'},
        outputs=[DATASET])

    # analyze our dataset
    describe = funcs["describe"].as_step(
        name="summary",
        params={"label_column": LABELS},
        inputs={"table": ingest.outputs[DATASET]})
    
    # train with hyper-paremeters 
    train = funcs["train"].as_step(
        name="train",
        params={"sample"          : -1, 
                "label_column"    : LABELS,
                "test_size"       : 0.10},
        hyperparams={'model_pkg_class': ["sklearn.ensemble.RandomForestClassifier", 
                                         "sklearn.linear_model.LogisticRegression",
                                         "sklearn.ensemble.AdaBoostClassifier"]},
        selector='max.accuracy',
        inputs={"dataset"         : ingest.outputs[DATASET]},
        outputs=['model', 'test_set'])

    # test and visualize our model
    test = funcs["test"].as_step(
        name="test",
        params={"label_column": LABELS},
        inputs={"models_path" : train.outputs['model'],
                "test_set"    : train.outputs['test_set']})

    # deploy our model as a serverless function, we can pass a list of models to serve 
    deploy = funcs["serving"].deploy_step(models=[{"key": f"{DATASET}:v1", "model_path": train.outputs['model']}])
    
    # test out new model server (via REST API calls)
    tester = funcs["live_tester"].as_step(name='model-tester',
        params={'addr': deploy.outputs['endpoint'], 'model': f"{DATASET}:v1"},
        inputs={'table': train.outputs['test_set']})
Overwriting project/workflow.py
# register the workflow file as "main", embed the workflow code into the project YAML
skproj.set_workflow('main', 'workflow.py', embed=True)

Save the project definitions to a file (project.yaml), it is recommended to commit all changes to a Git repo.

skproj.save()

Run a pipeline workflow

use the run method to execute a workflow, you can provide alternative arguments and specify the default target for workflow artifacts.
The workflow ID is returned and can be used to track the progress or you can use the hyperlinks

Note: The same command can be issued through CLI commands:
mlrun project my-proj/ -r main -p "v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/"

The dirty flag allows us to run a project with uncommitted changes (when the notebook is in the same git dir it will always be dirty)
The watch flag will wait for the pipeline to complete and print results

artifact_path = path.abspath('./pipe/{{workflow.uid}}')
run_id = skproj.run(
    'main',
    arguments={}, 
    artifact_path=artifact_path, 
    dirty=True, watch=True)
> 2021-01-26 18:31:09,236 [info] using in-cluster config.
Experiment link here
Run link here
> 2021-01-26 18:31:09,886 [info] Pipeline run id=ffd2c3b1-6d9c-4501-bbf9-c93a85510b1a, check UI or DB for progress
> 2021-01-26 18:31:09,887 [info] waiting for pipeline run completion

Run Results

Workflow ffd2c3b1-6d9c-4501-bbf9-c93a85510b1a finished, status=Succeeded
click the hyper links below to see detailed results
uid start state name results artifacts
Jan 26 18:32:58 completed model-tester
total_tests=15
errors=0
match=14
avg_latency=20047
min_latency=13516
max_latency=83878
latency
Jan 26 18:32:47 completed test
accuracy=0.9333333333333333
test-error=0.06666666666666667
auc-micro=0.9733333333333333
auc-weighted=0.9888888888888889
f1-score=0.9137254901960784
precision_score=0.8888888888888888
recall_score=0.9629629629629629
confusion-matrix
feature-importances
precision-recall-multiclass
roc-multiclass
test_set_preds
Jan 26 18:32:31 completed summary
histograms
violin
imbalance
imbalance-weights-vec
correlation-matrix
correlation
Jan 26 18:32:30 completed train
best_iteration=1
accuracy=0.975609756097561
test-error=0.024390243902439025
auc-micro=0.9979179060083283
auc-weighted=0.9966358284272497
f1-score=0.9721739130434783
precision_score=0.9743589743589745
recall_score=0.9722222222222222
test_set
confusion-matrix
feature-importances
precision-recall-multiclass
roc-multiclass
model
iteration_results
Jan 26 18:32:20 completed get-data
iris_dataset

back to top