Part 4: Projects and Automated ML Pipeline

This part of the MLRun getting-started tutorial walks you through the steps for working with projects, source control (git), and automating the ML pipeline.

MLRun Project is a container for all your work on a particular activity. All the associated code, functions, jobs/workflows and artifacts. Projects can be mapped to git repositories which enable versioning, collaboration, and CI/CD.

Users can create project definitions using the SDK or a yaml file and store those in MLRun DB, file, or archive. Once the project is loaded you can run jobs/workflows which refer to any project element by name, allowing separation between configuration and code. See the Projects, Automation & CI/CD section for details.

Projects contain workflows which execute the registered functions in a sequence/graph (DAG), can reference project parameters, secrets and artifacts by name. MLRun currently supports two workflow engines, local (for simple tasks) and Kubeflow Pipelines (for more complex/advanced tasks), MLRun also supports a real-time workflow engine (see: MLRun serving graphs).

Note: The Iguazio Data Science Platform has a default (pre-deployed) shared Kubeflow Pipelines service (pipelines).

An ML Engineer can gather the different functions created by the Data Engineer and Data Scientist and create this automated pipeline.

The tutorial consists of the following steps:

  1. Setting up Your Project

  2. Updating Project and Function Definitions

  3. Defining and Saving a Pipeline Workflow

  4. Registering the Workflow

  5. Running A Pipeline

  6. Viewing the Pipeline on the Dashboard (UI)

  7. Invoking the Model

By the end of this tutorial you’ll learn how to

  • Create an operational pipeline using previously defined functions.

  • Run the pipeline and track the pipeline results.

Prerequisites

The following steps are a continuation of the previous parts of this getting-started tutorial and rely on the generated outputs. Therefore, make sure to first run parts 1—3 of the tutorial.

Step 1: Setting Up Your Project

To run a pipeline, you first need to create a Python project object and import the required functions for its execution.

Create a project by using the new_project MLRun method, or use get_or_create_project which loads a project from MLRun DB or the archive/context if it exists or create a new project when its not.

Both methods have the following parameters:

  • name (Required) — the project name.

  • context — the path to a local project directory (the project’s context directory). The project directory contains a project-configuration file (default: project.yaml), which defines the project, and additional generated Python code. The project file is created when you save your project (using the save MLRun project method or when saving your first function within the project).

  • init_git — set to True to perform Git initialization of the project directory (context) in case its not initialized.

    Note: It’s customary to store project code and definitions in a Git repository.

The following code gets or creates a user project named “getting-started-<username>”.

Note: Platform projects are currently shared among all users of the parent tenant, to facilitate collaboration. Therefore,

  • Set user_project to True if you wish to create a project unique to your user. You can easily change the default project name for this tutorial by changing the definition of the project_name_base variable in the following code.

  • Don’t include in your project proprietary information that you don’t want to expose to other users. Note that while projects are a useful tool, you can easily develop and run code in the platform without using projects.

import mlrun

# Set the base project name
project_name_base = 'getting-started'

# Initialize the MLRun project object
project = mlrun.get_or_create_project(project_name_base, context="./", user_project=True, init_git=True)

print(f'Project name: {project.metadata.name}')
> 2021-09-09 05:20:40,995 [info] loaded project getting-started from MLRun DB
Project name: getting-started-iguazio

Step 2: Updating Project and Function Definitions

We need to save the definitions for the function we use in the projects so it is possible to automatically convert code to functions or import external functions whenever we load new versions of our code or when we run automated CI/CD workflows. In addition we may want to set other project attributes such as global parameters, secrets, and data.

