Part 4: Automated ML pipeline#

MLRun Project is a container for all your work on a particular activity: sll the associated code, functions, jobs/workflows and artifacts. Projects can be mapped to git repositories which enables versioning, collaboration, and CI/CD. Users can create project definitions using the SDK or a yaml file and store those in MLRun DB, file, or archive. Once the project is loaded you can run jobs/workflows which refer to any project element by name, allowing separation between configuration and code.

Projects contain workflows that execute the registered functions in a sequence/graph (DAG). It can reference project parameters, secrets and artifacts by name. The following notebook demonstrate how to build an automated workflow with feature selection, training, testing, and deployment.

Step 1: Setting up your project#

To run a pipeline, you first need to get or create a project object and define/import the required functions for its execution. See Create and load projects for details.

The following code gets or creates a user project named “fraud-demo-<username>”.

# Set the base project name
project_name = 'fraud-demo'
import mlrun

# Initialize the MLRun project object
project = mlrun.get_or_create_project(project_name, context="./", user_project=True)
> 2021-10-28 13:54:45,892 [info] loaded project fraud-demo from MLRun DB

Step 2: Updating project and function definitions#

You need to save the definitions for the function you use in the projects so it is possible to automatically convert code to functions or import external functions whenever you load new versions of the code or when you run automated CI/CD workflows. In addition you may want to set other project attributes such as global parameters, secrets, and data.

