MLRun cheat sheet#

Table of contents#

MLRun setup#

Docs: Set up your client environment, Installation and setup guide

MLRun server/client overview#

MLRun has two main components, the service and the client (SDK+UI):

  • MLRun service runs over Kubernetes (can also be deployed using local Docker for demo and test purposes) - see installation documentation for more information

  • MLRun client SDK is installed in your development environment via pip and interacts with the service using REST API calls

Remote connection (laptop, CI/CD, etc.)#

Docs: Configure remote environment

Localhost: Create a mlrun.env file for environment variables. MLRUN_DBPATH saves the URL endpoint of the MLRun APIs service endpoint. Since it is localhost, username and access_key are not required:

mlrun config set -a http://localhost:8080
# MLRun DB
MLRUN_DBPATH=<URL endpoint of the MLRun APIs service endpoint; e.g., "https://mlrun-api.default-tenant.app.mycluster.iguazio.com">

Iguazio MLOps Platform (not MLRun CE).

mlrun config set -a https://mlrun-api.default-tenant.app.xxx.iguazio-cd1.com -u joe -k mykey -e 
# this is another env file
V3IO_USERNAME=joe
V3IO_ACCESS_KEY=mykey
MLRUN_DBPATH=https://mlrun-api.default-tenant.app.xxx.iguazio-cd1.com

Connect via MLRun Python SDK:

# Use local service
mlrun.set_environment("http://localhost:8080", artifact_path="./")
# Use remote service
mlrun.set_environment("<remote-service-url>", access_key="xyz", username="joe")

MLRun projects#

Docs: Projects and automation

General workflow#

Docs: Create, save, and use projects

# Create or load a project
project = mlrun.get_or_create_project(name="my-project", context="./")

# Add a function to the project
project.set_function(
    name="train_model", func="train_model.py", kind="job", image="mlrun/mlrun"
)

# Add a workflow (pipeline) to the project
project.set_workflow(name="training_pipeline", workflow_path="straining_pipeline.py")

# Save the project and generate the project.yaml file
project.save()

# Run pipeline via project
project.run(name="training_pipeline", arguments={...})

Git integration#

Docs: Create and use functions

An MLRun project can be backed by a Git repo. Functions consume the repo and pull the code either: once when Docker image is built (production workflow); or at runtime (development workflow).

Pull the repo code once (bake into Docker image)#

project.set_source(source="git://github.com/mlrun/project-archive.git")

fn = project.set_function(
    name="myjob",
    handler="job_func.job_handler",
    image="mlrun/mlrun",
    kind="job",
    with_repo=True,
)

project.build_function(fn)

Pull the repo code at runtime#

project.set_source(
    source="git://github.com/mlrun/project-archive.git", pull_at_runtime=True
)

fn = project.set_function(
    name="nuclio",
    handler="nuclio_func:nuclio_handler",
    image="mlrun/mlrun",
    kind="nuclio",
    with_repo=True,
)

CI/CD integration#

Overview#

Docs: CD/CD automation with Git, Run pipelines with GitHub Actions, GitLab

Best practice for working with CI/CD is using MLRun Projects with a combination of the following:

  • Git: Single source of truth for source code and deployments via infrastructure as code. Allows for collaboration between multiple developers. An MLRun project can (and should) be tied to a Git repo. One project maps to one Git repo.

  • CI/CD: Main tool for orchestrating production deployments. The CI/CD system should be responsible for deploying latest code changes from Git onto the remote cluster via MLRun Python SDK or CLI.

  • Iguazio/MLRun: Kubernetes-based compute environment for running data analytics, model training, or model deployment tasks. Additionally, the cluster is where all experiment tracking, job information, logs, and more, is located.

See MLRun Projects for more information on Git and CI/CD integration. In practice, this may look something like the following:

Example (GitHub Actions)#

Full example: MLRun project-demo

name: mlrun-project-workflow
on: [issue_comment]

