Part 4: Automated ML pipeline#

MLRun Project is a container for all your work on a particular activity: all of the associated code, functions, jobs/workflows and artifacts. Projects can be mapped to git repositories, which enable versioning, collaboration, and CI/CD. Users can create project definitions using the SDK or a yaml file and store those in MLRun DB, file, or archive. Once the project is loaded you can run jobs/workflows that refer to any project element by name, allowing separation between configuration and code.

Projects contain workflows that execute the registered functions in a sequence/graph (DAG), can reference project parameters, secrets and artifacts by name. This notebook demonstrates how to build an automated workflow with feature selection, training, testing, and deployment.

Step 1: Setting up your project#

To run a pipeline, you first need to get or create a project object and define/import the required functions for its execution. See the Create, save, and use projects for details.

The following code gets or creates a user project named “fraud-demo”.

# Set the base project name
project_name = 'fraud-demo'
import mlrun

# Initialize the MLRun project object
project = mlrun.get_or_create_project(project_name, context="./", user_project=True)
> 2023-02-15 14:52:09,517 [info] loaded project fraud-demo from MLRun DB

Step 2: Updating project and function definitions#

You need to save the definitions for the function you use in the projects. This enables automatically converting code to functions or import external functions whenever you load new versions of your code or when you run automated CI/CD workflows. In addition, you may want to set other project attributes such as global parameters, secrets, and data.

