How to Apply MLRun on Existing Code?#

In this tutorial we will showcase how easy it is to apply MLRun on your existing code. With only 7 lines of code, you will get:

  • Experiment tracking - Track every single run of your experiment to learn what yielded the best results.

  • Automatic Logging - Log datasets, metrics results and plots with one line of code. MLRun will take care for all the rest.

  • Parameterization - Enable running your code with different parameters, run hyperparameters tuning and get the most out of your code.

  • Resource management - Control the amount of resources available for your experiment.

We will use this kaggle code by Sylas as an example, part of the competition New York City Taxi Fare Prediction.

1. Get the Data#

You may download the original data from kaggle, but as it is 5.7GB in size, we sampled it for demonstration purposes. To check our sampled data, you may download it from here: training set, testing set

Keep in mind that because we will use MLRun’s DataItem to pass the datasets, it will be downloaded automatically, so download them only if you wish to look inside.

2. Code Review#

We will use the original code with the most minimum changes required to apply MLRun to it. The code itself is stragiht forward:

  1. Read the training data and perform feature engineering on it to preprocess it for training.

  2. Train a LightGBM regression model using LightGBM’s train function.

  3. Read the testing data and save the contest expected submission file.

You can Download the script.py file, or copy paste it from here:

Show code
import gc

import lightgbm as lgbm
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# [MLRun] Import MLRun:
import mlrun
from mlrun.frameworks.lgbm import apply_mlrun

# [MLRun] Get MLRun's context:
context = mlrun.get_or_create_ctx("apply-mlrun-tutorial")

# [MLRun] Reading train data from context instead of local file:
train_df = context.get_input("train_set", "./train.csv").as_df()
# train_df =  pd.read_csv('./train.csv')

# Drop rows with null values
train_df = train_df.dropna(how="any", axis="rows")


def clean_df(df):
    return df[
        (df.fare_amount > 0)
        & (df.fare_amount <= 500)
        &
        # (df.passenger_count >= 0) & (df.passenger_count <= 8)  &
        (
            (df.pickup_longitude != 0)
            & (df.pickup_latitude != 0)
            & (df.dropoff_longitude != 0)
            & (df.dropoff_latitude != 0)
        )
    ]


train_df = clean_df(train_df)