jobs:
  submit-project:
    if: github.event.issue.pull_request != null && startsWith(github.event.comment.body, '/run')
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v3
    - name: Set up Python 3.9
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'
        architecture: 'x64'
    
    - name: Install mlrun
      run: python -m pip install pip install mlrun
    - name: Submit project
      run: python -m mlrun project ./ --watch --run main ${CMD:5}
      env:
        V3IO_USERNAME: ${{ secrets.V3IO_USERNAME }}
        V3IO_API: ${{ secrets.V3IO_API }}
        V3IO_ACCESS_KEY: ${{ secrets.V3IO_ACCESS_KEY }}
        MLRUN_DBPATH: ${{ secrets.MLRUN_DBPATH }}
        GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} 
        SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
        CMD: ${{ github.event.comment.body}}

Secrets#

Docs: Working with secrets

# Add secrets to the project
project.set_secrets(secrets={"AWS_KEY": "111222333"}, provider="kubernetes")

# Run the job with all secrets (automatically injects all project secrets for non-local runtimes)
project.run_function(fn)

# Retrieve the secret within the job
context.get_secret("AWS_KEY")

MLRun functions#

Essential runtimes#

Docs: Kinds of functions (runtimes)

Job#

# Job - run once to completion
job = project.set_function(
    name="my-job", func="my_job.py", kind="job", image="mlrun/mlrun", handler="handler"
)
project.run_function(job)

Nuclio#

# Nuclio - generic real-time function to do something when triggered
nuclio = project.set_function(
    name="my-nuclio",
    func="my_nuclio.py",
    kind="nuclio",
    image="mlrun/mlrun",
    handler="handler",
)
project.deploy_function(nuclio)

Serving#

# Serving - specialized Nuclio function specifically for model serving
serving = project.set_function(
    name="my-serving",
    func="my_serving.py",
    kind="serving",
    image="mlrun/mlrun",
    handler="handler",
)
serving.add_model(
    key="iris",
    model_path="https://s3.wasabisys.com/iguazio/models/iris/model.pkl",
    model_class="ClassifierModel",
)
project.deploy_function(serving)

Distributed runtimes#

Docs: Kinds of functions (runtimes)

MPIJob (Horovod)#

project = mlrun.get_or_create_project("mpijob")
mpijob = project.set_function(
    name="my-mpijob",
    filename="my_mpijob.py",
    kind="mpijob",
    image="mlrun/mlrun",
    handler="handler",
)
mpijob.spec.replicas = 3
mpijob.run()

Dask#

project = mlrun.get_or_create_project("dask")
dask = project.set_function(name="my-dask", kind="dask", image="mlrun/ml-base")
dask.spec.remote = True
dask.spec.replicas = 5
dask.spec.service_type = "NodePort"
dask.with_worker_limits(mem="6G")
dask.with_scheduler_limits(mem="1G")
dask.spec.nthreads = 5
dask.apply(mlrun.mount_v3io())
dask.client

Spark Operator#

import os

read_csv_filepath = os.path.join(os.path.abspath("."), "spark_read_csv.py")
project = mlrun.get_or_create_project("spark")
spark = project.set_function(
    kind="spark", command=read_csv_filepath, name="sparkreadcsv"
)
spark.with_driver_limits(cpu="1300m")
spark.with_driver_requests(cpu=1, mem="512m")
spark.with_executor_limits(cpu="1400m")
spark.with_executor_requests(cpu=1, mem="512m")
spark.with_igz_spark()
spark.spec.replicas = 2

spark.deploy()  # build image
spark.run(artifact_path="/User")  # run spark job

Resource management#

Docs: Managing job resources

Requests/limits (MEM/CPU/GPU)#

# Requests - lower bound
fn.with_requests(mem="1G", cpu=1)

# Limits - upper bound
fn.with_limits(mem="2G", cpu=2, gpus=1)

Scaling and auto-scaling#

# Nuclio/serving scaling
fn.spec.replicas = 2
fn.spec.min_replicas = 1
fn.spec.max_replicas = 4

Scale to zero#

# Nuclio/serving scaling
fn.spec.min_replicas = 0  # zero value is mandatory for scale to zero
fn.spec.max_replicas = 2

