Add MLOps to existing code#

This tutorial showcases how you can easily apply MLRun on your existing code with only 7 lines of code. You get:

  • Experiment tracking — Track every single run of your experiment to learn what yielded the best results.

  • Automatic Logging — Log datasets, metrics results and plots with one line of code. MLRun takes care for all the rest.

  • Parameterization — Enable running your code with different parameters, run hyperparameters tuning and get the most out of your code.

  • Resource management — Control the amount of resources available for your experiment.

Use this kaggle code by Sylas as an example, part of the competition New York City Taxi Fare Prediction.

In this tutorial

Get the data#

You can download the original data from kaggle. However, since the original data is 5.7GB in size, this demo uses sampled data. Since this demo uses MLRun's DataItem to pass the datasets, the sampled data is downloaded automatically. However, if you want to look at the data, you can download it: training set, and testing set.

Code review#

Use the original code with the minimum changes required to apply MLRun to it. The code itself is straightforward:

  1. Read the training data and perform feature engineering on it to preprocess it for training.

  2. Train a LightGBM regression model using LightGBM's train function.

  3. Read the testing data and save the contest expected submission file.

You can Download the script.py file, or copy / paste it from here:

Show code
import gc

import lightgbm as lgbm
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# [MLRun] Import MLRun:
import mlrun
from mlrun.frameworks.lgbm import apply_mlrun

# [MLRun] Create a project and get MLRun's context:
project = mlrun.get_or_create_project("my-project")
context = mlrun.get_or_create_ctx("my-context", project="my-project")

# [MLRun] Reading train data from context instead of local file:
train_df = context.get_input("train_set", "./train.csv").as_df()
# train_df =  pd.read_csv('./train.csv')

# Drop rows with null values
train_df = train_df.dropna(how="any", axis="rows")


def clean_df(df):
    return df[
        (df.fare_amount > 0)
        & (df.fare_amount <= 500)
        &
        # (df.passenger_count >= 0) & (df.passenger_count <= 8)  &
        (
            (df.pickup_longitude != 0)
            & (df.pickup_latitude != 0)
            & (df.dropoff_longitude != 0)
            & (df.dropoff_latitude != 0)
        )
    ]


train_df = clean_df(train_df)


