Kubernetes runtime#

This topic describes running a kubernetes-based job using shared data, and building custom container images.

In this section

Define a new function and its dependencies#

Define a single serverless function with two handlers, one for training and one for validation.

import mlrun 
> 2021-01-24 00:04:38,841 [warning] Failed resolving version info. Ignoring and using defaults
> 2021-01-24 00:04:40,691 [warning] Unable to parse server or client version. Assuming compatible: {'server_version': 'unstable', 'client_version': 'unstable'}
import time
import pandas as pd
from mlrun.artifacts import get_model, update_model

def training(
    context,
    p1: int = 1,
    p2: int = 2
) -> None:
    """Train a model.

    :param context: The runtime context object.
    :param p1: A model parameter.
    :param p2: Another model parameter.
    """
    # access input metadata, values, and inputs
    print(f'Run: {context.name} (uid={context.uid})')
    print(f'Params: p1={p1}, p2={p2}')
    context.logger.info('started training')
    
    # <insert training code here>
    
    # log the run results (scalar values)
    context.log_result('accuracy', p1 * 2)
    context.log_result('loss', p1 * 3)
    
    # add a label/tag to this run 
    context.set_label('category', 'tests')
    
    # log a simple artifact + label the artifact 
    # If you want to upload a local file to the artifact repo add src_path=<local-path>
    context.log_artifact('somefile', 
                          body=b'abc is 123', 
                          local_path='myfile.txt')
    
    # create a dataframe artifact 
    df = pd.DataFrame([{'A':10, 'B':100}, {'A':11,'B':110}, {'A':12,'B':120}])
    context.log_dataset('mydf', df=df)
    
    # Log an ML Model artifact, add metrics, params, and labels to it
    # and place it in a subdir ('models') under artifacts path 
    context.log_model('mymodel', body=b'abc is 123', 
                      model_file='model.txt', 
                      metrics={'accuracy':0.85}, parameters={'xx':'abc'},
                      labels={'framework': 'xgboost'},
                      artifact_path=context.artifact_subpath('models'))
def validation(
    context,
    model: mlrun.DataItem
) -> None:
    """Model validation.
    
    Dummy validation function.
    
    :param context: The runtime context object.
    :param model: The extimated model object.
    """
    # access input metadata, values, files, and secrets (passwords)
    print(f'Run: {context.name} (uid={context.uid})')
    context.logger.info('started validation')
    
    # get the model file, class (metadata), and extra_data (dict of key: DataItem)
    model_file, model_obj, _ = get_model(model)

    # update model object elements and data
    update_model(model_obj, parameters={'one_more': 5})

    print(f'path to local copy of model file - {model_file}')
    print('parameters:', model_obj.parameters)
    print('metrics:', model_obj.metrics)
    context.log_artifact('validation', 
                         body=b'<b> validated </b>', 
                         format='html')

The following end-code annotation tells MLRun to stop parsing the notebook from this cell. Do not remove this cell:

# mlrun: end-code

Convert the code to a serverless job#

Create a function that defines the runtime environment (type, code, image, …) and run() a job or experiment using that function. In each run, you can specify the function, inputs, parameters/hyper-parameters, etc.

Use the job runtime for running container jobs, or alternatively use another distributed runner like MpiJob, Spark, Dask, and Nuclio.

Setting up the environment

project_name, artifact_path = mlrun.set_environment(project='jobs-demo', artifact_path='./data/{{run.uid}}')

Define the cluster jobs, build images, and set dependencies#

To use the function in a cluster you need to package the code and its dependencies.

The code_to_function call automatically generates a function object from the current notebook (or specified file) with its list of dependencies and runtime configuration. In this example the code depends on the pandas package, so it’s specified in the code_to_function call.

# create an ML function from the notebook, attach it to iguazio data fabric (v3io)
trainer = mlrun.code_to_function(name='my-trainer', kind='job', image='mlrun/mlrun', requirements=['pandas'])

The functions need a shared storage media (file or object) to pass and store artifacts.

You can add Kubernetes resources like volumes, environment variables, secrets, cpu/mem/gpu, etc. to a function.

mlrun uses KubeFlow modifiers (apply) to configure resources. You can build your own resources or use predefined resources e.g. AWS resources.

The example above uses built-in images. When you move to production, use specific tags. For more details on built-in and custom images, see MLRun images and external docker images.

Option 1: Using file volumes for artifacts#

MLRun automatically applies the most common storage configuration to functions. As a result, most cases do not require any additional storage configurations before executing a function. See more details in Attach storage to functions.

If you’re using the Iguazio MLOps platform, and want to configure manually, use the mount_v3io() auto-mount modifier.
If you’re using another k8s PVC volume, use the mlrun.platforms.mount_pvc(..) modifier with the required parameters.