# Scaling to zero in case of 30 minutes (idle-time duration)
fn.set_config(
    key="spec.scaleToZero.scaleResources",
    value=[
        {
            "metricName": "nuclio_processor_handled_events_total",
            "windowSize": "30m",  # default values are 1m, 2m, 5m, 10m, 30m
            "threshold": 0,
        }
    ],
)

Mount persistent storage#

# Mount Iguazio V3IO
fn.apply(mlrun.mount_v3io())

# Mount PVC
fn.apply(
    mlrun.platforms.mount_pvc(
        pvc_name="data-claim", volume_name="data", volume_mount_path="/data"
    )
)

Pod priority#

fn.with_priority_class(name="igz-workload-medium")

Node selection#

fn.with_node_selection(node_selector={"app.iguazio.com/lifecycle": "non-preemptible"})

Serving/Nuclio triggers#

Docs: Nuclio Triggers

By default, Nuclio deploys a default HTTP trigger if the function doesn't have one. This is because users typically want to invoke functions through HTTP. However, we provide a way to disable the default HTTP trigger using: function.disable_default_http_trigger()

Note

disable_default_http_trigger is supported from Nuclio 1.13.1.

Also, you can explicitly enable the default HTTP trigger creation with: function.enable_default_http_trigger()

If you didn't set this parameter explicitly, the value is taken from Nuclio platform configuration. Therefore, if you haven't disabled the default HTTP trigger, don't have a custom one, and are unable to invoke the function, we recommend checking the Nuclio platform configuration.

import nuclio

serve = mlrun.import_function("hub://v2_model_server")

# Set amount of workers
serve.with_http(workers=8, worker_timeout=10)

# V3IO stream trigger
serve.add_v3io_stream_trigger(
    stream_path="v3io:///projects/myproj/stream1",
    name="stream",
    group="serving",
    seek_to="earliest",
    shards=1,
)

# Kafka stream trigger
serve.add_trigger(
    name="kafka",
    spec=nuclio.KafkaTrigger(
        brokers=["192.168.1.123:39092"],
        topics=["TOPIC"],
        partitions=4,
        consumer_group="serving",
        initial_offset="earliest",
    ),
)

# Cron trigger
serve.add_trigger("cron_interval", spec=nuclio.CronTrigger(interval="10s"))
serve.add_trigger("cron_schedule", spec=nuclio.CronTrigger(schedule="0 9 * * *"))

Note

The worker uses separate worker scope. This means that each worker has a copy of the variables, and all changes are kept within the worker (change by worker x, do not affect worker y).

Building Docker images#

Docs: Build function image, Images and their usage in MLRun

Manually build image#

project.set_function(
    "train_code.py",
    name="trainer",
    kind="job",
    image="mlrun/mlrun",
    handler="train_func",
    requirements=["pandas==1.3.5"],
)

project.build_function(
    "trainer",
    # Specify base image
    base_image="myrepo/base_image:latest",
    # Run arbitrary commands
    commands=[
        "pip install git+https://github.com/myusername/myrepo.git@mybranch",
        "mkdir -p /some/path && chmod 0777 /some/path",
    ],
)

Automatically build image#

project.set_function(
    "train_code.py",
    name="trainer",
    kind="job",
    image="mlrun/mlrun",
    handler="train_func",
    requirements=["pandas==1.3.5"],
)

# auto_build will trigger building the image before running,
# due to the additional requirements.
project.run_function("trainer", auto_build=True)

Multi-stage workflows (batch pipelines)#

Docs: Running a multi-stage workflow

Write a workflow#

# pipeline.py
from kfp import dsl
import mlrun
import nuclio


# Create a Kubeflow Pipelines pipeline
@dsl.pipeline(
    name="batch-pipeline",
    description="Example of batch pipeline for heart disease dataset",
)
def pipeline(source_url, label_column):

    # Get current project
    project = mlrun.get_current_project()

    # Ingest the data set
    ingest = mlrun.run_function(
        "get-data",
        handler="prep_data",
        inputs={"source_url": source_url},
        params={"label_column": label_column},
        outputs=["cleaned_data"],
    )

    # Train a model
    train = mlrun.run_function(
        "train",
        handler="train_model",
        inputs={"dataset": ingest.outputs["cleaned_data"]},
        params={"label_column": label_column},
        outputs=["model"],
    )

