Hyper-Param and Iterative jobs

MLRun support iterative tasks for automatic and distributed execution of many tasks with variable parameters, this can be used for various tasks such as:

  • Parallel loading and preparation of many data objects

  • Model training with different parameter sets and/or algorithms

  • Parallel testing with many test vector options

MLRun iterations can be viewed as child runs under the main task/run, each child run will get a set of parameters which will be computed/selected from the input hyper parameters based on the chosen strategy (Grid, List, Random or Custom).

The hyper parameters and options are specified in the task or the run() command through the hyperparams (for hyper param values) and hyper_param_options (for HyperParamOptions) properties, see examples below. hyper parameters can also be loaded directly from a CSV or Json file (by setting the param_file hyper option).

The hyper params are specified as a struct of key: list values for example: {"p1": [1,2,3], "p2": [10,20]}, the values can be of any type (int, string, float, ..), the list are used to compute the parameter combinations using one of the following strategies:

  1. Grid Search (grid) - running all the parameter combinations

  2. Random (random) - running a sampled set from all the parameter combinations

  3. List (list) - running the first parameter from each list followed by the 2nd from each list and so on, note that all the lists must be of equal size.

MLRun also support a 4th custom option which allow determining the parameter combination per run programmatically

You can specify a selection criteria to select the best run among the different child runs by setting the selector option, this will mark that result as the parent (iteration 0) result, and mark the best result in the user interface.

You can also specify the stop_condition to stop execution of child runs when some criteria based on the returned results is met (for example stop_condition="accuracy>=0.9")

Examples

Base dummy function:

import mlrun
> 2021-03-08 11:10:13,385 [warning] Failed resolving version info. Ignoring and using defaults
> 2021-03-08 11:10:16,756 [warning] Unable to parse server or client version. Assuming compatible: {'server_version': 'unstable', 'client_version': 'unstable'}
def hyper_func(context, p1, p2):
    print(f"p1={p1}, p2={p2}, result={p1 * p2}")
    context.log_result("multiplier", p1 * p2)

Grid Search (default)

grid_params = {"p1": [2,4,1], "p2": [10,20]}
task = mlrun.new_task("grid-demo").with_hyper_params(grid_params, selector="max.multiplier")
run = mlrun.new_function().run(task, handler=hyper_func)
> 2021-03-08 11:10:29,854 [info] starting run grid-demo uid=d60f6f1a951945b98863e3784e0f03a6 DB=http://mlrun-api:8080
p1=2, p2=10, result=20
p1=4, p2=10, result=40
p1=1, p2=10, result=10
p1=2, p2=20, result=40
p1=4, p2=20, result=80
p1=1, p2=20, result=20
> 2021-03-08 11:10:30,941 [info] best iteration=5, used criteria max.multiplier
project uid iter start state name labels inputs parameters results artifacts
default 0 Mar 08 11:10:29 completed grid-demo
v3io_user=admin
kind=handler
owner=admin
best_iteration=5
multiplier=80
iteration_results
to track results use .show() or .logs() or in CLI: 
!mlrun get run d60f6f1a951945b98863e3784e0f03a6 --project default , !mlrun logs d60f6f1a951945b98863e3784e0f03a6 --project default
> 2021-03-08 11:10:31,213 [info] run executed, status=completed

UI Screenshot:

hyper-params

Custom Iterator

We can define a child iteration context under the parent/main run, the child run will be logged independently

def handler(context: mlrun.MLClientCtx, param_list):
    best_multiplier = total = 0
    for param in param_list:
        with context.get_child_context(**param) as child:
            hyper_func(child, **child.parameters)
            multiplier = child.results['multiplier']
            total += multiplier
            if multiplier > best_multiplier:
                child.mark_as_best()
                best_multiplier = multiplier

    # log result at the parent
    context.log_result('avg_multiplier', total / len(param_list))
