Part 4: Automated ML pipeline
Contents
Part 4: Automated ML pipeline#
MLRun Project is a container for all your work on a particular activity: sll the associated code, functions,
jobs/workflows and artifacts. Projects can be mapped to git
repositories which enables versioning, collaboration, and CI/CD.
Users can create project definitions using the SDK or a yaml file and store those in MLRun DB, file, or archive.
Once the project is loaded you can run jobs/workflows which refer to any project element by name, allowing separation between configuration and code.
Projects contain workflows
that execute the registered functions in a sequence/graph (DAG). It can reference project parameters, secrets and artifacts by name. The following notebook demonstrate how to build an automated workflow with feature selection, training, testing, and deployment.
Step 1: Setting up your project#
To run a pipeline, you first need to get or create a project object and define/import the required functions for its execution. See Create and load projects for details.
The following code gets or creates a user project named “fraud-demo-<username>”.
# Set the base project name
project_name = 'fraud-demo'
import mlrun
# Initialize the MLRun project object
project = mlrun.get_or_create_project(project_name, context="./", user_project=True)
> 2021-10-28 13:54:45,892 [info] loaded project fraud-demo from MLRun DB
Step 2: Updating project and function definitions#
You need to save the definitions for the function you use in the projects so it is possible to automatically convert code to functions or import external functions whenever you load new versions of the code or when you run automated CI/CD workflows. In addition you may want to set other project attributes such as global parameters, secrets, and data.
The code can be stored in Python files, notebooks, external repositories, packaged containers, etc. Use the project.set_function()
method to register the code in the project. The definitions are saved to the project object, as well as in a YAML file in the root of our project.
Functions can also be imported from MLRun marketplace (using the hub://
schema).
You used the following functions in this tutorial:
feature_selection
— the first function, which determines the top features to be used for trainingtrain
— the model-training functiontest-classifier
— the model-testing functionmlrun-model
— the model-serving function
```{admonition} Note: set_function
uses the code_to_function
and import_function
methods under the hood (used in the previous notebooks), but in addition it saves the function configurations in the project spec for use in automated workflows and CI/CD.
Add the function definitions to the project along with parameters and data artifacts and save the project.
project.set_function('hub://feature_selection', 'feature_selection')
project.set_function('hub://sklearn-classifier','train')
project.set_function('hub://test_classifier', 'test')
project.set_function('hub://v2_model_server', 'serving')
<mlrun.runtimes.serving.ServingRuntime at 0x7f6229497190>
# set project level parameters and save
project.spec.params = {'label_column': 'label'}
project.save()
When you save the project it stores the project definitions in the project.yaml
. This means that you can load the project from the source control (GIT) and run it with a single command or API call.
The project YAML for this project can be printed using:
print(project.to_yaml())
kind: project
metadata:
name: fraud-demo-admin
created: '2021-08-05T15:59:59.434655'
spec:
params:
label_column: label
functions:
- url: hub://feature_selection
name: feature_selection
- url: hub://sklearn-classifier
name: train
- url: hub://test_classifier
name: test
- url: hub://v2_model_server
name: serving
workflows:
- name: main
path: workflow.py
engine: null
artifacts: []
desired_state: online
disable_auto_mount: false
status:
state: online
Saving and loading projects from GIT#
After you save the project and its elements (functions, workflows, artifacts, etc.) you can commit all the changes to a GIT repository. Do this using standard GIT tools or using MLRun project
methods such as pull
, push
, remote
that call the Git API for you.
Projects can then be loaded from Git using MLRun load_project
method, for example:
project = mlrun.load_project("./myproj", "git://github.com/mlrun/project-demo.git", name=project_name)
or using MLRun CLI:
mlrun project -n myproj -u "git://github.com/mlrun/project-demo.git" ./myproj
Read Create and load projects for more details.
Using Kubeflow pipelines#
You’re now ready to create a full ML pipeline. This is done by using Kubeflow Pipelines — an open-source framework for building and deploying portable, scalable, machine-learning workflows based on Docker containers. MLRun leverages this framework to take your existing code and deploy it as steps in the pipeline.
Step 3: Defining and saving a pipeline workflow#
A pipeline is created by running an MLRun “workflow”. The following code defines a workflow and writes it to a file in your local directory; (the file name is workflow.py). The workflow describes a directed acyclic graph (DAG) for execution using Kubeflow Pipelines, and depicts the connections between the functions and the data as part of an end-to-end pipeline. The workflow file has a definition of a pipeline DSL for connecting the function inputs and outputs.
The defined pipeline includes the following steps:
Perform feature selection (
feature_selection
).Train and the model (
train
).Test the model with its test data set.
Deploy the model as a real-time serverless function (
deploy
).
Note: A pipeline can also include continuous build integration and deployment (CI/CD) steps, such as building container images and deploying models.
%%writefile workflow.py
import mlrun
from kfp import dsl
from mlrun.model import HyperParamOptions
from mlrun import (
build_function,
deploy_function,
import_function,
run_function,
)
@dsl.pipeline(
name="Fraud Detection Pipeline",
description="Detecting fraud from a transactions dataset"
)
def kfpipeline(vector_name='transactions-fraud'):
project = mlrun.get_current_project()
# Feature selection
feature_selection = run_function(
"feature_selection",
name="feature_selection",
params={'sample_ratio':0.25,'output_vector_name': "short",
'ignore_type_errors': True},
inputs={'df_artifact': project.get_artifact_uri(vector_name, 'feature-vector')},
outputs=['top_features_vector'])
# train with hyper-paremeters
train = run_function(
"train",
name="train",
params={"sample": -1,
"label_column": project.get_param('label_column', 'label'),
"test_size": 0.10},
hyperparams={"model_name": ['transaction_fraud_rf',
'transaction_fraud_xgboost',
'transaction_fraud_adaboost'],
'model_pkg_class': ["sklearn.ensemble.RandomForestClassifier",
"sklearn.linear_model.LogisticRegression",
"sklearn.ensemble.AdaBoostClassifier"]},
hyper_param_options=HyperParamOptions(selector="max.accuracy"),
inputs={"dataset": feature_selection.outputs['top_features_vector']},
outputs=['model', 'test_set'])
# test and visualize our model
test = run_function(
"test",
name="test",
params={"label_column": project.get_param('label_column', 'label')},
inputs={
"models_path": train.outputs["model"],
"test_set": train.outputs["test_set"]})
# route the serving model to use enrichment
funcs['serving'].set_topology('router',
'mlrun.serving.routers.EnrichmentModelRouter',
name='EnrichmentModelRouter',
feature_vector_uri="transactions-fraud-short",
impute_policy={"*": "$mean"},
exist_ok=True)
# deploy the model as a serverless function, you can pass a list of models to serve
deploy = deploy_function("serving", models=[{"key": 'fraud', "model_path": train.outputs["model"]}])
Overwriting workflow.py
Step 4: Registering the workflow#
Use the set_workflow
MLRun project method to register your workflow with MLRun.
The following code sets the name
parameter to the selected workflow name (“main”) and the code
parameter to the name of the workflow file that is found in your project directory (workflow.py).
# Register the workflow file as "main"
project.set_workflow('main', 'workflow.py')
Step 5: Running a pipeline#
First run the following code to save your project:
project.save()
Use the run
MLRun project method to execute your workflow pipeline with Kubeflow Pipelines.
You can pass arguments
or set the artifact_path
to specify a unique path for storing the workflow artifacts.
run_id = project.run(
'main',
arguments={},
dirty=True, watch=True)
Run Results
Workflow 34db6d3c-858e-4bb5-9a6c-547baec5d0a7 finished, state=Succeededclick the hyper links below to see detailed results
uid | start | state | name | results | artifacts |
---|---|---|---|---|---|
Oct 28 13:56:30 | completed | test-classifier | accuracy=0.9883058032451396 test-error=0.0116941967548604 rocauc=0.8130881224506281 brier_score=0.22075754415862567 f1-score=0.36507936507936506 precision_score=0.6052631578947368 recall_score=0.26136363636363635 |
probability-calibration confusion-matrix feature-importances precision-recall-binary roc-binary test_set_preds |
|
Oct 28 13:55:22 | completed | sklearn-classifier | best_iteration=7 accuracy=0.9896594661902441 test-error=0.010340533809755834 rocauc=0.8228432450474152 brier_score=0.2209646484723041 f1-score=0.3612040133779264 precision_score=0.6206896551724138 recall_score=0.25471698113207547 |
test_set probability-calibration confusion-matrix feature-importances precision-recall-binary roc-binary model iteration_results |
|
Oct 28 13:54:57 | completed | feature-selection | top_features_vector=store://feature-vectors/fraud-demo-admin/short |
f_classif mutual_info_classif f_regression LinearSVC LogisticRegression ExtraTreesClassifier feature_scores max_scaled_scores_feature_scores selected_features_count selected_features |
Step 6: Test the model end point#
Now that your model is deployed using the pipeline, you can invoke it as usual:
# Define the serving function
serving_fn = project.func('serving')
# Choose an id for the test
sample_id = 'C76780537'
model_inference_path = '/v2/models/fraud/infer'
# Send the sample ID for predcition
serving_fn.invoke(path=model_inference_path,
body={'inputs': [[sample_id]]})
> 2021-10-28 13:56:56,170 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-admin-v2-model-server.default-tenant.svc.cluster.local:8080/v2/models/fraud/infer'}
{'id': '90f4b67c-c9e0-4e35-917f-979b71c5ad75',
'model_name': 'fraud',
'outputs': [0.0]}