Add workflow to project#

# Functions within the workflow
project.set_function(
    name="get-data", func="get_data.py", kind="job", image="mlrun/mlrun"
)
project.set_function(name="train", func="train.py", kind="job", image="mlrun/mlrun")

# Workflow
project.set_workflow(name="main", workflow_path="pipeline.py")

project.save()

Run workflow#

Python SDK

run_id = project.run(
    name="main",
    arguments={
        "source_url": "store://feature-vectors/heart-disease-classifier/heart-disease-vec:latest",
        "label_column": "target",
    },
)

CLI

mlrun project --run main \
    --arguments source_url=store://feature-vectors/heart-disease-classifier/heart-disease-vec:latest \
    --arguments label_column=target

Schedule workflow#

run_id = project.run(
    name="main",
    arguments={
        "source_url": "store://feature-vectors/heart-disease-classifier/heart-disease-vec:latest",
        "label_column": "target",
    },
    schedule="0 * * * *",
)

Logging#

Docs: MLRun execution context

context.logger.debug(
    message="Debugging info"
)  # logging all (debug, info, warning, error)
context.logger.info(message="Something happened")  # logging info, warning and error
context.logger.warning(message="Something might go wrong")  # logging warning and error
context.logger.error(message="Something went wrong")  # logging only error

Note

The real-time (nuclio) function uses default logger level debug (logging all)

Experiment tracking#

Docs: MLRun execution context, Automated experiment tracking, Decorators and auto-logging

Manual logging#

context.log_result(key="accuracy", value=0.934)
context.log_model(key="model", model_file="model.pkl")
context.log_dataset(key="model", df=df, format="csv", index=False)

Track returning values using hints and returns#

  • Pass type hints into the inputs parameter of the run method. Inputs are automatically parsed to their hinted type. If type hints are not in code, they can be passed in the input keys. Hints use the structure: key : type_hint

  • Pass log hints: how to log the returning values from a handler. The log hints are passed via the returns parameter in the run method. A log hint can be passed as a string or a dictionary.

  • Use the returns argument to specify how to log a function's returned values.

def my_handler(df):
    ...
    return processed_df, result
    
log_with_returns_run = my_func.run(
    handler="my_handler",
    inputs={"df: pandas.DataFrame": DATA_PATH},
    returns=["processed_data", "sum"],
    local=True,
)

Automatic logging#

# Auto logging for ML frameworks
from mlrun.frameworks.sklearn import apply_mlrun

apply_mlrun(model=model, model_name="my_model", x_test=X_test, y_test=y_test)
model.fit(X_train, y_train)


# MLRun decorator for input/output parsing
@mlrun.handler(
    labels={"framework": "scikit-learn"},
    outputs=["prediction:dataset"],
    inputs={"train_data": pd.DataFrame, "predict_input": pd.DataFrame},
)
def train_and_predict(train_data, predict_input, label_column="label"):

    x = train_data.drop(label_column, axis=1)
    y = train_data[label_column]

    clf = SVC()
    clf.fit(x, y)

    return list(clf.predict(predict_input))

Model inferencing and serving#

Docs: Deploy models and applications

Real-time serving#

Docs: Using built-in model serving classes, Build your own model serving class, Model serving API

serve = mlrun.import_function("hub://v2_model_server")
serve.add_model(
    key="iris", model_path="https://s3.wasabisys.com/iguazio/models/iris/model.pkl"
)

# Deploy to local mock server (Development testing)
mock_server = serve.to_mock_server()

# Deploy to serverless function (Production K8s deployment)
addr = serve.deploy()

Batch inferencing#

Docs: Batch inference

batch_inference = mlrun.import_function("hub://batch_inference")
batch_run = project.run_function(
    batch_inference,
    inputs={"dataset": prediction_set_path},
    params={"model": model_artifact.uri},
)

