Part 4: Creating an Automated ML Pipeline

This part of the MLRun getting-started tutorial walks you through the steps for creating an automated pipeline for our project. The pipeline is created using Kubeflow Pipelines.

The integration of MLRun with Kubeflow Pipelines enables you to take the functions in your project and build a pipeline that contains these functions.

Note: The Iguazio Data Science Platform has a default (pre-deployed) shared Kubeflow Pipelines service (pipelines).

An ML Engineer can gather the different functions created by the Data Engineer and Data Scientist and create this automated pipeline.

The tutorial consists of the following steps:

  1. Setup and Configuration

  2. Setting up Your Project

  3. Importing Functions

  4. Defining and Saving a Pipeline Workflow

  5. Registering the Workflow

  6. Saving Your Project

  7. Viewing the Pipeline on the Dashboard (UI)

  8. Invoking the Model

By the end of this tutorial you’ll learn how to

  • Create an operational pipeline using previously defined functions.

  • Run the pipeline and track the pipeline results.

Prerequisites

The following steps are a continuation of the previous parts of this getting-started tutorial and rely on the generated outputs. Therefore, make sure to first run parts 1—3 of the tutorial.

Step 1: Setup and Configuration

Importing Libraries

Run the following code to import required libraries:

from os import path
import mlrun

Step 2: Setting Up Your Project

To run a pipeline, you first need to create a Python project object and import the required functions for its execution.

Create a project by using the new_project MLRun method, which receives the following parameters:

  • name (Required) — the project name.

  • context — the path to a local project directory (the project’s context directory). The project directory contains a project-configuration file (default: project.yaml), which defines the project, and additional generated Python code. The project file is created when you save your project (using the save MLRun project method or when saving your first function within the project).

  • functions — a list of functions objects or links to function code or objects.

  • init_git — set to True to perform Git initialization of the project directory (context).

    Note: It’s customary to store project code and definitions in a Git repository.

The following code creates a user project named “getting-started-tutorial-<username>”.

Note: Platform projects are currently shared among all users of the parent tenant, to facilitate collaboration. Therefore,

  • Set user_project to True if you wish to create a project unique to your user. You can easily change the default project name for this tutorial by changing the definition of the project_name_base variable in the following code.

  • Don’t include in your project proprietary information that you don’t want to expose to other users. Note that while projects are a useful tool, you can easily develop and run code in the platform without using projects.

project_name_base = 'getting-started-tutorial'
project_path = path.abspath('conf')
project = mlrun.new_project(project_name_base,
                            context=project_path,
                            init_git=True,
                            user_project=True)

print(f'Project path: {project_path}\nProject name: {project.name}')
Project path: /User/demos/getting-started-tutorial/conf
Project name: getting-started-tutorial-iguazio

Step 3: Importing Functions

Viewing all Existing Functions in Your Project

Run get_run_db.list_functions to get the list of the functions for this project. Use the latest tag, and filter out any function that has an empty kind (which means it’s local). In the getting-started tutorial project you should expect to have the following functions:

  • prep-data — the first function, which ingests the Iris data set

  • describe — generates statistics on the data set

  • train-iris — the model-training function

  • test-classifier — the model-testing function

  • mlrun-model — the model-serving function

names = [func.get('metadata').get('name')
         for func in mlrun.get_run_db().list_functions(project={project.name}, tag='latest')
         if func.get('kind') != '']
print(names)
['describe', 'serving', 'prep-data', 'train-iris', 'test-classifier']

Importing Functions to Your Project

You can easily add the previously defined functions by using the set_function method and provide it a db:// schema, followed by the project name, /, and the function name.

