Databricks runtime#

The databricks runtime runs on a Databricks cluster (and not in the Iguazio cluster). The function raises a pod on MLRun, which communicates with the Databricks cluster. The requests originate in MLRun and all computing is in the Databricks cluster.

With the databricks runtime, you can send your local file/code as a string to the job, and use a handler as an endpoint for user code. You can optionally send keyword arguments (kwargs) to this job.

You can run the function on:

  • An existing cluster, by including DATABRICKS_CLUSTER_ID

  • A job compute cluster, created and dedicated for this function only. Omit DATABRICKS_CLUSTER_ID to create a job compute cluster, and set the cluster specs by using the task parameters when running the function. For example:

    params['task_parameters'] = {'new_cluster_spec': {'node_type_id': 'm5d.xlarge'}, 'number_of_workers': 2, 'timeout_minutes': 15, `token_key`: non-default-value}
    

Do not send variables named task_parameters or context since these are utilized by the internal processes of the runtime.

Example of running a databricks job from a local file on the existing cluster: DATABRICKS_CLUSTER_ID.

import os
import mlrun
from mlrun.runtimes.function_reference import FunctionReference
# If using a Databricks data store, for example, set the credentials:
os.environ["DATABRICKS_HOST"] = "DATABRICKS_HOST"
os.environ["DATABRICKS_TOKEN"] = "DATABRICKS_TOKEN"
os.environ["DATABRICKS_CLUSTER_ID"] = "DATABRICKS_CLUSTER_ID"
project = mlrun.get_or_create_project("project-name", context="./", user_project=False)

job_env = {
    "DATABRICKS_HOST": os.environ["DATABRICKS_HOST"],
    "DATABRICKS_CLUSTER_ID": os.environ.get("DATABRICKS_CLUSTER_ID"),
}
secrets = {"DATABRICKS_TOKEN": os.environ["DATABRICKS_TOKEN"]}

project.set_secrets(secrets)

code = """
def print_kwargs(**kwargs):
    print(f"kwargs: {kwargs}")
"""

function_ref = FunctionReference(
    kind="databricks",
    code=code,
    image="mlrun/mlrun",
    name="databricks-function",
)

function = function_ref.to_function()

for name, val in job_env.items():
    function.spec.env.append({"name": name, "value": val})

run = function.run(
    handler="print_kwargs",
    project="project-name",
    params={
        "param1": "value1",
        "param2": "value2",
        "task_parameters": {"timeout_minutes": 15},
    },
)
assert (
    run.status.results["databricks_runtime_task"]["logs"]
    == "kwargs: {'param1': 'value1', 'param2': 'value2'}\n"
)