Refactored As Operational Pipeline (with MLRun)

# Install prerequisites
%pip install mlrun lightgbm shapely

Create an MLRun project and configuration

from os import path
import mlrun

project_name_base = 'nyc-taxi'

project_name, artifact_path = mlrun.set_environment(project=project_name_base, user_project=True)

print(f'Project name: {project_name}')
print(f'Artifact path: {artifact_path}')
Project name: nyc-taxi-iguazio
Artifact path: /v3io/projects/{{run.project}}/artifacts

Define Nuclio and MLRun Functions

# nuclio: start-code
from os import path
import numpy as np 
import pandas as pd
import datetime as dt
from sklearn.model_selection import train_test_split
import lightgbm as lgbm
from mlrun.execution import MLClientCtx
from mlrun.datastore import DataItem
from pickle import dumps
import shapely.wkt
def get_zones_dict(zones_url):
    zones_df = pd.read_csv(zones_url)
    
    # Remove unecessary fields
    zones_df.drop(['Shape_Leng', 'Shape_Area', 'zone', 'LocationID', 'borough'], axis=1, inplace=True)
    
    # Convert DF to dictionary
    zones_dict = zones_df.set_index('OBJECTID').to_dict('index')
    
    # Add lat/long to each zone
    for zone in zones_dict:
        shape = shapely.wkt.loads(zones_dict[zone]['the_geom'])
        zones_dict[zone]['long'] = shape.centroid.x
        zones_dict[zone]['lat'] = shape.centroid.y
    
    return zones_dict
def get_zone_lat(zones_dict, zone_id):
    return zones_dict[zone_id]['lat']
def get_zone_long(zones_dict, zone_id):
    return zones_dict[zone_id]['long']
def clean_df(df):
    return df[(df.fare_amount > 0)  & (df.fare_amount <= 500) &
             (df.PULocationID > 0) & (df.PULocationID <= 263) & 
             (df.DOLocationID > 0) & (df.DOLocationID <= 263)]
# To Compute Haversine distance
def sphere_dist(pickup_lat, pickup_lon, dropoff_lat, dropoff_lon):
    """
    Return distance along great radius between pickup and dropoff coordinates.
    """
    #Define earth radius (km)
    R_earth = 6371
    #Convert degrees to radians
    pickup_lat, pickup_lon, dropoff_lat, dropoff_lon = map(np.radians,
                                                             [pickup_lat, pickup_lon, 
                                                              dropoff_lat, dropoff_lon])
    #Compute distances along lat, lon dimensions
    dlat = dropoff_lat - pickup_lat
    dlon = dropoff_lon - pickup_lon
    
    #Compute haversine distance
    a = np.sin(dlat/2.0)**2 + np.cos(pickup_lat) * np.cos(dropoff_lat) * np.sin(dlon/2.0)**2
    return 2 * R_earth * np.arcsin(np.sqrt(a))
def radian_conv(degree):
    """
    Return radian.
    """
    return  np.radians(degree)
