Part 4: Automated ML Pipeline

MLRun Project is a container for all your work on a particular activity. All the associated code, functions, jobs/workflows and artifacts. Projects can be mapped to git repositories which enable versioning, collaboration, and CI/CD. Users can create project definitions using the SDK or a yaml file and store those in MLRun DB, file, or archive. Once the project is loaded you can run jobs/workflows which refer to any project element by name, allowing separation between configuration and code.

Projects contain workflows which execute the registered functions in a sequence/graph (DAG), can reference project parameters, secrets and artifacts by name. The following notebook demonstrate how we build an automated workflow with feature selection, training, testing, and deployment.

Step 1: Setting Up Your Project

To run a pipeline, you first need to get or create a project object and define/import the required functions for its execution. See the Projects, Automation & CI/CD section for details.

The following code gets or creates a user project named “fraud-demo-<username>”.

# Set the base project name
project_name = 'fraud-demo'
import mlrun

# Initialize the MLRun project object
project = mlrun.get_or_create_project(project_name, context="./", user_project=True)
> 2021-10-28 13:54:45,892 [info] loaded project fraud-demo from MLRun DB

Step 2: Updating Project and Function Definitions

We need to save the definitions for the function we use in the projects so it is possible to automatically convert code to functions or import external functions whenever we load new versions of our code or when we run automated CI/CD workflows. In addition we may want to set other project attributes such as global parameters, secrets, and data.