This example uses the auto_mount() modifier. It auto-selects between the k8s PVC volume and the Iguazio data fabric. You can set the PVC volume configuration with the env var below or with the auto_mount params:

    MLRUN_PVC_MOUNT=<pvc-name>:<mount-path>

If you apply mount_v3io() or auto_mount() when running the function in the MLOps platform, it attaches the function to Iguazio’s real-time data fabric (mounted by default to home of the current user).

Note: If the notebook is not on the managed platform (it’s running remotely) you might need to use secrets.

For the current training function, run:

# for PVC volumes set the env var for PVC: MLRUN_PVC_MOUNT=<pvc-name>:<mount-path>, pass the relevant parameters
from mlrun.platforms import auto_mount
trainer.apply(auto_mount())
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fdf29b1c190>

Option 2: Using AWS S3 for artifacts#

When using AWS, you can use S3. See more details in S3.

Deploy (build) the function container#

The deploy() command builds a custom container image (creates a cluster build job) from the outlined function dependencies.

If a pre-built container image already exists, pass the image name instead. The code and params can be updated per run without building a new image.

The image is stored in a container repository. By default it uses the repository configured on the MLRun API service. You can specify your own docker registry by first creating a secret, and adding that secret name to the build configuration:

kubectl create -n <namespace> secret docker-registry my-docker --docker-server=https://index.docker.io/v1/ --docker-username=<your-user> --docker-password=<your-password> --docker-email=<your-email>

And then run this:

trainer.build_config(image='target/image:tag', secret='my_docker')

trainer.deploy(with_mlrun=False)
> 2021-01-24 00:05:18,384 [info] starting remote build, image: .mlrun/func-jobs-demo-my-trainer-latest
INFO[0020] Retrieving image manifest mlrun/mlrun:unstable 
INFO[0020] Retrieving image manifest mlrun/mlrun:unstable 
INFO[0021] Built cross stage deps: map[]                
INFO[0021] Retrieving image manifest mlrun/mlrun:unstable 
INFO[0021] Retrieving image manifest mlrun/mlrun:unstable 
INFO[0021] Executing 0 build triggers                   
INFO[0021] Unpacking rootfs as cmd RUN pip install pandas requires it. 
INFO[0037] RUN pip install pandas                       
INFO[0037] Taking snapshot of full filesystem...        
INFO[0050] cmd: /bin/sh                                 
INFO[0050] args: [-c pip install pandas]                
INFO[0050] Running: [/bin/sh -c pip install pandas]     
Requirement already satisfied: pandas in /usr/local/lib/python3.7/site-packages (1.2.0)
Requirement already satisfied: pytz>=2017.3 in /usr/local/lib/python3.7/site-packages (from pandas) (2020.5)
Requirement already satisfied: python-dateutil>=2.7.3 in /usr/local/lib/python3.7/site-packages (from pandas) (2.8.1)
Requirement already satisfied: numpy>=1.16.5 in /usr/local/lib/python3.7/site-packages (from pandas) (1.19.5)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.7/site-packages (from python-dateutil>=2.7.3->pandas) (1.15.0)
WARNING: You are using pip version 20.2.4; however, version 21.0 is available.
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.
INFO[0051] Taking snapshot of full filesystem...        
True

Run the function on the cluster#

Use with_code to inject the latest code into the function (without requiring a new build).