def add_airport_dist(dataset):
    """
    Return minumum distance from pickup or dropoff coordinates to each airport.
    JFK: John F. Kennedy International Airport
    EWR: Newark Liberty International Airport
    LGA: LaGuardia Airport
    SOL: Statue of Liberty 
    NYC: Newyork Central
    """
    jfk_coord = (40.639722, -73.778889)
    ewr_coord = (40.6925, -74.168611)
    lga_coord = (40.77725, -73.872611)
    sol_coord = (40.6892,-74.0445) # Statue of Liberty
    nyc_coord = (40.7141667,-74.0063889) 
    
    
    pickup_lat = dataset['pickup_latitude']
    dropoff_lat = dataset['dropoff_latitude']
    pickup_lon = dataset['pickup_longitude']
    dropoff_lon = dataset['dropoff_longitude']
    
    pickup_jfk = sphere_dist(pickup_lat, pickup_lon, jfk_coord[0], jfk_coord[1]) 
    dropoff_jfk = sphere_dist(jfk_coord[0], jfk_coord[1], dropoff_lat, dropoff_lon) 
    pickup_ewr = sphere_dist(pickup_lat, pickup_lon, ewr_coord[0], ewr_coord[1])
    dropoff_ewr = sphere_dist(ewr_coord[0], ewr_coord[1], dropoff_lat, dropoff_lon) 
    pickup_lga = sphere_dist(pickup_lat, pickup_lon, lga_coord[0], lga_coord[1]) 
    dropoff_lga = sphere_dist(lga_coord[0], lga_coord[1], dropoff_lat, dropoff_lon)
    pickup_sol = sphere_dist(pickup_lat, pickup_lon, sol_coord[0], sol_coord[1]) 
    dropoff_sol = sphere_dist(sol_coord[0], sol_coord[1], dropoff_lat, dropoff_lon)
    pickup_nyc = sphere_dist(pickup_lat, pickup_lon, nyc_coord[0], nyc_coord[1]) 
    dropoff_nyc = sphere_dist(nyc_coord[0], nyc_coord[1], dropoff_lat, dropoff_lon)
    
    
    
    dataset['jfk_dist'] = pickup_jfk + dropoff_jfk
    dataset['ewr_dist'] = pickup_ewr + dropoff_ewr
    dataset['lga_dist'] = pickup_lga + dropoff_lga
    dataset['sol_dist'] = pickup_sol + dropoff_sol
    dataset['nyc_dist'] = pickup_nyc + dropoff_nyc
    
    return dataset
def add_datetime_info(dataset):
    #Convert to datetime format
    dataset['pickup_datetime'] = pd.to_datetime(dataset['tpep_pickup_datetime'],format="%Y-%m-%d %H:%M:%S")
    
    dataset['hour'] = dataset.pickup_datetime.dt.hour
    dataset['day'] = dataset.pickup_datetime.dt.day
    dataset['month'] = dataset.pickup_datetime.dt.month
    dataset['weekday'] = dataset.pickup_datetime.dt.weekday
    dataset['year'] = dataset.pickup_datetime.dt.year
    
    return dataset
def fetch_data(context : MLClientCtx, taxi_records_csv_path: DataItem, zones_csv_path: DataItem):
    
    context.logger.info('Reading taxi records data from {}'.format(taxi_records_csv_path))
    taxi_records_dataset = taxi_records_csv_path.as_df()
    
    context.logger.info('Reading zones data from {}'.format(zones_csv_path))
    zones_dataset = zones_csv_path.as_df()
    
    target_path = path.join(context.artifact_path, 'data')
    context.logger.info('Saving datasets to {} ...'.format(target_path))

    # Store the data sets in your artifacts database
    context.log_dataset('nyc-taxi-dataset', df=taxi_records_dataset, format='csv',
                        index=False, artifact_path=target_path)
    context.log_dataset('zones-dataset', df=zones_dataset, format='csv',
                        index=False, artifact_path=target_path)    
def get_zones_dict(zones_df):

    # Remove unecessary fields
    zones_df.drop(['Shape_Leng', 'Shape_Area', 'zone', 'LocationID', 'borough'], axis=1, inplace=True)
    
    # Convert DF to dictionary
    zones_dict = zones_df.set_index('OBJECTID').to_dict('index')
    
    # Add lat/long to each zone
    for zone in zones_dict:
        shape = shapely.wkt.loads(zones_dict[zone]['the_geom'])
        zones_dict[zone]['long'] = shape.centroid.x
        zones_dict[zone]['lat'] = shape.centroid.y
    
    return zones_dict
def get_zone_lat(zones_dict, zone_id):
    return zones_dict[zone_id]['lat']
def get_zone_long(zones_dict, zone_id):
    return zones_dict[zone_id]['long']
