{ "cells": [ { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "# Pipelines using Dask, Kubeflow and MLRun" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Create a project to host functions, jobs and artifacts\n", "\n", "Projects are used to package multiple functions, workflows, and artifacts. Project code and definitions are usually stored in a Git archive.\n", "\n", "The following code creates a new project in a local dir and initializes git tracking on it." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "> 2022-09-27 17:26:14,808 [info] loaded project sk-project-dask from MLRun DB\n", "> 2022-09-27 17:26:14,839 [info] loaded project sk-project-dask from MLRun DB\n" ] } ], "source": [ "import os\n", "import mlrun\n", "import warnings\n", "warnings.filterwarnings(\"ignore\")\n", "\n", "# set project name and dir\n", "project_name = 'sk-project-dask'\n", "project_dir = './'\n", "\n", "# specify artifacts target location\n", "_, artifact_path = mlrun.set_environment(project=project_name)\n", "\n", "# set project\n", "sk_dask_proj = mlrun.get_or_create_project(project_name, project_dir, init_git=True)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Init Dask cluster" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import mlrun\n", "# set up function from local file\n", "dsf = mlrun.new_function(name=\"mydask\", kind=\"dask\", image=\"mlrun/ml-models\")\n", "\n", "# set up function specs for dask\n", "dsf.spec.remote = True\n", "dsf.spec.replicas = 5\n", "dsf.spec.service_type = 'NodePort'\n", "dsf.with_limits(mem=\"6G\")\n", "dsf.spec.nthreads = 5" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# apply mount_v3io over our function so that our k8s pod which run our function\n", "# will be able to access our data (shared data access)\n", "dsf.apply(mlrun.mount_v3io())" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "data": { "text/plain": [ "'db://sk-project-dask/mydask'" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dsf.save()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "> 2022-09-27 17:26:25,134 [info] trying dask client at: tcp://mlrun-mydask-d7df9301-d.default-tenant:8786\n", "> 2022-09-27 17:26:25,162 [info] using remote dask scheduler (mlrun-mydask-d7df9301-d) at: tcp://mlrun-mydask-d7df9301-d.default-tenant:8786\n" ] }, { "data": { "text/html": [ "dashboard link: default-tenant.app.alexp-edge.lab.iguazeng.com:32472" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-83392da2-3e89-11ed-b7e8-82a5d7054c46

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Direct
\n", " Dashboard: http://mlrun-mydask-d7df9301-d.default-tenant:8787/status\n", "
\n", "\n", " \n", "
\n", "

Scheduler Info

\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-b8468d53-b900-4041-9982-5e14d5e5eb81

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://10.200.152.178:8786\n", " \n", " Workers: 0\n", "
\n", " Dashboard: http://10.200.152.178:8787/status\n", " \n", " Total threads: 0\n", "
\n", " Started: Just now\n", " \n", " Total memory: 0 B\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# init dask cluster\n", "dsf.client" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Load and run a functions\n", "\n", "Load the function object from .py .yaml file or function hub (marketplace).
" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# load function from the marketplace\n", "sk_dask_proj.set_function(\"hub://describe\", name=\"describe\")\n", "sk_dask_proj.set_function(\"hub://sklearn_classifier_dask\", name=\"dask_classifier\")" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Create a fully automated ML pipeline\n", "\n", "### Add more functions to the project to be used in the pipeline (from the functions hub/marketplace)\n", "\n", "Describe data, train and eval model with dask." ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "### Define and save a pipeline\n", "\n", "The following workflow definition will be written into a file. It describes a Kubeflow execution graph (DAG) \n", "and how functions and data are connected to form an end to end pipeline. \n", "\n", "* Describe data.\n", "* Train, test and evaluate with dask.\n", "\n", "Check the code below to see how functions objects are initialized and used (by name) inside the workflow.
\n", "The `workflow.py` file has two parts, initialize the function objects and define pipeline dsl (connect the function inputs and outputs).\n", "\n", "> Note: The pipeline can include CI steps like building container images and deploying models as illustrated in the following example.\n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting workflow.py\n" ] } ], "source": [ "%%writefile workflow.py\n", "import os\n", "from kfp import dsl\n", "import mlrun\n", "\n", "# params\n", "funcs = {}\n", "LABELS = \"label\"\n", "DROP = \"congestion_surcharge\"\n", "DATA_URL = mlrun.get_sample_path(\"data/iris/iris_dataset.csv\")\n", "DASK_CLIENT = \"db://sk-project-dask/mydask\"\n", "\n", "\n", "# init functions is used to configure function resources and local settings\n", "def init_functions(functions: dict, project=None, secrets=None):\n", " for f in functions.values():\n", " f.apply(mlrun.mount_v3io())\n", " pass\n", "\n", "\n", "@dsl.pipeline(name=\"Demo training pipeline\", description=\"Shows how to use mlrun\")\n", "def kfpipeline():\n", " # Describe the data\n", " describe = funcs[\"describe\"].as_step(\n", " inputs={\"table\": DATA_URL},\n", " params={\"dask_function\": DASK_CLIENT},\n", " )\n", "\n", " # Train, test and evaluate:\n", " train = funcs[\"dask_classifier\"].as_step(\n", " name=\"train\",\n", " handler=\"train_model\",\n", " inputs={\"dataset\": DATA_URL},\n", " params={\n", " \"label_column\": LABELS,\n", " \"dask_function\": DASK_CLIENT,\n", " \"test_size\": 0.10,\n", " \"model_pkg_class\": \"sklearn.ensemble.RandomForestClassifier\",\n", " \"drop_cols\": DROP,\n", " },\n", " outputs=[\"model\", \"test_set\"],\n", " )\n", " train.after(describe)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "# register the workflow file as \"main\", embed the workflow code into the project YAML\n", "sk_dask_proj.set_workflow('main', 'workflow.py', embed=False)" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "Save the project definitions to a file (project.yaml). It is recommended to commit all changes to a Git repo." ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sk_dask_proj.save()" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "\n", "## Run a pipeline workflow\n", "Use the `run` method to execute a workflow. You can provide alternative arguments and specify the default target for workflow artifacts.
\n", "The workflow ID is returned and can be used to track the progress or you can use the hyperlinks.\n", "\n", "> Note: The same command can be issued through CLI commands:
\n", " `mlrun project my-proj/ -r main -p \"v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/\"`\n", "\n", "The `dirty` flag lets you run a project with uncommitted changes (when the notebook is in the same git dir it is always dirty)
\n", "The `watch` flag waits for the pipeline to complete and print results." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "data": { "text/html": [ "
Pipeline running (id=631ad0a3-19f1-4df0-bfa7-6c38c60275e0), click here to view the details in MLRun UI
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "image/svg+xml": [ "\n", "\n", "\n", "\n", "\n", "\n", "kfp\n", "\n", "\n", "\n", "demo-training-pipeline-48mrn-2447047633\n", "\n", "describe\n", "\n", "\n", "\n", "demo-training-pipeline-48mrn-366652794\n", "\n", "train\n", "\n", "\n", "\n", "demo-training-pipeline-48mrn-2447047633->demo-training-pipeline-48mrn-366652794\n", "\n", "\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "

Run Results

Workflow 631ad0a3-19f1-4df0-bfa7-6c38c60275e0 finished, state=Succeeded
click the hyper links below to see detailed results
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
uidstartstatenameparametersresults
Sep 27 17:27:09completedtrain
label_column=label
dask_function=db://sk-project-dask/mydask
test_size=0.1
model_pkg_class=sklearn.ensemble.RandomForestClassifier
drop_cols=congestion_surcharge
micro=0.9944598337950138
macro=0.9945823158323159
precision-0=1.0
precision-1=0.9166666666666666
precision-2=0.8
recall-0=1.0
recall-1=0.7857142857142857
recall-2=0.9230769230769231
f1-0=1.0
f1-1=0.8461538461538461
f1-2=0.8571428571428571
Sep 27 17:26:42completeddescribe
dask_function=db://sk-project-dask/mydask
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "artifact_path = os.path.abspath('./pipe/{{workflow.uid}}')\n", "run_id = sk_dask_proj.run(\n", " 'main',\n", " arguments={}, \n", " artifact_path=artifact_path, \n", " dirty=False, \n", " watch=True\n", ")" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "**[back to top](#top)**" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "mlrun-extended", "language": "python", "name": "conda-env-mlrun-extended-py" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" } }, "nbformat": 4, "nbformat_minor": 4 }