Model monitoring and drift detection#

Docs: Model monitoring, Batch inference

Real-time drift detection#

# Log the model with training set
context.log_model("model", model_file="model.pkl", training_set=X_train)

# Enable tracking for the model server
serving_fn = import_function("hub://v2_model_server", project=project_name).apply(
    auto_mount()
)
serving_fn.add_model(
    "model", model_path="store://models/project-name/model:latest"
)  # Model path comes from experiment tracking DB

# Enable monitoring
serving_fn.set_tracking()

# Deploy the model server
serving_fn.deploy()

Batch drift detection#

batch_inference = mlrun.import_function("hub://batch_inference_v2")
batch_run = project.run_function(
    batch_inference,
    inputs={"dataset": prediction_set_path, "sample_set": training_set_path},
    params={
        "model": model_artifact.uri,
        "label_columns": "label",
        "perform_drift_analysis": True,
    },
)

Alerts and notifications#

Docs: Alerts, Notifications

Alerts#

alert_data = mlrun.alerts.alert.AlertConfig(
    project=project_name,
    name=alert_name,
    summary=alert_summary,
    severity=alert_objects.AlertSeverity.LOW,
    entities=alert_objects.EventEntities(
        kind=entity_kind, project=project_name, ids=[result_endpoint]
    ),
    trigger=alert_objects.AlertTrigger(events=[event_name]),
    criteria=None,
    notifications=notifications,
)

Notifications#

notification = mlrun.model.Notification(
    kind="slack",
    name="slack_notification",
    message="A drift was detected",
    severity="warning",
    when=["now"],
    condition="failed",
    secret_params={
        "webhook": "https://hooks.slack.com/",
    },
).to_dict()

Sources and targets#

Abstract underlying storage to easily retrieve and store data from various sources

Docs: Ingest data using the feature store

Sources#

Docs: Sources

from mlrun.datastore.sources import (
    CSVSource,
    ParquetSource,
    BigQuerySource,
    KafkaSource,
)

# CSV
csv_source = CSVSource(name="read", path="/User/getting_started/examples/demo.csv")
csv_df = csv_source.to_dataframe()

# Parquet
from pyspark.sql import SparkSession

session = SparkSession.builder.master("local").getOrCreate()
parquet_source = ParquetSource(
    name="read", path="v3io:///users/admin/getting_started/examples/userdata1.parquet"
)
spark_df = parquet_source.to_spark_df(session=session)

# BigQuery
bq_source = BigQuerySource(
    name="read", table="the-psf.pypi.downloads20210328", gcp_project="my_project"
)
bq_df = bq_source.to_dataframe()

# Kafka
kafka_source = KafkaSource(
    name="read",
    brokers="localhost:9092",
    topics="topic",
    group="serving",
    initial_offset="earliest",
)
kafka_source.add_nuclio_trigger(function=fn)

# Snowflake
snowflake_source = SnowflakeSource(
    name="read",
    query="select * from customer limit 100000",
    url="<url>",
    user="<user>",
    password="<password>",
    database="SNOWFLAKE_SAMPLE_DATA",
    db_schema="TPCH_SF1",
    warehouse="compute_wh",
)
snowflake_df = snowflake_source.to_dataframe()

Targets#

Docs: Targets, Partitioning on Parquet target

from mlrun.datastore.targets import CSVTarget, ParquetTarget

# CSV
csv_target = CSVTarget(name="write", path="/User/test.csv")
csv_target.write_dataframe(df=csv_df, key_column="id")

# Parquet
pq_target = ParquetTarget(
    name="write",
    path="/User/test.parquet",
    partitioned=True,
    partition_cols=["country"],
)
pq_target.write_dataframe(df=pq_df, key_column="id")

# Redis (see docs for writing online features)
redis_target = RedisNoSqlTarget(name="write", path="redis://1.2.3.4:6379")
redis_target.write_dataframe(df=redis_df)