def transform_dataset(context : MLClientCtx, taxi_records_csv_path: DataItem, zones_csv_path: DataItem):
    
    context.logger.info('Begin datasets transform')
    
    context.logger.info('zones_csv_path: ' + str(zones_csv_path))
    
    zones_df = zones_csv_path.as_df()    
    
    # Get zones dictionary
    zones_dict = get_zones_dict(zones_df)
    
    train_df = taxi_records_csv_path.as_df()
    
    # Clean DF
    train_df = clean_df(train_df)
    
    # Enrich DF
    train_df['pickup_latitude'] = train_df.apply(lambda x: get_zone_lat(zones_dict, x['PULocationID']), axis=1 )
    train_df['pickup_longitude'] = train_df.apply(lambda x: get_zone_long(zones_dict, x['PULocationID']), axis=1 )
    train_df['dropoff_latitude'] = train_df.apply(lambda x: get_zone_lat(zones_dict, x['DOLocationID']), axis=1 )
    train_df['dropoff_longitude'] = train_df.apply(lambda x: get_zone_long(zones_dict, x['DOLocationID']), axis=1 )

    train_df = add_datetime_info(train_df)
    train_df = add_airport_dist(train_df)

    train_df['pickup_latitude'] = radian_conv(train_df['pickup_latitude'])
    train_df['pickup_longitude'] = radian_conv(train_df['pickup_longitude'])
    train_df['dropoff_latitude'] = radian_conv(train_df['dropoff_latitude'])
    train_df['dropoff_longitude'] = radian_conv(train_df['dropoff_longitude'])

    train_df.drop(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'congestion_surcharge', 'improvement_surcharge', 'pickup_datetime',
                  'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'total_amount', 'RatecodeID', 'store_and_fwd_flag',
                  'PULocationID', 'DOLocationID', 'payment_type'], 
                  axis=1, inplace=True, errors='ignore')
    
    # Save dataset to artifact
    target_path = path.join(context.artifact_path, 'data')
    context.log_dataset('nyc-taxi-dataset-transformed', df=train_df, artifact_path=target_path, format='csv')    
    
    context.logger.info('End dataset transform')
params = {
        'boosting_type':'gbdt',
        'objective': 'regression',
        'nthread': 4,
        'num_leaves': 31,
        'learning_rate': 0.05,
        'max_depth': -1,
        'subsample': 0.8,
        'bagging_fraction' : 1,
        'max_bin' : 5000 ,
        'bagging_freq': 20,
        'colsample_bytree': 0.6,
        'metric': 'rmse',
        'min_split_gain': 0.5,
        'min_child_weight': 1,
        'min_child_samples': 10,
        'scale_pos_weight':1,
        'zero_as_missing': True,
        'seed':0,
        'num_rounds':50000
    }
def train_model(context: MLClientCtx, input_ds: DataItem):
    
    context.logger.info('Begin training')
    context.logger.info('LGBM version is ' + str(lgbm.__version__))
    
    train_df = input_ds.as_df()
    
    y = train_df['fare_amount']
  
    train_df = train_df.drop(columns=['fare_amount'])
    train_df = train_df.drop(train_df.columns[[0]], axis=1)
    x_train,x_test,y_train,y_test = train_test_split(train_df,y,random_state=123,test_size=0.10)
    
    train_set = lgbm.Dataset(x_train, y_train, silent=False,categorical_feature=['year','month','day','weekday'])
    valid_set = lgbm.Dataset(x_test, y_test, silent=False,categorical_feature=['year','month','day','weekday'])
    model = lgbm.train(params, train_set = train_set, num_boost_round=10000,early_stopping_rounds=500,verbose_eval=500, valid_sets=valid_set)
    
    context.log_model('FareModel',
                     body=dumps(model),
                     artifact_path=context.artifact_subpath("models"),
                     model_file="FareModel.pkl")
    
    context.logger.info('End training')
# nuclio: end-code

Set Input Paths

taxi_records_csv_path = 'https://s3.wasabisys.com/iguazio/data/Taxi/yellow_tripdata_2019-01_subset.csv'
zones_csv_path = 'https://s3.wasabisys.com/iguazio/data/Taxi/taxi_zones.csv'

Convert Code to a Function

taxi_func = mlrun.code_to_function(name='taxi',
                                   kind='job',
                                   image='mlrun/mlrun',
                                   requirements=['lightgbm', 'shapely'])

Run fetch_data Locally

We can test out code locally, by calling the function with local parameter set to True

fetch_data_run = taxi_func.run(handler='fetch_data',
                               inputs={'taxi_records_csv_path': taxi_records_csv_path,
                                       'zones_csv_path': zones_csv_path},
                               local=True)