Our code maybe stored in Python files, notebooks, external repositories, packaged containers, etc. We use the project.set_function() method to register our code in the project, the definitions will be saved to the project object as well as in a YAML file in the root of our project. Functions can also be imported from MLRun marketplace (using the hub:// schema).

We used the following functions in this tutorial:

  • prep-data — the first function, which ingests the Iris data set (in Notebook 01)

  • describe — generates statistics on the data set (from the marketplace)

  • train-iris — the model-training function (in Notebook 02)

  • test-classifier — the model-testing function (from the marketplace)

  • mlrun-model — the model-serving function (in Notebook 03)

Note: set_function uses the code_to_function and import_function methods under the hood (used in the previous notebooks), but in addition it saves the function configurations in the project spec for use in automated workflows and CI/CD.

We add the function definitions to the project along with parameters and data artifacts and save the project.

project.set_function('01-mlrun-basics.ipynb', 'prep-data', kind='job', image='mlrun/mlrun')
project.set_function('02-model-training.ipynb', 'train', kind='job', image='mlrun/mlrun', handler='train_iris')
project.set_function('hub://describe', 'describe')
project.set_function('hub://test_classifier', 'test')
project.set_function('hub://v2_model_server', 'serving')

# set project level parameters and save
project.spec.params = {'label_column': 'label'}
project.save()


When we save the project it stores the project definitions in the project.yaml, this will allow us to load the project from the source control (GIT) and run it with a single command or API call.

The project YAML for this project can be printed using:

print(project.to_yaml())
kind: project
metadata:
  name: getting-started-iguazio
  created: '2021-09-08T19:34:24.802000+00:00'
spec:
  params:
    label_column: label
  functions:
  - url: 01-mlrun-basics.ipynb
    name: prep-data
    kind: job
    image: mlrun/mlrun
  - url: 02-model-training.ipynb
    name: train
    kind: job
    image: mlrun/mlrun
    handler: train_iris
  - url: hub://describe
    name: describe
  - url: hub://test_classifier
    name: test
  - url: hub://v2_model_server
    name: serving
  workflows:
  - name: main
    path: workflow.py
    engine: null
  artifacts: []
  source: ''
  subpath: ''
  origin_url: ''
  desired_state: online
  owner: iguazio
  disable_auto_mount: false
status:
  state: online

Saving and Loading Projects from GIT

After we saved our project and its elements (functions, workflows, artifacts, etc.) we can commit all our changes to a GIT repository, this can be done using standard GIT tools or using MLRun project methods such as pull, push, remote which will call the Git API for you.

Projects can then be loaded from Git using MLRun load_project method, example:

project = mlrun.load_project("./myproj", "git://github.com/mlrun/project-demo.git", name=project_name)

or using MLRun CLI:

mlrun project -n myproj -u "git://github.com/mlrun/project-demo.git" ./myproj

Read the Projects, Automation & CI/CD section for more details

Using Kubeflow Pipelines

You’re now ready to create a full ML pipeline. This is done by using Kubeflow Pipelines — an open-source framework for building and deploying portable, scalable machine-learning workflows based on Docker containers. MLRun leverages this framework to take your existing code and deploy it as steps in the pipeline.

Note: When using the Iguazio Data Science Platform, Kubeflow Pipelines is available as a default (pre-deployed) shared platform service.

Step 3: Defining and Saving a Pipeline Workflow

A pipeline is created by running an MLRun “workflow”. The following code defines a workflow and writes it to a file in your local directory; (the file name is workflow.py). The workflow describes a directed acyclic graph (DAG) for execution using Kubeflow Pipelines, and depicts the connections between the functions and the data as part of an end-to-end pipeline. The workflow file has two parts — initialization of the function objects, and definition of a pipeline DSL (domain-specific language) for connecting the function inputs and outputs. Examine the code to see how functions objects are initialized and used (by name) within the workflow.

The defined pipeline includes the following steps:

  • Ingest the Iris flower data set (ingest).

  • Train and the model (train).

  • Test the model with its test data set.

  • Deploy the model as a real-time serverless function (deploy).

Note: A pipeline can also include continuous build integration and deployment (CI/CD) steps, such as building container images and deploying models.

%%writefile './workflow.py'

from kfp import dsl
from mlrun import run_function, deploy_function


DATASET = 'cleaned_data'
MODEL = 'iris'
LABELS = "label"

# Create a Kubeflow Pipelines pipeline
@dsl.pipeline(
    name="Getting-started-tutorial",
    description="This tutorial is designed to demonstrate some of the main "
                "capabilities of the Iguazio Data Science Platform.\n"
                "The tutorial uses the Iris flower data set."
)
def kfpipeline(source_url):

    # Ingest the data set
    ingest = run_function(
        'prep-data',
        handler='prep_data',
        inputs={'source_url': source_url},
        params={'label_column': LABELS},
        outputs=[DATASET])
    
    # Train a model   
    train = run_function(
        "train",
        params={"label_column": LABELS},
        inputs={"dataset": ingest.outputs[DATASET]},
        outputs=['my_model', 'test_set'])
    
    # Test and visualize the model
    test = run_function(
        "test",
        params={"label_column": LABELS},
        inputs={"models_path": train.outputs['my_model'],
                "test_set": train.outputs['test_set']})
    
    # Deploy the model as a serverless function
    deploy = deploy_function("serving", models={f"{MODEL}_v1": train.outputs['my_model']})
Overwriting ./workflow.py

Step 4: Registering the Workflow

Use the set_workflow MLRun project method to register your workflow with MLRun. The following code sets the name parameter to the selected workflow name (“main”) and the code parameter to the name of the workflow file that is found in your project directory (workflow.py).

# Register the workflow file as "main"
project.set_workflow('main', 'workflow.py')

Step 5: Running A Pipeline

First run the following code to save your project:

project.save()

Use the run MLRun project method to execute your workflow pipeline with Kubeflow Pipelines. The tutorial code sets the following method parameters; (for the full parameters list, see the MLRun documentation or embedded help):

  • name — the workflow name (in this case, “main” — see the previous step).

  • arguments — A dictionary of Kubeflow Pipelines arguments (parameters). The tutorial code sets this parameter to an empty arguments list ({}), but you can edit the code to add arguments.

  • artifact_path — a path or URL that identifies a location for storing the workflow artifacts. You can use {{workflow.uid}} in the path to signify the ID of the current workflow run iteration. The tutorial code sets the artifacts path to a <worker ID> directory ({{workflow.uid}}) in a pipeline directory under the projects container (/v3io/projects/getting-started-tutorial-project name/pipeline/<worker ID>).

  • dirty — set to True to allow running the workflow also when the project’s Git repository is dirty (i.e., contains uncommitted changes). (When the notebook that contains the execution code is in the same Git directory as the executed workflow, the directory will always be dirty during the execution.)

  • watch — set to True to wait for the pipeline to complete and output the execution graph as it updates.

The run method returns the ID of the executed workflow, which the code stores in a run_id variable. You can use this ID to track the progress or your workflow, as demonstrated in the following sections.

Note: You can also run the workflow from a command-line shell by using the mlrun CLI. The following CLI command defines a similar execution logic as that of the run call in the tutorial:

mlrun project /User/getting-started-tutorial/conf -r main -p "$V3IO_HOME_URL/getting-started-tutorial/pipeline/{{workflow.uid}}/"
import os
from mlrun import mlconf
url_prefix = os.environ.get('SAMPLE_DATA_SOURCE_URL_PREFIX', 'https://s3.wasabisys.com/iguazio')
# Set the source-data URL
source_url = f'{url_prefix.rstrip("/")}/data/iris/iris.data.raw.csv'
pipeline_path = mlconf.artifact_path

run_id = project.run(
    'main',
    arguments={'source_url' : source_url}, 
    artifact_path=os.path.join(pipeline_path, "pipeline", '{{workflow.uid}}'),
    dirty=True,
    watch=True)
../_images/04-pipeline_28_0.svg

Run Results

Workflow 4644392d-20ca-40fd-a8be-4cf783e29d9f finished, state=Succeeded
click the hyper links below to see detailed results
uid start state name results artifacts
Sep 09 05:21:54 completed test-classifier
accuracy=1.0
test-error=0.0
auc-micro=1.0
auc-weighted=1.0
f1-score=1.0
precision_score=1.0
recall_score=1.0
confusion-matrix
feature-importances
precision-recall-multiclass
roc-multiclass
test_set_preds
Sep 09 05:21:40 completed train
accuracy=1.0
test-error=0.0
auc-micro=1.0
auc-weighted=1.0
f1-score=1.0
precision_score=1.0
recall_score=1.0
confusion-matrix
feature-importances
precision-recall-multiclass
roc-multiclass
test_set
my_model
Sep 09 05:21:27 completed prep-data-prep_data
num_rows=150
cleaned_data

Step 6: Viewing the Pipeline on the Dashboard (UI)

Navigate to the Pipelines page on the dashboard (UI). After the pipelines execution completes, you should be able to view the pipeline and see its functions:

  • prep-data

  • train

  • test

  • deploy-serving

pipeline

Step 7: Invoking the Model

Now that your model is deployed using the pipeline, you can invoke it as usual:

serving_func = project.func('serving')
my_data = {'inputs': [[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}
serving_func.invoke('/v2/models/iris_v1/infer', my_data)
> 2021-09-09 05:22:10,329 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-getting-started-iguazio-v2-model-server.default-tenant.svc.cluster.local:8080/v2/models/iris_v1/infer'}
{'id': 'cadf6104-5278-48b9-a95e-25652af1b9a2',
 'model_name': 'iris_v1',
 'outputs': [0, 2]}

You can also make an HTTP call directly:

import requests
import json
predict_url = f'http://{serving_func.status.address}/v2/models/iris_v1/predict'
resp = requests.put(predict_url, json=json.dumps(my_data))
print(resp.json())
{'id': '4997dfa8-d73d-447c-b3c8-efe6aa83e9f2', 'model_name': 'iris_v1', 'outputs': [0, 2]}

Done!

Congratulation! You’ve completed the getting started tutorial.

You might also want to explore the following demos:

For additional information and guidelines, see the MLRun How-To Guides and Demos.