# Kafka (see docs for writing online features)
kafka_target = KafkaTarget(
    name="write",
    brokers="localhost:9092",
    path="topic",
)
kafka_target.write_dataframe(df=kafka_df)

Feature store#

Docs: Feature Store, Feature sets, Feature set transformations, Creating and using feature vectors, Feature store end-to-end demo

Definitions#

Docs: Feature store overview

  • Feature Set: A group of features that can be ingested together and stored in logical group (usually one-to-one with a dataset, stream, table, etc.)

  • Feature Vector: A group of features from different Feature Sets

Engines#

Docs: Feature store overview, Ingest features with Spark

  • storey engine (default) is designed for real-time data (e.g. individual records) that will be transformed using Python functions and classes.

  • pandas engine is designed for batch data that can fit into memory that will be transformed using Pandas dataframes. Pandas is used for testing, and is not recommended for production deployments

  • spark engine is designed for batch data.

Feature sets#

Docs: Feature sets

Basic ingestion#

Docs: Ingest data using the feature store

import mlrun.feature_store as fstore
from mlrun.datastore.sources import ParquetSource

categorical_fset = fstore.FeatureSet(
    name="heart-disease-categorical",
    entities=[fstore.Entity("patient_id")],
    description="Categorical columns for heart disease dataset",
)

categorical_fset.ingest(
    source=ParquetSource(path="./data/heart_disease_categorical.parquet")
)

Feature set per engine#

from mlrun.datastore.sources import DataFrameSource

# Storey engine
storey_set = fstore.FeatureSet(
    name="heart-disease-storey",
    entities=[fstore.Entity("patient_id")],
    description="Heart disease data via storey engine",
    engine="storey",
)
storey_set.ingest(source=DataFrameSource(df=data))

# Pandas engine
pandas_set = fstore.FeatureSet(
    name="heart-disease-pandas",
    entities=[fstore.Entity("patient_id")],
    description="Heart disease data via pandas engine",
    engine="pandas",
)
pandas_set.ingest(source=DataFrameSource(df=data))

# Spark engine
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Spark function").getOrCreate()

spark_set = fstore.FeatureSet(
    name="heart-disease-spark",
    entities=[fstore.Entity("patient_id")],
    description="Heart disease data via spark engine",
    engine="spark",
)
spark_set.ingest(source=CSVSource(path=v3io_data_path), spark_context=spark)

Ingestion methods#

Docs: Ingest data locally, Ingest data using an MLRun job, Real-time ingestion, Incremental ingestion, Feature store end-to-end demo

# Local
from mlrun.datastore.sources import CSVSource

fs = fstore.FeatureSet("stocks", entities=[fstore.Entity("ticker")])
df = fs.ingest(source=CSVSource("mycsv", path="stocks.csv"))

# Job
from mlrun.datastore.sources import ParquetSource

fs = fstore.FeatureSet("stocks", entities=[fstore.Entity("ticker")])
df = fs.ingest(
    source=ParquetSource("mypq", path="stocks.parquet"),
    run_config=fstore.RunConfig(image="mlrun/mlrun"),
)

# Real-Time
from mlrun.datastore.sources import HttpSource

fs = fstore.FeatureSet("stocks", entities=[fstore.Entity("ticker")])
url, _ = fs.deploy_ingestion_service(
    source=HttpSource(key_field="ticker"),
    run_config=fstore.RunConfig(image="mlrun/mlrun", kind="serving"),
)

# Incremental
cron_trigger = "* */1 * * *"  # will run every hour
fs = fstore.FeatureSet("stocks", entities=[fstore.Entity("ticker")])
fset.ingest(
    source=ParquetSource(
        "mypq", path="stocks.parquet", time_field="time", schedule=cron_trigger
    ),
    run_config=fstore.RunConfig(image="mlrun/mlrun"),
)

Aggregations#

Docs: add_aggregation(), Aggregations

quotes_set = fstore.FeatureSet("stock-quotes", entities=[fstore.Entity("ticker")])
quotes_set.add_aggregation("bid", ["min", "max"], ["1h"], "10m")

Built-in transformations#

