{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Pipelines using Dask, Kubeflow and MLRun" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a project to host functions, jobs and artifacts\n", "\n", "Projects are used to package multiple functions, workflows, and artifacts. Project code and definitions are usually stored in a Git archive.\n", "\n", "The following code creates a new project in a local dir and initializes git tracking on it." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "> 2021-01-24 16:39:27,665 [warning] Failed resolving version info. Ignoring and using defaults\n", "> 2021-01-24 16:39:29,248 [warning] Unable to parse server or client version. Assuming compatible: {'server_version': 'unstable', 'client_version': 'unstable'}\n" ] } ], "source": [ "import os\n", "import mlrun\n", "import warnings\n", "warnings.filterwarnings(\"ignore\")\n", "\n", "# set project name and dir\n", "project_name = 'sk-project-dask'\n", "project_dir = './project'\n", "\n", "# specify artifacts target location\n", "_, artifact_path = mlrun.set_environment(project=project_name)\n", "\n", "# set project\n", "sk_dask_proj = mlrun.new_project(project_name, project_dir, init_git=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Init Dask cluster" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "> 2021-01-24 16:39:36,831 [info] using in-cluster config.\n" ] } ], "source": [ "import mlrun\n", "# set up function from local file\n", "dsf = mlrun.new_function(name=\"mydask\", kind=\"dask\", image=\"mlrun/ml-models\")\n", "\n", "# set up function specs for dask\n", "dsf.spec.remote = True\n", "dsf.spec.replicas = 5\n", "dsf.spec.service_type = 'NodePort'\n", "dsf.with_limits(mem=\"6G\")\n", "dsf.spec.nthreads = 5" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# apply mount_v3io over our function so that our k8s pod which run our function\n", "# will be able to access our data (shared data access)\n", "dsf.apply(mlrun.mount_v3io())" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'52f5dcddb916b12943e9d44e9e2b75f48e286ec7'" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dsf.save()" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "> 2021-01-24 20:15:37,716 [info] trying dask client at: tcp://mlrun-mydask-997e6385-a.default-tenant:8786\n", "> 2021-01-24 20:15:48,564 [warning] remote scheduler at tcp://mlrun-mydask-997e6385-a.default-tenant:8786 not ready, will try to restart Timed out trying to connect to 'tcp://mlrun-mydask-997e6385-a.default-tenant:8786' after 10 s: Timed out trying to connect to 'tcp://mlrun-mydask-997e6385-a.default-tenant:8786' after 10 s: [Errno -2] Name or service not known\n", "> 2021-01-24 20:15:54,442 [info] using remote dask scheduler (mlrun-mydask-b4eb4ec5-8) at: tcp://mlrun-mydask-b4eb4ec5-8.default-tenant:8786\n" ] }, { "data": { "text/html": [ "dashboard link: default-tenant.app.yh57.iguazio-cd0.com:30975" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 0
  • \n", "
  • Cores: 0
  • \n", "
  • Memory: 0 B
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# init dask cluster\n", "dsf.client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load and run a functions\n", "\n", "Load the function object from .py .yaml file or function hub (marketplace).
" ] }, { "cell_type": "code", "execution_count": 63, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 63, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# load function from the marketplace\n", "sk_dask_proj.set_function('hub://describe_dask', name='describe')\n", "sk_dask_proj.set_function('hub://sklearn_classifier_dask', name='dask_classifier')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sk_dask_proj.set_function('/User/dask/04-describe.py', name='describe', kind='job', image='mlrun/ml-models')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a fully automated ML pipeline\n", "\n", "### Add more functions to the project to be used in the pipeline (from the functions hub/marketplace)\n", "\n", "Describe data, train and eval model with dask." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define and save a pipeline\n", "\n", "The following workflow definition will be written into a file. It describes a Kubeflow execution graph (DAG) \n", "and how functions and data are connected to form an end to end pipeline. \n", "\n", "* Ingest data\n", "* Describe data\n", "* Train, test and evaluate with dask\n", "\n", "Check the code below to see how functions objects are initialized and used (by name) inside the workflow.
\n", "The `workflow.py` file has two parts, initialize the function objects and define pipeline dsl (connect the function inputs and outputs).\n", "\n", "> Note: The pipeline can include CI steps like building container images and deploying models as illustrated in the following example.\n" ] }, { "cell_type": "code", "execution_count": 64, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting project/workflow.py\n" ] } ], "source": [ "%%writefile project/workflow.py\n", "from kfp import dsl\n", "from mlrun import mount_v3io\n", "\n", "# params\n", "funcs = {}\n", "LABELS = \"label\"\n", "DROP = 'congestion_surcharge'\n", "#DATA_URL = \"/User/iris.csv\"\n", "DATA_URL = \"/User/iris.csv\"\n", "DASK_CLIENT = \"db://sk-project-dask/mydask\"\n", "\n", "# init functions is used to configure function resources and local settings\n", "def init_functions(functions: dict, project=None, secrets=None):\n", " for f in functions.values():\n", " f.apply(mount_v3io())\n", " pass\n", " \n", "@dsl.pipeline(\n", " name=\"Demo training pipeline\",\n", " description=\"Shows how to use mlrun\"\n", ")\n", "def kfpipeline():\n", " \n", " # describe data\n", " describe = funcs['describe'].as_step(\n", " params={\"dask_function\" : DASK_CLIENT},\n", " inputs={\"dataset\" : DATA_URL}\n", " )\n", " \n", " # get data, train, test and evaluate \n", " train = funcs['dask_classifier'].as_step(\n", " name=\"train\",\n", " handler=\"train_model\",\n", " params={\"label_column\" : LABELS,\n", " \"dask_function\" : DASK_CLIENT,\n", " \"test_size\" : 0.10,\n", " \"model_pkg_class\" : \"sklearn.ensemble.RandomForestClassifier\",\n", " \"drop_cols\" : DROP},\n", " inputs={\"dataset\" : DATA_URL},\n", " outputs=['model', 'test_set']\n", " )\n", " \n", " train.after(describe)" ] }, { "cell_type": "code", "execution_count": 65, "metadata": {}, "outputs": [], "source": [ "# register the workflow file as \"main\", embed the workflow code into the project YAML\n", "sk_dask_proj.set_workflow('main', 'workflow.py', embed=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Save the project definitions to a file (project.yaml). It is recommended to commit all changes to a Git repo." ] }, { "cell_type": "code", "execution_count": 66, "metadata": {}, "outputs": [], "source": [ "sk_dask_proj.save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Run a pipeline workflow\n", "Use the `run` method to execute a workflow. You can provide alternative arguments and specify the default target for workflow artifacts.
\n", "The workflow ID is returned and can be used to track the progress or you can use the hyperlinks.\n", "\n", "> Note: The same command can be issued through CLI commands:
\n", " `mlrun project my-proj/ -r main -p \"v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/\"`\n", "\n", "The `dirty` flag lets you run a project with uncommitted changes (when the notebook is in the same git dir it is always dirty)
\n", "The `watch` flag waits for the pipeline to complete and print results." ] }, { "cell_type": "code", "execution_count": 67, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Experiment link here" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "Run link here" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "> 2021-01-24 21:30:12,077 [info] Pipeline run id=c1b351fc-073b-4cdd-a6c3-fc167afbce8e, check UI or DB for progress\n", "> 2021-01-24 21:30:12,079 [info] waiting for pipeline run completion\n" ] }, { "data": { "text/html": [ "

Run Results

Workflow c1b351fc-073b-4cdd-a6c3-fc167afbce8e finished, status=Succeeded
click the hyper links below to see detailed results
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
uidstartstatenameresultsartifacts
Jan 24 21:30:37completedtrain
micro=0.9979224376731302
macro=1.0
precision-1=1.0
precision-0=1.0
precision-2=0.8571428571428571
recall-1=1.0
recall-0=0.8461538461538461
recall-2=1.0
f1-1=1.0
f1-0=0.9166666666666666
f1-2=0.923076923076923
ROCAUC
ClassificationReport
ConfusionMatrix
FeatureImportances
model
standard_scaler
label_encoder
test_set
Jan 24 21:30:20completeddescribe-dask
scale_pos_weight=1.00
histograms
imbalance
correlation
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "artifact_path = os.path.abspath('./pipe/{{workflow.uid}}')\n", "run_id = sk_dask_proj.run(\n", " 'main',\n", " arguments={}, \n", " artifact_path=artifact_path, \n", " dirty=False, watch=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**[back to top](#top)**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" } }, "nbformat": 4, "nbformat_minor": 4 }