# To Compute Haversine distance
def sphere_dist(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
    """
    Return distance along great radius between pickup and drop-off coordinates.
    """
    # Define earth radius (km)
    r_earth = 6371
    # Convert degrees to radians
    pickup_lat, pickup_lon, dropoff_lat, dropoff_lon = map(
        np.radians, [pickup_lat, pickup_lon, dropoff_lat, dropoff_lon]
    )
    # Compute distances along lat, lon dimensions
    dlat = dropoff_lat - pickup_lat
    dlon = dropoff_lon - pickup_lon

    # Compute haversine distance
    a = (
        np.sin(dlat / 2.0) ** 2
        + np.cos(pickup_lat) * np.cos(dropoff_lat) * np.sin(dlon / 2.0) ** 2
    )
    return 2 * r_earth * np.arcsin(np.sqrt(a))


def sphere_dist_bear(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
    """
    Return distance along great radius between pickup and drop-off coordinates.
    """
    # Convert degrees to radians
    pickup_lat, pickup_lon, dropoff_lat, dropoff_lon = map(
        np.radians, [pickup_lat, pickup_lon, dropoff_lat, dropoff_lon]
    )
    # Compute distances along lat, lon dimensions
    dlon = pickup_lon - dropoff_lon

    # Compute bearing distance
    a = np.arctan2(
        np.sin(dlon * np.cos(dropoff_lat)),
        np.cos(pickup_lat) * np.sin(dropoff_lat)
        - np.sin(pickup_lat) * np.cos(dropoff_lat) * np.cos(dlon),
    )
    return a


def radian_conv(degree):
    """
    Return radian.
    """
    return np.radians(degree)


def add_airport_dist(dataset):
    """
    Return minimum distance from pickup or drop-off coordinates to each airport.
    JFK: John F. Kennedy International Airport
    EWR: Newark Liberty International Airport
    LGA: LaGuardia Airport
    SOL: Statue of Liberty
    NYC: Newyork Central
    """
    jfk_coord = (40.639722, -73.778889)
    ewr_coord = (40.6925, -74.168611)
    lga_coord = (40.77725, -73.872611)
    sol_coord = (40.6892, -74.0445)  # Statue of Liberty
    nyc_coord = (40.7141667, -74.0063889)

    pickup_lat = dataset["pickup_latitude"]
    dropoff_lat = dataset["dropoff_latitude"]
    pickup_lon = dataset["pickup_longitude"]
    dropoff_lon = dataset["dropoff_longitude"]

    pickup_jfk = sphere_dist(pickup_lat, pickup_lon, jfk_coord[0], jfk_coord[1])
    dropoff_jfk = sphere_dist(jfk_coord[0], jfk_coord[1], dropoff_lat, dropoff_lon)
    pickup_ewr = sphere_dist(pickup_lat, pickup_lon, ewr_coord[0], ewr_coord[1])
    dropoff_ewr = sphere_dist(ewr_coord[0], ewr_coord[1], dropoff_lat, dropoff_lon)
    pickup_lga = sphere_dist(pickup_lat, pickup_lon, lga_coord[0], lga_coord[1])
    dropoff_lga = sphere_dist(lga_coord[0], lga_coord[1], dropoff_lat, dropoff_lon)
    pickup_sol = sphere_dist(pickup_lat, pickup_lon, sol_coord[0], sol_coord[1])
    dropoff_sol = sphere_dist(sol_coord[0], sol_coord[1], dropoff_lat, dropoff_lon)
    pickup_nyc = sphere_dist(pickup_lat, pickup_lon, nyc_coord[0], nyc_coord[1])
    dropoff_nyc = sphere_dist(nyc_coord[0], nyc_coord[1], dropoff_lat, dropoff_lon)

    dataset["jfk_dist"] = pickup_jfk + dropoff_jfk
    dataset["ewr_dist"] = pickup_ewr + dropoff_ewr
    dataset["lga_dist"] = pickup_lga + dropoff_lga
    dataset["sol_dist"] = pickup_sol + dropoff_sol
    dataset["nyc_dist"] = pickup_nyc + dropoff_nyc

    return dataset


def add_datetime_info(dataset):
    # Convert to datetime format
    dataset["pickup_datetime"] = pd.to_datetime(
        dataset["pickup_datetime"], format="%Y-%m-%d %H:%M:%S UTC"
    )

    dataset["hour"] = dataset.pickup_datetime.dt.hour
    dataset["day"] = dataset.pickup_datetime.dt.day
    dataset["month"] = dataset.pickup_datetime.dt.month
    dataset["weekday"] = dataset.pickup_datetime.dt.weekday
    dataset["year"] = dataset.pickup_datetime.dt.year

    return dataset


train_df = add_datetime_info(train_df)
train_df = add_airport_dist(train_df)
train_df["distance"] = sphere_dist(
    train_df["pickup_latitude"],
    train_df["pickup_longitude"],
    train_df["dropoff_latitude"],
    train_df["dropoff_longitude"],
)

train_df["bearing"] = sphere_dist_bear(
    train_df["pickup_latitude"],
    train_df["pickup_longitude"],
    train_df["dropoff_latitude"],
    train_df["dropoff_longitude"],
)
train_df["pickup_latitude"] = radian_conv(train_df["pickup_latitude"])
train_df["pickup_longitude"] = radian_conv(train_df["pickup_longitude"])
train_df["dropoff_latitude"] = radian_conv(train_df["dropoff_latitude"])
train_df["dropoff_longitude"] = radian_conv(train_df["dropoff_longitude"])


train_df.drop(columns=["key", "pickup_datetime"], inplace=True)

y = train_df["fare_amount"]
train_df = train_df.drop(columns=["fare_amount"])


print(train_df.head())

x_train, x_test, y_train, y_test = train_test_split(
    train_df, y, random_state=123, test_size=0.10
)

del train_df
del y
gc.collect()

params = {
    "boosting_type": "gbdt",
    "objective": "regression",
    "nthread": 4,
    "num_leaves": 31,
    "learning_rate": 0.05,
    "max_depth": -1,
    "subsample": 0.8,
    "bagging_fraction": 1,
    "max_bin": 5000,
    "bagging_freq": 20,
    "colsample_bytree": 0.6,
    "metric": "rmse",
    "min_split_gain": 0.5,
    "min_child_weight": 1,
    "min_child_samples": 10,
    "scale_pos_weight": 1,
    "zero_as_missing": True,
    "seed": 0,
    # "categorical_feature": "name:year,month,day,weekday",
}

train_set = lgbm.Dataset(x_train, y_train)
valid_set = lgbm.Dataset(x_test, y_test)

# [MLRun] Apply MLRun on the LightGBM module:
apply_mlrun(context=context)

model = lgbm.train(
    params,
    num_boost_round=10000,
    train_set=train_set,
    valid_sets=[valid_set],
    callbacks=[lgbm.early_stopping(stopping_rounds=500)],
)

del x_train
del y_train
del x_test
del y_test
gc.collect()

# [MLRun] Reading test data from context instead of local file:
test_df = context.get_input("test_set", "./test.csv").as_df()
# test_df =  pd.read_csv('./test.csv')
print(test_df.head())
test_df = add_datetime_info(test_df)
test_df = add_airport_dist(test_df)
test_df["distance"] = sphere_dist(
    test_df["pickup_latitude"],
    test_df["pickup_longitude"],
    test_df["dropoff_latitude"],
    test_df["dropoff_longitude"],
)

test_df["bearing"] = sphere_dist_bear(
    test_df["pickup_latitude"],
    test_df["pickup_longitude"],
    test_df["dropoff_latitude"],
    test_df["dropoff_longitude"],
)
test_df["pickup_latitude"] = radian_conv(test_df["pickup_latitude"])
test_df["pickup_longitude"] = radian_conv(test_df["pickup_longitude"])
test_df["dropoff_latitude"] = radian_conv(test_df["dropoff_latitude"])
test_df["dropoff_longitude"] = radian_conv(test_df["dropoff_longitude"])


test_key = test_df["key"]
test_df = test_df.drop(columns=["key", "pickup_datetime"])

# Predict from test set
prediction = model.predict(test_df, num_iteration=model.best_iteration)
submission = pd.DataFrame({"key": test_key, "fare_amount": prediction})

# [MLRun] Log the submission instead of saving it locally:
context.log_dataset(key="taxi_fare_submission", df=submission, format="csv")
# submission.to_csv('taxi_fare_submission.csv',index=False)

This demo focuses on reviewing the changes / additions made to the original code so that you can apply MLRun on top of it. Seven lines of code are added / replaced as you can see in the sections below:

Initialization#

Imports#

On lines 9-10, add 2 imports:

  • mlrun — Import MLRun of course.

  • apply_mlrun — Use the apply_mlrun function from MLRun's frameworks, a sub-package for common ML/DL frameworks integrations with MLRun.

import mlrun
from mlrun.frameworks.lgbm import apply_mlrun

MLRun context#

To get parameters and inputs into the code, you need to get MLRun's context. Use the function get_or_create_ctx.

Line 13:

context = mlrun.get_or_create_ctx("apply-mlrun-tutorial")

Get Training Set#

In the original code the training set was read from a local file. Now you want to get it from the user who runs the code. Use the context to get the "training_set" input by using the get_input method. To maintain the original logic, include the default path for when the training set was not provided by the user.

Line 16:

train_df = context.get_input("train_set", "./train.csv").as_df()  
# Instead of: `train_df =  pd.read_csv('./train.csv')`

Apply MLRun#

Now use the apply_mlrun function from MLRun's LightGBM framework integration. MLRun automatically wraps the LightGBM module and enables automatic logging and evaluation.

Line 209:

apply_mlrun(context=context)

Logging the dataset#

Similar to the way you got the training set, you get the test dataset as an input from the MLRun content.

Line 226:

test_df = context.get_input("test_set", "./test.csv").as_df()
# Instead of: `test_df =  pd.read_csv('./test.csv')`

Save the submission#

Finally, instead of saving the result locally, log the submission to MLRun.

Line 258:

context.log_dataset(key="taxi_fare_submission", df=submission, format="csv")  
# Instead of: `submission.to_csv('taxi_fare_submission.csv',index=False)`

Run the script with MLRun#

Now you can run the script and see MLRun in action.

import mlrun
> 2024-09-09 08:18:27,086 [info] Server and client versions are not the same but compatible: {'parsed_server_version': Version(major=1, minor=7, patch=0, prerelease='rc40', build=None), 'parsed_client_version': Version(major=1, minor=6, patch=3, prerelease=None, build=None)}

Create a project#

Create a project using the function get_or_create_project. To read more about MLRun projects, see Projects.

project = mlrun.get_or_create_project(
    name="tutorial", context="./", user_project=True, allow_cross_project=True
)
> 2024-09-09 08:18:27,158 [info] Loading project from path: {'project_name': 'tutorial', 'path': './'}
> 2024-09-09 08:18:41,988 [info] Project loaded successfully: {'project_name': 'tutorial', 'path': './', 'stored_in_db': True}

Create a function#

Create an MLRun function using the function code_to_function. To read more about MLRun functions, see Functions.

script_function = mlrun.code_to_function(
    filename="src/script.py",
    name="tutorial-function",
    kind="job",
    image="mlrun/mlrun",
    requirements=["lightgbm"],
)
script_function.deploy()

Run the function#

Now you can run the function, providing it with the inputs you want. Use the datasets links to send them to the function. MLRun downloads and reads them into pd.DataFrame automatically.

script_run = script_function.run(
    inputs={
        "train_set": "https://s3.us-east-1.wasabisys.com/iguazio/data/nyc-taxi/train.csv",
        "test_set": "https://s3.us-east-1.wasabisys.com/iguazio/data/nyc-taxi/test.csv",
    },
)
> 2024-09-09 08:21:43,661 [info] Storing function: {'name': 'tutorial-function', 'uid': '9941f4382bad40738bf6bf226b02f4c3', 'db': 'http://mlrun-api:8080'}
> 2024-09-09 08:21:44,080 [info] Job is running in the background, pod: tutorial-function-4s5p7
> 2024-09-09 08:21:49,093 [info] Server and client versions are not the same but compatible: {'parsed_server_version': Version(major=1, minor=7, patch=0, prerelease='rc40', build=None), 'parsed_client_version': Version(major=1, minor=6, patch=3, prerelease=None, build=None)}
> 2024-09-09 08:21:49,191 [info] Handler was not provided running main (script.py)
Running: ['/opt/conda/bin/python', '-u', 'script.py']
> 2024-09-09 08:21:54,047 [info] Server and client versions are not the same but compatible: {'parsed_server_version': Version(major=1, minor=7, patch=0, prerelease='rc40', build=None), 'parsed_client_version': Version(major=1, minor=6, patch=3, prerelease=None, build=None)}
> 2024-09-09 08:21:54,463 [info] logging run results to: http://mlrun-api:8080
   pickup_longitude  pickup_latitude  ...  distance   bearing
0         -1.288826         0.710721  ...  1.030764 -2.918897
1         -1.291824         0.710546  ...  8.450134 -0.375217
2         -1.291242         0.711418  ...  1.389525  2.599961
3         -1.291319         0.710927  ...  2.799270  0.133905
4         -1.290987         0.711536  ...  1.999157 -0.502703

[5 rows x 17 columns]
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.012949 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 55091
[LightGBM] [Info] Number of data points in the train set: 194071, number of used features: 17
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Info] Start training from score 11.335573
Training until validation scores don't improve for 500 rounds
Early stopping, best iteration is:
[353]	valid_0's rmse: 3.9286
                           key  ... passenger_count
0  2015-01-27 13:08:24.0000002  ...               1
1  2015-01-27 13:08:24.0000003  ...               1
2  2011-10-08 11:53:44.0000002  ...               1
3  2012-12-01 21:12:12.0000002  ...               1
4  2012-12-01 21:12:12.0000003  ...               1

[5 rows x 7 columns]
project uid iter start state name labels inputs parameters results artifacts
tutorial-jill 0 Sep 09 08:21:54 completed tutorial-function
v3io_user=Jill
kind=job
owner=Jill
mlrun/client_version=1.6.3
mlrun/client_python_version=3.9.18
host=tutorial-function-4s5p7
train_set
test_set
valid_0_rmse=3.941035163070338
valid_0_rmse_plot
valid_0_rmse_plot
valid_0_rmse_plot
valid_0_rmse_plot
valid_0_rmse_plot
valid_0_rmse_plot
valid_0_rmse_plot
valid_0_rmse_plot
valid_0_rmse_plot
valid_0_rmse_plot
valid_0-feature-importance
valid_0
model
taxi_fare_submission

> to track results use the .show() or .logs() methods or click here to open in UI
> 2024-09-09 08:23:37,194 [info] Run execution finished: {'status': 'completed', 'name': 'tutorial-function'}

Review outputs#

To view the outputs yielded by the MLRun automatic logging and evaluation, call the outputs property on the run object:

script_run.outputs
{'valid_0_rmse': 3.941035163070338,
 'valid_0_rmse_plot': 'v3io:///projects/tutorial-jill/artifacts/tutorial-function/0/valid_0_rmse_plot.html',
 'valid_0-feature-importance': 'v3io:///projects/tutorial-jill/artifacts/tutorial-function/0/valid_0-feature-importance.html',
 'valid_0': 'store://artifacts/tutorial-jill/tutorial-function_valid_0@9941f4382bad40738bf6bf226b02f4c3',
 'model': 'store://artifacts/tutorial-jill/model@9941f4382bad40738bf6bf226b02f4c3',
 'taxi_fare_submission': 'store://artifacts/tutorial-jill/tutorial-function_taxi_fare_submission@9941f4382bad40738bf6bf226b02f4c3'}

MLRun automatically detects all the metrics calculated and collects the data along with the training. Here there was one validation set named valid_0 and the RMSE metric was calculated on it. You can see the RMSE values per iteration plot and the final score including the features importance plot.

You can explore the different artifacts by calling the artifact function like this:

script_run.artifact("valid_0_rmse_plot").show()
script_run.artifact("valid_0-feature-importance").show()