Docs: storey.transformations, Built-in transformations

quotes_set.graph.to("storey.Filter", "filter", _fn="(event['bid'] > 50)")

Custom transformations#

Docs: Custom transformations

Define transformation

# Storey
class MyMapStorey(MapClass):
    def __init__(self, multiplier=1, **kwargs):
        super().__init__(**kwargs)
        self._multiplier = multiplier

    def do(self, event):
        event["multi"] = event["bid"] * self._multiplier
        return event


# Pandas
class MyMapPandas:
    def __init__(self, multiplier=1, **kwargs):
        self._multiplier = multiplier

    def do(self, df):
        df["multi"] = df["bid"] * self._multiplier
        return df


# Spark
class MyMapSpark:
    def __init__(self, multiplier=1, **kwargs):
        self._multiplier = multiplier

    def do(self, df):
        df = df.withColumn("multi", df["bid"] * self._multiplier)
        return df

Use in graph

quotes_set.graph.add_step("MyMapStorey", "multi", after="filter", multiplier=3)

Feature vectors#

Docs: Feature vectors

Basic retrieval#

import mlrun.feature_store as fstore
from mlrun.datastore.targets import ParquetTarget

fvec = fstore.FeatureVector(
    name="heart-disease-vector",
    features=["heart-disease-categorical.*", "heart-disease-continuous.*"],
    description="Heart disease dataset",
)
fvec.save()

# Instantiate feature-vector from mlrun DB
fvec = fstore.get_feature_vector("iguazio-academy/heart-disease-vector")

# Offline features for training
df = fvec.get_offline_features().to_dataframe()

# Materialize offline features to parquet
fvec.get_offline_features(target=ParquetTarget())

# Online features for serving
feature_service = fvec.get_online_feature_service().feature_service.get(
    [
        {"patient_id": "e443544b-8d9e-4f6c-9623-e24b6139aae0"},
        {"patient_id": "8227d3df-16ab-4452-8ea5-99472362d982"},
    ]
)

Real-time pipelines#

Docs: Real-time serving pipelines, Real-time pipeline use cases, Graph concepts and state machine, Model serving graph, Writing custom steps

Definitions#

Graphs are composed of the following:

  • Step: A step runs a function or class handler or a REST API call

  • Router: A special type of step with routing logic and multiple child routes/models

  • Queue: A queue or stream that accepts data from one or more source steps and publishes to one or more output steps

Graphs have two modes (topologies):

  • Router topology (default): A minimal configuration with a single router and child tasks/routes

  • Flow topology: A full graph/DAG

Simple graph#

Docs: Real-time serving pipelines getting started

Define Python file(s) to orchestrate

# graph.py
def inc(x):
    return x + 1


def mul(x):
    return x * 2


class WithState:
    def __init__(self, name, context, init_val=0):
        self.name = name
        self.context = context
        self.counter = init_val

    def do(self, x):
        self.counter += 1
        print(f"Echo: {self.name}, x: {x}, counter: {self.counter}")
        return x + self.counter

Define MLRun function and graph

import mlrun

fn = project.set_function(
    name="simple-graph", func="graph.py", kind="serving", image="mlrun/mlrun"
)
graph = fn.set_topology("flow")

# inc, mul, and WithState are all defined in graph.py
graph.to(name="+1", handler="inc").to(name="*2", handler="mul").to(
    name="(X+counter)", class_name="WithState"
).respond()

# Local testing
server = fn.to_mock_server()
server.test(body=5)

# K8s deployment
project.deploy_function(fn)

Simple model serving router#

Docs: Example of a simple model serving router

# load the sklearn model serving function and add models to it
fn = mlrun.import_function("hub://v2_model_server")
fn.add_model("model1", model_path="s3://...")
fn.add_model("model2", model_path="store://...")

# deploy the function to the cluster
project.deploy_function(fn)

# test the live model endpoint
fn.invoke("/v2/models/model1/infer", body={"inputs": [5]})

Custom model serving class#

Docs: Model serving graph

from cloudpickle import load
from typing import List
import numpy as np