> 2021-01-28 10:50:17,131 [info] starting run taxi-fetch_data uid=6a4667dc66684e96aa4792c109afa2e4 DB=http://mlrun-api:8080
> 2021-01-28 10:50:17,272 [info] Reading taxi records data from https://s3.wasabisys.com/iguazio/data/Taxi/yellow_tripdata_2019-01_subset.csv
> 2021-01-28 10:50:20,868 [info] Reading zones data from https://s3.wasabisys.com/iguazio/data/Taxi/taxi_zones.csv
> 2021-01-28 10:50:21,290 [info] Saving datasets to /v3io/projects/nyc-taxi-iguazio/artifacts/data ...
project uid iter start state name labels inputs parameters results artifacts
nyc-taxi-iguazio 0 Jan 28 10:50:17 completed taxi-fetch_data
v3io_user=iguazio
kind=
owner=iguazio
host=jupyter-b8fc56bc-cdkfx
taxi_records_csv_path
zones_csv_path
nyc-taxi-dataset
zones-dataset
to track results use .show() or .logs() or in CLI: 
!mlrun get run 6a4667dc66684e96aa4792c109afa2e4 --project nyc-taxi-iguazio , !mlrun logs 6a4667dc66684e96aa4792c109afa2e4 --project nyc-taxi-iguazio
> 2021-01-28 10:50:35,407 [info] run executed, status=completed
fetch_data_run.outputs
{'nyc-taxi-dataset': 'store://artifacts/nyc-taxi-iguazio/taxi-fetch_data_nyc-taxi-dataset:6a4667dc66684e96aa4792c109afa2e4',
 'zones-dataset': 'store://artifacts/nyc-taxi-iguazio/taxi-fetch_data_zones-dataset:6a4667dc66684e96aa4792c109afa2e4'}

Run on the Cluster

Prepare Cluster Function

Create an MLRun function and create a custom image for it (that uses shapely).

from mlrun.platforms import auto_mount
taxi_func.apply(auto_mount())
taxi_func.deploy()
> 2021-01-28 10:50:35,424 [info] starting remote build, image: .mlrun/func-nyc-taxi-iguazio-taxi-latest
INFO[0020] Retrieving image manifest mlrun/mlrun:0.6.0-rc11 
INFO[0020] Retrieving image manifest mlrun/mlrun:0.6.0-rc11 
INFO[0021] Built cross stage deps: map[]                
INFO[0021] Retrieving image manifest mlrun/mlrun:0.6.0-rc11 
INFO[0021] Retrieving image manifest mlrun/mlrun:0.6.0-rc11 
INFO[0021] Executing 0 build triggers                   
INFO[0021] Unpacking rootfs as cmd RUN python -m pip install lightgbm shapely requires it. 
INFO[0036] RUN python -m pip install lightgbm shapely   
INFO[0036] Taking snapshot of full filesystem...        
INFO[0048] cmd: /bin/sh                                 
INFO[0048] args: [-c python -m pip install lightgbm shapely] 
INFO[0048] Running: [/bin/sh -c python -m pip install lightgbm shapely] 
Collecting lightgbm
  Downloading lightgbm-3.1.1-py2.py3-none-manylinux1_x86_64.whl (1.8 MB)
Collecting shapely
  Downloading Shapely-1.7.1-cp37-cp37m-manylinux1_x86_64.whl (1.0 MB)
Requirement already satisfied: scikit-learn!=0.22.0 in /usr/local/lib/python3.7/site-packages (from lightgbm) (0.23.2)
Requirement already satisfied: numpy in /usr/local/lib/python3.7/site-packages (from lightgbm) (1.19.5)
Requirement already satisfied: scipy in /usr/local/lib/python3.7/site-packages (from lightgbm) (1.6.0)
Requirement already satisfied: wheel in /usr/local/lib/python3.7/site-packages (from lightgbm) (0.36.2)
Requirement already satisfied: threadpoolctl>=2.0.0 in /usr/local/lib/python3.7/site-packages (from scikit-learn!=0.22.0->lightgbm) (2.1.0)
Requirement already satisfied: joblib>=0.11 in /usr/local/lib/python3.7/site-packages (from scikit-learn!=0.22.0->lightgbm) (1.0.0)
Installing collected packages: lightgbm, shapely
Successfully installed lightgbm-3.1.1 shapely-1.7.1
WARNING: You are using pip version 20.2.4; however, version 21.0 is available.
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.
INFO[0050] Taking snapshot of full filesystem...        
True
fetch_data_run = taxi_func.run(name='fetch_data',
                               handler='fetch_data',
                               inputs={'taxi_records_csv_path': taxi_records_csv_path,
                                       'zones_csv_path': zones_csv_path})
