Add MLOps to existing code
Contents
Add MLOps to existing code#
This tutorial showcases how easy it is to apply MLRun on your existing code. With only 7 lines of code, you get:
Experiment tracking — Track every single run of your experiment to learn what yielded the best results.
Automatic Logging — Log datasets, metrics results and plots with one line of code. MLRun takes care for all the rest.
Parameterization — Enable running your code with different parameters, run hyperparameters tuning and get the most out of your code.
Resource management — Control the amount of resources available for your experiment.
Use this kaggle code by Sylas as an example, part of the competition New York City Taxi Fare Prediction.
Tutorial steps:
Get the data#
You can download the original data from kaggle. However, since the original data is 5.7GB in size, this demo uses sampled data. Since this demo uses MLRun’s DataItem
to pass the datasets, the sampled data is downloaded automatically. However, if you want to look at the data, you can download it: training set, and testing set.
Code review#
Use the original code with the minimum changes required to apply MLRun to it. The code itself is straightforward:
Read the training data and perform feature engineering on it to preprocess it for training.
Train a LightGBM regression model using LightGBM’s
train
function.Read the testing data and save the contest expected submission file.
You can Download the script.py
file[Download here]
, or copy / paste it from here:
Show code
import gc
import lightgbm as lgbm
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
# [MLRun] Import MLRun:
import mlrun
from mlrun.frameworks.lgbm import apply_mlrun
# [MLRun] Get MLRun's context:
context = mlrun.get_or_create_ctx("apply-mlrun-tutorial")
# [MLRun] Reading train data from context instead of local file:
train_df = context.get_input("train_set", "./train.csv").as_df()
# train_df = pd.read_csv('./train.csv')
# Drop rows with null values
train_df = train_df.dropna(how="any", axis="rows")
def clean_df(df):
return df[
(df.fare_amount > 0)
& (df.fare_amount <= 500)
&
# (df.passenger_count >= 0) & (df.passenger_count <= 8) &
(
(df.pickup_longitude != 0)
& (df.pickup_latitude != 0)
& (df.dropoff_longitude != 0)
& (df.dropoff_latitude != 0)
)
]
train_df = clean_df(train_df)
# To Compute Haversine distance
def sphere_dist(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
"""
Return distance along great radius between pickup and dropoff coordinates.
"""
# Define earth radius (km)
R_earth = 6371
# Convert degrees to radians
pickup_lat, pickup_lon, dropoff_lat, dropoff_lon = map(
np.radians, [pickup_lat, pickup_lon, dropoff_lat, dropoff_lon]
)
# Compute distances along lat, lon dimensions
dlat = dropoff_lat - pickup_lat
dlon = dropoff_lon - pickup_lon
# Compute haversine distance
a = (
np.sin(dlat / 2.0) ** 2
+ np.cos(pickup_lat) * np.cos(dropoff_lat) * np.sin(dlon / 2.0) ** 2
)
return 2 * R_earth * np.arcsin(np.sqrt(a))
def sphere_dist_bear(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
"""
Return distance along great radius between pickup and dropoff coordinates.
"""
# Convert degrees to radians
pickup_lat, pickup_lon, dropoff_lat, dropoff_lon = map(
np.radians, [pickup_lat, pickup_lon, dropoff_lat, dropoff_lon]
)
# Compute distances along lat, lon dimensions
dlon = pickup_lon - dropoff_lon
# Compute bearing distance
a = np.arctan2(
np.sin(dlon * np.cos(dropoff_lat)),
np.cos(pickup_lat) * np.sin(dropoff_lat)
- np.sin(pickup_lat) * np.cos(dropoff_lat) * np.cos(dlon),
)
return a
def radian_conv(degree):
"""
Return radian.
"""
return np.radians(degree)
def add_airport_dist(dataset):
"""
Return minumum distance from pickup or dropoff coordinates to each airport.
JFK: John F. Kennedy International Airport
EWR: Newark Liberty International Airport
LGA: LaGuardia Airport
SOL: Statue of Liberty
NYC: Newyork Central
"""
jfk_coord = (40.639722, -73.778889)
ewr_coord = (40.6925, -74.168611)
lga_coord = (40.77725, -73.872611)
sol_coord = (40.6892, -74.0445) # Statue of Liberty
nyc_coord = (40.7141667, -74.0063889)
pickup_lat = dataset["pickup_latitude"]
dropoff_lat = dataset["dropoff_latitude"]
pickup_lon = dataset["pickup_longitude"]
dropoff_lon = dataset["dropoff_longitude"]
pickup_jfk = sphere_dist(pickup_lat, pickup_lon, jfk_coord[0], jfk_coord[1])
dropoff_jfk = sphere_dist(jfk_coord[0], jfk_coord[1], dropoff_lat, dropoff_lon)
pickup_ewr = sphere_dist(pickup_lat, pickup_lon, ewr_coord[0], ewr_coord[1])
dropoff_ewr = sphere_dist(ewr_coord[0], ewr_coord[1], dropoff_lat, dropoff_lon)
pickup_lga = sphere_dist(pickup_lat, pickup_lon, lga_coord[0], lga_coord[1])
dropoff_lga = sphere_dist(lga_coord[0], lga_coord[1], dropoff_lat, dropoff_lon)
pickup_sol = sphere_dist(pickup_lat, pickup_lon, sol_coord[0], sol_coord[1])
dropoff_sol = sphere_dist(sol_coord[0], sol_coord[1], dropoff_lat, dropoff_lon)
pickup_nyc = sphere_dist(pickup_lat, pickup_lon, nyc_coord[0], nyc_coord[1])
dropoff_nyc = sphere_dist(nyc_coord[0], nyc_coord[1], dropoff_lat, dropoff_lon)
dataset["jfk_dist"] = pickup_jfk + dropoff_jfk
dataset["ewr_dist"] = pickup_ewr + dropoff_ewr
dataset["lga_dist"] = pickup_lga + dropoff_lga
dataset["sol_dist"] = pickup_sol + dropoff_sol
dataset["nyc_dist"] = pickup_nyc + dropoff_nyc
return dataset
def add_datetime_info(dataset):
# Convert to datetime format
dataset["pickup_datetime"] = pd.to_datetime(
dataset["pickup_datetime"], format="%Y-%m-%d %H:%M:%S UTC"
)
dataset["hour"] = dataset.pickup_datetime.dt.hour
dataset["day"] = dataset.pickup_datetime.dt.day
dataset["month"] = dataset.pickup_datetime.dt.month
dataset["weekday"] = dataset.pickup_datetime.dt.weekday
dataset["year"] = dataset.pickup_datetime.dt.year
return dataset
train_df = add_datetime_info(train_df)
train_df = add_airport_dist(train_df)
train_df["distance"] = sphere_dist(
train_df["pickup_latitude"],
train_df["pickup_longitude"],
train_df["dropoff_latitude"],
train_df["dropoff_longitude"],
)
train_df["bearing"] = sphere_dist_bear(
train_df["pickup_latitude"],
train_df["pickup_longitude"],
train_df["dropoff_latitude"],
train_df["dropoff_longitude"],
)
train_df["pickup_latitude"] = radian_conv(train_df["pickup_latitude"])
train_df["pickup_longitude"] = radian_conv(train_df["pickup_longitude"])
train_df["dropoff_latitude"] = radian_conv(train_df["dropoff_latitude"])
train_df["dropoff_longitude"] = radian_conv(train_df["dropoff_longitude"])
train_df.drop(columns=["key", "pickup_datetime"], inplace=True)
y = train_df["fare_amount"]
train_df = train_df.drop(columns=["fare_amount"])
print(train_df.head())
x_train, x_test, y_train, y_test = train_test_split(
train_df, y, random_state=123, test_size=0.10
)
del train_df
del y
gc.collect()
params = {
"boosting_type": "gbdt",
"objective": "regression",
"nthread": 4,
"num_leaves": 31,
"learning_rate": 0.05,
"max_depth": -1,
"subsample": 0.8,
"bagging_fraction": 1,
"max_bin": 5000,
"bagging_freq": 20,
"colsample_bytree": 0.6,
"metric": "rmse",
"min_split_gain": 0.5,
"min_child_weight": 1,
"min_child_samples": 10,
"scale_pos_weight": 1,
"zero_as_missing": True,
"seed": 0,
"num_rounds": 50000,
}
train_set = lgbm.Dataset(
x_train,
y_train,
silent=False,
categorical_feature=["year", "month", "day", "weekday"],
)
valid_set = lgbm.Dataset(
x_test,
y_test,
silent=False,
categorical_feature=["year", "month", "day", "weekday"],
)
# [MLRun] Apply MLRun on the LightGBM module:
apply_mlrun(context=context)
model = lgbm.train(
params,
train_set=train_set,
num_boost_round=10000,
early_stopping_rounds=500,
valid_sets=[valid_set],
)
del x_train
del y_train
del x_test
del y_test
gc.collect()
# [MLRun] Reading test data from context instead of local file:
test_df = context.get_input("test_set", "./test.csv").as_df()
# test_df = pd.read_csv('./test.csv')
print(test_df.head())
test_df = add_datetime_info(test_df)
test_df = add_airport_dist(test_df)
test_df["distance"] = sphere_dist(
test_df["pickup_latitude"],
test_df["pickup_longitude"],
test_df["dropoff_latitude"],
test_df["dropoff_longitude"],
)
test_df["bearing"] = sphere_dist_bear(
test_df["pickup_latitude"],
test_df["pickup_longitude"],
test_df["dropoff_latitude"],
test_df["dropoff_longitude"],
)
test_df["pickup_latitude"] = radian_conv(test_df["pickup_latitude"])
test_df["pickup_longitude"] = radian_conv(test_df["pickup_longitude"])
test_df["dropoff_latitude"] = radian_conv(test_df["dropoff_latitude"])
test_df["dropoff_longitude"] = radian_conv(test_df["dropoff_longitude"])
test_key = test_df["key"]
test_df = test_df.drop(columns=["key", "pickup_datetime"])
# Predict from test set
prediction = model.predict(test_df, num_iteration=model.best_iteration)
submission = pd.DataFrame({"key": test_key, "fare_amount": prediction})
# [MLRun] Log the submission instead of saving it locally:
context.log_dataset(key="taxi_fare_submission", df=submission, format="csv")
# submission.to_csv('taxi_fare_submission.csv',index=False)
This demo focuses on reviewing the changes / additions made to the original code so that you can apply MLRun on top of it. Seven lines of code are added / replaced as you can see in the sections below:
Initialization#
Imports#
On lines 9-10, add 2 imports:
mlrun
— Import MLRun of course.apply_mlrun
— Use theapply_mlrun
function from MLRun’s frameworks, a sub-package for common ML/DL frameworks integrations with MLRun.
import mlrun
from mlrun.frameworks.lgbm import apply_mlrun
MLRun context#
To get parameters and inputs into the code, you need to get MLRun’s context. Use the function get_or_create_ctx
.
Line 13:
context = mlrun.get_or_create_ctx("apply-mlrun-tutorial")
Get Training Set#
In the original code the training set was read from a local file. Now you want to get it from the user who runs the code.
Use the context to get the "training_set"
input by using the get_input
method. To maintain the original logic, include the default path for when the training set was not provided by the user.
Line 16:
train_df = context.get_input("train_set", "./train.csv").as_df()
# Instead of: `train_df = pd.read_csv('./train.csv')`
Apply MLRun#
Now use the apply_mlrun
function from MLRun’s LightGBM framework integration. MLRun automatically wraps the LightGBM module and enables automatic logging and evaluation.
Line 219:
apply_mlrun(context=context)
Logging the dataset#
Similar to the way you got the training set, you get the test dataset as an input from the MLRun content.
Line 235:
test_df = context.get_input("test_set", "./test.csv").as_df()
# Instead of: `test_df = pd.read_csv('./test.csv')`
Save the submission#
Finally, instead of saving the result locally, log the submission to MLRun.
Line 267:
context.log_dataset(key="taxi_fare_submission", df=submission, format="csv")
# Instead of: `submission.to_csv('taxi_fare_submission.csv',index=False)`
Run the script with MLRun#
Now you can run the script and see MLRun in action.
import mlrun
Create a project#
Create a project using the function get_or_create_project
. To read more about MLRun projects, see Projects.
project = mlrun.get_or_create_project(name="apply-mlrun-tutorial", context="./", user_project=True)
> 2022-08-09 18:21:26,785 [info] loaded project apply-mlrun-tutorial from MLRun DB
Create a function#
Create an MLRun function using the function code_to_function
. To read more about MLRun functions, see Serverless functions.
script_function = mlrun.code_to_function(
filename="./src/script.py",
name="apply-mlrun-tutorial-function",
kind="job",
image="mlrun/ml-models"
)
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f20a5dfe250>
Run the function#
Now you can run the function, providing it with the inputs you want. Use the datasets links to send them to the function. MLRun downloads and reads them into pd.DataFrame
automatically.
script_run = script_function.run(
inputs={
"train_set": "https://s3.us-east-1.wasabisys.com/iguazio/data/nyc-taxi/train.csv",
"test_set": "https://s3.us-east-1.wasabisys.com/iguazio/data/nyc-taxi/test.csv"
},
)
> 2022-08-09 18:21:26,851 [info] starting run apply-mlrun-tutorial-function uid=8d82ef16a15d4151a16060c13b133170 DB=http://mlrun-api:8080
> 2022-08-09 18:21:27,017 [info] handler was not provided running main (./script.py)
> 2022-08-09 18:21:39,330 [info] logging run results to: http://mlrun-api:8080
pickup_longitude pickup_latitude ... distance bearing
0 -1.288826 0.710721 ... 1.030764 -2.918897
1 -1.291824 0.710546 ... 8.450134 -0.375217
2 -1.291242 0.711418 ... 1.389525 2.599961
3 -1.291319 0.710927 ... 2.799270 0.133905
4 -1.290987 0.711536 ... 1.999157 -0.502703
[5 rows x 17 columns]
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Warning] Met categorical feature which contains sparse values. Consider renumbering to consecutive integers started from zero
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Warning] Auto-choosing col-wise multi-threading, the overhead of testing was 0.008352 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 55092
[LightGBM] [Info] Number of data points in the train set: 194071, number of used features: 17
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Info] Start training from score 11.335573
key ... passenger_count
0 2015-01-27 13:08:24.0000002 ... 1
1 2015-01-27 13:08:24.0000003 ... 1
2 2011-10-08 11:53:44.0000002 ... 1
3 2012-12-01 21:12:12.0000002 ... 1
4 2012-12-01 21:12:12.0000003 ... 1
[5 rows x 7 columns]
project | uid | iter | start | state | name | labels | inputs | parameters | results | artifacts |
---|---|---|---|---|---|---|---|---|---|---|
apply-mlrun-tutorial-guyl | ...3b133170 |
0 | Aug 09 18:21:39 | completed | apply-mlrun-tutorial-function | v3io_user=guyl kind= owner=guyl host=jupyter-guyl-66857b7999-xnncv |
train_set test_set |
valid_0_rmse=3.905279481685527 |
valid_0_rmse_plot valid_0-feature-importance valid_0 model taxi_fare_submission |
> 2022-08-09 18:22:02,987 [info] run executed, status=completed
Review outputs#
To view the outputs yielded by the MLRun automatic logging and evaluation, call the outputs
property on the run object:
script_run.outputs
{'valid_0_rmse': 3.905279481685527,
'valid_0_rmse_plot': 'v3io:///projects/apply-mlrun-tutorial-guyl/artifacts/apply-mlrun-tutorial-function/0/valid_0_rmse_plot.html',
'valid_0-feature-importance': 'v3io:///projects/apply-mlrun-tutorial-guyl/artifacts/apply-mlrun-tutorial-function/0/valid_0-feature-importance.html',
'valid_0': 'store://artifacts/apply-mlrun-tutorial-guyl/apply-mlrun-tutorial-function_valid_0:8d82ef16a15d4151a16060c13b133170',
'model': 'store://artifacts/apply-mlrun-tutorial-guyl/model:8d82ef16a15d4151a16060c13b133170',
'taxi_fare_submission': 'store://artifacts/apply-mlrun-tutorial-guyl/apply-mlrun-tutorial-function_taxi_fare_submission:8d82ef16a15d4151a16060c13b133170'}
MLRun automatically detects all the metrics calculated and collects the data along with the training. Here there was one validation set named valid_0
and the RMSE metric was calculated on it. You can see the RMSE values per iteration plot and the final score including the features importance plot.
You can explore the different artifacts by calling the artifact
function like this:
script_run.artifact('valid_0_rmse_plot').show()