The code can be stored in Python files, notebooks, external repositories, packaged containers, etc. Use the project.set_function() method to register the code in the project. The definitions are saved to the project object, as well as in a YAML file in the root of our project. Functions can also be imported from MLRun marketplace (using the hub:// schema).

You used the following functions in this tutorial:

  • feature_selection — the first function, which determines the top features to be used for training

  • train — the model-training function

  • test-classifier — the model-testing function

  • mlrun-model — the model-serving function

```{admonition} Note: set_function uses the code_to_function and import_function methods under the hood (used in the previous notebooks), but in addition it saves the function configurations in the project spec for use in automated workflows and CI/CD.


Add the function definitions to the project along with parameters and data artifacts and save the project.

project.set_function('hub://feature_selection', 'feature_selection')
project.set_function('hub://sklearn-classifier','train')
project.set_function('hub://test_classifier', 'test')
project.set_function('hub://v2_model_server', 'serving')
<mlrun.runtimes.serving.ServingRuntime at 0x7f6229497190>
# set project level parameters and save
project.spec.params = {'label_column': 'label'}
project.save()


When you save the project it stores the project definitions in the project.yaml. This means that you can load the project from the source control (GIT) and run it with a single command or API call.

The project YAML for this project can be printed using:

print(project.to_yaml())
kind: project
metadata:
  name: fraud-demo-admin
  created: '2021-08-05T15:59:59.434655'
spec:
  params:
    label_column: label
  functions:
  - url: hub://feature_selection
    name: feature_selection
  - url: hub://sklearn-classifier
    name: train
  - url: hub://test_classifier
    name: test
  - url: hub://v2_model_server
    name: serving
  workflows:
  - name: main
    path: workflow.py
    engine: null
  artifacts: []
  desired_state: online
  disable_auto_mount: false
status:
  state: online

Saving and loading projects from GIT#

After you save the project and its elements (functions, workflows, artifacts, etc.) you can commit all the changes to a GIT repository. Do this using standard GIT tools or using MLRun project methods such as pull, push, remote that call the Git API for you.

Projects can then be loaded from Git using MLRun load_project method, for example:

project = mlrun.load_project("./myproj", "git://github.com/mlrun/project-demo.git", name=project_name)

or using MLRun CLI:

mlrun project -n myproj -u "git://github.com/mlrun/project-demo.git" ./myproj

Read Create and load projects for more details.

Using Kubeflow pipelines#

You’re now ready to create a full ML pipeline. This is done by using Kubeflow Pipelines — an open-source framework for building and deploying portable, scalable, machine-learning workflows based on Docker containers. MLRun leverages this framework to take your existing code and deploy it as steps in the pipeline.

Step 3: Defining and saving a pipeline workflow#

A pipeline is created by running an MLRun “workflow”. The following code defines a workflow and writes it to a file in your local directory; (the file name is workflow.py). The workflow describes a directed acyclic graph (DAG) for execution using Kubeflow Pipelines, and depicts the connections between the functions and the data as part of an end-to-end pipeline. The workflow file has a definition of a pipeline DSL for connecting the function inputs and outputs.

The defined pipeline includes the following steps:

  • Perform feature selection (feature_selection).

  • Train and the model (train).

  • Test the model with its test data set.

  • Deploy the model as a real-time serverless function (deploy).

Note: A pipeline can also include continuous build integration and deployment (CI/CD) steps, such as building container images and deploying models.

%%writefile workflow.py
import mlrun
from kfp import dsl
from mlrun.model import HyperParamOptions

from mlrun import (
    build_function,
    deploy_function,
    import_function,
    run_function,
)

    
@dsl.pipeline(
    name="Fraud Detection Pipeline",
    description="Detecting fraud from a transactions dataset"
)

def kfpipeline(vector_name='transactions-fraud'):
    
    project = mlrun.get_current_project()
    
    # Feature selection   
    feature_selection = run_function(
        "feature_selection",
        name="feature_selection",
        params={'sample_ratio':0.25,'output_vector_name': "short",
                'ignore_type_errors': True},
        inputs={'df_artifact': project.get_artifact_uri(vector_name, 'feature-vector')},
        outputs=['top_features_vector'])
    
    
    # train with hyper-paremeters
    train = run_function(
        "train",
        name="train",
        params={"sample": -1, 
                "label_column": project.get_param('label_column', 'label'),
                "test_size": 0.10},
        hyperparams={"model_name": ['transaction_fraud_rf', 
                                  'transaction_fraud_xgboost', 
                                  'transaction_fraud_adaboost'],
                     
                     'model_pkg_class': ["sklearn.ensemble.RandomForestClassifier", 
                                         "sklearn.linear_model.LogisticRegression",
                                         "sklearn.ensemble.AdaBoostClassifier"]},
        hyper_param_options=HyperParamOptions(selector="max.accuracy"),
        inputs={"dataset": feature_selection.outputs['top_features_vector']},
        outputs=['model', 'test_set'])
    
            
    # test and visualize our model
    test = run_function(
        "test",
        name="test",
        params={"label_column": project.get_param('label_column', 'label')},
        inputs={
            "models_path": train.outputs["model"],
            "test_set": train.outputs["test_set"]})
    
    # route the serving model to use enrichment
    funcs['serving'].set_topology('router', 
                                  'mlrun.serving.routers.EnrichmentModelRouter', 
                                  name='EnrichmentModelRouter', 
                                  feature_vector_uri="transactions-fraud-short", 
                                  impute_policy={"*": "$mean"},
                                  exist_ok=True)

    
    # deploy the model as a serverless function, you can pass a list of models to serve 
    deploy = deploy_function("serving", models=[{"key": 'fraud', "model_path": train.outputs["model"]}])
Overwriting workflow.py

Step 4: Registering the workflow#

Use the set_workflow MLRun project method to register your workflow with MLRun. The following code sets the name parameter to the selected workflow name (“main”) and the code parameter to the name of the workflow file that is found in your project directory (workflow.py).

# Register the workflow file as "main"
project.set_workflow('main', 'workflow.py')

Step 5: Running a pipeline#

First run the following code to save your project:

project.save()

Use the run MLRun project method to execute your workflow pipeline with Kubeflow Pipelines.

You can pass arguments or set the artifact_path to specify a unique path for storing the workflow artifacts.

run_id = project.run(
    'main',
    arguments={}, 
    dirty=True, watch=True)
../../_images/4672d122857a30216acfe2ef27a6ade007655248dc997311c8ac2f594ac0ffb2.svg

Run Results

Workflow 34db6d3c-858e-4bb5-9a6c-547baec5d0a7 finished, state=Succeeded
click the hyper links below to see detailed results
uid start state name results artifacts
Oct 28 13:56:30 completed test-classifier
accuracy=0.9883058032451396
test-error=0.0116941967548604
rocauc=0.8130881224506281
brier_score=0.22075754415862567
f1-score=0.36507936507936506
precision_score=0.6052631578947368
recall_score=0.26136363636363635
probability-calibration
confusion-matrix
feature-importances
precision-recall-binary
roc-binary
test_set_preds
Oct 28 13:55:22 completed sklearn-classifier
best_iteration=7
accuracy=0.9896594661902441
test-error=0.010340533809755834
rocauc=0.8228432450474152
brier_score=0.2209646484723041
f1-score=0.3612040133779264
precision_score=0.6206896551724138
recall_score=0.25471698113207547
test_set
probability-calibration
confusion-matrix
feature-importances
precision-recall-binary
roc-binary
model
iteration_results
Oct 28 13:54:57 completed feature-selection
top_features_vector=store://feature-vectors/fraud-demo-admin/short
f_classif
mutual_info_classif
f_regression
LinearSVC
LogisticRegression
ExtraTreesClassifier
feature_scores
max_scaled_scores_feature_scores
selected_features_count
selected_features

Step 6: Test the model end point#

Now that your model is deployed using the pipeline, you can invoke it as usual:

# Define the serving function
serving_fn = project.func('serving')

# Choose an id for the test
sample_id = 'C76780537'
model_inference_path = '/v2/models/fraud/infer'

# Send the sample ID for predcition
serving_fn.invoke(path=model_inference_path,
                  body={'inputs': [[sample_id]]})
> 2021-10-28 13:56:56,170 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-admin-v2-model-server.default-tenant.svc.cluster.local:8080/v2/models/fraud/infer'}
{'id': '90f4b67c-c9e0-4e35-917f-979b71c5ad75',
 'model_name': 'fraud',
 'outputs': [0.0]}

Done!#