> 2021-01-28 10:51:29,816 [info] starting run fetch_data uid=40a122c3066a479d9c09c865487e23ab DB=http://mlrun-api:8080
> 2021-01-28 10:51:30,197 [info] Job is running in the background, pod: fetch-data-f8p7f
> 2021-01-28 10:51:34,657 [info] Reading taxi records data from https://s3.wasabisys.com/iguazio/data/Taxi/yellow_tripdata_2019-01_subset.csv
> 2021-01-28 10:51:38,368 [info] Reading zones data from https://s3.wasabisys.com/iguazio/data/Taxi/taxi_zones.csv
> 2021-01-28 10:51:38,795 [info] Saving datasets to /v3io/projects/nyc-taxi-iguazio/artifacts/data ...
> 2021-01-28 10:51:51,082 [info] run executed, status=completed
final state: completed
project uid iter start state name labels inputs parameters results artifacts
nyc-taxi-iguazio 0 Jan 28 10:51:34 completed fetch_data
v3io_user=iguazio
kind=job
owner=iguazio
host=fetch-data-f8p7f
taxi_records_csv_path
zones_csv_path
nyc-taxi-dataset
zones-dataset
to track results use .show() or .logs() or in CLI: 
!mlrun get run 40a122c3066a479d9c09c865487e23ab --project nyc-taxi-iguazio , !mlrun logs 40a122c3066a479d9c09c865487e23ab --project nyc-taxi-iguazio
> 2021-01-28 10:51:59,497 [info] run executed, status=completed
fetch_data_run.outputs
{'nyc-taxi-dataset': 'store://artifacts/nyc-taxi-iguazio/fetch_data_nyc-taxi-dataset:40a122c3066a479d9c09c865487e23ab',
 'zones-dataset': 'store://artifacts/nyc-taxi-iguazio/fetch_data_zones-dataset:40a122c3066a479d9c09c865487e23ab'}

Transform the Dataset

transform_dataset_run = taxi_func.run(name='transform_dataset',
                                      handler='transform_dataset',
                                      inputs={'taxi_records_csv_path': fetch_data_run.outputs['nyc-taxi-dataset'],
                                              'zones_csv_path': fetch_data_run.outputs['zones-dataset']})
> 2021-01-28 10:51:59,516 [info] starting run transform_dataset uid=23089337d5d7483cb81c599da6e64e03 DB=http://mlrun-api:8080
> 2021-01-28 10:52:00,239 [info] Job is running in the background, pod: transform-dataset-5mhwx
> 2021-01-28 10:52:04,698 [info] Begin datasets transform
> 2021-01-28 10:52:04,698 [info] zones_csv_path: /v3io/projects/nyc-taxi-iguazio/artifacts/data/zones-dataset.csv
> 2021-01-28 10:53:06,205 [info] End dataset transform
> 2021-01-28 10:53:06,242 [info] run executed, status=completed
final state: completed
project uid iter start state name labels inputs parameters results artifacts
nyc-taxi-iguazio 0 Jan 28 10:52:04 completed transform_dataset
v3io_user=iguazio
kind=job
owner=iguazio
host=transform-dataset-5mhwx
taxi_records_csv_path
zones_csv_path
nyc-taxi-dataset-transformed
to track results use .show() or .logs() or in CLI: 
!mlrun get run 23089337d5d7483cb81c599da6e64e03 --project nyc-taxi-iguazio , !mlrun logs 23089337d5d7483cb81c599da6e64e03 --project nyc-taxi-iguazio
> 2021-01-28 10:53:09,631 [info] run executed, status=completed
transform_dataset_run.outputs
{'nyc-taxi-dataset-transformed': 'store://artifacts/nyc-taxi-iguazio/transform_dataset_nyc-taxi-dataset-transformed:23089337d5d7483cb81c599da6e64e03'}

