Running Dask on the cluster with MLRun#
Note
Dask is supported at the Tech Preview level only.
The Dask framework enables you to parallelize your Python code and run it as a distributed process on an Iguazio cluster and dramatically accelerate the performance.
In this notebook you’ll learn how to create a Dask cluster and then an MLRun function running as a Dask client.
It also demonstrates how to run parallelize custom algorithm using Dask Delayed option.
For more information on Dask over Kubernetes: https://kubernetes.dask.org/en/latest/.
Set up the environment#
# set mlrun api path and artifact path for logging
import mlrun
project = mlrun.get_or_create_project("dask-demo", "./")
> 2023-02-19 07:48:52,191 [info] Created and saved project dask-demo: {'from_template': None, 'overwrite': False, 'context': './', 'save': True}
> 2023-02-19 07:48:52,194 [info] created project dask-demo and saved in MLRun DB
Create and start Dask cluster#
Dask functions can be local (local workers), or remote (use containers in the cluster). In the case of remote you
can specify the number of replicas (optional) or leave blank for auto-scale.
Use the new_function()
to define the Dask cluster and set the desired configuration of that clustered function.
If the Dask workers need to access the shared file system, apply a shared volume mount (e.g. via v3io mount).
The Dask function spec has several unique attributes (in addition to the standard job attributes):
.remote — bool, use local or clustered dask
.replicas — number of desired replicas, keep 0 for auto-scale
.min_replicas, .max_replicas — set replicas range for auto-scale
.scheduler_timeout — cluster is killed after timeout (inactivity), default is ‘60 minutes’
.nthreads — number of worker threads
If you want to access the Dask dashboard or scheduler from remote you need to use NodePort service type (set .service_type
to ‘NodePort’), and the external IP need to be specified in the MLRun configuration (mlconf.remote_host). This is set automatically if you are running on an Iguazio cluster.
Specify the kind (dask) and the container image:
# create an mlrun function that will init the dask cluster
dask_cluster_name = "dask-cluster"
dask_cluster = mlrun.new_function(dask_cluster_name, kind="dask", image="mlrun/mlrun")
dask_cluster.apply(mlrun.mount_v3io())
<mlrun.runtimes.daskjob.DaskCluster at 0x7f0dabf52460>
# set range for # of replicas with replicas and max_replicas
dask_cluster.spec.min_replicas = 1
dask_cluster.spec.max_replicas = 4
# set the use of dask remote cluster (distributed)
dask_cluster.spec.remote = True
dask_cluster.spec.service_type = "NodePort"
# set dask memory and cpu limits
dask_cluster.with_worker_requests(mem="2G", cpu="2")
Initialize the Dask Cluster#
When you request the dask cluster client
attribute, it verifies that the cluster is up and running:
# init dask client and use the scheduler address as param in the following cell
dask_cluster.client
> 2023-02-19 07:49:07,462 [info] trying dask client at: tcp://mlrun-dask-cluster-bae5cf76-0.default-tenant:8786
> 2023-02-19 07:49:07,516 [info] using remote dask scheduler (mlrun-dask-cluster-bae5cf76-0) at: tcp://mlrun-dask-cluster-bae5cf76-0.default-tenant:8786
Client
Client-e3759c00-b029-11ed-86b2-6684fa230d0c
Connection method: Direct | |
Dashboard: http://mlrun-dask-cluster-bae5cf76-0.default-tenant:8787/status |
Scheduler Info
Scheduler
Scheduler-7641adf7-a399-4465-869c-d479318d6835
Comm: tcp://10.200.196.73:8786 | Workers: 0 |
Dashboard: http://10.200.196.73:8787/status | Total threads: 0 |
Started: Just now | Total memory: 0 B |
Workers
Creating a function that runs over Dask#
# mlrun: start-code
Import mlrun and dask. Nuclio is only used to convert the code into an MLRun function.
import mlrun
from dask.distributed import Client
from dask import delayed
from dask import dataframe as dd
import warnings
import numpy as np
import os
import mlrun
warnings.filterwarnings("ignore")
Python function code#
This simple function reads a .csv file using dask dataframe. It runs the groupby
and describe
functions on the dataset, and stores the results as a dataset artifact.
def test_dask(
context, dataset: mlrun.DataItem, client=None, dask_function: str = None
) -> None:
# setup dask client from the MLRun dask cluster function
if dask_function:
client = mlrun.import_function(dask_function).client
elif not client:
client = Client()
# load the dataitem as dask dataframe (dd)
df = dataset.as_df(df_module=dd)
# run describe (get statistics for the dataframe) with dask
df_describe = df.describe().compute()
# run groupby and count using dask
df_grpby = df.groupby("VendorID").count().compute()
context.log_dataset("describe", df=df_grpby, format="csv", index=True)
return
# mlrun: end-code
Test the function over Dask#
Load sample data#
DATA_URL = "/User/examples/ytrip.csv"
!mkdir -p /User/examples/
!curl -L "https://s3.wasabisys.com/iguazio/data/Taxi/yellow_tripdata_2019-01_subset.csv" > {DATA_URL}
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 84.9M 100 84.9M 0 0 7136k 0 0:00:12 0:00:12 --:--:-- 6371k
Convert the code to MLRun function#
Use code_to_function
to convert the code to MLRun and specify the configuration for the Dask process (e.g. replicas, memory etc.).
Note that the resource configurations are per worker.
# mlrun transforms the code above (up to nuclio: end-code cell) into serverless function
# which runs in k8s pods
fn = mlrun.code_to_function("test_dask", kind="job", handler="test_dask").apply(
mlrun.mount_v3io()
)
Run the function#
# function URI is db://<project>/<name>
dask_uri = f"db://{project.name}/{dask_cluster_name}"
r = fn.run(
handler=test_dask,
inputs={"dataset": DATA_URL},
params={"dask_function": dask_uri},
auto_build=True,
)
> 2023-02-19 07:49:27,208 [info] starting run test-dask-test_dask uid=a30942af70f347488daf4f653afd6c63 DB=http://mlrun-api:8080
> 2023-02-19 07:49:27,361 [info] Job is running in the background, pod: test-dask-test-dask-dqdln
Names with underscore '_' are about to be deprecated, use dashes '-' instead. Replacing underscores with dashes.
> 2023-02-19 07:49:35,137 [info] trying dask client at: tcp://mlrun-dask-cluster-bae5cf76-0.default-tenant:8786
> 2023-02-19 07:49:35,163 [info] using remote dask scheduler (mlrun-dask-cluster-bae5cf76-0) at: tcp://mlrun-dask-cluster-bae5cf76-0.default-tenant:8786
remote dashboard: default-tenant.app.vmdev94.lab.iguazeng.com:31886
> 2023-02-19 07:49:45,383 [info] To track results use the CLI: {'info_cmd': 'mlrun get run a30942af70f347488daf4f653afd6c63 -p dask-demo', 'logs_cmd': 'mlrun logs a30942af70f347488daf4f653afd6c63 -p dask-demo'}
> 2023-02-19 07:49:45,384 [info] Or click for UI: {'ui_url': 'https://dashboard.default-tenant.app.vmdev94.lab.iguazeng.com/mlprojects/dask-demo/jobs/monitor/a30942af70f347488daf4f653afd6c63/overview'}
> 2023-02-19 07:49:45,384 [info] run executed, status=completed
final state: completed
project | uid | iter | start | state | name | labels | inputs | parameters | results | artifacts |
---|---|---|---|---|---|---|---|---|---|---|
dask-demo | 0 | Feb 19 07:49:35 | completed | test-dask-test_dask | v3io_user=dani kind=job owner=dani mlrun/client_version=1.3.0-rc23 mlrun/client_python_version=3.9.16 host=test-dask-test-dask-dqdln |
dataset |
dask_function=db://dask-demo/dask-cluster |
describe |
> 2023-02-19 07:49:45,730 [info] run executed, status=completed
Track the progress in the UI#
You can view the progress and detailed information in the MLRun UI by clicking on the uid above.
To track the dask progress: in the Dask UI click the “dashboard link” above the “client” section.