Your code can be stored in Python files, notebooks, external repositories, packaged containers, etc. You use the project.set_function() method to register your code in the project. The definitions are saved to the project object, as well as in a YAML file in the root of our project. Functions can also be imported from MLRun marketplace (using the hub:// schema).

This tutorial uses these functions:

  • feature_selection — the first function, which determines the top features to be used for training.

  • train — the model-training function

  • evaluate — the model-testing function

  • mlrun-model — the model-serving function

Note

set_function uses the code_to_function and import_function methods under the hood (used in the previous notebooks), but in addition it saves the function configurations in the project spec for use in automated workflows and CI/CD.

Add the function definitions to the project along with parameters and data artifacts and save the project.

project.set_function('hub://feature_selection', 'feature_selection')
project.set_function('hub://auto_trainer','train')
project.set_function('hub://v2_model_server', 'serving')
Names with underscore '_' are about to be deprecated, use dashes '-' instead. Replacing underscores with dashes.
<mlrun.runtimes.serving.ServingRuntime at 0x7f5701e79520>
# set project level parameters and save
project.spec.params = {'label_column': 'label'}
project.save()
<mlrun.projects.project.MlrunProject at 0x7f5720229280>


When you save the project it stores the project definitions in the project.yaml. This allows you to load the project from the source control (GIT) and run it with a single command or API call.

The project YAML for this project can be printed using:

print(project.to_yaml())
kind: project
metadata:
  name: fraud-demo-dani
  created: '2023-02-15T14:40:29.807000'
spec:
  params:
    label_column: label
  functions:
  - url: hub://feature_selection
    name: feature_selection
  - url: hub://auto_trainer
    name: train
  - url: hub://v2_model_server
    name: serving
  workflows: []
  artifacts: []
  source: ''
  desired_state: online
  owner: dani
status:
  state: online

Saving and loading projects from GIT#

After you saved your project and its elements (functions, workflows, artifacts, etc.) you can commit all your changes to a GIT repository. This can be done using standard GIT tools or using MLRun project methods such as pull, push, remote, which calls the Git API for you.

Projects can then be loaded from Git using MLRun load_project method, for example:

project = mlrun.load_project("./myproj", "git://github.com/mlrun/project-demo.git", name=project_name)

or using MLRun CLI:

mlrun project -n myproj -u "git://github.com/mlrun/project-demo.git" ./myproj

Read CI/CD integration for more details.

Using Kubeflow pipelines#

You’re now ready to create a full ML pipeline. This is done by using Kubeflow Pipelines — an open-source framework for building and deploying portable, scalable machine-learning workflows based on Docker containers. MLRun leverages this framework to take your existing code and deploy it as steps in the pipeline.

Step 3: Defining and saving a pipeline workflow#

A pipeline is created by running an MLRun “workflow”. The following code defines a workflow and writes it to a file in your local directory. (The file name is workflow.py.) The workflow describes a directed acyclic graph (DAG) for execution using Kubeflow Pipelines, and depicts the connections between the functions and the data as part of an end-to-end pipeline. The workflow file has a definition of a pipeline DSL for connecting the function inputs and outputs.

The defined pipeline includes the following steps:

  • Perform feature selection (feature_selection).

  • Train and the model (train).

  • Test the model with its test data set (evaluate).

  • Deploy the model as a real-time serverless function (deploy).

Note

A pipeline can also include continuous build integration and deployment (CI/CD) steps, such as building container images and deploying models.

%%writefile workflow.py
import mlrun
from kfp import dsl
from mlrun.model import HyperParamOptions

from mlrun import (
    build_function,
    deploy_function,
    import_function,
    run_function,
)

    
@dsl.pipeline(
    name="Fraud Detection Pipeline",
    description="Detecting fraud from a transactions dataset"
)

def kfpipeline(vector_name='transactions-fraud'):
    
    project = mlrun.get_current_project()
    
    # Feature selection   
    feature_selection = run_function(
        "feature_selection",
        name="feature_selection",
        params={'output_vector_name': "short",
                "label_column": project.get_param('label_column', 'label'),
                "k": 18,
                "min_votes": 2,
                'ignore_type_errors': True
               },
        inputs={'df_artifact': project.get_artifact_uri(vector_name, 'feature-vector')},
        outputs=['feature_scores', 'selected_features_count', 'top_features_vector', 'selected_features'])
    
    
    # train with hyper-paremeters
    train = run_function(
        "train",
        name="train",
        handler="train",
        params={"sample": -1, 
                "label_column": project.get_param('label_column', 'label'),
                "test_size": 0.10},
        hyperparams={"model_name": ['transaction_fraud_rf', 
                                    'transaction_fraud_xgboost', 
                                    'transaction_fraud_adaboost'],
                     'model_class': ["sklearn.ensemble.RandomForestClassifier", 
                                     "sklearn.linear_model.LogisticRegression",
                                     "sklearn.ensemble.AdaBoostClassifier"]},
        hyper_param_options=HyperParamOptions(selector="max.accuracy"),
        inputs={"dataset": feature_selection.outputs['top_features_vector']},
        outputs=['model', 'test_set'])
    
            
    # test and visualize your model
    test = run_function(
        "train",
        name="evaluate",
        handler='evaluate',
        params={"label_columns": project.get_param('label_column', 'label'),
                "model": train.outputs["model"], 
                "drop_columns": project.get_param('label_column', 'label')},
        inputs={"dataset": train.outputs["test_set"]})
    
    # route your serving model to use enrichment
    funcs['serving'].set_topology('router', 
                                  'mlrun.serving.routers.EnrichmentModelRouter', 
                                  name='EnrichmentModelRouter', 
                                  feature_vector_uri="transactions-fraud-short", 
                                  impute_policy={"*": "$mean"},
                                  exist_ok=True)

    
    # deploy your model as a serverless function, you can pass a list of models to serve 
    deploy = deploy_function("serving", models=[{"key": 'fraud', "model_path": train.outputs["model"]}])
Writing workflow.py

Step 4: Registering the workflow#

Use the set_workflow MLRun project method to register your workflow with MLRun. The following code sets the name parameter to the selected workflow name (“main”) and the code parameter to the name of the workflow file that is found in your project directory (workflow.py).

# Register the workflow file as "main"
project.set_workflow('main', 'workflow.py')

Step 5: Running a pipeline#

First run the following code to save your project:

project.save()
<mlrun.projects.project.MlrunProject at 0x7f5720229280>

Use the run MLRun project method to execute your workflow pipeline with Kubeflow Pipelines.

You can pass arguments or set the artifact_path to specify a unique path for storing the workflow artifacts.

run_id = project.run(
    'main',
    arguments={}, 
    dirty=True, watch=True)
Pipeline running (id=2e7556a2-c398-4134-8229-163bd7ee3ec3), click here to view the details in MLRun UI
../../_images/34582477f38509d80c3524cdac676bbcd1c20bc5699aa009e4950ac024bcfcec.svg

Run Results

[info] Workflow 2e7556a2-c398-4134-8229-163bd7ee3ec3 finished, state=Succeeded


click the hyper links below to see detailed results
uid start state name parameters results
Feb 15 14:53:55 completed evaluate
label_columns=label
model=store://artifacts/fraud-demo-dani/transaction_fraud_adaboost:2e7556a2-c398-4134-8229-163bd7ee3ec3
drop_columns=label
evaluation_accuracy=0.991504247876062
evaluation_f1_score=0.4137931034482759
evaluation_precision_score=0.42857142857142855
evaluation_recall_score=0.4
Feb 15 14:53:00 completed train
sample=-1
label_column=label
test_size=0.1
best_iteration=9
accuracy=0.991504247876062
f1_score=0.4137931034482759
precision_score=0.42857142857142855
recall_score=0.4
Feb 15 14:52:23 completed feature_selection
output_vector_name=short
label_column=label
k=18
min_votes=2
ignore_type_errors=True
top_features_vector=store://feature-vectors/fraud-demo-dani/short

Step 6: Test the model endpoint#

Now that your model is deployed using the pipeline, you can invoke it as usual:

# Define your serving function
serving_fn = project.get_function('serving')

# Choose an id for your test
sample_id = 'C1000148617'
model_inference_path = '/v2/models/fraud/infer'

# Send our sample ID for predcition
serving_fn.invoke(path=model_inference_path,
                  body={'inputs': [[sample_id]]})
> 2023-02-15 14:56:50,310 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-dani-serving.default-tenant.svc.cluster.local:8080/v2/models/fraud/infer'}
{'id': 'dbc3b94e-367d-4970-8825-f99ebf76320b',
 'model_name': 'fraud',
 'outputs': [0]}

Done!#