Train Model

train_model_run = taxi_func.run(name='train_model',
                                handler='train_model',
                                inputs={'input_ds': transform_dataset_run.outputs['nyc-taxi-dataset-transformed']})
> 2021-01-28 10:53:09,648 [info] starting run train_model uid=91bbaebdc91a43c9b8412a37b389bffa DB=http://mlrun-api:8080
> 2021-01-28 10:53:09,840 [info] Job is running in the background, pod: train-model-pn5jb
[LightGBM] [Warning] Met categorical feature which contains sparse values. Consider renumbering to consecutive integers started from zero
> 2021-01-28 10:53:14,273 [info] Begin training
> 2021-01-28 10:53:14,273 [info] LGBM version is 3.1.1
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Warning] Auto-choosing row-wise multi-threading, the overhead of testing was 0.006758 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 23961
[LightGBM] [Info] Number of data points in the train set: 879294, number of used features: 16
[LightGBM] [Warning] bagging_fraction is set=1, subsample=0.8 will be ignored. Current value: bagging_fraction=1
[LightGBM] [Info] Start training from score 12.418691
Training until validation scores don't improve for 500 rounds
[500]	valid_0's rmse: 3.01302
[1000]	valid_0's rmse: 2.98594
[1500]	valid_0's rmse: 2.98293
[2000]	valid_0's rmse: 2.98666
Early stopping, best iteration is:
[1551]	valid_0's rmse: 2.98227
> 2021-01-28 10:53:54,777 [info] End training
> 2021-01-28 10:53:54,845 [info] run executed, status=completed
/usr/local/lib/python3.7/site-packages/lightgbm/engine.py:151: UserWarning: Found `num_rounds` in params. Will use it instead of argument
  warnings.warn("Found `{}` in params. Will use it instead of argument".format(alias))
/usr/local/lib/python3.7/site-packages/lightgbm/basic.py:1551: UserWarning: Using categorical_feature in Dataset.
  warnings.warn('Using categorical_feature in Dataset.')
/usr/local/lib/python3.7/site-packages/lightgbm/basic.py:1286: UserWarning: Overriding the parameters from Reference Dataset.
  warnings.warn('Overriding the parameters from Reference Dataset.')
/usr/local/lib/python3.7/site-packages/lightgbm/basic.py:1098: UserWarning: categorical_column in param dict is overridden.
  warnings.warn('{} in param dict is overridden.'.format(cat_alias))
final state: completed
project uid iter start state name labels inputs parameters results artifacts
nyc-taxi-iguazio 0 Jan 28 10:53:14 completed train_model
v3io_user=iguazio
kind=job
owner=iguazio
host=train-model-pn5jb
input_ds
FareModel
to track results use .show() or .logs() or in CLI: 
!mlrun get run 91bbaebdc91a43c9b8412a37b389bffa --project nyc-taxi-iguazio , !mlrun logs 91bbaebdc91a43c9b8412a37b389bffa --project nyc-taxi-iguazio
> 2021-01-28 10:53:58,264 [info] run executed, status=completed
train_model_run.outputs
{'FareModel': 'store://artifacts/nyc-taxi-iguazio/train_model_FareModel:91bbaebdc91a43c9b8412a37b389bffa'}

Serving

The model serving class is in model-serving.ipynb.

serving = mlrun.code_to_function(filename=path.abspath('model-serving.ipynb')).apply(auto_mount())

