# Copyright 2023 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections import defaultdict
from collections.abc import Iterator
from datetime import datetime
from typing import Annotated, Any, Callable, Optional, Union
import pydantic.v1
import mlrun.common.schemas.notification as notification_objects
from mlrun.common.types import StrEnum
[docs]class EventEntityKind(StrEnum):
MODEL_ENDPOINT_RESULT = "model-endpoint-result"
MODEL_MONITORING_APPLICATION = "model-monitoring-application"
JOB = "job"
[docs]class EventEntities(pydantic.v1.BaseModel):
kind: EventEntityKind
project: str
ids: pydantic.v1.conlist(str, min_items=1, max_items=1)
[docs]class EventKind(StrEnum):
DATA_DRIFT_DETECTED = "data-drift-detected"
DATA_DRIFT_SUSPECTED = "data-drift-suspected"
CONCEPT_DRIFT_DETECTED = "concept-drift-detected"
CONCEPT_DRIFT_SUSPECTED = "concept-drift-suspected"
MODEL_PERFORMANCE_DETECTED = "model-performance-detected"
MODEL_PERFORMANCE_SUSPECTED = "model-performance-suspected"
SYSTEM_PERFORMANCE_DETECTED = "system-performance-detected"
SYSTEM_PERFORMANCE_SUSPECTED = "system-performance-suspected"
MM_APP_ANOMALY_DETECTED = "mm-app-anomaly-detected"
MM_APP_ANOMALY_SUSPECTED = "mm-app-anomaly-suspected"
MM_APP_FAILED = "mm-app-failed"
FAILED = "failed"
_event_kind_entity_map = {
EventKind.DATA_DRIFT_SUSPECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.DATA_DRIFT_DETECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.CONCEPT_DRIFT_DETECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.CONCEPT_DRIFT_SUSPECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.MODEL_PERFORMANCE_DETECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.MODEL_PERFORMANCE_SUSPECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.SYSTEM_PERFORMANCE_DETECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.SYSTEM_PERFORMANCE_SUSPECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.MM_APP_ANOMALY_DETECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.MM_APP_ANOMALY_SUSPECTED: [EventEntityKind.MODEL_ENDPOINT_RESULT],
EventKind.MM_APP_FAILED: [EventEntityKind.MODEL_MONITORING_APPLICATION],
EventKind.FAILED: [EventEntityKind.JOB],
}
[docs]class Event(pydantic.v1.BaseModel):
kind: EventKind
timestamp: Union[str, datetime] = None # occurrence time
entity: EventEntities
value_dict: Optional[dict] = pydantic.v1.Field(default_factory=dict)
[docs] def is_valid(self):
return self.entity.kind in _event_kind_entity_map[self.kind]
[docs]class AlertActiveState(StrEnum):
ACTIVE = "active"
INACTIVE = "inactive"
[docs]class AlertSeverity(StrEnum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
# what should trigger the alert. must be either event (at least 1), or prometheus query
[docs]class AlertTrigger(pydantic.v1.BaseModel):
events: list[EventKind] = []
prometheus_alert: str = None
def __eq__(self, other):
return (
self.prometheus_alert == other.prometheus_alert
and self.events == other.events
)
[docs]class AlertCriteria(pydantic.v1.BaseModel):
count: Annotated[
int,
pydantic.v1.Field(
description="Number of events to wait until notification is sent"
),
] = 1
period: Annotated[
str,
pydantic.v1.Field(
description="Time period during which event occurred. e.g. 1d, 3h, 5m, 15s"
),
] = None
def __eq__(self, other):
return self.count == other.count and self.period == other.period
[docs]class ResetPolicy(StrEnum):
MANUAL = "manual"
AUTO = "auto"
[docs]class AlertNotification(pydantic.v1.BaseModel):
notification: notification_objects.Notification
cooldown_period: Annotated[
str,
pydantic.v1.Field(
description="Period during which notifications "
"will not be sent after initial send. The format of this would be in time."
" e.g. 1d, 3h, 5m, 15s"
),
] = None
[docs]class AlertConfig(pydantic.v1.BaseModel):
project: str
id: int = None
name: str
description: Optional[str] = ""
summary: Annotated[
str,
pydantic.v1.Field(
description=(
"String to be sent in the notifications generated."
"e.g. 'Model {{project}}/{{entity}} is drifting.'"
"Supported variables: project, entity, name"
)
),
]
created: Union[str, datetime] = None
severity: AlertSeverity
entities: EventEntities
trigger: AlertTrigger
criteria: Optional[AlertCriteria]
reset_policy: ResetPolicy = ResetPolicy.AUTO
notifications: pydantic.v1.conlist(AlertNotification, min_items=1)
state: AlertActiveState = AlertActiveState.INACTIVE
count: Optional[int] = 0
updated: datetime = None
[docs] class Config:
extra = pydantic.v1.Extra.allow
[docs] def get_raw_notifications(self) -> list[notification_objects.Notification]:
return [
alert_notification.notification for alert_notification in self.notifications
]
[docs]class AlertsModes(StrEnum):
enabled = "enabled"
disabled = "disabled"
[docs]class AlertTemplate(
pydantic.v1.BaseModel
): # Template fields that are not shared with created configs
template_id: int = None
template_name: str
template_description: Optional[str] = (
"String explaining the purpose of this template"
)
# A property that identifies templates that were created by the system and cannot be modified/deleted by the user
system_generated: bool = False
# AlertConfig fields that are pre-defined
summary: Optional[str] = (
"String to be sent in the generated notifications e.g. 'Model {{project}}/{{entity}} is drifting.'"
"See AlertConfig.summary description"
)
severity: AlertSeverity
trigger: AlertTrigger
criteria: Optional[AlertCriteria]
reset_policy: ResetPolicy = ResetPolicy.AUTO
# This is slightly different than __eq__ as it doesn't compare everything
[docs] def templates_differ(self, other):
return (
self.template_description != other.template_description
or self.summary != other.summary
or self.severity != other.severity
or self.trigger != other.trigger
or self.reset_policy != other.reset_policy
or self.criteria != other.criteria
)
[docs]class AlertActivation(pydantic.v1.BaseModel):
id: int
name: str
project: str
severity: AlertSeverity
activation_time: datetime
entity_id: str
entity_kind: EventEntityKind
criteria: AlertCriteria
event_kind: EventKind
number_of_events: int
notifications: list[notification_objects.NotificationState]
reset_time: Optional[datetime] = None
[docs] def group_key(self, attributes: list[str]) -> Union[Any, tuple]:
"""
Dynamically create a key for grouping based on the provided attributes.
- If there's only one attribute, return the value directly (not a single-element tuple).
- If there are multiple attributes, return them as a tuple for grouping.
This ensures grouping behaves intuitively without redundant tuple representations.
"""
if len(attributes) == 1:
# Avoid single-element tuple like (high,) when only one grouping attribute is used
return getattr(self, attributes[0])
# Otherwise, return a tuple of all specified attributes
return tuple(getattr(self, attr) for attr in attributes)
[docs]class AlertActivations(pydantic.v1.BaseModel):
activations: list[AlertActivation]
pagination: Optional[dict]
def __iter__(self) -> Iterator[AlertActivation]:
return iter(self.activations)
def __getitem__(self, index: int) -> AlertActivation:
return self.activations[index]
def __len__(self) -> int:
return len(self.activations)
[docs] def group_by(self, *attributes: str) -> dict:
"""
Group alert activations by specified attributes.
Args:
:param attributes: Attributes to group by.
:returns: A dictionary where keys are tuples of attribute values and values are lists of
AlertActivation objects.
Example:
# Group by project and severity
grouped = activations.group_by("project", "severity")
"""
grouped = defaultdict(list)
for activation in self.activations:
key = activation.group_key(attributes)
grouped[key].append(activation)
return dict(grouped)
[docs] def aggregate_by(
self,
group_by_attrs: list[str],
aggregation_function: Callable[[list[AlertActivation]], Any],
) -> dict:
"""
Aggregate alert activations by specified attributes using a given aggregation function.
Args:
:param group_by_attrs: Attributes to group by.
:param aggregation_function: Function to aggregate grouped activations.
:returns: A dictionary where keys are tuples of attribute values and values are the result
of the aggregation function.
Example:
# Aggregate by name and entity_id and count number of activations in each group
activations.aggregate_by(["name", "entity_id"], lambda activations: len(activations))
"""
grouped = self.group_by(*group_by_attrs)
aggregated = {
key: aggregation_function(activations)
for key, activations in grouped.items()
}
return aggregated