param_list = [{"p1":2, "p2":10}, {"p1":3, "p2":30}, {"p1":4, "p2":7}]
run = mlrun.new_function().run(handler=handler, params={"param_list": param_list})
> 2021-03-08 11:20:14,579 [info] starting run mlrun-ede940-handler uid=f42199eac35142268c253f72d3d12f52 DB=http://mlrun-api:8080
p1=2, p2=10, result=20
p1=3, p2=30, result=90
p1=4, p2=7, result=28
project uid iter start state name labels inputs parameters results artifacts
default 0 Mar 08 11:20:14 completed mlrun-ede940-handler
v3io_user=admin
kind=handler
owner=admin
host=jupyter-65c78cc479-jtz4j
param_list=[{'p1': 2, 'p2': 10}, {'p1': 3, 'p2': 30}, {'p1': 4, 'p2': 7}]
best_iteration=2
multiplier=90
avg_multiplier=46.0
to track results use .show() or .logs() or in CLI: 
!mlrun get run f42199eac35142268c253f72d3d12f52 --project default , !mlrun logs f42199eac35142268c253f72d3d12f52 --project default
> 2021-03-08 11:20:15,726 [info] run executed, status=completed

Parallel Execution Over Containers

When working with compute intensive or long running tasks we would like to run our iterations over a cluster of containers, on the same time we don’t want to bring up too many containers and rather limit the number of parallel tasks.

MLRun support distribution of the child runs over a Dask cluster, this is handled automatically by MLRun, the user only need to specify the Dask configuration and the level of parallelism. The execution can be controlled from the client/notebook, or can have a job (immediate or scheduled) which control the execution.

In the following example we create a new function and execute the parent/controller as an MLRun job and the different child runs over a Dask cluster (MLRun Dask function).

# mark the start of a code section which will be be sent to the job
# nuclio: start-code
import socket
import pandas as pd
def hyper_func2(context, data, p1, p2, p3):
    print(data.as_df().head())
    print(f"p2={p2}, p3={p3}, r1={p2 * p3} at {socket.gethostname()}")
    context.log_result("r1", p2 * p3)
    raw_data = {
        "first_name": ["Jason", "Molly", "Tina", "Jake", "Amy"],
        "age": [42, 52, 36, 24, 73],
        "testScore": [25, 94, 57, 62, 70],
    }
    df = pd.DataFrame(raw_data, columns=["first_name", "age", "testScore"])
    context.log_dataset("mydf", df=df, stats=True)
# nuclio: end-code

Define a Dask Cluster (using MLRun serverless Dask)

dask_cluster = mlrun.new_function("dask-cluster", kind='dask', image='mlrun/ml-models')
dask_cluster.apply(mlrun.mount_v3io())        # add volume mounts
dask_cluster.spec.service_type = "NodePort"   # open interface to the dask UI dashboard
dask_cluster.spec.replicas = 2                # define two containers
uri = dask_cluster.save()
uri
'db://default/dask-cluster'

Define the Parallel Work

We set the parallel_runs attribute to indicate how many child tasks to run in parallel, and set the dask_cluster_uri to point to our dask cluster (if we don’t set the cluster uri it will use dask local), we can also set the teardown_dask flag to indicate we want to free up all the dask resources after completion.

grid_params = {"p2": [2,1,4,1], "p3": [10,20]}
task = mlrun.new_task(params={"p1": 8}, inputs={'data': 'https://s3.wasabisys.com/iguazio/data/iris/iris_dataset.csv'})
task.with_hyper_params(
    grid_params, selector="r1", strategy="grid", parallel_runs=4, dask_cluster_uri=uri, teardown_dask=True
)
<mlrun.model.RunTemplate at 0x7ff6c6ebf650>

Define a job that will take our code (using code_to_function) and run it over the cluster

fn = mlrun.code_to_function(name='hyper-tst', kind='job', image='mlrun/ml-models')
run = fn.run(task, handler=hyper_func2)
> 2021-03-08 11:36:00,054 [info] starting run hyper-tst-hyper_func2 uid=82f0325f52724b668f0305b740f6014e DB=http://mlrun-api:8080
> 2021-03-08 11:36:00,417 [info] Job is running in the background, pod: hyper-tst-hyper-func2-rgfxj
> 2021-03-08 11:36:03,804 [info] using in-cluster config.
> 2021-03-08 11:36:10,190 [info] trying dask client at: tcp://mlrun-dask-cluster-bb923447-f.default-tenant:8786
> 2021-03-08 11:36:10,234 [info] using remote dask scheduler (mlrun-dask-cluster-bb923447-f) at: tcp://mlrun-dask-cluster-bb923447-f.default-tenant:8786
0
3                4.6               3.1  ...               0.2      0
4                5.0               3.6  ...               0.2      0

