# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import os
import pathlib
import typing
import nuclio
import nuclio.auth
import mlrun.common.constants
import mlrun.common.runtimes.validators
import mlrun.common.schemas as schemas
import mlrun.datastore
import mlrun.errors
import mlrun.run
import mlrun.runtimes.nuclio.api_gateway as nuclio_api_gateway
import mlrun.runtimes.nuclio.function as nuclio_function
from mlrun.common.runtimes.constants import (
NuclioIngressAddTemplatedIngressModes,
ProbeTimeConfig,
ProbeType,
)
from mlrun.utils import is_relative_path, is_valid_port, logger, update_in
class ApplicationSpec(nuclio_function.NuclioSpec):
_dict_fields = nuclio_function.NuclioSpec._dict_fields + [
"internal_application_port",
"application_ports",
]
def __init__(
self,
command=None,
args=None,
image=None,
mode=None,
entry_points=None,
description=None,
replicas=None,
min_replicas=None,
max_replicas=None,
volumes=None,
volume_mounts=None,
env=None,
resources=None,
config=None,
base_spec=None,
no_cache=None,
source=None,
image_pull_policy=None,
function_kind=None,
build=None,
service_account=None,
readiness_timeout=None,
readiness_timeout_before_failure=None,
default_handler=None,
node_name=None,
node_selector=None,
affinity=None,
disable_auto_mount=False,
priority_class_name=None,
pythonpath=None,
workdir=None,
image_pull_secret=None,
tolerations=None,
preemption_mode=None,
security_context=None,
service_type=None,
add_templated_ingress_host_mode=None,
state_thresholds=None,
disable_default_http_trigger=None,
custom_scaling_metric_specs=None,
serving_spec=None,
graph=None,
parameters=None,
track_models=None,
internal_application_port=None,
application_ports=None,
model_endpoints_instructions=None,
auth=None,
env_from=None,
mount_otlp_secret: bool = False,
):
super().__init__(
command=command,
args=args,
image=image,
mode=mode,
entry_points=entry_points,
description=description,
replicas=replicas,
min_replicas=min_replicas,
max_replicas=max_replicas,
volumes=volumes,
volume_mounts=volume_mounts,
env=env,
env_from=env_from,
resources=resources,
config=config,
base_spec=base_spec,
no_cache=no_cache,
source=source,
image_pull_policy=image_pull_policy,
function_kind=function_kind,
build=build,
service_account=service_account,
readiness_timeout=readiness_timeout,
readiness_timeout_before_failure=readiness_timeout_before_failure,
default_handler=default_handler,
node_name=node_name,
node_selector=node_selector,
affinity=affinity,
disable_auto_mount=disable_auto_mount,
priority_class_name=priority_class_name,
pythonpath=pythonpath,
workdir=workdir,
image_pull_secret=image_pull_secret,
tolerations=tolerations,
preemption_mode=preemption_mode,
security_context=security_context,
service_type=service_type,
add_templated_ingress_host_mode=add_templated_ingress_host_mode,
serving_spec=serving_spec,
graph=graph,
parameters=parameters,
track_models=track_models,
state_thresholds=state_thresholds,
disable_default_http_trigger=disable_default_http_trigger,
custom_scaling_metric_specs=custom_scaling_metric_specs,
model_endpoints_instructions=model_endpoints_instructions,
auth=auth,
mount_otlp_secret=mount_otlp_secret,
)
# Override default min/max replicas (don't assume application is stateless)
self.min_replicas = min_replicas or 1
self.max_replicas = max_replicas or 1
# initializing internal application port and application ports
self._internal_application_port = None
self._application_ports = []
application_ports = application_ports or []
# if internal_application_port is not provided, use the first application port
if not internal_application_port and len(application_ports) > 0:
internal_application_port = application_ports[0]
# the port of application sidecar to which traffic will be routed from a nuclio function
self.internal_application_port = (
internal_application_port
or mlrun.mlconf.function.application.default_sidecar_internal_port
)
# all exposed ports by the application sidecar
self.application_ports = application_ports
@property
def application_ports(self):
return self._application_ports
@application_ports.setter
def application_ports(self, ports):
"""
Set the application ports for the application sidecar.
The internal application port is always included and always first.
"""
# Handle None / single int
if ports is None:
ports = []
elif isinstance(ports, int):
ports = [ports]
elif not isinstance(ports, list):
raise mlrun.errors.MLRunInvalidArgumentError(
"Application ports must be a list of integers"
)
# Validate and normalize
cleaned_ports = []
for port in ports:
is_valid_port(port, raise_on_error=True)
if port != self.internal_application_port:
cleaned_ports.append(port)
application_ports = [self.internal_application_port] + cleaned_ports
# ensure multiple ports are supported in Nuclio
if len(application_ports) > 1:
nuclio_function.multiple_port_sidecar_is_supported()
self._application_ports = application_ports
@property
def internal_application_port(self):
return self._internal_application_port
@internal_application_port.setter
def internal_application_port(self, port):
port = int(port)
is_valid_port(port, raise_on_error=True)
self._internal_application_port = port
# If when internal application port is being set, length of self._application_ports is 1,
# it means that it consist of [old_port] only
# so in this case, we rewrite the list completely, by setting value to [new_value]
if len(self.application_ports) == 1:
self._application_ports = [port]
return
# when setting new internal application port, ensure that it is included in the application ports
# it just triggers setter logic, so setting to the same value is a no-op
self.application_ports = self._application_ports
class ApplicationStatus(nuclio_function.NuclioStatus):
def __init__(
self,
state=None,
nuclio_name=None,
address=None,
internal_invocation_urls=None,
external_invocation_urls=None,
build_pod=None,
container_image=None,
application_image=None,
application_source=None,
sidecar_name=None,
api_gateway_name=None,
api_gateway=None,
url=None,
):
super().__init__(
state=state,
nuclio_name=nuclio_name,
address=address,
internal_invocation_urls=internal_invocation_urls,
external_invocation_urls=external_invocation_urls,
build_pod=build_pod,
container_image=container_image,
)
self.application_image = application_image or None
self.application_source = application_source or None
self.sidecar_name = sidecar_name or None
self.api_gateway_name = api_gateway_name or None
self.api_gateway: nuclio_api_gateway.APIGateway | None = api_gateway or None
self.url = url or None
[docs]
class ApplicationRuntime(nuclio_function.RemoteRuntime):
kind = "application"
reverse_proxy_image = None
@nuclio_function.min_nuclio_versions("1.13.1")
def __init__(self, spec=None, metadata=None):
super().__init__(spec=spec, metadata=metadata)
@property
def spec(self) -> ApplicationSpec:
return self._spec
@spec.setter
def spec(self, spec):
self._spec = self._verify_dict(spec, "spec", ApplicationSpec)
@property
def status(self) -> ApplicationStatus:
return self._status
@status.setter
def status(self, status):
self._status = self._verify_dict(status, "status", ApplicationStatus)
@property
def api_gateway(self):
return self.status.api_gateway
@api_gateway.setter
def api_gateway(self, api_gateway: nuclio_api_gateway.APIGateway):
self.status.api_gateway = api_gateway
@property
def url(self):
if not self.status.api_gateway:
self._sync_api_gateway()
return self.status.api_gateway.invoke_url
@url.setter
def url(self, url):
self.status.url = url
[docs]
def set_internal_application_port(self, port: int):
self.spec.internal_application_port = port
[docs]
def set_source_target(self, target_dir: str):
"""
Configure the target directory where application source code will be extracted at runtime by the init container.
:param target_dir: Absolute path inside the runtime container where the source code will be placed
"""
if not target_dir:
raise mlrun.errors.MLRunInvalidArgumentError("target_dir is required")
if not target_dir.startswith("/"):
raise mlrun.errors.MLRunInvalidArgumentError(
f"target_dir must be an absolute path, got: {target_dir}"
)
self.spec.build.source_code_target_dir = target_dir
[docs]
def set_probe(
self,
type: str,
initial_delay_seconds: int | None = None,
period_seconds: int | None = None,
failure_threshold: int | None = None,
timeout_seconds: int | None = None,
http_path: str | None = None,
http_port: int | None = None,
http_scheme: str | None = None,
config: dict | None = None,
):
"""Set a Kubernetes probe configuration for the sidecar container
The config parameter serves as the base configuration, and individual parameters
override values in config. If http_path is provided without http_port and config
is not provided, the port will be enriched from the internal application port
just before deployment.
:param type: Probe type - one of "readiness", "liveness", "startup"
:param initial_delay_seconds: Number of seconds after the container has started before probes are initiated
:param period_seconds: How often (in seconds) to perform the probe
:param failure_threshold: Minimum consecutive failures for the probe to be considered failed
:param timeout_seconds: Number of seconds after which the probe times out
:param http_path: If provided, use an HTTP probe with this path
:param http_port: If HTTP probe is used and no port provided,
the internal application port will be used
:param http_scheme: "http" or "https" for HTTP probe. Defaults to "http"
:param config: A full dict with the probe configuration
(used as base, overridden by individual parameters)
:return: function object (self)
"""
self._validate_set_probes_input(locals())
type = ProbeType(type)
# Start with config as base
probe_config = copy.deepcopy(config) if config else {}
# Build HTTP probe configuration if http_path is provided
# Note: If http_path is None, all HTTP-related parameters (http_port, http_scheme) are ignored
if http_path:
http_probe = probe_config.get("httpGet", {})
http_probe["path"] = http_path
if http_port is not None:
http_probe["port"] = http_port
http_probe["scheme"] = http_scheme or http_probe.get("scheme", "HTTP")
probe_config["httpGet"] = http_probe
# Override timing parameters from explicit arguments
probe_config.update(
{
config.value: value
for config, value in {
ProbeTimeConfig.INITIAL_DELAY_SECONDS: initial_delay_seconds,
ProbeTimeConfig.PERIOD_SECONDS: period_seconds,
ProbeTimeConfig.FAILURE_THRESHOLD: failure_threshold,
ProbeTimeConfig.TIMEOUT_SECONDS: timeout_seconds,
}.items()
if value is not None
}
)
# Validate the probe configuration before storing
mlrun.common.runtimes.validators.validate_sidecar_probes(
[{type.key: probe_config}]
)
# Store probe configuration in the sidecar
sidecar = self._set_sidecar(self._get_sidecar_name())
sidecar[type.key] = probe_config
return self
[docs]
def delete_probe(
self,
type: str,
):
"""Delete a Kubernetes probe configuration from the sidecar container
:param type: Probe type - one of "readiness", "liveness", "startup"
:return: function object (self)
"""
# Validate probe type
ProbeType.is_valid(type, raise_on_error=True)
type = ProbeType(type)
sidecar = self._get_sidecar()
if sidecar:
if type.key in sidecar:
del sidecar[type.key]
return self
[docs]
def with_sidecar(
self,
name: str | None = None,
image: str | None = None,
ports: typing.Union[int, list[int]] | None = None,
command: str | None = None,
args: list[str] | None = None,
):
# wraps with_sidecar just to set the application ports
super().with_sidecar(
name=name,
image=image,
ports=ports,
command=command,
args=args,
)
if ports:
if self.spec.internal_application_port != ports[0]:
logger.info(
f"Setting internal application port to the first port from the sidecar: {ports[0]}. "
f"If this is not intended, please set the internal_application_port explicitly."
)
self.spec.internal_application_port = ports[0]
self.spec.application_ports = ports
[docs]
def pre_deploy_validation(self):
super().pre_deploy_validation()
if not self.spec.config.get("spec.sidecars"):
raise mlrun.errors.MLRunBadRequestError(
"Application spec must include a sidecar configuration"
)
sidecars = self.spec.config["spec.sidecars"]
for sidecar in sidecars:
if not sidecar.get("image"):
raise mlrun.errors.MLRunBadRequestError(
"Application sidecar spec must include an image"
)
if not sidecar.get("ports"):
raise mlrun.errors.MLRunBadRequestError(
"Application sidecar spec must include at least one port"
)
ports = sidecar["ports"]
for port in ports:
if not port.get("containerPort"):
raise mlrun.errors.MLRunBadRequestError(
"Application sidecar port spec must include a containerPort"
)
if not port.get("name"):
raise mlrun.errors.MLRunBadRequestError(
"Application sidecar port spec must include a name"
)
if not sidecar.get("command") and sidecar.get("args"):
raise mlrun.errors.MLRunBadRequestError(
"Application sidecar spec must include a command if args are provided"
)
[docs]
def prepare_image_for_deploy(self):
super().prepare_image_for_deploy()
[docs]
def requires_build(self) -> bool:
"""
Check if the application image needs to be built.
For ApplicationRuntime, store:// URIs don't require a build because the init
container loads them at runtime. This allows redeploying with source code changes
without rebuilding the image.
"""
build = self.spec.build
source = build.source
# store:// URIs are loaded by init container at runtime, not baked into image
if source and mlrun.datastore.is_store_uri(source):
source_requires_build = False
else:
# For other sources (git, archives), check load_source_on_run flag
source_requires_build = bool(source and not build.load_source_on_run)
return bool(build.commands or build.requirements or source_requires_build)
[docs]
def deploy(
self,
project="",
tag="",
verbose=False,
builder_env: dict | None = None,
force_build: bool = False,
with_mlrun=None,
skip_deployed=False,
is_kfp=False,
mlrun_version_specifier=None,
show_on_failure: bool = False,
create_default_api_gateway: bool = True,
track_models: bool | None = None,
wait: bool = True,
):
"""
Deploy function, builds the application image if required (self.requires_build()) or force_build is True,
Once the image is built, the function is deployed.
:param project: Project name
:param tag: Function tag
:param verbose: Set True for verbose logging
:param builder_env: Env vars dict for source archive config/credentials
e.g. builder_env={"GIT_TOKEN": token}
:param force_build: Set True to force rebuilding the application image.
Use this when changing requirements, commands, or base image
after the initial deployment.
Code-only changes don't require force_build as the init container
loads the new source at runtime.
:param with_mlrun: Add the current mlrun package to the container build
:param skip_deployed: Skip the build if we already have an image for the function
:param is_kfp: Deploy as part of a kfp pipeline
:param mlrun_version_specifier: Which mlrun package version to include (if not current)
:param show_on_failure: Show logs only in case of build failure
:param create_default_api_gateway: When deploy finishes the default API gateway will be created for the
application. Disabling this flag means that the application will not be
accessible until an API gateway is created for it.
:param track_models: override state of self.spec.track_models. If not provided, uses the spec
value (False by default, True after setup_model_monitoring() is called).
When True, model endpoints are created at deployment time.
:param wait: must be True for application functions. Application deploy performs
readiness-dependent post-deploy steps (API gateway and sidecar), so
external wait orchestration is unsupported; passing ``False`` raises
``MLRunInvalidArgumentError``.
:return: The default API gateway URL if created or True if the function is ready (deployed)
"""
if not wait:
raise mlrun.errors.MLRunInvalidArgumentError(
"ApplicationRuntime.deploy(wait=False) is not supported. "
"Application deploy performs readiness-dependent post-deploy "
"steps (API gateway, sidecar). Deploy with wait=True."
)
# Upload local source as artifact. The server needs the store:// URI to configure the init container, but we
# restore the local path afterward, so subsequent deploys re-upload the file.
original_local_source, artifact_uri = self._upload_source_as_artifact()
try:
# Check status.application_image because spec.image gets cleared after build to use
# the reverse proxy image instead
if (
self.requires_build() and not self.status.application_image
) or force_build:
self._fill_credentials()
self._build_application_image(
builder_env=builder_env,
force_build=force_build,
watch=True,
with_mlrun=with_mlrun,
skip_deployed=skip_deployed,
is_kfp=is_kfp,
mlrun_version_specifier=mlrun_version_specifier,
show_on_failure=show_on_failure,
)
self._ensure_reverse_proxy_configurations()
self._configure_application_sidecar()
# We only allow accessing the application via the API Gateway
self.spec.add_templated_ingress_host_mode = (
NuclioIngressAddTemplatedIngressModes.never
)
self._enrich_sidecar_probe_ports()
super().deploy(
project=project,
tag=tag,
verbose=verbose,
builder_env=builder_env,
track_models=track_models,
)
logger.info(
"Successfully deployed function.",
)
finally:
# Restore the original local source path so subsequent deploys re-upload automatically
if artifact_uri and original_local_source:
self.spec.build.source = original_local_source
# Restore the source in case it was removed to make nuclio not consider it when building
if not self.spec.build.source and self.status.application_source:
self.spec.build.source = self.status.application_source
self.save(versioned=False)
if create_default_api_gateway:
try:
api_gateway_name = self.resolve_default_api_gateway_name()
return self.create_api_gateway(api_gateway_name, set_as_default=True)
except Exception as exc:
logger.warning(
"Failed to create default API gateway, application may not be accessible. "
"Use the `create_api_gateway` method to make it accessible",
exc=mlrun.errors.err_to_str(exc),
)
elif not self.status.api_gateway:
logger.warning(
"Application is online but may not be accessible since default gateway creation was not requested."
"Use the `create_api_gateway` method to make it accessible."
)
return True
[docs]
def with_source_archive(
self,
source,
workdir=None,
pull_at_runtime: bool = False,
target_dir: str | None = None,
):
"""load the code from git/tar/zip archive at build or runtime
:param source: valid absolute path or URL to git, zip, or tar file, e.g.
git://github.com/mlrun/something.git
http://some/url/file.zip
note path source must exist on the image or exist locally when run is local
(it is recommended to use 'workdir' when source is a filepath instead)
:param workdir: working dir relative to the archive root (e.g. './subdir') or absolute to the image root
:param pull_at_runtime: load the archive into the container at runtime (via init container) vs on build
:param target_dir: target dir on runtime pod for repo clone / archive extraction
"""
self._configure_mlrun_build_with_source(
source=source,
workdir=workdir,
pull_at_runtime=pull_at_runtime,
target_dir=target_dir,
)
[docs]
def from_image(self, image):
"""
Deploy the function with an existing nuclio processor image.
This applies only for the reverse proxy and not the application image.
:param image: image name
"""
super().from_image(image)
# nuclio implementation detail - when providing the image and emptying out the source code and build source,
# nuclio skips rebuilding the image and simply takes the prebuilt image
self.spec.build.functionSourceCode = ""
self.spec.config.pop("spec.build.functionSourceCode", None)
self.status.application_source = self.spec.build.source
self.spec.build.source = ""
self.spec.config.pop("spec.build.source", None)
# save the image in the status, so we won't repopulate the function source code
self.status.container_image = image
# ensure golang runtime and handler for the reverse proxy
self.spec.nuclio_runtime = "golang"
update_in(
self.spec.base_spec,
"spec.handler",
"main:Handler",
)
[docs]
@staticmethod
def get_filename_and_handler() -> (str, str):
reverse_proxy_file_path = pathlib.Path(__file__).parent / "reverse_proxy.go"
return str(reverse_proxy_file_path), "Handler"
[docs]
def create_api_gateway(
self,
name: str | None = None,
path: str | None = None,
direct_port_access: bool = False,
authentication_mode: schemas.APIGatewayAuthenticationMode = None,
authentication_creds: tuple[str, str] | None = None,
ssl_redirect: bool | None = None,
set_as_default: bool = False,
gateway_timeout: int | None = None,
port: int | None = None,
):
"""
Create the application API gateway. Once the application is deployed, the API gateway can be created.
An application without an API gateway is not accessible.
:param name: The name of the API gateway
:param path: Optional path of the API gateway, default value is "/".
The given path should be supported by the deployed application
:param direct_port_access: Set True to allow direct port access to the application sidecar
:param authentication_mode: API Gateway authentication mode
:param authentication_creds: API Gateway basic authentication credentials as a tuple (username, password)
:param ssl_redirect: Set True to force SSL redirect, False to disable. Defaults to
mlrun.mlconf.force_api_gateway_ssl_redirect()
:param set_as_default: Set the API gateway as the default for the application (`status.api_gateway`)
:param gateway_timeout: nginx ingress timeout in sec (request timeout, when will the gateway return an
error)
:param port: The API gateway port, used only when direct_port_access=True
:return: The API gateway URL
"""
if not name:
raise mlrun.errors.MLRunInvalidArgumentError(
"API gateway name must be specified."
)
if not set_as_default and name == self.resolve_default_api_gateway_name():
raise mlrun.errors.MLRunInvalidArgumentError(
f"Non-default API gateway cannot use the default gateway name, {name=}."
)
if (
authentication_mode == schemas.APIGatewayAuthenticationMode.basic
and not authentication_creds
):
raise mlrun.errors.MLRunInvalidArgumentError(
"Authentication credentials not provided"
)
if not direct_port_access and port:
logger.warning(
"Ignoring 'port' because 'direct_port_access' is not enabled. "
"The 'port' setting is only applicable when 'direct_port_access' is enabled."
)
ports = (
port or self.spec.internal_application_port if direct_port_access else []
)
api_gateway = nuclio_api_gateway.APIGateway(
nuclio_api_gateway.APIGatewayMetadata(
name=name,
namespace=self.metadata.namespace,
labels=self.metadata.labels.copy(),
),
nuclio_api_gateway.APIGatewaySpec(
functions=[self],
project=self.metadata.project,
path=path,
ports=mlrun.utils.helpers.as_list(ports) if ports else None,
),
)
api_gateway.with_gateway_timeout(gateway_timeout)
if ssl_redirect is None:
ssl_redirect = mlrun.mlconf.force_api_gateway_ssl_redirect()
if ssl_redirect:
# Force ssl redirect so that the application is only accessible via https
api_gateway.with_force_ssl_redirect()
# Add authentication if required
authentication_mode = (
authentication_mode
or mlrun.mlconf.function.application.default_authentication_mode
)
if authentication_mode == schemas.APIGatewayAuthenticationMode.access_key:
api_gateway.with_access_key_auth()
elif authentication_mode == schemas.APIGatewayAuthenticationMode.basic:
api_gateway.with_basic_auth(*authentication_creds)
elif authentication_mode == schemas.APIGatewayAuthenticationMode.iguazio:
api_gateway.with_iguazio_auth()
db = self._get_db()
api_gateway_scheme = db.store_api_gateway(
api_gateway=api_gateway.to_scheme(), project=self.metadata.project
)
if set_as_default:
self.status.api_gateway_name = api_gateway_scheme.metadata.name
self.status.api_gateway = nuclio_api_gateway.APIGateway.from_scheme(
api_gateway_scheme
)
self.status.api_gateway.wait_for_readiness()
self.url = self.status.api_gateway.invoke_url
url = self.url
else:
api_gateway = nuclio_api_gateway.APIGateway.from_scheme(api_gateway_scheme)
api_gateway.wait_for_readiness()
url = api_gateway.invoke_url
# Update application status (enriches invocation url)
self._get_state(raise_on_exception=False)
logger.info("Successfully created API gateway", url=url)
return url
[docs]
def delete_api_gateway(self, name: str):
"""
Delete API gateway by name.
Refreshes the application status to update api gateway and invocation URLs.
:param name: The API gateway name
"""
self._get_db().delete_api_gateway(name=name, project=self.metadata.project)
if name == self.status.api_gateway_name:
self.status.api_gateway_name = None
self.status.api_gateway = None
self._get_state()
[docs]
def invoke(
self,
path: str = "",
body: typing.Union[str, bytes, dict, list] | None = None,
method: str | None = None,
headers: dict | None = None,
force_external_address: bool = False,
auth_info: schemas.AuthInfo = None,
mock: bool | None = None,
credentials: tuple[str, str] | None = None,
**http_client_kwargs,
):
self._sync_api_gateway()
# If the API Gateway is not ready or not set, try to invoke the function directly (without the API Gateway)
if not self.status.api_gateway:
logger.warning(
"Default API gateway is not configured, invoking function invocation URL."
)
# create a requests auth object if credentials are provided and not already set in the http client kwargs
auth = http_client_kwargs.pop("auth", None) or (
nuclio.auth.AuthInfo(
username=credentials[0], password=credentials[1]
).to_requests_auth()
if credentials
else None
)
return super().invoke(
path,
body,
method,
headers,
force_external_address,
auth_info,
mock,
auth=auth,
**http_client_kwargs,
)
if not method:
method = "POST" if body else "GET"
return self.status.api_gateway.invoke(
method=method,
headers=headers,
credentials=credentials,
path=path,
body=body,
**http_client_kwargs,
)
[docs]
@classmethod
def deploy_reverse_proxy_image(cls):
"""
Build the reverse proxy image and save it.
The reverse proxy image is used to route requests to the application sidecar.
This is useful when you want to decrease build time by building the application image only once.
:param use_cache: Use the cache when building the image
"""
# create a function that includes only the reverse proxy, without the application
if not mlrun.get_current_project(silent=True):
raise mlrun.errors.MLRunMissingProjectError(
"An active project is required to run deploy_reverse_proxy_image(). "
"Use `mlrun.get_or_create_project()` or set an active project first."
)
reverse_proxy_func = mlrun.run.new_function(
name="reverse-proxy-temp", kind="remote"
)
# default max replicas is 4, we only need one replica for the reverse proxy
reverse_proxy_func.spec.max_replicas = 1
# the reverse proxy image should not be based on another image
reverse_proxy_func.set_config("spec.build.baseImage", None)
reverse_proxy_func.spec.image = ""
reverse_proxy_func.spec.build.base_image = ""
cls._ensure_reverse_proxy_configurations(reverse_proxy_func)
reverse_proxy_func.deploy()
# save the created container image
cls.reverse_proxy_image = reverse_proxy_func.status.container_image
# delete the function to avoid cluttering the project
mlrun.get_run_db().delete_function(
reverse_proxy_func.metadata.name, reverse_proxy_func.metadata.project
)
[docs]
def resolve_default_api_gateway_name(self):
return (
f"{self.metadata.name}-{self.metadata.tag}"
if self.metadata.tag
else self.metadata.name
)
@nuclio_function.min_nuclio_versions("1.13.1")
def disable_default_http_trigger(
self,
):
raise mlrun.runtimes.RunError(
"Application runtime does not support disabling the default HTTP trigger"
)
@nuclio_function.min_nuclio_versions("1.13.1")
def enable_default_http_trigger(
self,
):
pass
def _run(self, runobj: "mlrun.RunObject", execution):
raise mlrun.runtimes.RunError(
"Application runtime .run() is not yet supported. Use .invoke() instead."
)
def _enrich_command_from_status(self):
pass
def _build_application_image(
self,
builder_env: dict | None = None,
force_build: bool = False,
watch=True,
with_mlrun=None,
skip_deployed=False,
is_kfp=False,
mlrun_version_specifier=None,
show_on_failure: bool = False,
):
if not self.spec.command:
logger.warning(
"Building the application image without a command. "
"Use spec.command and spec.args to specify the application entrypoint",
command=self.spec.command,
args=self.spec.args,
)
if self.spec.build.source in [".", "./"]:
logger.info(
"The application is configured to use the project's source. "
"Application runtime requires loading the source into the application image. "
"Loading on build will be forced regardless of whether 'pull_at_runtime=True' was configured."
)
# We temporarily clear self.spec.build.source here because the parent _build_image() method
# would otherwise try to include it in the Docker build context. For store:// URIs, the source
# cannot be fetched during build (it requires runtime credentials/context), so we must:
# 1. Clear it before build to prevent build context inclusion
# 2. Restore it after build so the server can configure the init container for runtime loading
source_for_init_container = None
if self.spec.build.source and mlrun.datastore.is_store_uri(
self.spec.build.source
):
source_for_init_container = self.spec.build.source
self.spec.build.source = None
logger.debug(
"Source is a store:// artifact URI - excluding from build, "
"init container will load it at runtime",
source=source_for_init_container,
)
with_mlrun = self._resolve_build_with_mlrun(with_mlrun)
try:
result = self._build_image(
builder_env=builder_env,
force_build=force_build,
mlrun_version_specifier=mlrun_version_specifier,
show_on_failure=show_on_failure,
skip_deployed=skip_deployed,
watch=watch,
is_kfp=is_kfp,
with_mlrun=with_mlrun,
)
finally:
# Restore source for init container configuration by the server
if source_for_init_container:
self.spec.build.source = source_for_init_container
return result
def _ensure_reverse_proxy_configurations(self):
# If an HTTP trigger already exists in the spec,
# it means the user explicitly defined a custom configuration,
# so, skip automatic creation.
skip_http_trigger_creation = False
for key, value in self.spec.config.items():
if key.startswith("spec.triggers"):
if isinstance(value, dict):
if value.get("kind") == "http":
skip_http_trigger_creation = True
break
if not skip_http_trigger_creation:
self.with_http(
workers=mlrun.mlconf.function.application.default_worker_number,
trigger_name="application-http",
)
if self.spec.build.functionSourceCode or self.status.container_image:
return
filename, handler = ApplicationRuntime.get_filename_and_handler()
name, spec, code = nuclio.build_file(
filename,
name=self.metadata.name,
handler=handler,
)
self.spec.function_handler = mlrun.utils.get_in(spec, "spec.handler")
self.spec.build.functionSourceCode = mlrun.utils.get_in(
spec, "spec.build.functionSourceCode"
)
self.spec.nuclio_runtime = mlrun.utils.get_in(spec, "spec.runtime")
# default the reverse proxy logger level to info
logger_sinks_key = "spec.loggerSinks"
if not self.spec.config.get(logger_sinks_key):
self.set_config(
logger_sinks_key, [{"level": "info", "sink": "myStdoutLoggerSink"}]
)
def _configure_application_sidecar(self):
# Save the application image in the status to allow overriding it with the reverse proxy entry point
if self.spec.image and (
not self.status.application_image
or self.spec.image != self.status.container_image
):
self.status.application_image = self.spec.image
self.spec.image = ""
# reuse the reverse proxy image if it was built before
if (
reverse_proxy_image := self.status.container_image
or self.reverse_proxy_image
):
self.from_image(reverse_proxy_image)
self.status.sidecar_name = f"{self.metadata.name}-sidecar"
self.with_sidecar(
name=self.status.sidecar_name,
image=self.status.application_image,
ports=self.spec.application_ports,
command=self.spec.command,
args=self.spec.args,
)
self.set_env("SIDECAR_PORT", self.spec.internal_application_port)
self.set_env("SIDECAR_HOST", "http://localhost")
# configure the sidecar container as the default container for logging purposes
self.metadata.annotations["kubectl.kubernetes.io/default-container"] = (
self.status.sidecar_name
)
def _sync_api_gateway(self):
if not self.status.api_gateway_name:
return
db = self._get_db()
api_gateway_scheme = db.get_api_gateway(
name=self.status.api_gateway_name, project=self.metadata.project
)
self.status.api_gateway = nuclio_api_gateway.APIGateway.from_scheme(
api_gateway_scheme
)
self.status.api_gateway.wait_for_readiness()
self.url = self.status.api_gateway.invoke_url
def _enrich_sidecar_probe_ports(self):
"""Enrich sidecar probe configurations with internal application port if needed
This method is called just before deployment to automatically enrich HTTP probes
in the sidecar container that were configured without an explicit port.
Enrichment logic:
- Only enriches HTTP probes (httpGet) that don't already have a port specified
- If the user explicitly provided http_port in set_probe(), enrichment is skipped
- If the user provided a port in the config dict, enrichment is skipped
- Enrichment happens just before deployment to ensure the latest internal_application_port
value is used, even if it was modified after set_probe() was called
"""
# Check each probe type and enrich missing HTTP ports
sidecar = self._get_sidecar()
if not sidecar:
return
for probe_type in ProbeType:
probe_config = sidecar.get(probe_type.key)
if probe_config and isinstance(probe_config, dict):
http_get = probe_config.get("httpGet")
if http_get and isinstance(http_get, dict) and "port" not in http_get:
if self.spec.internal_application_port is None:
raise ValueError(
f"Cannot enrich {probe_type.value} probe: HTTP probe requires a port, "
"but internal_application_port is not set. "
"Please set the internal_application_port or provide http_port in set_probe()."
)
http_get["port"] = self.spec.internal_application_port
logger.debug(
"Enriched sidecar probe port",
probe_type=probe_type.value,
port=http_get["port"],
application_name=self.metadata.name,
)
sidecar[probe_type.key] = probe_config
def _get_sidecar(self) -> dict | None:
"""Get the sidecar container for ApplicationRuntime
Returns the sidecar dict if found, None otherwise.
"""
sidecar_name = self._get_sidecar_name()
if not hasattr(self.spec, "config") or "spec.sidecars" not in self.spec.config:
return None
sidecars = self.spec.config["spec.sidecars"]
for sidecar in sidecars:
if sidecar.get("name") == sidecar_name:
return sidecar
return None
def _get_sidecar_name(self) -> str:
return f"{self.metadata.name}-sidecar"
@staticmethod
def _validate_set_probes_input(params: dict):
# Validate probe type
ProbeType.is_valid(params.get("type"), raise_on_error=True)
# At least one optional parameter must be provided
optional_params = {
k: v for k, v in params.items() if (k != "type" and k != "self")
}
if all(v is None for v in optional_params.values()):
raise ValueError(
"Empty probe configuration: at least one parameter must be set"
)
def _upload_source_as_artifact(self) -> tuple[str | None, str | None]:
"""
Upload local single-file source as an MLRun artifact.
If spec.build.source is a local file path, upload it to the artifact store and update spec.build.source
with the artifact URI for the server.
:returns: (original_local_path, artifact_uri) if uploaded, (None, None) otherwise.
deploy() uses these to restore the local path in a finally block.
"""
source = self.spec.build.source
if not source or not self._is_local_path(source) or os.path.isdir(source):
return None, None
if not os.path.isfile(source):
# On redeploy the remote artifact from the previous deploy is still available
if not getattr(self.status, "application_source", None):
raise mlrun.errors.MLRunNotFoundError(
f"Source file not found: '{source}'. "
"The file must exist locally to be uploaded as a source artifact."
)
return None, None
project_name = self.metadata.project
if not project_name:
raise mlrun.errors.MLRunMissingProjectError(
"Project is required to upload source as artifact"
)
project = mlrun.get_or_create_project(project_name)
# Use function name as part of the artifact key for identification
artifact_key = f"{self.metadata.name}-source"
logger.info(
"Uploading local source file as artifact",
source=source,
artifact_key=artifact_key,
project=project_name,
)
# Upload the file as a code artifact to an internal path with system-generated label
try:
artifact = project.log_code_file(
key=artifact_key,
local_path=source,
code_type="function",
artifact_path=mlrun.common.constants.MLRUN_INTERNAL_ARTIFACT_PATH,
upload=True,
labels={
mlrun.common.constants.MLRunInternalLabels.function_name: self.metadata.name,
mlrun.common.constants.MLRunInternalLabels.system_generated: "true",
},
)
except Exception as exc:
raise mlrun.errors.MLRunRuntimeError(
f"Failed to upload source file '{source}' as artifact"
) from exc
self.spec.build.source = artifact.uri
return source, artifact.uri
@staticmethod
def _is_local_path(source: str) -> bool:
"""Check if source looks like a local filesystem path (not a URL or store URI)."""
if mlrun.datastore.is_store_uri(source):
return False
return is_relative_path(source) or os.path.isabs(source)