# To Compute Haversine distance
def sphere_dist(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
    """
    Return distance along great radius between pickup and dropoff coordinates.
    """
    # Define earth radius (km)
    R_earth = 6371
    # Convert degrees to radians
    pickup_lat, pickup_lon, dropoff_lat, dropoff_lon = map(
        np.radians, [pickup_lat, pickup_lon, dropoff_lat, dropoff_lon]
    )
    # Compute distances along lat, lon dimensions
    dlat = dropoff_lat - pickup_lat
    dlon = dropoff_lon - pickup_lon

    # Compute haversine distance
    a = (
        np.sin(dlat / 2.0) ** 2
        + np.cos(pickup_lat) * np.cos(dropoff_lat) * np.sin(dlon / 2.0) ** 2
    )
    return 2 * R_earth * np.arcsin(np.sqrt(a))


def sphere_dist_bear(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
    """
    Return distance along great radius between pickup and dropoff coordinates.
    """
    # Convert degrees to radians
    pickup_lat, pickup_lon, dropoff_lat, dropoff_lon = map(
        np.radians, [pickup_lat, pickup_lon, dropoff_lat, dropoff_lon]
    )
    # Compute distances along lat, lon dimensions
    dlon = pickup_lon - dropoff_lon

    # Compute bearing distance
    a = np.arctan2(
        np.sin(dlon * np.cos(dropoff_lat)),
        np.cos(pickup_lat) * np.sin(dropoff_lat)
        - np.sin(pickup_lat) * np.cos(dropoff_lat) * np.cos(dlon),
    )
    return a


def radian_conv(degree):
    """
    Return radian.
    """
    return np.radians(degree)


def add_airport_dist(dataset):
    """
    Return minumum distance from pickup or dropoff coordinates to each airport.
    JFK: John F. Kennedy International Airport
    EWR: Newark Liberty International Airport
    LGA: LaGuardia Airport
    SOL: Statue of Liberty
    NYC: Newyork Central
    """
    jfk_coord = (40.639722, -73.778889)
    ewr_coord = (40.6925, -74.168611)
    lga_coord = (40.77725, -73.872611)
    sol_coord = (40.6892, -74.0445)  # Statue of Liberty
    nyc_coord = (40.7141667, -74.0063889)

    pickup_lat = dataset["pickup_latitude"]
    dropoff_lat = dataset["dropoff_latitude"]
    pickup_lon = dataset["pickup_longitude"]
    dropoff_lon = dataset["dropoff_longitude"]

    pickup_jfk = sphere_dist(pickup_lat, pickup_lon, jfk_coord[0], jfk_coord[1])
    dropoff_jfk = sphere_dist(jfk_coord[0], jfk_coord[1], dropoff_lat, dropoff_lon)
    pickup_ewr = sphere_dist(pickup_lat, pickup_lon, ewr_coord[0], ewr_coord[1])
    dropoff_ewr = sphere_dist(ewr_coord[0], ewr_coord[1], dropoff_lat, dropoff_lon)
    pickup_lga = sphere_dist(pickup_lat, pickup_lon, lga_coord[0], lga_coord[1])
    dropoff_lga = sphere_dist(lga_coord[0], lga_coord[1], dropoff_lat, dropoff_lon)
    pickup_sol = sphere_dist(pickup_lat, pickup_lon, sol_coord[0], sol_coord[1])
    dropoff_sol = sphere_dist(sol_coord[0], sol_coord[1], dropoff_lat, dropoff_lon)
    pickup_nyc = sphere_dist(pickup_lat, pickup_lon, nyc_coord[0], nyc_coord[1])
    dropoff_nyc = sphere_dist(nyc_coord[0], nyc_coord[1], dropoff_lat, dropoff_lon)

    dataset["jfk_dist"] = pickup_jfk + dropoff_jfk
    dataset["ewr_dist"] = pickup_ewr + dropoff_ewr
    dataset["lga_dist"] = pickup_lga + dropoff_lga
    dataset["sol_dist"] = pickup_sol + dropoff_sol
    dataset["nyc_dist"] = pickup_nyc + dropoff_nyc

    return dataset


def add_datetime_info(dataset):
    # Convert to datetime format
    dataset["pickup_datetime"] = pd.to_datetime(
        dataset["pickup_datetime"], format="%Y-%m-%d %H:%M:%S UTC"
    )

    dataset["hour"] = dataset.pickup_datetime.dt.hour
    dataset["day"] = dataset.pickup_datetime.dt.day
    dataset["month"] = dataset.pickup_datetime.dt.month
    dataset["weekday"] = dataset.pickup_datetime.dt.weekday
    dataset["year"] = dataset.pickup_datetime.dt.year

    return dataset


train_df = add_datetime_info(train_df)
train_df = add_airport_dist(train_df)
train_df["distance"] = sphere_dist(
    train_df["pickup_latitude"],
    train_df["pickup_longitude"],
    train_df["dropoff_latitude"],
    train_df["dropoff_longitude"],
)

train_df["bearing"] = sphere_dist_bear(
    train_df["pickup_latitude"],
    train_df["pickup_longitude"],
    train_df["dropoff_latitude"],
    train_df["dropoff_longitude"],
)
train_df["pickup_latitude"] = radian_conv(train_df["pickup_latitude"])
train_df["pickup_longitude"] = radian_conv(train_df["pickup_longitude"])
train_df["dropoff_latitude"] = radian_conv(train_df["dropoff_latitude"])
train_df["dropoff_longitude"] = radian_conv(train_df["dropoff_longitude"])


train_df.drop(columns=["key", "pickup_datetime"], inplace=True)

y = train_df["fare_amount"]
train_df = train_df.drop(columns=["fare_amount"])


print(train_df.head())

x_train, x_test, y_train, y_test = train_test_split(
    train_df, y, random_state=123, test_size=0.10
)

del train_df
del y
gc.collect()

params = {
    "boosting_type": "gbdt",
    "objective": "regression",
    "nthread": 4,
    "num_leaves": 31,
    "learning_rate": 0.05,
    "max_depth": -1,
    "subsample": 0.8,
    "bagging_fraction": 1,
    "max_bin": 5000,
    "bagging_freq": 20,
    "colsample_bytree": 0.6,
    "metric": "rmse",
    "min_split_gain": 0.5,
    "min_child_weight": 1,
    "min_child_samples": 10,
    "scale_pos_weight": 1,
    "zero_as_missing": True,
    "seed": 0,
    "num_rounds": 50000,
}

train_set = lgbm.Dataset(
    x_train,
    y_train,
    silent=False,
    categorical_feature=["year", "month", "day", "weekday"],
)
valid_set = lgbm.Dataset(
    x_test,
    y_test,
    silent=False,
    categorical_feature=["year", "month", "day", "weekday"],
)

# [MLRun] Apply MLRun on the LightGBM module:
apply_mlrun(context=context)

model = lgbm.train(
    params,
    train_set=train_set,
    num_boost_round=10000,
    early_stopping_rounds=500,
    valid_sets=[valid_set],
)
del x_train
del y_train
del x_test
del y_test
gc.collect()

# [MLRun] Reading test data from context instead of local file:
test_df = context.get_input("test_set", "./test.csv").as_df()
# test_df =  pd.read_csv('./test.csv')
print(test_df.head())
test_df = add_datetime_info(test_df)
test_df = add_airport_dist(test_df)
test_df["distance"] = sphere_dist(
    test_df["pickup_latitude"],
    test_df["pickup_longitude"],
    test_df["dropoff_latitude"],
    test_df["dropoff_longitude"],
)

test_df["bearing"] = sphere_dist_bear(
    test_df["pickup_latitude"],
    test_df["pickup_longitude"],
    test_df["dropoff_latitude"],
    test_df["dropoff_longitude"],
)
test_df["pickup_latitude"] = radian_conv(test_df["pickup_latitude"])
test_df["pickup_longitude"] = radian_conv(test_df["pickup_longitude"])
test_df["dropoff_latitude"] = radian_conv(test_df["dropoff_latitude"])
test_df["dropoff_longitude"] = radian_conv(test_df["dropoff_longitude"])


test_key = test_df["key"]
test_df = test_df.drop(columns=["key", "pickup_datetime"])

# Predict from test set
prediction = model.predict(test_df, num_iteration=model.best_iteration)
submission = pd.DataFrame({"key": test_key, "fare_amount": prediction})

# [MLRun] Log the submission instead of saving it locally:
context.log_dataset(key="taxi_fare_submission", df=submission, format="csv")
# submission.to_csv('taxi_fare_submission.csv',index=False)

We will focus on reviewing the changes / additions made to the original code to apply MLRun on top of it. We added / replaced 7 lines of code you can see in the tabs below:

Initialization#

Imports#

On lines 9-10, we added 2 imports:

  • mlrun - Import MLRun of course.

  • apply_mlrun - We’ll use the apply_mlrun function from MLRun’s frameworks, a sub-package for common ML/DL frameworks integrations with MLRun.

import mlrun
from mlrun.frameworks.lgbm import apply_mlrun

MLRun Context#

To get parameters and inputs into the code, we need to get MLRun’s context. We can do so by using the function get_or_create_ctx.

Line 13:

context = mlrun.get_or_create_ctx("apply-mlrun-tutorial")

Get Training Set#

In the original code the training set was read from local file. Now we wish to get it from the user who will run the code. We’ll use the context to get the "training_set" input using the get_input method. To not change the original logic, we also included default path for when the training set was not provided by the user.

Line 16:

train_df = context.get_input("train_set", "./train.csv").as_df()  
# Instead of: `train_df =  pd.read_csv('./train.csv')`

Apply MLRun#

We’ll now use the apply_mlrun function from MLRun’s LightGBM framework integration. MLRun will automatically wrap the LightGBM module and enable automatic logging and evaluation.

Line 219:

apply_mlrun(context=context)

Logging the dataset#

Similar to the way we got the training set, we will get the test dataset as an input from the MLRun conte

Line 235:

test_df = context.get_input("test_set", "./test.csv").as_df()
# Instead of: `test_df =  pd.read_csv('./test.csv')`

Save the Submission#

Finally, instead of saving the result locally, we will log the submission to MLRun.

Line 267:

context.log_dataset(key="taxi_fare_submission", df=submission, format="csv")  
# Instead of: `submission.to_csv('taxi_fare_submission.csv',index=False)`

3. Run the Script with MLRun#

We will now run the script and see MLRun in action.

import mlrun

3.1. Create a Project#

We will create a project using the function get_or_create_project. To read more about MLRun projects, click here

project = mlrun.get_or_create_project(name="apply-mlrun-tutorial", context="./", user_project=True)
> 2022-08-09 18:21:26,785 [info] loaded project apply-mlrun-tutorial from MLRun DB

3.2. Create a Function#

We will create an MLRun function using the function code_to_function. To read more about MLRun functions, click here

script_function = mlrun.code_to_function(
    filename="./script.py",
    name="apply-mlrun-tutorial-function",
    kind="job",
    image="mlrun/ml-models"
)
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f20a5dfe250>

3.3. Run the Function#

Now we can run the function, providing it with the inputs we desire. We will use the datasets links to send them over to the function. MLRun will download and read them into pd.DataFrame automatically.

script_run = script_function.run(
    inputs={
        "train_set": "https://s3.us-east-1.wasabisys.com/iguazio/data/nyc-taxi/train.csv",
        "test_set": "https://s3.us-east-1.wasabisys.com/iguazio/data/nyc-taxi/test.csv"
    },
)
> 2022-08-09 18:21:26,851 [info] starting run apply-mlrun-tutorial-function uid=8d82ef16a15d4151a16060c13b133170 DB=http://mlrun-api:8080
> 2022-08-09 18:21:27,017 [info] handler was not provided running main (./script.py)
> 2022-08-09 18:21:39,330 [info] logging run results to: http://mlrun-api:8080
   pickup_longitude  pickup_latitude  ...  distance   bearing
0         -1.288826         0.710721  ...  1.030764 -2.918897
1         -1.291824         0.710546  ...  8.450134 -0.375217
2         -1.291242         0.711418  ...  1.389525  2.599961
3         -1.291319         0.710927  ...  2.799270  0.133905
4         -1.290987         0.711536  ...  1.999157 -0.502703

[5 rows x 17 columns]
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Warning] Met categorical feature which contains sparse values. Consider renumbering to consecutive integers started from zero
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Warning] Auto-choosing col-wise multi-threading, the overhead of testing was 0.008352 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 55092
[LightGBM] [Info] Number of data points in the train set: 194071, number of used features: 17
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Info] Start training from score 11.335573
                           key  ... passenger_count