trainer.with_code()
<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fdf29b1c190>
# run our training task with params
train_run = trainer.run(name='my-training', handler='training', params={'p1': 9})
> 2021-01-24 00:09:14,760 [info] starting run my-training uid=30b8131285a74f87b16d957fabc5fac3 DB=http://mlrun-api:8080
> 2021-01-24 00:09:14,928 [info] Job is running in the background, pod: my-training-lhtxt
> 2021-01-24 00:09:18,972 [warning] Unable to parse server or client version. Assuming compatible: {'server_version': 'unstable', 'client_version': 'unstable'}
Run: my-training (uid=30b8131285a74f87b16d957fabc5fac3)
Params: p1=9, p2=2
> 2021-01-24 00:09:19,050 [info] started training
> 2021-01-24 00:09:19,299 [info] run executed, status=completed
final state: completed
project uid iter start state name labels inputs parameters results artifacts
jobs-demo 0 Jan 24 00:09:19 completed my-training
v3io_user=admin
kind=job
owner=admin
host=my-training-lhtxt
category=tests
p1=9
accuracy=18
loss=27
somefile
mydf
mymodel
to track results use .show() or .logs() or in CLI: 
!mlrun get run 30b8131285a74f87b16d957fabc5fac3 --project jobs-demo , !mlrun logs 30b8131285a74f87b16d957fabc5fac3 --project jobs-demo
> 2021-01-24 00:09:21,253 [info] run executed, status=completed
# running validation, use the model result from the previous step 
model = train_run.outputs['mymodel']
validation_run = trainer.run(name='validation', handler='validation', inputs={'model': model}, watch=True)
> 2021-01-24 00:09:21,259 [info] starting run validation uid=c757ffcdc36d4412b4bcba1df75f079d DB=http://mlrun-api:8080
> 2021-01-24 00:09:21,536 [info] Job is running in the background, pod: validation-dwd78
> 2021-01-24 00:09:25,570 [warning] Unable to parse server or client version. Assuming compatible: {'server_version': 'unstable', 'client_version': 'unstable'}
Run: validation (uid=c757ffcdc36d4412b4bcba1df75f079d)
> 2021-01-24 00:09:25,719 [info] started validation
path to local copy of model file - /User/data/30b8131285a74f87b16d957fabc5fac3/models/model.txt
parameters: {'xx': 'abc', 'one_more': 5}
metrics: {'accuracy': 0.85}
> 2021-01-24 00:09:25,873 [info] run executed, status=completed
final state: completed
project uid iter start state name labels inputs parameters results artifacts
jobs-demo 0 Jan 24 00:09:25 completed validation
v3io_user=admin
kind=job
owner=admin
host=validation-dwd78
model
validation
to track results use .show() or .logs() or in CLI: 
!mlrun get run c757ffcdc36d4412b4bcba1df75f079d --project jobs-demo , !mlrun logs c757ffcdc36d4412b4bcba1df75f079d --project jobs-demo
> 2021-01-24 00:09:27,647 [info] run executed, status=completed

Create and run a Kubeflow pipeline#

Kubeflow pipelines are used for workflow automation, creating a graph of functions and their specified parameters, inputs, and outputs.

You can chain the outputs and inputs of the pipeline steps, as illustrated below.

import kfp
from kfp import dsl
from mlrun import run_pipeline
from mlrun import run_function, deploy_function
@dsl.pipeline(
    name = 'job test',
    description = 'demonstrating mlrun usage'
)
def job_pipeline(
   p1: int = 9
) -> None:
    """Define our pipeline.
    
    :param p1: A model parameter.
    """

    train = run_function('my-trainer',
                            handler='training',
                            params={'p1': p1},
                            outputs=['mymodel'])
    
    validate = run_function('my-trainer',
                               handler='validation',
                               inputs={'model': train.outputs['mymodel']},
                               outputs=['validation'])    

Running the pipeline#

Pipeline results are stored at the artifact_path location. The artifact path for workflows can be one of:

  • The project’s artifact_path (set by project.spec.artifact_path = '<some path>'). MLRun adds /{{workflow.uid}} to the path if it does not already include it.

  • MLRun’s default artifact-path, if set. MLRun adds /{{workflow.uid}}’ to the path if it does not already include it.

  • The artifact_path as passed to the specific call for run(), as shown below. In this case, MLRun does not modify the user-provided path.

If you want to customize the path, per workflow, use:

artifact_path = 'v3io:///users/admin/kfp/{{workflow.uid}}/'
arguments = {'p1': 8}
run_id = run_pipeline(job_pipeline, arguments, experiment='my-job', artifact_path=artifact_path)
> 2021-01-24 00:09:46,670 [info] using in-cluster config.
Experiment link here
Run link here
> 2021-01-24 00:09:46,940 [info] Pipeline run id=26ac4209-8505-47a3-b807-e9c51061bf0d, check UI or DB for progress
from mlrun import wait_for_pipeline_completion, get_run_db
wait_for_pipeline_completion(run_id)
db = get_run_db().list_runs(project=project_name, labels=f'workflow={run_id}').show()
project uid iter start state name labels inputs parameters results artifacts
jobs-demo 0 Jan 24 00:10:05 completed my-trainer-validation
v3io_user=admin
owner=admin
workflow=26ac4209-8505-47a3-b807-e9c51061bf0d
kind=job
host=my-trainer-validation-gkvjw
model
validation
jobs-demo 0 Jan 24 00:09:53 completed my-trainer-training
v3io_user=admin
owner=admin
workflow=26ac4209-8505-47a3-b807-e9c51061bf0d
kind=job
host=my-trainer-training-jb6rz
category=tests
p1=8
accuracy=16
loss=24
somefile
mydf
mymodel

Viewing the pipeline on the dashboard (UI)#

In the Projects > Jobs and Workflows > Monitor Workflows tab, press the workflow name to view a graph of the workflow. Press any step to open a pane with full details of the step: either the job’s overview, inputs, artifacts, etc.; or the deploy / build function’s overview, code, and log. The color of the step, after pressing, indicates the status. See the status description in the Log tab. The graph is refreshed while the pipeline is running.

pipeline

Back to top