Source code for mlrun.serving.otel

# Copyright 2026 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import socket
from typing import Literal

import storey

import mlrun
import mlrun.errors
import mlrun.utils.telemetry

logger = mlrun.utils.logger


# TODO move to storey
def _warmup_endpoint(endpoint: str, timeout: float = 5.0) -> None:
    host, _, port = endpoint.rpartition(":")
    if not host or not port.isdigit():
        return
    try:
        with socket.create_connection((host, int(port)), timeout=timeout):
            pass
    except OSError as exc:
        logger.debug(
            "OTel endpoint warmup probe failed",
            endpoint=endpoint,
            error=str(exc),
        )


# Sources for the OTLP headers passed to the underlying gRPC channel.
_HEADERS_SOURCE_FILE = "file"
_HEADERS_SOURCE_PROJECT_SECRET = "project_secret"
_HEADERS_SOURCE_NONE = "none"
_HEADERS_SOURCES = (
    _HEADERS_SOURCE_FILE,
    _HEADERS_SOURCE_PROJECT_SECRET,
    _HEADERS_SOURCE_NONE,
)


[docs] class OTelMetricsExporter(storey.OTelMetricsExporter): """MLRun serving graph step that exports OTel metrics as a side-effect. Inherits from ``storey.OTelMetricsExporter`` (a pass-through ``Flow`` step that forwards each event downstream after recording the metric) and layers MLRun-aware defaults on top: - ``endpoint`` defaults to ``mlrun.mlconf.telemetry.otlp_endpoint`` — set by the operator on the API server and delivered to the SDK via ``/client-spec``. Pass explicitly to route metrics to a different OTLP receiver. - ``insecure`` defaults to ``mlrun.mlconf.telemetry.insecure``. - Headers come from one of three sources controlled by ``headers_source``: * ``"file"`` (default): read from the kubelet-mounted secret at ``mlrun.common.constants.MLRUN_TELEMETRY_OTLP_HEADERS_PATH``. The server-side runtime injector mounts the secret when the function's ``runtime.spec.mount_otlp_secret=True``. One file per header — filename = header name, contents = header value. Used by MLRun's internal Model Monitoring applications. * ``"project_secret"``: read from a single project secret whose value is a JSON dict of ``{header_name: header_value}``. Use this when app authors want to manage their own OTel auth headers without touching the operator's secret. * ``"none"``: no headers. Suitable for in-cluster collectors that don't require authentication. Example:: flow = function.set_topology("flow", engine="async") flow.to(name="my_app", class_name="MyApp").to( class_name="mlrun.serving.OTelMetricsExporter", # endpoint, insecure default from mlconf.telemetry headers_source="file", ) .. warning:: When ``headers_source`` is ``"file"`` or ``"project_secret"``, headers are resolved **eagerly** in ``__init__``. Always add the step via the ``class_name="mlrun.serving.OTelMetricsExporter"`` form (above) so construction is deferred to function-pod startup, where the secret mount (or project-secret env) actually exists. Instantiating the class directly on the SDK side — e.g. ``flow.to(OTelMetricsExporter(headers_source="file"))`` — runs the resolver against a missing mount, silently returns an empty headers dict, and bakes that empty dict into the serialized graph. The OTLP endpoint is resolved at construction time; if neither passed nor present in ``mlconf.telemetry``, construction raises ``MLRunRuntimeError``. Call ``mlrun.get_run_db()`` (or ``mlrun.get_or_create_project(...)``) first in dev contexts so the SDK has synced ``/client-spec``. :param endpoint: OTLP gRPC endpoint URL (e.g. ``"otel-collector.iguazio .svc.cluster.local:4317"``). Defaults to ``mlrun.mlconf.telemetry.otlp_endpoint``. :param insecure: Use a plaintext (non-TLS) gRPC channel. Defaults to ``mlrun.mlconf.telemetry.insecure``. :param headers_source: One of ``"file"``, ``"project_secret"``, ``"none"``. See class docstring above. :param project_secret_key: Required when ``headers_source="project_secret"``. The secret's value must be a JSON object whose keys are header names and values are header values. All remaining keyword arguments are forwarded to ``storey.OTelMetricsExporter`` unchanged (``flush_mode``, ``export_interval_millis``, ``instrument_type``, ``metric_name_field``, ``value_field``, ``attribute_fields``, ``metrics_field``, etc.). """ def __init__( self, endpoint: str | None = None, insecure: bool | None = None, headers_source: Literal["file", "project_secret", "none"] = "file", project_secret_key: str | None = None, export_interval_millis: int | None = None, **kwargs, ): if headers_source not in _HEADERS_SOURCES: raise mlrun.errors.MLRunInvalidArgumentError( f"Unknown headers_source {headers_source!r}; " f"use one of {_HEADERS_SOURCES}." ) if headers_source == _HEADERS_SOURCE_PROJECT_SECRET and not project_secret_key: raise mlrun.errors.MLRunInvalidArgumentError( "project_secret_key is required when headers_source='project_secret'." ) resolved_endpoint = endpoint or mlrun.mlconf.telemetry.otlp_endpoint if not resolved_endpoint: raise mlrun.errors.MLRunRuntimeError( "OTLP endpoint unresolved: pass endpoint=..., or ensure the " "operator has set mlconf.telemetry.otlp_endpoint and the " "/client-spec sync has run (e.g. call mlrun.get_run_db() " "or mlrun.get_or_create_project() first)." ) resolved_insecure = ( insecure if insecure is not None else mlrun.mlconf.telemetry.insecure ) mm_interval_s = mlrun.mlconf.telemetry.model_monitoring.interval export_interval_millis = export_interval_millis or ( int(mm_interval_s) * 1000 if mm_interval_s is not None else None ) if export_interval_millis is None: kwargs["flush_mode"] = "immediate" else: kwargs["export_interval_millis"] = export_interval_millis super().__init__( endpoint=resolved_endpoint, headers=self._resolve_headers(headers_source, project_secret_key), insecure=bool(resolved_insecure), **kwargs, ) # todo move to storey if os.environ.get("NUCLIO_FUNCTION_NAME"): _warmup_endpoint(resolved_endpoint) # Stash mlrun-specific config for introspection / serialization. self._mlrun_headers_source = headers_source self._mlrun_project_secret_key = project_secret_key @staticmethod def _resolve_headers(source: str, project_secret_key: str | None) -> dict[str, str]: if source == _HEADERS_SOURCE_NONE: return {} if source == _HEADERS_SOURCE_FILE: return mlrun.utils.telemetry.resolve_otlp_headers() # project_secret raw = mlrun.get_secret_or_env(project_secret_key) if not raw: return {} try: headers = json.loads(raw) except json.JSONDecodeError as exc: raise mlrun.errors.MLRunInvalidArgumentError( f"Project secret {project_secret_key!r} must contain a JSON dict " f"of {{header_name: header_value}}; failed to parse." ) from exc if not isinstance(headers, dict): raise mlrun.errors.MLRunInvalidArgumentError( f"Project secret {project_secret_key!r} must decode to a JSON dict; " f"got {type(headers).__name__}." ) return {str(k): str(v) for k, v in headers.items()}