Add MLOps to existing code#

This tutorial showcases how easy it is to apply MLRun on your existing code. With only 7 lines of code, you get:

  • Experiment tracking — Track every single run of your experiment to learn what yielded the best results.

  • Automatic Logging — Log datasets, metrics results and plots with one line of code. MLRun takes care for all the rest.

  • Parameterization — Enable running your code with different parameters, run hyperparameters tuning and get the most out of your code.

  • Resource management — Control the amount of resources available for your experiment.

Use this kaggle code by Sylas as an example, part of the competition New York City Taxi Fare Prediction.

Tutorial steps:

Get the data#

You can download the original data from kaggle. However, since the original data is 5.7GB in size, this demo uses sampled data. Since this demo uses MLRun’s DataItem to pass the datasets, the sampled data is downloaded automatically. However, if you want to look at the data, you can download it: training set, and testing set.

Code review#

Use the original code with the minimum changes required to apply MLRun to it. The code itself is straightforward:

  1. Read the training data and perform feature engineering on it to preprocess it for training.

  2. Train a LightGBM regression model using LightGBM’s train function.

  3. Read the testing data and save the contest expected submission file.

You can Download the script.py file[Download here], or copy / paste it from here:

Show code
import gc

import lightgbm as lgbm
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# [MLRun] Import MLRun:
import mlrun
from mlrun.frameworks.lgbm import apply_mlrun

# [MLRun] Get MLRun's context:
context = mlrun.get_or_create_ctx("apply-mlrun-tutorial")

# [MLRun] Reading train data from context instead of local file:
train_df = context.get_input("train_set", "./train.csv").as_df()
# train_df =  pd.read_csv('./train.csv')

# Drop rows with null values
train_df = train_df.dropna(how="any", axis="rows")


def clean_df(df):
    return df[
        (df.fare_amount > 0)
        & (df.fare_amount <= 500)
        &
        # (df.passenger_count >= 0) & (df.passenger_count <= 8)  &
        (
            (df.pickup_longitude != 0)
            & (df.pickup_latitude != 0)
            & (df.dropoff_longitude != 0)
            & (df.dropoff_latitude != 0)
        )
    ]


train_df = clean_df(train_df)