import mlrun


class ClassifierModel(mlrun.serving.V2ModelServer):
    def load(self):
        """load and initialize the model and/or other elements"""
        model_file, extra_data = self.get_model(".pkl")
        self.model = load(open(model_file, "rb"))

    def predict(self, body: dict) -> List:
        """Generate model predictions from sample."""
        feats = np.asarray(body["inputs"])
        result: np.ndarray = self.model.predict(feats)
        return result.tolist()

Advanced data processing and serving ensemble#

Docs: Advanced model serving graph - notebook example

fn = project.set_function(
    name="advanced", func="demo.py", kind="serving", image="mlrun/mlrun"
)
graph = function.set_topology("flow", engine="async")

# use built-in storey class or our custom Echo class to create and link Task steps. Add an error
# handling step that runs only if the "Echo" step fails
graph.to("storey.Extend", name="enrich", _fn='({"tag": "something"})').to(
    class_name="Echo", name="pre-process", some_arg="abc"
).error_handler(name="catcher", handler="handle_error", full_event=True)

# add an Ensemble router with two child models (routes), the "*" prefix marks it as router class
router = graph.add_step(
    "*mlrun.serving.VotingEnsemble", name="ensemble", after="pre-process"
)
router.add_route("m1", class_name="ClassifierModel", model_path=path1)
router.add_route("m2", class_name="ClassifierModel", model_path=path2)

# add the final step (after the router), which handles post-processing and response to the client
graph.add_step(class_name="Echo", name="final", after="ensemble").respond()

Hyperparameter tuning#

Docs: Hyperparameter tuning optimization

The following hyperparameter examples use this function:

# hp.py
def hyper_func(context, p1, p2):
    print(f"p1={p1}, p2={p2}, result={p1 * p2}")
    context.log_result("multiplier", p1 * p2)


# MLRun function in project
fn = project.set_function(
    name="hp", func="hp.py", image="mlrun/mlrun", kind="job", handler="hyper_func"
)

Note

The selector can be named any value that is logged - in this case multiplier

Grid search (default)#

Docs: Grid Search

Runs all parameter combinations

hp_tuning_run = project.run_function(
    "hp", hyperparams={"p1": [2, 4, 1], "p2": [10, 20]}, selector="max.multiplier"
)

Parallel executors#

Docs: Parallel execution over containers

Dask#

Docs: Running the workers using Dask

# Create Dask cluster
project = mlrun.get_or_create_project(dask - cluster)
dask_cluster = project.set_function(
    name="dask-cluster", kind="dask", image="mlrun/ml-base"
)
dask_cluster.apply(mlrun.mount_v3io())  # add volume mounts
dask_cluster.spec.service_type = "NodePort"  # open interface to the dask UI dashboard
dask_cluster.spec.replicas = 2  # define two containers
uri = dask_cluster.save()

# Run parallel hyperparameter trials
hp_tuning_run_dask = project.run_function(
    "hp",
    hyperparams={"p1": [2, 4, 1], "p2": [10, 20, 30]},
    selector="max.multiplier",
    hyper_param_options=mlrun.model.HyperParamOptions(
        strategy="grid",
        parallel_runs=4,
        dask_cluster_uri=uri,
        teardown_dask=True,
    ),
)

Nuclio#

Docs: Running the workers using Nuclio

# Create nuclio:mlrun function
fn = project.set_function(
    name="hyper-tst2", func="hp.py", kind="nuclio:mlrun", image="mlrun/mlrun"
)
# (replicas * workers) must be equal to or greater than parallel_runs
fn.spec.replicas = 2
fn.with_http(workers=2)
fn.deploy()

# Run the parallel tasks over the function
hp_tuning_run_dask = project.run_function(
    "hyper-tst2",
    hyperparams={"p1": [2, 4, 1], "p2": [10, 20, 30]},
    selector="max.multiplier",
    hyper_param_options=mlrun.model.HyperParamOptions(
        strategy="grid", parallel_runs=4, max_errors=3
    ),
    handler="hyper_func",
)