0  2015-01-27 13:08:24.0000002  ...               1
1  2015-01-27 13:08:24.0000003  ...               1
2  2011-10-08 11:53:44.0000002  ...               1
3  2012-12-01 21:12:12.0000002  ...               1
4  2012-12-01 21:12:12.0000003  ...               1

[5 rows x 7 columns]
project uid iter start state name labels inputs parameters results artifacts
apply-mlrun-tutorial-guyl
...3b133170
0 Aug 09 18:21:39 completed apply-mlrun-tutorial-function
v3io_user=guyl
kind=
owner=guyl
host=jupyter-guyl-66857b7999-xnncv
train_set
test_set
valid_0_rmse=3.905279481685527
valid_0_rmse_plot
valid_0-feature-importance
valid_0
model
taxi_fare_submission
> 2022-08-09 18:22:02,987 [info] run executed, status=completed

4. Review Outputs#

We can see what outputs MLRun automatic logging and evaluation yielded by calling the outputs property on the run object:

script_run.outputs
{'valid_0_rmse': 3.905279481685527,
 'valid_0_rmse_plot': 'v3io:///projects/apply-mlrun-tutorial-guyl/artifacts/apply-mlrun-tutorial-function/0/valid_0_rmse_plot.html',
 'valid_0-feature-importance': 'v3io:///projects/apply-mlrun-tutorial-guyl/artifacts/apply-mlrun-tutorial-function/0/valid_0-feature-importance.html',
 'valid_0': 'store://artifacts/apply-mlrun-tutorial-guyl/apply-mlrun-tutorial-function_valid_0:8d82ef16a15d4151a16060c13b133170',
 'model': 'store://artifacts/apply-mlrun-tutorial-guyl/model:8d82ef16a15d4151a16060c13b133170',
 'taxi_fare_submission': 'store://artifacts/apply-mlrun-tutorial-guyl/apply-mlrun-tutorial-function_taxi_fare_submission:8d82ef16a15d4151a16060c13b133170'}

MLRun automatically detects all the metrics calculated and collect the data along the training. Here there was one validation set named valid_0 and the RMSE metric was calculated on it. You can see the RMSE values per iteration plot and the final score including the features importance plot.

You can explore the different artifacts by calling the artifact function like so:

script_run.artifact('valid_0_rmse_plot').show()