# To Compute Haversine distance
def sphere_dist(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
    """
    Return distance along great radius between pickup and dropoff coordinates.
    """
    # Define earth radius (km)
    R_earth = 6371
    # Convert degrees to radians
    pickup_lat, pickup_lon, dropoff_lat, dropoff_lon = map(
        np.radians, [pickup_lat, pickup_lon, dropoff_lat, dropoff_lon]
    )
    # Compute distances along lat, lon dimensions
    dlat = dropoff_lat - pickup_lat
    dlon = dropoff_lon - pickup_lon

    # Compute haversine distance
    a = (
        np.sin(dlat / 2.0) ** 2
        + np.cos(pickup_lat) * np.cos(dropoff_lat) * np.sin(dlon / 2.0) ** 2
    )
    return 2 * R_earth * np.arcsin(np.sqrt(a))


def sphere_dist_bear(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
    """
    Return distance along great radius between pickup and dropoff coordinates.
    """
    # Convert degrees to radians
    pickup_lat, pickup_lon, dropoff_lat, dropoff_lon = map(
        np.radians, [pickup_lat, pickup_lon, dropoff_lat, dropoff_lon]
    )
    # Compute distances along lat, lon dimensions
    dlon = pickup_lon - dropoff_lon

    # Compute bearing distance
    a = np.arctan2(
        np.sin(dlon * np.cos(dropoff_lat)),
        np.cos(pickup_lat) * np.sin(dropoff_lat)
        - np.sin(pickup_lat) * np.cos(dropoff_lat) * np.cos(dlon),
    )
    return a


def radian_conv(degree):
    """
    Return radian.
    """
    return np.radians(degree)


def add_airport_dist(dataset):
    """
    Return minumum distance from pickup or dropoff coordinates to each airport.
    JFK: John F. Kennedy International Airport
    EWR: Newark Liberty International Airport
    LGA: LaGuardia Airport
    SOL: Statue of Liberty
    NYC: Newyork Central
    """
    jfk_coord = (40.639722, -73.778889)
    ewr_coord = (40.6925, -74.168611)
    lga_coord = (40.77725, -73.872611)
    sol_coord = (40.6892, -74.0445)  # Statue of Liberty
    nyc_coord = (40.7141667, -74.0063889)

    pickup_lat = dataset["pickup_latitude"]
    dropoff_lat = dataset["dropoff_latitude"]
    pickup_lon = dataset["pickup_longitude"]
    dropoff_lon = dataset["dropoff_longitude"]

    pickup_jfk = sphere_dist(pickup_lat, pickup_lon, jfk_coord[0], jfk_coord[1])
    dropoff_jfk = sphere_dist(jfk_coord[0], jfk_coord[1], dropoff_lat, dropoff_lon)
    pickup_ewr = sphere_dist(pickup_lat, pickup_lon, ewr_coord[0], ewr_coord[1])
    dropoff_ewr = sphere_dist(ewr_coord[0], ewr_coord[1], dropoff_lat, dropoff_lon)
    pickup_lga = sphere_dist(pickup_lat, pickup_lon, lga_coord[0], lga_coord[1])
    dropoff_lga = sphere_dist(lga_coord[0], lga_coord[1], dropoff_lat, dropoff_lon)
    pickup_sol = sphere_dist(pickup_lat, pickup_lon, sol_coord[0], sol_coord[1])
    dropoff_sol = sphere_dist(sol_coord[0], sol_coord[1], dropoff_lat, dropoff_lon)
    pickup_nyc = sphere_dist(pickup_lat, pickup_lon, nyc_coord[0], nyc_coord[1])
    dropoff_nyc = sphere_dist(nyc_coord[0], nyc_coord[1], dropoff_lat, dropoff_lon)

    dataset["jfk_dist"] = pickup_jfk + dropoff_jfk
    dataset["ewr_dist"] = pickup_ewr + dropoff_ewr
    dataset["lga_dist"] = pickup_lga + dropoff_lga
    dataset["sol_dist"] = pickup_sol + dropoff_sol
    dataset["nyc_dist"] = pickup_nyc + dropoff_nyc

    return dataset


def add_datetime_info(dataset):
    # Convert to datetime format
    dataset["pickup_datetime"] = pd.to_datetime(
        dataset["pickup_datetime"], format="%Y-%m-%d %H:%M:%S UTC"
    )

    dataset["hour"] = dataset.pickup_datetime.dt.hour
    dataset["day"] = dataset.pickup_datetime.dt.day
    dataset["month"] = dataset.pickup_datetime.dt.month
    dataset["weekday"] = dataset.pickup_datetime.dt.weekday
    dataset["year"] = dataset.pickup_datetime.dt.year

    return dataset


train_df = add_datetime_info(train_df)
train_df = add_airport_dist(train_df)
train_df["distance"] = sphere_dist(
    train_df["pickup_latitude"],
    train_df["pickup_longitude"],
    train_df["dropoff_latitude"],
    train_df["dropoff_longitude"],
)

train_df["bearing"] = sphere_dist_bear(
    train_df["pickup_latitude"],
    train_df["pickup_longitude"],
    train_df["dropoff_latitude"],
    train_df["dropoff_longitude"],
)
train_df["pickup_latitude"] = radian_conv(train_df["pickup_latitude"])
train_df["pickup_longitude"] = radian_conv(train_df["pickup_longitude"])
train_df["dropoff_latitude"] = radian_conv(train_df["dropoff_latitude"])
train_df["dropoff_longitude"] = radian_conv(train_df["dropoff_longitude"])


train_df.drop(columns=["key", "pickup_datetime"], inplace=True)

y = train_df["fare_amount"]
train_df = train_df.drop(columns=["fare_amount"])


print(train_df.head())

x_train, x_test, y_train, y_test = train_test_split(
    train_df, y, random_state=123, test_size=0.10
)

del train_df
del y
gc.collect()

params = {
    "boosting_type": "gbdt",
    "objective": "regression",
    "nthread": 4,
    "num_leaves": 31,
    "learning_rate": 0.05,
    "max_depth": -1,
    "subsample": 0.8,
    "bagging_fraction": 1,
    "max_bin": 5000,
    "bagging_freq": 20,
    "colsample_bytree": 0.6,
    "metric": "rmse",
    "min_split_gain": 0.5,
    "min_child_weight": 1,
    "min_child_samples": 10,
    "scale_pos_weight": 1,
    "zero_as_missing": True,
    "seed": 0,
    "num_rounds": 50000,
}

train_set = lgbm.Dataset(
    x_train,
    y_train,
    silent=False,
    categorical_feature=["year", "month", "day", "weekday"],
)
valid_set = lgbm.Dataset(
    x_test,
    y_test,
    silent=False,
    categorical_feature=["year", "month", "day", "weekday"],
)

# [MLRun] Apply MLRun on the LightGBM module:
apply_mlrun(context=context)

model = lgbm.train(
    params,
    train_set=train_set,
    num_boost_round=10000,
    early_stopping_rounds=500,
    valid_sets=[valid_set],
)
del x_train
del y_train
del x_test
del y_test
gc.collect()

# [MLRun] Reading test data from context instead of local file:
test_df = context.get_input("test_set", "./test.csv").as_df()
# test_df =  pd.read_csv('./test.csv')
print(test_df.head())
test_df = add_datetime_info(test_df)
test_df = add_airport_dist(test_df)
test_df["distance"] = sphere_dist(
    test_df["pickup_latitude"],
    test_df["pickup_longitude"],
    test_df["dropoff_latitude"],
    test_df["dropoff_longitude"],
)

test_df["bearing"] = sphere_dist_bear(
    test_df["pickup_latitude"],
    test_df["pickup_longitude"],
    test_df["dropoff_latitude"],
    test_df["dropoff_longitude"],
)
test_df["pickup_latitude"] = radian_conv(test_df["pickup_latitude"])
test_df["pickup_longitude"] = radian_conv(test_df["pickup_longitude"])
test_df["dropoff_latitude"] = radian_conv(test_df["dropoff_latitude"])
test_df["dropoff_longitude"] = radian_conv(test_df["dropoff_longitude"])


test_key = test_df["key"]
test_df = test_df.drop(columns=["key", "pickup_datetime"])

# Predict from test set
prediction = model.predict(test_df, num_iteration=model.best_iteration)
submission = pd.DataFrame({"key": test_key, "fare_amount": prediction})

# [MLRun] Log the submission instead of saving it locally:
context.log_dataset(key="taxi_fare_submission", df=submission, format="csv")
# submission.to_csv('taxi_fare_submission.csv',index=False)

This demo focuses on reviewing the changes / additions made to the original code so that you can apply MLRun on top of it. Seven lines of code are added / replaced as you can see in the sections below:

Initialization#

Imports#

On lines 9-10, add 2 imports:

  • mlrun — Import MLRun of course.

  • apply_mlrun — Use the apply_mlrun function from MLRun’s frameworks, a sub-package for common ML/DL frameworks integrations with MLRun.

import mlrun
from mlrun.frameworks.lgbm import apply_mlrun

MLRun context#

To get parameters and inputs into the code, you need to get MLRun’s context. Use the function get_or_create_ctx.

Line 13:

context = mlrun.get_or_create_ctx("apply-mlrun-tutorial")

Get Training Set#

In the original code the training set was read from a local file. Now you want to get it from the user who runs the code. Use the context to get the "training_set" input by using the get_input method. To maintain the original logic, include the default path for when the training set was not provided by the user.

Line 16:

train_df = context.get_input("train_set", "./train.csv").as_df()  
# Instead of: `train_df =  pd.read_csv('./train.csv')`

Apply MLRun#

Now use the apply_mlrun function from MLRun’s LightGBM framework integration. MLRun automatically wraps the LightGBM module and enables automatic logging and evaluation.

Line 219:

apply_mlrun(context=context)

Logging the dataset#

Similar to the way you got the training set, you get the test dataset as an input from the MLRun content.

Line 235:

test_df = context.get_input("test_set", "./test.csv").as_df()
# Instead of: `test_df =  pd.read_csv('./test.csv')`

Save the submission#

Finally, instead of saving the result locally, log the submission to MLRun.

Line 267:

context.log_dataset(key="taxi_fare_submission", df=submission, format="csv")  
# Instead of: `submission.to_csv('taxi_fare_submission.csv',index=False)`

Run the script with MLRun#

Now you can run the script and see MLRun in action.

import mlrun

Create a project#

Create a project using the function get_or_create_project. To read more about MLRun projects, see Projects.

project = mlrun.get_or_create_project(name="apply-mlrun-tutorial", context="./", user_project=True)
> 2022-08-09 18:21:26,785 [info] loaded project apply-mlrun-tutorial from MLRun DB

Create a function#

Create an MLRun function using the function code_to_function. To read more about MLRun functions, see Serverless functions.

script_function = mlrun.code_to_function(
    filename="./src/script.py",
    name="apply-mlrun-tutorial-function",
    kind="job",
    image="mlrun/ml-models"
)
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f20a5dfe250>

Run the function#

Now you can run the function, providing it with the inputs you want. Use the datasets links to send them to the function. MLRun downloads and reads them into pd.DataFrame automatically.

script_run = script_function.run(
    inputs={
        "train_set": "https://s3.us-east-1.wasabisys.com/iguazio/data/nyc-taxi/train.csv",
        "test_set": "https://s3.us-east-1.wasabisys.com/iguazio/data/nyc-taxi/test.csv"
    },
)
> 2022-08-09 18:21:26,851 [info] starting run apply-mlrun-tutorial-function uid=8d82ef16a15d4151a16060c13b133170 DB=http://mlrun-api:8080
> 2022-08-09 18:21:27,017 [info] handler was not provided running main (./script.py)
> 2022-08-09 18:21:39,330 [info] logging run results to: http://mlrun-api:8080
   pickup_longitude  pickup_latitude  ...  distance   bearing
0         -1.288826         0.710721  ...  1.030764 -2.918897
1         -1.291824         0.710546  ...  8.450134 -0.375217
2         -1.291242         0.711418  ...  1.389525  2.599961
3         -1.291319         0.710927  ...  2.799270  0.133905
4         -1.290987         0.711536  ...  1.999157 -0.502703

[5 rows x 17 columns]
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Warning] Met categorical feature which contains sparse values. Consider renumbering to consecutive integers started from zero
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Warning] Auto-choosing col-wise multi-threading, the overhead of testing was 0.008352 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 55092
[LightGBM] [Info] Number of data points in the train set: 194071, number of used features: 17
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Info] Start training from score 11.335573
                           key  ... passenger_count