[5 rows x 5 columns]
p2=2, p3=10, r1=20 at mlrun-dask-cluster-bb923447-flxf89
> --------------- Iteration: (2) ---------------
   sepal length (cm)  sepal width (cm)  ...  petal width (cm)  label
0                5.1               3.5  ...               0.2      0
1                4.9               3.0  ...               0.2      0
2                4.7               3.2  ...               0.2      0
3                4.6               3.1  ...               0.2      0
4                5.0               3.6  ...               0.2      0

[5 rows x 5 columns]
p2=1, p3=10, r1=10 at mlrun-dask-cluster-bb923447-flxf89
> --------------- Iteration: (3) ---------------
   sepal length (cm)  sepal width (cm)  ...  petal width (cm)  label
0                5.1               3.5  ...               0.2      0
1                4.9               3.0  ...               0.2      0
2                4.7               3.2  ...               0.2      0
3                4.6               3.1  ...               0.2      0
4                5.0               3.6  ...               0.2      0

[5 rows x 5 columns]
p2=4, p3=10, r1=40 at mlrun-dask-cluster-bb923447-flxf89
> --------------- Iteration: (4) ---------------
   sepal length (cm)  sepal width (cm)  ...  petal width (cm)  label
0                5.1               3.5  ...               0.2      0
1                4.9               3.0  ...               0.2      0
2                4.7               3.2  ...               0.2      0
3                4.6               3.1  ...               0.2      0
4                5.0               3.6  ...               0.2      0

[5 rows x 5 columns]
p2=1, p3=10, r1=10 at mlrun-dask-cluster-bb923447-ftg25p
> --------------- Iteration: (5) ---------------
   sepal length (cm)  sepal width (cm)  ...  petal width (cm)  label
0                5.1               3.5  ...               0.2      0
1                4.9               3.0  ...               0.2      0
2                4.7               3.2  ...               0.2      0
3                4.6               3.1  ...               0.2      0
4                5.0               3.6  ...               0.2      0

[5 rows x 5 columns]
p2=2, p3=20, r1=40 at mlrun-dask-cluster-bb923447-flxf89
> --------------- Iteration: (6) ---------------
   sepal length (cm)  sepal width (cm)  ...  petal width (cm)  label
0                5.1               3.5  ...               0.2      0
1                4.9               3.0  ...               0.2      0
2                4.7               3.2  ...               0.2      0
3                4.6               3.1  ...               0.2      0
4                5.0               3.6  ...               0.2      0

[5 rows x 5 columns]
p2=1, p3=20, r1=20 at mlrun-dask-cluster-bb923447-flxf89
> --------------- Iteration: (7) ---------------
   sepal length (cm)  sepal width (cm)  ...  petal width (cm)  label
0                5.1               3.5  ...               0.2      0
1                4.9               3.0  ...               0.2      0
2                4.7               3.2  ...               0.2      0
3                4.6               3.1  ...               0.2      0
4                5.0               3.6  ...               0.2      0

[5 rows x 5 columns]
p2=4, p3=20, r1=80 at mlrun-dask-cluster-bb923447-flxf89
> --------------- Iteration: (8) ---------------
   sepal length (cm)  sepal width (cm)  ...  petal width (cm)  label
0                5.1               3.5  ...               0.2      0
1                4.9               3.0  ...               0.2      0
2                4.7               3.2  ...               0.2      0
3                4.6               3.1  ...               0.2      0
4                5.0               3.6  ...               0.2      0

[5 rows x 5 columns]
p2=1, p3=20, r1=20 at mlrun-dask-cluster-bb923447-ftg25p
final state: completed
project uid iter start state name labels inputs parameters results artifacts
default 0 Mar 08 11:36:03 completed hyper-tst-hyper_func2
v3io_user=admin
kind=job
owner=admin
data
p1=8
best_iteration=7
r1=80
mydf
iteration_results
to track results use .show() or .logs() or in CLI: 
!mlrun get run 82f0325f52724b668f0305b740f6014e --project default , !mlrun logs 82f0325f52724b668f0305b740f6014e --project default
> 2021-03-08 11:36:21,833 [info] run executed, status=completed