Part 4: Automated ML pipeline#
MLRun Project is a container for all your work on a particular activity: all of the associated code, functions,
jobs/workflows and artifacts. Projects can be mapped to git
repositories, which enable versioning, collaboration, and CI/CD.
Users can create project definitions using the SDK or a yaml file and store those in MLRun DB, file, or archive.
Once the project is loaded you can run jobs/workflows that refer to any project element by name, allowing separation between configuration and code.
Projects contain workflows
that execute the registered functions in a sequence/graph (DAG), can reference project
parameters, secrets and artifacts by name. This notebook demonstrates how to build an automated workflow with
feature selection, training, testing, and deployment.
Step 1: Setting up your project#
To run a pipeline, you first need to get or create a project object and define/import the required functions for its execution. See the Create, save, and use projects for details.
The following code gets or creates a user project named "fraud-demo
# Set the base project name
project_name = "fraud-demo"
import mlrun
# Initialize the MLRun project object
project = mlrun.get_or_create_project(project_name, context="./", user_project=True)
> 2023-02-15 14:52:09,517 [info] loaded project fraud-demo from MLRun DB
Step 2: Updating project and function definitions#
You need to save the definitions for the function you use in the projects. This enables automatically converting code to functions or import external functions whenever you load new versions of your code or when you run automated CI/CD workflows. In addition, you may want to set other project attributes such as global parameters, secrets, and data.
Your code can be stored in Python files, notebooks, external repositories, packaged containers, etc. You use the
project.set_function()
method to register your code in the project. The definitions are saved to the project object, as
well as in a YAML file in the root of our project.
Functions can also be imported from MLRun marketplace (using the hub://
schema).
This tutorial uses these functions:
feature_selection
— the first function, which determines the top features to be used for training.train
— the model-training functionevaluate
— the model-testing functionmlrun-model
— the model-serving function
Note
set_function
uses the code_to_function
and import_function
methods under the hood (used in the previous notebooks), but in addition it saves the function configurations in the project spec for use in automated workflows and CI/CD.
Add the function definitions to the project along with parameters and data artifacts and save the project.
project.set_function("hub://feature_selection", "feature_selection")
project.set_function("hub://auto_trainer", "train")
project.set_function("hub://v2_model_server", "serving")
Names with underscore '_' are about to be deprecated, use dashes '-' instead. Replacing underscores with dashes.
<mlrun.runtimes.serving.ServingRuntime at 0x7f5701e79520>
# set project level parameters and save
project.spec.params = {"label_column": "label"}
project.save()
<mlrun.projects.project.MlrunProject at 0x7f5720229280>
When you save the project it stores the project definitions in the project.yaml
. This allows you to load the project
from the source control (GIT) and run it with a single command or API call.
The project YAML for this project can be printed using:
print(project.to_yaml())
kind: project
metadata:
name: fraud-demo-dani
created: '2023-02-15T14:40:29.807000'
spec:
params:
label_column: label
functions:
- url: hub://feature_selection
name: feature_selection
- url: hub://auto_trainer
name: train
- url: hub://v2_model_server
name: serving
workflows: []
artifacts: []
source: ''
desired_state: online
owner: dani
status:
state: online
Saving and loading projects from GIT#
After you saved your project and its elements (functions, workflows, artifacts, etc.) you can commit all your changes to a
GIT repository. This can be done using standard GIT tools or using MLRun project
methods such as pull
, push
,
remote
, which calls the Git API for you.
Projects can then be loaded from Git using MLRun load_project
method, for example:
project = mlrun.load_project("./myproj", "git://github.com/mlrun/project-demo.git", name=project_name)
or using MLRun CLI:
mlrun project -n myproj -u "git://github.com/mlrun/project-demo.git" ./myproj
Read CI/CD integration for more details.
Using Kubeflow pipelines#
You're now ready to create a full ML pipeline. This is done by using Kubeflow Pipelines — an open-source framework for building and deploying portable, scalable machine-learning workflows based on Docker containers. MLRun leverages this framework to take your existing code and deploy it as steps in the pipeline.
Step 3: Defining and saving a pipeline workflow#
A pipeline is created by running an MLRun "workflow". The following code defines a workflow and writes it to a file in your local directory. (The file name is workflow.py.) The workflow describes a directed acyclic graph (DAG) for execution using Kubeflow Pipelines, and depicts the connections between the functions and the data as part of an end-to-end pipeline. The workflow file has a definition of a pipeline DSL for connecting the function inputs and outputs.
The defined pipeline includes the following steps:
Perform feature selection (
feature_selection
).Train and the model (
train
).Test the model with its test data set (
evaluate
).Deploy the model as a real-time serverless function (
deploy
).
Note
A pipeline can also include continuous build integration and deployment (CI/CD) steps, such as building container images and deploying models.
%%writefile workflow.py
import mlrun
from kfp import dsl
from mlrun.model import HyperParamOptions
from mlrun import (
build_function,
deploy_function,
import_function,
run_function,
)
@dsl.pipeline(
name="Fraud Detection Pipeline",
description="Detecting fraud from a transactions dataset"
)
def kfpipeline(vector_name='transactions-fraud'):
project = mlrun.get_current_project()
# Feature selection
feature_selection = run_function(
"feature_selection",
name="feature_selection",
params={'output_vector_name': "short",
"label_column": project.get_param('label_column', 'label'),
"k": 18,
"min_votes": 2,
'ignore_type_errors': True
},
inputs={'df_artifact': project.get_artifact_uri(vector_name, 'feature-vector')},
outputs=['feature_scores', 'selected_features_count', 'top_features_vector', 'selected_features'])
# train with hyper-paremeters
train = run_function(
"train",
name="train",
handler="train",
params={"sample": -1,
"label_column": project.get_param('label_column', 'label'),
"test_size": 0.10},
hyperparams={"model_name": ['transaction_fraud_rf',
'transaction_fraud_xgboost',
'transaction_fraud_adaboost'],
'model_class': ["sklearn.ensemble.RandomForestClassifier",
"sklearn.linear_model.LogisticRegression",
"sklearn.ensemble.AdaBoostClassifier"]},
hyper_param_options=HyperParamOptions(selector="max.accuracy"),
inputs={"dataset": feature_selection.outputs['top_features_vector']},
outputs=['model', 'test_set'])
# test and visualize your model
test = run_function(
"train",
name="evaluate",
handler='evaluate',
params={"label_columns": project.get_param('label_column', 'label'),
"model": train.outputs["model"],
"drop_columns": project.get_param('label_column', 'label')},
inputs={"dataset": train.outputs["test_set"]})
# route your serving model to use enrichment
funcs['serving'].set_topology('router',
'mlrun.serving.routers.EnrichmentModelRouter',
name='EnrichmentModelRouter',
feature_vector_uri="transactions-fraud-short",
impute_policy={"*": "$mean"},
exist_ok=True)
# deploy your model as a serverless function, you can pass a list of models to serve
deploy = deploy_function("serving", models=[{"key": 'fraud', "model_path": train.outputs["model"]}])
Writing workflow.py
Step 4: Registering the workflow#
Use the set_workflow
MLRun project method to register your workflow with MLRun.
The following code sets the name
parameter to the selected workflow name ("main") and the code
parameter to the name of
the workflow file that is found in your project directory (workflow.py).
# Register the workflow file as "main"
project.set_workflow("main", "workflow.py")
Step 5: Running a pipeline#
First run the following code to save your project:
project.save()
<mlrun.projects.project.MlrunProject at 0x7f5720229280>
Use the run
MLRun project method to execute your workflow pipeline with Kubeflow Pipelines.
You can pass arguments
or set the artifact_path
to specify a unique path for storing the workflow artifacts.
run_id = project.run("main", arguments={}, dirty=True, watch=True)
Run Results
[info] Workflow 2e7556a2-c398-4134-8229-163bd7ee3ec3 finished, state=Succeeded
click the hyper links below to see detailed results
uid | start | state | name | parameters | results |
---|---|---|---|---|---|
Feb 15 14:53:55 | completed | evaluate | label_columns=label model=store://artifacts/fraud-demo-dani/transaction_fraud_adaboost:2e7556a2-c398-4134-8229-163bd7ee3ec3 drop_columns=label |
evaluation_accuracy=0.991504247876062 evaluation_f1_score=0.4137931034482759 evaluation_precision_score=0.42857142857142855 evaluation_recall_score=0.4 |
|
Feb 15 14:53:00 | completed | train | sample=-1 label_column=label test_size=0.1 |
best_iteration=9 accuracy=0.991504247876062 f1_score=0.4137931034482759 precision_score=0.42857142857142855 recall_score=0.4 |
|
Feb 15 14:52:23 | completed | feature_selection | output_vector_name=short label_column=label k=18 min_votes=2 ignore_type_errors=True |
top_features_vector=store://feature-vectors/fraud-demo-dani/short |
Step 6: Test the model endpoint#
Now that your model is deployed using the pipeline, you can invoke it as usual:
# Define your serving function
serving_fn = project.get_function("serving")
# Choose an id for your test
sample_id = "C1000148617"
model_inference_path = "/v2/models/fraud/infer"
# Send our sample ID for predcition
serving_fn.invoke(path=model_inference_path, body={"inputs": [[sample_id]]})
> 2023-02-15 14:56:50,310 [info] invoking function: {'method': 'POST', 'path': 'http://nuclio-fraud-demo-dani-serving.default-tenant.svc.cluster.local:8080/v2/models/fraud/infer'}
{'id': 'dbc3b94e-367d-4970-8825-f99ebf76320b',
'model_name': 'fraud',
'outputs': [0]}