0  2015-01-27 13:08:24.0000002  ...               1
1  2015-01-27 13:08:24.0000003  ...               1
2  2011-10-08 11:53:44.0000002  ...               1
3  2012-12-01 21:12:12.0000002  ...               1
4  2012-12-01 21:12:12.0000003  ...               1

[5 rows x 7 columns]
project uid iter start state name labels inputs parameters results artifacts
apply-mlrun-tutorial-guyl
...3b133170
0 Aug 09 18:21:39 completed apply-mlrun-tutorial-function
v3io_user=guyl
kind=
owner=guyl
host=jupyter-guyl-66857b7999-xnncv
train_set
test_set
valid_0_rmse=3.905279481685527
valid_0_rmse_plot
valid_0-feature-importance
valid_0
model
taxi_fare_submission
> 2022-08-09 18:22:02,987 [info] run executed, status=completed

Review outputs#

To view the outputs yielded by the MLRun automatic logging and evaluation, call the outputs property on the run object:

script_run.outputs
{'valid_0_rmse': 3.905279481685527,
 'valid_0_rmse_plot': 'v3io:///projects/apply-mlrun-tutorial-guyl/artifacts/apply-mlrun-tutorial-function/0/valid_0_rmse_plot.html',
 'valid_0-feature-importance': 'v3io:///projects/apply-mlrun-tutorial-guyl/artifacts/apply-mlrun-tutorial-function/0/valid_0-feature-importance.html',
 'valid_0': 'store://artifacts/apply-mlrun-tutorial-guyl/apply-mlrun-tutorial-function_valid_0:8d82ef16a15d4151a16060c13b133170',
 'model': 'store://artifacts/apply-mlrun-tutorial-guyl/model:8d82ef16a15d4151a16060c13b133170',
 'taxi_fare_submission': 'store://artifacts/apply-mlrun-tutorial-guyl/apply-mlrun-tutorial-function_taxi_fare_submission:8d82ef16a15d4151a16060c13b133170'}

MLRun automatically detects all the metrics calculated and collects the data along with the training. Here there was one validation set named valid_0 and the RMSE metric was calculated on it. You can see the RMSE values per iteration plot and the final score including the features importance plot.

You can explore the different artifacts by calling the artifact function like this:

script_run.artifact('valid_0_rmse_plot').show()
script_run.artifact('valid_0-feature-importance').show()

And of course, you can also see the submission that was logged:

script_run.artifact('taxi_fare_submission').show()
key fare_amount
0 2015-01-27 13:08:24.0000002 10.281408
1 2015-01-27 13:08:24.0000003 11.019641
2 2011-10-08 11:53:44.0000002 4.898061
3 2012-12-01 21:12:12.0000002 7.758042
4 2012-12-01 21:12:12.0000003 15.298775
... ... ...
9909 2015-05-10 12:37:51.0000002 9.117569
9910 2015-01-12 17:05:51.0000001 10.850885
9911 2015-04-19 20:44:15.0000001 55.048856
9912 2015-01-31 01:05:19.0000005 20.110280
9913 2015-01-18 14:06:23.0000006 7.081041

9914 rows × 2 columns