serving.spec.default_class = 'LGBMModel'
serving.add_model('taxi-serving', train_model_run.outputs['FareModel'])
serving_address = serving.deploy()
> 2021-01-28 10:54:04,257 [info] Starting remote function deploy
2021-01-28 10:54:04  (info) Deploying function
2021-01-28 10:54:04  (info) Building
2021-01-28 10:54:04  (info) Staging files and preparing base images
2021-01-28 10:54:04  (info) Building processor image
2021-01-28 10:54:07  (info) Build complete
> 2021-01-28 10:54:13,316 [info] function deployed, address=default-tenant.app.jinkwubtllaf.iguazio-cd1.com:30486
my_data = '''{"inputs":[[1,0.80,0.711950,-1.291073,0.712059,1.290988,13,1,1,1,2019,47.274013,40.386065,16.975747,26.587155,18.925788]]}'''
serving.invoke('/v2/models/taxi-serving/predict', my_data)
{'id': '985aaf56-fb29-436e-9d96-ef834b8ec12f',
 'model_name': 'taxi-serving',
 'outputs': [9.52302976897415]}

Kubeflow Pipeline

Create Project Object

project_path = path.abspath('conf')
project = mlrun.new_project(project_name_base,
                            context=project_path,
                            init_git=True,
                            user_project=True)

project.set_function(f'db://{project.name}/taxi')
project.set_function(f'db://{project.name}/model-serving')
<mlrun.runtimes.serving.ServingRuntime at 0x7f5509b74f90>

Create the Workflow

%%writefile {path.join(project_path, 'workflow.py')}
from kfp import dsl
from mlrun.platforms import auto_mount

funcs = {}
taxi_records_csv_path = 'https://s3.wasabisys.com/iguazio/data/Taxi/yellow_tripdata_2019-01_subset.csv'
zones_csv_path = 'https://s3.wasabisys.com/iguazio/data/Taxi/taxi_zones.csv'

# init functions is used to configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(auto_mount())

@dsl.pipeline(
    name="NYC Taxi Demo",
    description="Convert ML script to MLRun"
)

def kfpipeline():
    
    # build our ingestion function (container image)
    builder = funcs['taxi'].deploy_step(skip_deployed=True)
    
    # run the ingestion function with the new image and params
    ingest = funcs['taxi'].as_step(
        name="fetch_data",
        handler='fetch_data',
        image=builder.outputs['image'],
        inputs={'taxi_records_csv_path': taxi_records_csv_path,
                'zones_csv_path': zones_csv_path},
        outputs=['nyc-taxi-dataset', 'zones-dataset'])

    # Join and transform the data sets 
    transform = funcs["taxi"].as_step(
        name="transform_dataset",
        handler='transform_dataset',
        inputs={"taxi_records_csv_path": ingest.outputs['nyc-taxi-dataset'],
                "zones_csv_path" : ingest.outputs['zones-dataset']},
        outputs=['nyc-taxi-dataset-transformed'])

    # Train the model
    train = funcs["taxi"].as_step(
        name="train",
        handler="train_model",
        inputs={"input_ds" : transform.outputs['nyc-taxi-dataset-transformed']},
        outputs=['FareModel'])
    
    # Deploy the model
    deploy = funcs["model-serving"].deploy_step(models={"taxi-serving_v1": train.outputs['FareModel']}, tag='v2')
Overwriting /User/demos/howto/converting-to-mlrun/conf/workflow.py
project.set_workflow('main', 'workflow.py', embed=True)
project.save()

Run the Workflow

artifact_path = path.abspath('./pipe/{{workflow.uid}}')
run_id = project.run(
    'main',
    arguments={}, 
    artifact_path=artifact_path, 
    dirty=True,
    watch=True)
> 2021-01-28 10:54:13,695 [info] using in-cluster config.
Experiment link here
Run link here
> 2021-01-28 10:54:14,163 [info] Pipeline run id=32e62e7f-424f-474b-b9f5-54ae1834d39f, check UI or DB for progress
> 2021-01-28 10:54:14,164 [info] waiting for pipeline run completion

Run Results

Workflow 32e62e7f-424f-474b-b9f5-54ae1834d39f finished, status=Succeeded
click the hyper links below to see detailed results
uid start state name results artifacts
Jan 28 10:56:15 completed train
FareModel
Jan 28 10:55:01 completed transform_dataset
nyc-taxi-dataset-transformed
Jan 28 10:54:26 completed fetch_data
nyc-taxi-dataset
zones-dataset