Our code maybe stored in Python files, notebooks, external repositories, packaged containers, etc. We use the project.set_function() method to register our code in the project, the definitions will be saved to the project object as well as in a YAML file in the root of our project. Functions can also be imported from MLRun marketplace (using the hub:// schema).

We used the following functions in this tutorial:

  • feature_selection — the first function, which determines the top features to be used for training.

  • train — the model-training function

  • test-classifier — the model-testing function

  • mlrun-model — the model-serving function

Note: set_function uses the code_to_function and import_function methods under the hood (used in the previous notebooks), but in addition it saves the function configurations in the project spec for use in automated workflows and CI/CD.

We add the function definitions to the project along with parameters and data artifacts and save the project.

project.set_function('hub://feature_selection', 'feature_selection')
project.set_function('hub://sklearn-classifier','train')
project.set_function('hub://test_classifier', 'test')
project.set_function('hub://v2_model_server', 'serving')
<mlrun.runtimes.serving.ServingRuntime at 0x7f6229497190>
# set project level parameters and save
project.spec.params = {'label_column': 'label'}
project.save()


When we save the project it stores the project definitions in the project.yaml, this will allow us to load the project from the source control (GIT) and run it with a single command or API call.

The project YAML for this project can be printed using:

print(project.to_yaml())
kind: project
metadata:
  name: fraud-demo-admin
  created: '2021-08-05T15:59:59.434655'
spec:
  params:
    label_column: label
  functions:
  - url: hub://feature_selection
    name: feature_selection
  - url: hub://sklearn-classifier
    name: train
  - url: hub://test_classifier
    name: test
  - url: hub://v2_model_server
    name: serving
  workflows:
  - name: main
    path: workflow.py
    engine: null
  artifacts: []
  desired_state: online
  disable_auto_mount: false
status:
  state: online

Saving and Loading Projects from GIT

After we saved our project and its elements (functions, workflows, artifacts, etc.) we can commit all our changes to a GIT repository, this can be done using standard GIT tools or using MLRun project methods such as pull, push, remote which will call the Git API for you.

Projects can then be loaded from Git using MLRun load_project method, example:

project = mlrun.load_project("./myproj", "git://github.com/mlrun/project-demo.git", name=project_name)

or using MLRun CLI:

mlrun project -n myproj -u "git://github.com/mlrun/project-demo.git" ./myproj

Read the Projects, Automation & CI/CD section for more details

Using Kubeflow Pipelines

You’re now ready to create a full ML pipeline. This is done by using Kubeflow Pipelines — an open-source framework for building and deploying portable, scalable machine-learning workflows based on Docker containers. MLRun leverages this framework to take your existing code and deploy it as steps in the pipeline.

Step 3: Defining and Saving a Pipeline Workflow

A pipeline is created by running an MLRun “workflow”. The following code defines a workflow and writes it to a file in your local directory; (the file name is workflow.py). The workflow describes a directed acyclic graph (DAG) for execution using Kubeflow Pipelines, and depicts the connections between the functions and the data as part of an end-to-end pipeline. The workflow file has a definition of a pipeline DSL for connecting the function inputs and outputs.

The defined pipeline includes the following steps:

  • Perform feature selection (feature_selection).

  • Train and the model (train).

  • Test the model with its test data set.

  • Deploy the model as a real-time serverless function (deploy).

Note: A pipeline can also include continuous build integration and deployment (CI/CD) steps, such as building container images and deploying models.

%%writefile workflow.py
import mlrun
from kfp import dsl
from mlrun.model import HyperParamOptions

from mlrun import (
    build_function,
    deploy_function,
    import_function,
    run_function,
)

    
@dsl.pipeline(
    name="Fraud Detection Pipeline",
    description="Detecting fraud from a transactions dataset"
)

def kfpipeline(vector_name='transactions-fraud'):
    
    project = mlrun.get_current_project()
    
    # Feature selection   
    feature_selection = run_function(
        "feature_selection",
        name="feature_selection",
        params={'sample_ratio':0.25,'output_vector_name': "short",
                'ignore_type_errors': True},
        inputs={'df_artifact': project.get_artifact_uri(vector_name, 'feature-vector')},
        outputs=['top_features_vector'])
    
    
    # train with hyper-paremeters
    train = run_function(
        "train",
        name="train",
        params={"sample": -1, 
                "label_column": project.get_param('label_column', 'label'),
                "test_size": 0.10},
        hyperparams={"model_name": ['transaction_fraud_rf', 
                                  'transaction_fraud_xgboost', 
                                  'transaction_fraud_adaboost'],
                     
                     'model_pkg_class': ["sklearn.ensemble.RandomForestClassifier", 
                                         "sklearn.linear_model.LogisticRegression",
                                         "sklearn.ensemble.AdaBoostClassifier"]},
        hyper_param_options=HyperParamOptions(selector="max.accuracy"),
        inputs={"dataset": feature_selection.outputs['top_features_vector']},
        outputs=['model', 'test_set'])
    
            
    # test and visualize our model
    test = run_function(
        "test",
        name="test",
        params={"label_column": project.get_param('label_column', 'label')},
        inputs={
            "models_path": train.outputs["model"],
            "test_set": train.outputs["test_set"]})
    
    # route our serving model to use enrichment
    funcs['serving'].set_topology('router', 
                                  'mlrun.serving.routers.EnrichmentModelRouter', 
                                  name='EnrichmentModelRouter', 
                                  feature_vector_uri="transactions-fraud-short", 
                                  impute_policy={"*": "$mean"},
                                  exist_ok=True)

    
    # deploy our model as a serverless function, we can pass a list of models to serve 
    deploy = deploy_function("serving", models=[{"key": 'fraud', "model_path": train.outputs["model"]}])
Overwriting workflow.py

Step 4: Registering the Workflow

Use the set_workflow MLRun project method to register your workflow with MLRun. The following code sets the name parameter to the selected workflow name (“main”) and the code parameter to the name of the workflow file that is found in your project directory (workflow.py).

# Register the workflow file as "main"
project.set_workflow('main', 'workflow.py')

Step 5: Running A Pipeline

First run the following code to save your project:

project.save()

Use the run MLRun project method to execute your workflow pipeline with Kubeflow Pipelines.

users can pass arguments or set the artifact_path to specify a unique path for storing the workflow artifacts

import os 
from os import environ, path
from mlrun import mlconf
pipeline_path = mlconf.artifact_path

run_id = project.run(
    'main',
    arguments={}, 
    artifact_path=os.path.join(pipeline_path, "pipeline", '{{workflow.uid}}'),
    dirty=True, watch=True)
../../_images/04-pipeline_25_0.svg

Run Results

Workflow 34db6d3c-858e-4bb5-9a6c-547baec5d0a7 finished, state=Succeeded
click the hyper links below to see detailed results
uid start state name results artifacts
Oct 28 13:56:30 completed test-classifier
accuracy=0.9883058032451396
test-error=0.0116941967548604
rocauc=0.8130881224506281
brier_score=0.22075754415862567
f1-score=0.36507936507936506
precision_score=0.6052631578947368
recall_score=0.26136363636363635
probability-calibration
confusion-matrix
feature-importances
precision-recall-binary
roc-binary
test_set_preds
Oct 28 13:55:22 completed sklearn-classifier
best_iteration=7
accuracy=0.9896594661902441
test-error=0.010340533809755834
rocauc=0.8228432450474152
brier_score=0.2209646484723041
f1-score=0.3612040133779264
precision_score=0.6206896551724138
recall_score=0.25471698113207547
test_set
probability-calibration
confusion-matrix
feature-importances
precision-recall-binary
roc-binary
model
iteration_results
Oct 28 13:54:57 completed feature-selection
top_features_vector=store://feature-vectors/fraud-demo-admin/short
f_classif
mutual_info_classif
f_regression
LinearSVC
LogisticRegression
ExtraTreesClassifier
feature_scores
max_scaled_scores_feature_scores
selected_features_count
selected_features

Step 6: Test the model end point

Now that your model is deployed using the pipeline, you can invoke it as usual:

# Define our serving function
serving_fn = project.func('serving')

# Choose an id for our test
sample_id = 'C76780537'
model_inference_path = '/v2/models/fraud/infer'

# Send our sample ID for predcition
serving_fn.invoke(path=model_inference_path,
                  body={'inputs': [[sample_id]]})
> 2021-10-28 13:56:56,170 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-admin-v2-model-server.default-tenant.svc.cluster.local:8080/v2/models/fraud/infer'}
{'id': '90f4b67c-c9e0-4e35-917f-979b71c5ad75',
 'model_name': 'fraud',
 'outputs': [0.0]}

Done!