You can gather various functions from different sources and combine them into a pipeline. For example, you can import functions not only from the project, but also from the MLRun marketplace (using the hub:// schema). For example, to implement drift detection, you can chain together a drift-detection function and a notification function as part of production monitoring. You can also import functions directly from Python files (*.py) or from other notebook files (*.ipynb). For more information and examples, see the MLRun documentation and other demos that showcase such capabilities.

project.set_function(f'db://{project.name}/prep-data')
project.set_function(f'db://{project.name}/describe')
project.set_function(f'db://{project.name}/train-iris', 'train')
project.set_function(f'db://{project.name}/test-classifier', 'test')
project.set_function(f'db://{project.name}/serving')
<mlrun.runtimes.serving.ServingRuntime at 0x7f17c9be1590>
project.functions
[{'url': 'db://getting-started-tutorial-iguazio/prep-data',
  'name': 'prep-data'},
 {'url': 'db://getting-started-tutorial-iguazio/describe', 'name': 'describe'},
 {'url': 'db://getting-started-tutorial-iguazio/train-iris', 'name': 'train'},
 {'url': 'db://getting-started-tutorial-iguazio/test-classifier',
  'name': 'test'},
 {'url': 'db://getting-started-tutorial-iguazio/serving', 'name': 'serving'}]

Using Kubeflow Pipelines

You’re now ready to create a full ML pipeline. This is done by using Kubeflow Pipelines — an open-source framework for building and deploying portable, scalable machine-learning workflows based on Docker containers. MLRun leverages this framework to take your existing code and deploy it as steps in the pipeline.

Note: When using the Iguazio Data Science Platform, Kubeflow Pipelines is available as a default (pre-deployed) shared platform service.

Step 4: Defining and Saving a Pipeline Workflow

A pipeline is created by running an MLRun “workflow”. The following code defines a workflow and writes it to a file in your project’s conf directory; (the file name is workflow.py). The workflow describes a directed acyclic graph (DAG) for execution using Kubeflow Pipelines, and depicts the connections between the functions and the data as part of an end-to-end pipeline. The workflow file has two parts — initialization of the function objects, and definition of a pipeline DSL (domain-specific language) for connecting the function inputs and outputs. Examine the code to see how functions objects are initialized and used (by name) within the workflow.

The defined pipeline includes the following steps:

  • Ingest the Iris flower data set (ingest).

  • Train and the model (train).

  • Test the model with its test data set.

  • Deploy the model as a real-time serverless function (deploy).

Note: A pipeline can also include continuous build integration and deployment (CI/CD) steps, such as building container images and deploying models.

%%writefile {path.join(project_path, 'workflow.py')}

from kfp import dsl
import mlrun
from mlrun.platforms import auto_mount


funcs = {}
DATASET = 'cleaned_data'
LABELS = "label"

# Configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(auto_mount())

# Create a Kubeflow Pipelines pipeline
@dsl.pipeline(
    name="Getting-started-tutorial",
    description="This tutorial is designed to demonstrate some of the main "
                "capabilities of the Iguazio Data Science Platform.\n"
                "The tutorial uses the Iris flower data set."
)
def kfpipeline(source_url='https://s3.wasabisys.com/iguazio/data/iris/iris_dataset.csv'):

    # Ingest the data set
    ingest = funcs['prep-data'].as_step(
        name="prep-data",
        handler='prep_data',
        inputs={'source_url': source_url},
        params={'label_column': LABELS},
        outputs=[DATASET])
    
    # Train a model   
    train = funcs["train"].as_step(
        name="train",
        params={"label_column": LABELS},
        inputs={"dataset": ingest.outputs[DATASET]},
        outputs=['model', 'test_set'])
    
    # Test and visualize the model
    test = funcs["test"].as_step(
        name="test",
        params={"label_column": LABELS},
        inputs={"models_path": train.outputs['model'],
                "test_set": train.outputs['test_set']})
    
    # Deploy the model as a serverless function
    deploy = funcs["serving"].deploy_step(
        models={f"{DATASET}_v1": train.outputs['model']})
Overwriting /User/demos/getting-started-tutorial/conf/workflow.py

Step 5: Registering the Workflow

Use the set_workflow MLRun project method to register your workflow with MLRun. The following code sets the name parameter to the selected workflow name (“main”) and the code parameter to the name of the workflow file that is found in your project directory (workflow.py).

# Register the workflow file as "main"
project.set_workflow('main', 'workflow.py')

Step 6: Saving Your Project

Run the following code to save your project:

project.save()

Use the run MLRun project method to execute your workflow pipeline with Kubeflow Pipelines. The tutorial code sets the following method parameters; (for the full parameters list, see the MLRun documentation or embedded help):

  • name — the workflow name (in this case, “main” — see the previous step).

  • arguments — A dictionary of Kubeflow Pipelines arguments (parameters). The tutorial code sets this parameter to an empty arguments list ({}), but you can edit the code to add arguments.

  • artifact_path — a path or URL that identifies a location for storing the workflow artifacts. You can use {{workflow.uid}} in the path to signify the ID of the current workflow run iteration. The tutorial code sets the artifacts path to a <worker ID> directory ({{workflow.uid}}) in a pipeline directory under the projects container (/v3io/projects/getting-started-tutorial-project name/pipeline/<worker ID>).

  • dirty — set to True to allow running the workflow also when the project’s Git repository is dirty (i.e., contains uncommitted changes). (When the notebook that contains the execution code is in the same Git directory as the executed workflow, the directory will always be dirty during the execution.)

The run method returns the ID of the executed workflow, which the code stores in a run_id variable. You can use this ID to track the progress or your workflow, as demonstrated in the following sections.

Note: You can also run the workflow from a command-line shell by using the mlrun CLI. The following CLI command defines a similar execution logic as that of the run call in the tutorial:

mlrun project /User/getting-started-tutorial/conf -r main -p "$V3IO_HOME_URL/getting-started-tutorial/pipeline/{{workflow.uid}}/"
import os 
from os import environ, path
from mlrun import mlconf
pipeline_path = mlconf.artifact_path

run_id = project.run(
    'main',
    arguments={}, 
    artifact_path=os.path.join(pipeline_path, "pipeline", '{{workflow.uid}}'),
    dirty=True,
    watch=True)
> 2021-01-25 12:59:33,106 [info] using in-cluster config.
Experiment link here
Run link here
> 2021-01-25 12:59:33,434 [info] Pipeline run id=342b653e-a696-4e89-a390-34d42ae1a555, check UI or DB for progress
> 2021-01-25 12:59:33,435 [info] waiting for pipeline run completion

Run Results

Workflow 342b653e-a696-4e89-a390-34d42ae1a555 finished, status=Succeeded
click the hyper links below to see detailed results
uid start state name results artifacts
Jan 25 13:00:03 completed test
accuracy=1.0
test-error=0.0
auc-micro=1.0
auc-weighted=1.0
f1-score=1.0
precision_score=1.0
recall_score=1.0
confusion-matrix
precision-recall-multiclass
roc-multiclass
test_set_preds
Jan 25 12:59:51 completed train
accuracy=1.0
test-error=0.0
auc-micro=1.0
auc-weighted=1.0
f1-score=1.0
precision_score=1.0
recall_score=1.0
train_set
test_set
confusion-matrix
precision-recall-multiclass
roc-multiclass
model
Jan 25 12:59:39 completed prep-data
num_rows=150
cleaned_data

Step 7: Viewing the Pipeline on the Dashboard (UI)

Navigate to the Pipelines page on the dashboard (UI). After the pipelines execution completes, you should be able to view the pipeline and see its functions:

  • prep-data

  • train

  • test

  • deploy-serving

pipeline

Step 8: Invoking the Model

Now that your model is deployed using the pipeline, you can invoke it as usual:

serving_func = project.func('serving')
my_data = {'inputs': [[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}
serving_func.invoke('/v2/models/my_model/infer', my_data)
{'id': '0bb41173-1a86-4484-9220-c7b72ea0ab0e',
 'model_name': 'my_model',
 'outputs': [0, 2]}

You can also make an HTTP call directly:

import requests
import json
predict_url = f'http://{serving_func.status.address}/v2/models/my_model/predict'
resp = requests.put(predict_url, json=json.dumps(my_data))
print(resp.json())
{'id': '36798e6e-37b7-4f05-9ae2-ec67c2217d22', 'model_name': 'my_model', 'outputs': [0, 2]}

Done!

Congratulation! You’ve completed the getting started tutorial.

You might also want to explore the following demos:

For additional information and guidelines, see the MLRun How-To Guides and Demos.