# Copyright 2023 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from typing import Any
import mlrun.errors
from mlrun.utils import get_in, update_in
# headers keys with underscore are getting ignored by werkzeug https://github.com/pallets/werkzeug/pull/2622
# to avoid conflicts with WGSI which converts all header keys to uppercase with underscores.
# more info https://github.com/benoitc/gunicorn/issues/2799, this comment can be removed once old keys are removed
event_id_key = "MLRUN-EVENT-ID"
event_path_key = "MLRUN-EVENT-PATH"
def _extract_input_data(input_path, body):
if input_path:
if not hasattr(body, "__getitem__"):
raise TypeError("input_path parameter supports only dict-like event bodies")
return get_in(body, input_path)
return body
def _update_result_body(result_path, event_body, result):
if result_path and event_body:
if not hasattr(event_body, "__getitem__"):
raise TypeError(
"result_path parameter supports only dict-like event bodies"
)
update_in(event_body, result_path, result)
else:
event_body = result
return event_body
class _RequestContext(dict):
"""Unified request context passed to handlers after API handler processing.
Merges parameters from body_map (JSONPath extraction), path templates, query
string, and system-injected URL info into a single dict. The original event
body is preserved as :attr:`original_body`.
When a downstream :class:`TaskStep` receives this object it calls the handler
as ``fn(original_body, **params)`` so handlers can declare named parameters::
def handler(body, model_name, version, **kwargs): ...
Priority order (highest wins): path > query > body_map.
Conflicts between path/query/body_map raise :exc:`MLRunBadRequestError`.
System-injected ``url_params`` (``mlrun_`` prefix) are merged last without
conflict checking.
"""
def __init__(
self,
original_body: Any = None,
path_params: dict[str, str] | None = None,
query_params: dict[str, str | list[str]] | None = None,
body_params: dict[str, Any] | None = None,
url_params: dict[str, Any] | None = None,
):
merged: dict[str, Any] = {}
sources = [
("body_map", body_params or {}),
("query", query_params or {}),
("path", path_params or {}),
]
param_sources: dict[str, list[str]] = {}
for source_name, params in sources:
for key, value in params.items():
if key in merged:
param_sources.setdefault(key, []).append(source_name)
else:
param_sources[key] = [source_name]
merged[key] = value
conflicts = {k: v for k, v in param_sources.items() if len(v) > 1}
if conflicts:
conflict_details = ", ".join(
f"{k} (from {' + '.join(srcs)})" for k, srcs in conflicts.items()
)
raise mlrun.errors.MLRunBadRequestError(
f"Parameter name conflict detected. Same parameter appears in multiple "
f"request sources: {conflict_details}. Parameters must be unique across "
f"path, query, and body_map."
)
if url_params:
merged.update(url_params)
super().__init__(merged)
self.original_body = original_body
[docs]
class StepToDict:
"""auto serialization of graph steps to a python dictionary"""
meta_keys = [
"context",
"name",
"input_path",
"result_path",
"full_event",
"kwargs",
]
[docs]
def to_dict(
self,
fields: list | None = None,
exclude: list | None = None,
strip: bool = False,
):
"""convert the step object to a python dictionary"""
fields = fields or getattr(self, "_dict_fields", None)
if not fields:
fields = list(inspect.signature(self.__init__).parameters.keys())
if exclude:
fields = [field for field in fields if field not in exclude]
args = {
key: getattr(self, key)
for key in fields
if getattr(self, key, None) is not None and key not in self.meta_keys
}
# add storey kwargs or extra kwargs
if "kwargs" in fields and (hasattr(self, "kwargs") or hasattr(self, "_kwargs")):
kwargs = getattr(self, "kwargs", {}) or getattr(self, "_kwargs", {})
for key, value in kwargs.items():
if key not in self.meta_keys:
args[key] = value
mod_name = self.__class__.__module__
class_path = self.__class__.__qualname__
if mod_name not in ["__main__", "builtins"]:
class_path = f"{mod_name}.{class_path}"
struct = {
"class_name": class_path,
"name": self.name
if hasattr(self, "name") and self.name
else self.__class__.__name__,
"class_args": args,
}
if hasattr(self, "_STEP_KIND"):
struct["kind"] = self._STEP_KIND
if hasattr(self, "_input_path") and self._input_path is not None:
struct["input_path"] = self._input_path
if hasattr(self, "_result_path") and self._result_path is not None:
struct["result_path"] = self._result_path
if hasattr(self, "_full_event") and self._full_event:
struct["full_event"] = self._full_event
return struct
class MonitoringApplicationToDict(StepToDict):
_STEP_KIND = "monitoring_application"
meta_keys = []
class RouterToDict(StepToDict):
_STEP_KIND = "router"
def to_dict(
self,
fields: list | None = None,
exclude: list | None = None,
strip: bool = False,
):
return super().to_dict(exclude=["routes"], strip=strip)