mlrun.runtimes#
- class mlrun.runtimes.BaseRuntime(metadata=None, spec=None)[source]#
Bases:
ModelObj
- as_step(runspec: Optional[RunObject] = None, handler=None, name: str = '', project: str = '', params: Optional[dict] = None, hyperparams=None, selector='', hyper_param_options: Optional[HyperParamOptions] = None, inputs: Optional[dict] = None, outputs: Optional[dict] = None, workdir: str = '', artifact_path: str = '', image: str = '', labels: Optional[dict] = None, use_db=True, verbose=None, scrape_metrics=False, returns: Optional[List[Union[str, Dict[str, str]]]] = None, auto_build: bool = False)[source]#
Run a local or remote task.
- Parameters
runspec – run template object or dict (see RunTemplate)
handler – name of the function handler
name – execution name
project – project name
params – input parameters (dict)
hyperparams – hyper parameters
selector – selection criteria for hyper params
hyper_param_options – hyper param options (selector, early stop, strategy, ..) see:
HyperParamOptions
inputs – Input objects to pass to the handler. Type hints can be given so the input will be parsed during runtime from mlrun.DataItem to the given type hint. The type hint can be given in the key field of the dictionary after a colon, e.g: “<key> : <type_hint>”.
outputs – list of outputs which can pass in the workflow
artifact_path – default artifact output path (replace out_path)
workdir – default input artifacts path
image – container image to use
labels – labels to tag the job/run with ({key:val, ..})
use_db – save function spec in the db (vs the workflow file)
verbose – add verbose prints/logs
scrape_metrics – whether to add the mlrun/scrape-metrics label to this run’s resources
returns –
List of configurations for how to log the returning values from the handler’s run (as artifacts or results). The list’s length must be equal to the amount of returning objects. A configuration may be given as:
A string of the key to use to log the returning value as result or as an artifact. To specify The artifact type, it is possible to pass a string in the following structure: “<key> : <type>”. Available artifact types can be seen in mlrun.ArtifactType. If no artifact type is specified, the object’s default artifact type will be used.
A dictionary of configurations to use when logging. Further info per object type and artifact type can be given there. The artifact key must appear in the dictionary as “key”: “the_key”.
auto_build – when set to True and the function require build it will be built on the first function run, use only if you dont plan on changing the build config between runs
- Returns
KubeFlow containerOp
- export(target='', format='.yaml', secrets=None, strip=True)[source]#
save function spec to a local/remote path (default to./function.yaml)
- Parameters
target – target path/url
format – .yaml (default) or .json
secrets – optional secrets dict/object for target path (e.g. s3)
strip – strip status data
- Returns
self
- full_image_path(image=None, client_version: Optional[str] = None, client_python_version: Optional[str] = None)[source]#
- generate_runtime_k8s_env(runobj: Optional[RunObject] = None) List[Dict] [source]#
Prepares a runtime environment as it’s expected by kubernetes.models.V1Container
- Parameters
runobj – Run context object (RunObject) with run metadata and status
- Returns
List of dicts with the structure {“name”: “var_name”, “value”: “var_value”}
- kind = 'base'#
- property metadata: BaseMetadata#
- prepare_image_for_deploy()[source]#
if a function has a ‘spec.image’ it is considered to be deployed, but because we allow the user to set ‘spec.image’ for usability purposes, we need to check whether this is a built image or it requires to be built on top.
- run(runspec: Optional[Union[RunTemplate, RunObject, dict]] = None, handler: Optional[Union[str, Callable]] = None, name: Optional[str] = '', project: Optional[str] = '', params: Optional[dict] = None, inputs: Optional[Dict[str, str]] = None, out_path: Optional[str] = '', workdir: Optional[str] = '', artifact_path: Optional[str] = '', watch: Optional[bool] = True, schedule: Optional[Union[str, ScheduleCronTrigger]] = None, hyperparams: Optional[Dict[str, list]] = None, hyper_param_options: Optional[HyperParamOptions] = None, verbose: Optional[bool] = None, scrape_metrics: Optional[bool] = None, local: Optional[bool] = False, local_code_path: Optional[str] = None, auto_build: Optional[bool] = None, param_file_secrets: Optional[Dict[str, str]] = None, notifications: Optional[List[Notification]] = None, returns: Optional[List[Union[str, Dict[str, str]]]] = None, **launcher_kwargs) RunObject [source]#
Run a local or remote task.
- Parameters
runspec – The run spec to generate the RunObject from. Can be RunTemplate | RunObject | dict.
handler – Pointer or name of a function handler.
name – Execution name.
project – Project name.
params – Input parameters (dict).
inputs – Input objects to pass to the handler. Type hints can be given so the input will be parsed during runtime from mlrun.DataItem to the given type hint. The type hint can be given in the key field of the dictionary after a colon, e.g: “<key> : <type_hint>”.
out_path – Default artifact output path.
artifact_path – Default artifact output path (will replace out_path).
workdir – Default input artifacts path.
watch – Watch/follow run log.
schedule – ScheduleCronTrigger class instance or a standard crontab expression string (which will be converted to the class using its from_crontab constructor), see this link for help: https://apscheduler.readthedocs.io/en/3.x/modules/triggers/cron.html#module-apscheduler.triggers.cron
hyperparams – Dict of param name and list of values to be enumerated. The default strategy is grid search and uses e.g. {“p1”: [1,2,3]}. (Can be specified as a JSON file) For list, lists must be of equal length, e.g. {“p1”: [1], “p2”: [2]}. (Can be specified as JSON file or as a CSV file listing the parameter values per iteration.) You can specify strategy of type grid, list, random, and other options in the hyper_param_options parameter.
hyper_param_options – Dict or
HyperParamOptions
struct of hyperparameter options.verbose – Add verbose prints/logs.
scrape_metrics – Whether to add the mlrun/scrape-metrics label to this run’s resources.
local – Run the function locally vs on the runtime/cluster.
local_code_path – Path of the code for local runs & debug.
auto_build – When set to True and the function require build it will be built on the first function run, use only if you don’t plan on changing the build config between runs.
param_file_secrets – Dictionary of secrets to be used only for accessing the hyper-param parameter file. These secrets are only used locally and will not be stored anywhere
notifications – List of notifications to push when the run is completed
returns –
List of log hints - configurations for how to log the returning values from the handler’s run (as artifacts or results). The list’s length must be equal to the amount of returning objects. A log hint may be given as:
A string of the key to use to log the returning value as result or as an artifact. To specify The artifact type, it is possible to pass a string in the following structure: “<key> : <type>”. Available artifact types can be seen in mlrun.ArtifactType. If no artifact type is specified, the object’s default artifact type will be used.
A dictionary of configurations to use when logging. Further info per object type and artifact type can be given there. The artifact key must appear in the dictionary as “key”: “the_key”.
builder_env – Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={“GIT_TOKEN”: token}
- Returns
Run context object (RunObject) with run metadata, results and status
- property spec: FunctionSpec#
- property status: FunctionStatus#
- to_dict(fields=None, exclude=None, strip=False)[source]#
convert the object to a python dictionary
- Parameters
fields – list of fields to include in the dict
exclude – list of fields to exclude from the dict
- property uri#
- with_code(from_file='', body=None, with_doc=True)[source]#
Update the function code This function eliminates the need to build container images every time we edit the code
- Parameters
from_file – blank for current notebook, or path to .py/.ipynb file
body – will use the body as the function code
with_doc – update the document of the function parameters
- Returns
function object
- with_commands(commands: List[str], overwrite: bool = False, verify_base_image: bool = False, prepare_image_for_deploy: bool = True)[source]#
add commands to build spec.
- Parameters
commands – list of commands to run during build
overwrite – overwrite existing commands
verify_base_image – verify that the base image is configured (deprecated, use prepare_image_for_deploy)
prepare_image_for_deploy – prepare the image/base_image spec for deployment
- Returns
function object
- with_requirements(requirements: Union[str, List[str]], overwrite: bool = False, verify_base_image: bool = False, prepare_image_for_deploy: bool = True, requirements_file: str = '')[source]#
add package requirements from file or list to build spec.
- Parameters
requirements – a list of python packages
requirements_file – a local python requirements file path
overwrite – overwrite existing requirements
verify_base_image – verify that the base image is configured (deprecated, use prepare_image_for_deploy)
prepare_image_for_deploy – prepare the image/base_image spec for deployment
- Returns
function object
- class mlrun.runtimes.DaskCluster(spec=None, metadata=None)[source]#
Bases:
KubejobRuntime
- property client#
- deploy(watch=True, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None, builder_env: Optional[dict] = None, show_on_failure: bool = False)[source]#
deploy function, build container with dependencies
- Parameters
watch – wait for the deploy to complete (and print build logs)
with_mlrun – add the current mlrun package to the container build
skip_deployed – skip the build if we already have an image for the function
is_kfp – deploy as part of a kfp pipeline
mlrun_version_specifier – which mlrun package version to include (if not current)
builder_env – Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={“GIT_TOKEN”: token}
show_on_failure – show logs only in case of build failure
:return True if the function is ready (deployed)
- property initialized#
- kind = 'dask'#
- property spec: DaskSpec#
- property status: DaskStatus#
- with_limits(mem=None, cpu=None, gpus=None, gpu_type='nvidia.com/gpu', patch: bool = False)[source]#
Set pod cpu/memory/gpu limits (max values)
- Parameters
mem – set limit for memory e.g. ‘500M’, ‘2G’, etc.
cpu – set limit for cpu e.g. ‘0.5’, ‘2’, etc.
gpus – set limit for gpu
gpu_type – set gpu type e.g. “nvidia.com/gpu”
patch – by default it overrides the whole limits section, if you wish to patch specific resources use patch=True
- with_requests(mem=None, cpu=None, patch: bool = False)[source]#
Set requested (desired) pod cpu/memory resources
- Parameters
mem – set request for memory e.g. ‘200M’, ‘1G’, etc.
cpu – set request for cpu e.g. ‘0.1’, ‘1’, etc.
patch – by default it overrides the whole requests section, if you wish to patch specific resources use patch=True
- with_scheduler_limits(mem: Optional[str] = None, cpu: Optional[str] = None, gpus: Optional[int] = None, gpu_type: str = 'nvidia.com/gpu', patch: bool = False)[source]#
set scheduler pod resources limits by default it overrides the whole limits section, if you wish to patch specific resources use patch=True.
- with_scheduler_requests(mem: Optional[str] = None, cpu: Optional[str] = None, patch: bool = False)[source]#
set scheduler pod resources requests by default it overrides the whole requests section, if you wish to patch specific resources use patch=True.
- with_worker_limits(mem: Optional[str] = None, cpu: Optional[str] = None, gpus: Optional[int] = None, gpu_type: str = 'nvidia.com/gpu', patch: bool = False)[source]#
set worker pod resources limits by default it overrides the whole limits section, if you wish to patch specific resources use patch=True.
- class mlrun.runtimes.DatabricksRuntime(spec=None, metadata=None)[source]#
Bases:
KubejobRuntime
- kind = 'databricks'#
- run(runspec: Optional[Union[RunTemplate, RunObject, dict]] = None, handler: Optional[Union[str, Callable]] = None, name: Optional[str] = '', project: Optional[str] = '', params: Optional[dict] = None, inputs: Optional[Dict[str, str]] = None, out_path: Optional[str] = '', workdir: Optional[str] = '', artifact_path: Optional[str] = '', watch: Optional[bool] = True, schedule: Optional[Union[str, ScheduleCronTrigger]] = None, hyperparams: Optional[Dict[str, list]] = None, hyper_param_options: Optional[HyperParamOptions] = None, verbose: Optional[bool] = None, scrape_metrics: Optional[bool] = None, local: Optional[bool] = False, local_code_path: Optional[str] = None, auto_build: Optional[bool] = None, param_file_secrets: Optional[Dict[str, str]] = None, notifications: Optional[List[Notification]] = None, returns: Optional[List[Union[str, Dict[str, str]]]] = None, **launcher_kwargs) RunObject [source]#
Run a local or remote task.
- Parameters
runspec – The run spec to generate the RunObject from. Can be RunTemplate | RunObject | dict.
handler – Pointer or name of a function handler.
name – Execution name.
project – Project name.
params – Input parameters (dict).
inputs – Input objects to pass to the handler. Type hints can be given so the input will be parsed during runtime from mlrun.DataItem to the given type hint. The type hint can be given in the key field of the dictionary after a colon, e.g: “<key> : <type_hint>”.
out_path – Default artifact output path.
artifact_path – Default artifact output path (will replace out_path).
workdir – Default input artifacts path.
watch – Watch/follow run log.
schedule – ScheduleCronTrigger class instance or a standard crontab expression string (which will be converted to the class using its from_crontab constructor), see this link for help: https://apscheduler.readthedocs.io/en/3.x/modules/triggers/cron.html#module-apscheduler.triggers.cron
hyperparams – Dict of param name and list of values to be enumerated. The default strategy is grid search and uses e.g. {“p1”: [1,2,3]}. (Can be specified as a JSON file) For list, lists must be of equal length, e.g. {“p1”: [1], “p2”: [2]}. (Can be specified as JSON file or as a CSV file listing the parameter values per iteration.) You can specify strategy of type grid, list, random, and other options in the hyper_param_options parameter.
hyper_param_options – Dict or
HyperParamOptions
struct of hyperparameter options.verbose – Add verbose prints/logs.
scrape_metrics – Whether to add the mlrun/scrape-metrics label to this run’s resources.
local – Run the function locally vs on the runtime/cluster.
local_code_path – Path of the code for local runs & debug.
auto_build – When set to True and the function require build it will be built on the first function run, use only if you don’t plan on changing the build config between runs.
param_file_secrets – Dictionary of secrets to be used only for accessing the hyper-param parameter file. These secrets are only used locally and will not be stored anywhere
notifications – List of notifications to push when the run is completed
returns –
List of log hints - configurations for how to log the returning values from the handler’s run (as artifacts or results). The list’s length must be equal to the amount of returning objects. A log hint may be given as:
A string of the key to use to log the returning value as result or as an artifact. To specify The artifact type, it is possible to pass a string in the following structure: “<key> : <type>”. Available artifact types can be seen in mlrun.ArtifactType. If no artifact type is specified, the object’s default artifact type will be used.
A dictionary of configurations to use when logging. Further info per object type and artifact type can be given there. The artifact key must appear in the dictionary as “key”: “the_key”.
builder_env – Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={“GIT_TOKEN”: token}
- Returns
Run context object (RunObject) with run metadata, results and status
- class mlrun.runtimes.HandlerRuntime(metadata=None, spec=None)[source]#
Bases:
BaseRuntime
,ParallelRunner
- kind = 'handler'#
- class mlrun.runtimes.KubejobRuntime(spec=None, metadata=None)[source]#
Bases:
KubeResource
- build_config(image='', base_image=None, commands: Optional[list] = None, secret=None, source=None, extra=None, load_source_on_run=None, with_mlrun=None, auto_build=None, requirements=None, overwrite=False, verify_base_image=False, prepare_image_for_deploy=True, requirements_file=None, builder_env=None, extra_args=None)[source]#
specify builder configuration for the deploy operation
- Parameters
image – target image name/path
base_image – base image name/path
commands – list of docker build (RUN) commands e.g. [‘pip install pandas’]
secret – k8s secret for accessing the docker registry
source – source git/tar archive to load code from in to the context/workdir e.g. git://github.com/mlrun/something.git#development
extra – extra Dockerfile lines
load_source_on_run – load the archive code into the container at runtime vs at build time
with_mlrun – add the current mlrun package to the container build
auto_build – when set to True and the function require build it will be built on the first function run, use only if you dont plan on changing the build config between runs
requirements – a list of packages to install
requirements_file – requirements file to install
overwrite – overwrite existing build configuration (currently applies to requirements and commands) * False: the new params are merged with the existing * True: the existing params are replaced by the new ones
verify_base_image – verify that the base image is configured (deprecated, use prepare_image_for_deploy)
prepare_image_for_deploy – prepare the image/base_image spec for deployment
extra_args – A string containing additional builder arguments in the format of command-line options, e.g. extra_args=”–skip-tls-verify –build-arg A=val”
builder_env – Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={“GIT_TOKEN”: token}
- deploy(watch=True, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None, builder_env: Optional[dict] = None, show_on_failure: bool = False) bool [source]#
deploy function, build container with dependencies
- Parameters
watch – wait for the deploy to complete (and print build logs)
with_mlrun – add the current mlrun package to the container build
skip_deployed – skip the build if we already have an image for the function
is_kfp – deploy as part of a kfp pipeline
mlrun_version_specifier – which mlrun package version to include (if not current)
builder_env – Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={“GIT_TOKEN”: token}
show_on_failure – show logs only in case of build failure
:return True if the function is ready (deployed)
- deploy_step(image=None, base_image=None, commands: Optional[list] = None, secret_name='', with_mlrun=True, skip_deployed=False)[source]#
- kind = 'job'#
- with_source_archive(source, workdir=None, handler=None, pull_at_runtime=True, target_dir=None)[source]#
load the code from git/tar/zip archive at runtime or build
- Parameters
source – valid absolute path or URL to git, zip, or tar file, e.g. git://github.com/mlrun/something.git http://some/url/file.zip note path source must exist on the image or exist locally when run is local (it is recommended to use ‘workdir’ when source is a filepath instead)
handler – default function handler
workdir – working dir relative to the archive root (e.g. ‘./subdir’) or absolute to the image root
pull_at_runtime – load the archive into the container at job runtime vs on build/deploy
target_dir – target dir on runtime pod or repo clone / archive extraction
- class mlrun.runtimes.LocalRuntime(metadata=None, spec=None)[source]#
Bases:
BaseRuntime
,ParallelRunner
- kind = 'local'#
- with_source_archive(source, workdir=None, handler=None, target_dir=None)[source]#
load the code from git/tar/zip archive at runtime or build
- Parameters
source – valid path to git, zip, or tar file, e.g. git://github.com/mlrun/something.git http://some/url/file.zip
handler – default function handler
workdir – working dir relative to the archive root (e.g. ‘./subdir’) or absolute
target_dir – local target dir for repo clone (by default its <current-dir>/code)
- class mlrun.runtimes.RemoteRuntime(spec=None, metadata=None)[source]#
Bases:
KubeResource
- add_trigger(name, spec)[source]#
add a nuclio trigger object/dict
- Parameters
name – trigger name
spec – trigger object or dict
- add_v3io_stream_trigger(stream_path, name='stream', group='serving', seek_to='earliest', shards=1, extra_attributes=None, ack_window_size=None, **kwargs)[source]#
add v3io stream trigger to the function
- Parameters
stream_path – v3io stream path (e.g. ‘v3io:///projects/myproj/stream1’)
name – trigger name
group – consumer group
seek_to – start seek from: “earliest”, “latest”, “time”, “sequence”
shards – number of shards (used to set number of replicas)
extra_attributes – key/value dict with extra trigger attributes
ack_window_size – stream ack window size (the consumer group will be updated with the event id - ack_window_size, on failure the events in the window will be retransmitted)
kwargs – extra V3IOStreamTrigger class attributes
- deploy(dashboard='', project='', tag='', verbose=False, auth_info: Optional[AuthInfo] = None, builder_env: Optional[dict] = None)[source]#
Deploy the nuclio function to the cluster
- Parameters
dashboard – DEPRECATED. Keep empty to allow auto-detection by MLRun API
project – project name
tag – function tag
verbose – set True for verbose logging
auth_info – service AuthInfo
builder_env – env vars dict for source archive config/credentials e.g. builder_env={“GIT_TOKEN”: token}
- deploy_step(dashboard='', project='', models=None, env=None, tag=None, verbose=None, use_function_from_db=None)[source]#
return as a Kubeflow pipeline step (ContainerOp), recommended to use mlrun.deploy_function() instead
- Parameters
dashboard – DEPRECATED. Keep empty to allow auto-detection by MLRun API.
project – project name, defaults to function project
models – model name and paths
env – dict of environment variables
tag – version tag
verbose – verbose output
use_function_from_db – use the function from the DB instead of the local function object
- get_url(force_external_address: bool = False, auth_info: Optional[AuthInfo] = None)[source]#
This method returns function’s url.
- Parameters
force_external_address – use the external ingress URL
auth_info – service AuthInfo
- Returns
returns function’s url
- invoke(path: str, body: Optional[Union[str, bytes, dict]] = None, method: Optional[str] = None, headers: Optional[dict] = None, dashboard: str = '', force_external_address: bool = False, auth_info: Optional[AuthInfo] = None, mock: Optional[bool] = None, **http_client_kwargs)[source]#
Invoke the remote (live) function and return the results
example:
function.invoke("/api", body={"inputs": x})
- Parameters
path – request sub path (e.g. /images)
body – request body (str, bytes or a dict for json requests)
method – HTTP method (GET, PUT, ..)
headers – key/value dict with http headers
dashboard – nuclio dashboard address
force_external_address – use the external ingress URL
auth_info – service AuthInfo
mock – use mock server vs a real Nuclio function (for local simulations)
http_client_kwargs – allow the user to pass any parameter supported in requests.request method see this link for more information: https://requests.readthedocs.io/en/latest/api/#requests.request
- kind = 'remote'#
- property spec: NuclioSpec#
- property status: NuclioStatus#
- with_http(workers: Optional[int] = 8, port: Optional[int] = None, host: Optional[str] = None, paths: Optional[List[str]] = None, canary: Optional[float] = None, secret: Optional[str] = None, worker_timeout: Optional[int] = None, gateway_timeout: Optional[int] = None, trigger_name: Optional[str] = None, annotations: Optional[Mapping[str, str]] = None, extra_attributes: Optional[Mapping[str, str]] = None)[source]#
update/add nuclio HTTP trigger settings
Note: gateway timeout is the maximum request time before an error is returned, while the worker timeout if the max time a request will wait for until it will start processing, gateway_timeout must be greater than the worker_timeout.
- Parameters
workers – number of worker processes (default=8). set 0 to use Nuclio’s default workers count
port – TCP port to listen on. by default, nuclio will choose a random port as long as the function service is NodePort. if the function service is ClusterIP, the port is ignored.
host – Ingress hostname
paths – list of Ingress sub paths
canary – k8s ingress canary (% traffic value between 0 and 100)
secret – k8s secret name for SSL certificate
worker_timeout – worker wait timeout in sec (how long a message should wait in the worker queue before an error is returned)
gateway_timeout – nginx ingress timeout in sec (request timeout, when will the gateway return an error)
trigger_name – alternative nuclio trigger name
annotations – key/value dict of ingress annotations
extra_attributes – key/value dict of extra nuclio trigger attributes
- Returns
function object (self)
- with_node_selection(**kwargs)[source]#
Enables to control on which k8s node the job will run
- Parameters
node_name – The name of the k8s node
node_selector – Label selector, only nodes with matching labels will be eligible to be picked
affinity – Expands the types of constraints you can express - see https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity for details
tolerations – Tolerations are applied to pods, and allow (but do not require) the pods to schedule onto nodes with matching taints - see https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration for details
- with_preemption_mode(**kwargs)[source]#
Preemption mode controls whether pods can be scheduled on preemptible nodes. Tolerations, node selector, and affinity are populated on preemptible nodes corresponding to the function spec.
The supported modes are:
allow - The function can be scheduled on preemptible nodes
constrain - The function can only run on preemptible nodes
prevent - The function cannot be scheduled on preemptible nodes
none - No preemptible configuration will be applied on the function
The default preemption mode is configurable in mlrun.mlconf.function_defaults.preemption_mode, by default it’s set to prevent
- Parameters
mode – allow | constrain | prevent | none defined in
PreemptionModes
- with_priority_class(**kwargs)[source]#
Enables to control the priority of the pod If not passed - will default to mlrun.mlconf.default_function_priority_class_name
- Parameters
name – The name of the priority class
- with_service_type(service_type: str, add_templated_ingress_host_mode: Optional[str] = None)[source]#
Enables to control the service type of the pod and the addition of templated ingress host
- Parameters
service_type – service type (ClusterIP, NodePort), defaults to mlrun.mlconf.httpdb.nuclio.service_type
add_templated_ingress_host_mode – add templated ingress host mode (never, always, onClusterIP), see mlrun.mlconf.httpdb.nuclio.add_templated_ingress_host_mode for the default and more information
- with_source_archive(source, workdir=None, handler=None, runtime='')[source]#
Load nuclio function from remote source
Note: remote source may require credentials, those can be stored in the project secrets or passed in the function.deploy() using the builder_env dict, see the required credentials per source:
v3io - “V3IO_ACCESS_KEY”.
git - “GIT_USERNAME”, “GIT_PASSWORD”.
AWS S3 - “AWS_ACCESS_KEY_ID”, “AWS_SECRET_ACCESS_KEY” or “AWS_SESSION_TOKEN”.
- Parameters
source – a full path to the nuclio function source (code entry) to load the function from
handler – a path to the function’s handler, including path inside archive/git repo
workdir – working dir relative to the archive root (e.g. ‘subdir’)
runtime – (optional) the runtime of the function (defaults to python:3.7)
- Examples
git:
fn.with_source_archive("git://github.com/org/repo#my-branch", handler="main:handler", workdir="path/inside/repo")
s3:
fn.spec.nuclio_runtime = "golang" fn.with_source_archive("s3://my-bucket/path/in/bucket/my-functions-archive", handler="my_func:Handler", workdir="path/inside/functions/archive", runtime="golang")
- class mlrun.runtimes.RemoteSparkRuntime(spec=None, metadata=None)[source]#
Bases:
KubejobRuntime
- default_image = '.remote-spark-default-image'#
- deploy(watch=True, with_mlrun=None, skip_deployed=False, is_kfp=False, mlrun_version_specifier=None, builder_env: Optional[dict] = None, show_on_failure: bool = False)[source]#
deploy function, build container with dependencies
- Parameters
watch – wait for the deploy to complete (and print build logs)
with_mlrun – add the current mlrun package to the container build
skip_deployed – skip the build if we already have an image for the function
is_kfp – deploy as part of a kfp pipeline
mlrun_version_specifier – which mlrun package version to include (if not current)
builder_env – Kaniko builder pod env vars dict (for config/credentials) e.g. builder_env={“GIT_TOKEN”: token}
show_on_failure – show logs only in case of build failure
:return True if the function is ready (deployed)
- kind = 'remote-spark'#
- property spec: RemoteSparkSpec#
- with_security_context(security_context: V1SecurityContext)[source]#
With security context is not supported for spark runtime. Driver / Executor processes run with uid / gid 1000 as long as security context is not defined. If in the future we want to support setting security context it will work only from spark version 3.2 onwards.
- class mlrun.runtimes.ServingRuntime(spec=None, metadata=None)[source]#
Bases:
RemoteRuntime
MLRun Serving Runtime
- add_child_function(name, url=None, image=None, requirements=None, kind=None)[source]#
in a multi-function pipeline add child function
example:
fn.add_child_function('enrich', './enrich.ipynb', 'mlrun/mlrun')
- Parameters
name – child function name
url – function/code url, support .py, .ipynb, .yaml extensions
image – base docker image for the function
requirements – py package requirements file path OR list of packages
kind – mlrun function/runtime kind
:return function object
- add_model(key: str, model_path: Optional[str] = None, class_name: Optional[str] = None, model_url: Optional[str] = None, handler: Optional[str] = None, router_step: Optional[str] = None, child_function: Optional[str] = None, **class_args)[source]#
add ml model and/or route to the function.
Example, create a function (from the notebook), add a model class, and deploy:
fn = code_to_function(kind='serving') fn.add_model('boost', model_path, model_class='MyClass', my_arg=5) fn.deploy()
only works with router topology, for nested topologies (model under router under flow) need to add router to flow and use router.add_route()
- Parameters
key – model api key (or name:version), will determine the relative url/path
model_path – path to mlrun model artifact or model directory file/object path
class_name – V2 Model python class name or a model class instance (can also module.submodule.class and it will be imported automatically)
model_url – url of a remote model serving endpoint (cannot be used with model_path)
handler – for advanced users!, override default class handler name (do_event)
router_step – router step name (to determine which router we add the model to in graphs with multiple router steps)
child_function – child function name, when the model runs in a child function
class_args – extra kwargs to pass to the model serving class __init__ (can be read in the model using .get_param(key) method)
- deploy(dashboard='', project='', tag='', verbose=False, auth_info: Optional[AuthInfo] = None, builder_env: Optional[dict] = None)[source]#
deploy model serving function to a local/remote cluster
- Parameters
dashboard – DEPRECATED. Keep empty to allow auto-detection by MLRun API
project – optional, override function specified project name
tag – specify unique function tag (a different function service is created for every tag)
verbose – verbose logging
auth_info – The auth info to use to communicate with the Nuclio dashboard, required only when providing dashboard
builder_env – env vars dict for source archive config/credentials e.g. builder_env={“GIT_TOKEN”: token}
- kind = 'serving'#
- plot(filename=None, format=None, source=None, **kw)[source]#
plot/save graph using graphviz
example:
serving_fn = mlrun.new_function("serving", image="mlrun/mlrun", kind="serving") serving_fn.add_model('my-classifier',model_path=model_path, class_name='mlrun.frameworks.sklearn.SklearnModelServer') serving_fn.plot(rankdir="LR")
- Parameters
filename – target filepath for the image (None for the notebook)
format – The output format used for rendering (
'pdf'
,'png'
, etc.)source – source step to add to the graph
kw – kwargs passed to graphviz, e.g. rankdir=”LR” (see: https://graphviz.org/doc/info/attrs.html)
- Returns
graphviz graph object
- remove_states(keys: list)[source]#
remove one, multiple, or all states/models from the spec (blank list for all)
- set_topology(topology=None, class_name=None, engine=None, exist_ok=False, **class_args) Union[RootFlowStep, RouterStep] [source]#
set the serving graph topology (router/flow) and root class or params
examples:
# simple model router topology graph = fn.set_topology("router") fn.add_model(name, class_name="ClassifierModel", model_path=model_uri) # async flow topology graph = fn.set_topology("flow", engine="async") graph.to("MyClass").to(name="to_json", handler="json.dumps").respond()
topology options are:
router - root router + multiple child route states/models route is usually determined by the path (route key/name) can specify special router class and router arguments flow - workflow (DAG) with a chain of states flow support "sync" and "async" engines, branches are not allowed in sync mode when using async mode calling state.respond() will mark the state as the one which generates the (REST) call response
- Parameters
topology –
graph topology, router or flow
class_name –
optional for router, router class name/path or router object
engine –
optional for flow, sync or async engine (default to async)
exist_ok –
allow overriding existing topology
class_args –
optional, router/flow class init args
:return graph object (fn.spec.graph)
- set_tracking(stream_path: Optional[str] = None, batch: Optional[int] = None, sample: Optional[int] = None, stream_args: Optional[dict] = None, tracking_policy: Optional[Union[TrackingPolicy, dict]] = None)[source]#
set tracking parameters:
- Parameters
stream_path – Path/url of the tracking stream e.g. v3io:///users/mike/mystream you can use the “dummy://” path for test/simulation.
batch – Micro batch size (send micro batches of N records at a time).
sample – Sample size (send only one of N records).
stream_args – Stream initialization parameters, e.g. shards, retention_in_hours, ..
tracking_policy –
Tracking policy object or a dictionary that will be converted into a tracking policy object. By using TrackingPolicy, the user can apply his model monitoring requirements, such as setting the scheduling policy of the model monitoring batch job or changing the image of the model monitoring stream.
example:
# initialize a new serving function serving_fn = mlrun.import_function("hub://v2-model-server", new_name="serving") # apply model monitoring and set monitoring batch job to run every 3 hours tracking_policy = {'default_batch_intervals':"0 */3 * * *"} serving_fn.set_tracking(tracking_policy=tracking_policy)
- property spec: ServingSpec#
- to_mock_server(namespace=None, current_function='*', track_models=False, workdir=None, **kwargs) GraphServer [source]#
create mock server object for local testing/emulation
- Parameters
namespace – one or list of namespaces/modules to search the steps classes/functions in
log_level – log level (error | info | debug)
current_function – specify if you want to simulate a child function, * for all functions
track_models – allow model tracking (disabled by default in the mock server)
workdir – working directory to locate the source code (if not the current one)
- with_secrets(kind, source)[source]#
register a secrets source (file, env or dict)
read secrets from a source provider to be used in workflows, example:
task.with_secrets('file', 'file.txt') task.with_secrets('inline', {'key': 'val'}) task.with_secrets('env', 'ENV1,ENV2') task.with_secrets('vault', ['secret1', 'secret2'...]) # If using an empty secrets list [] then all accessible secrets will be available. task.with_secrets('vault', []) # To use with Azure key vault, a k8s secret must be created with the following keys: # kubectl -n <namespace> create secret generic azure-key-vault-secret \ # --from-literal=tenant_id=<service principal tenant ID> \ # --from-literal=client_id=<service principal client ID> \ # --from-literal=secret=<service principal secret key> task.with_secrets('azure_vault', { 'name': 'my-vault-name', 'k8s_secret': 'azure-key-vault-secret', # An empty secrets list may be passed ('secrets': []) to access all vault secrets. 'secrets': ['secret1', 'secret2'...] })
- Parameters
kind – secret type (file, inline, env)
source – secret data or link (see example)
- Returns
The Runtime (function) object