# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from typing import Union
import mlrun
import mlrun.common.schemas.alert as alert_objects
from mlrun.model import ModelObj
[docs]
class AlertConfig(ModelObj):
_dict_fields = [
"project",
"name",
"description",
"summary",
"severity",
"reset_policy",
"cooldown_period",
"state",
"count",
"created",
"updated",
]
_fields_to_serialize = ModelObj._fields_to_serialize + [
"entities",
"notifications",
"trigger",
"criteria",
]
def __init__(
self,
project: str | None = None,
name: str | None = None,
template: Union[alert_objects.AlertTemplate, str] = None,
description: str | None = None,
summary: str | None = None,
severity: alert_objects.AlertSeverity = None,
trigger: alert_objects.AlertTrigger = None,
criteria: alert_objects.AlertCriteria = None,
reset_policy: alert_objects.ResetPolicy = None,
cooldown_period: str | None = None,
notifications: list[alert_objects.AlertNotification] | None = None,
entities: alert_objects.EventEntities = None,
id: int | None = None,
state: alert_objects.AlertActiveState = None,
created: str | None = None,
count: int | None = None,
updated: str | None = None,
**kwargs,
):
"""Alert config object
Example::
# create an alert on endpoint_id, which will be triggered to slack if there is a "data_drift_detected" event
# 3 times in the next hour.
from mlrun.alerts import AlertConfig
import mlrun.common.schemas.alert as alert_objects
entity_kind = alert_objects.EventEntityKind.MODEL_ENDPOINT_RESULT
entity_id = get_default_result_instance_fqn(endpoint_id)
event_name = alert_objects.EventKind.DATA_DRIFT_DETECTED
notification = mlrun.model.Notification(
kind="slack",
name="slack_notification",
message="drift was detected",
severity="warning",
when=["now"],
condition="failed",
secret_params={
"webhook": "https://hooks.slack.com/",
},
).to_dict()
alert_data = AlertConfig(
project="my-project",
name="drift-alert",
summary="a drift was detected",
severity=alert_objects.AlertSeverity.LOW,
entities=alert_objects.EventEntities(
kind=entity_kind, project="my-project", ids=[entity_id]
),
trigger=alert_objects.AlertTrigger(events=[event_name]),
criteria=alert_objects.AlertCriteria(count=3, period="1h"),
notifications=[
alert_objects.AlertNotification(notification=notification)
],
)
project.store_alert_config(alert_data)
:param project: Name of the project to associate the alert with
:param name: Name of the alert
:param template: Optional parameter that allows creating an alert based on a predefined template.
You can pass either an AlertTemplate object or a string (the template name).
If a template is used, many fields of the alert will be auto-generated based on the
template.However, you still need to provide the following fields:
`name`, `project`, `entity`, `notifications`
:param description: Description of the alert
:param summary: Summary of the alert, will be sent in the generated notifications
:param severity: Severity of the alert
:param trigger: The events that will trigger this alert, may be a simple trigger based on events or
complex trigger which is based on a prometheus alert
:param criteria: When the alert will be triggered based on the specified number of events within the
defined time period.
:param reset_policy: When to clear the alert. "manual" means the alert stays active after triggering
and must be reset explicitly. "auto" means the alert is reset automatically
after triggering and sending notifications (immediately if cooldown_period is
not set, or after the cooldown period elapses if it is set).
:param cooldown_period: Period during which the alert remains active after being triggered before it
is automatically reset. Only applicable when reset_policy=auto and
cooldown_period > 0. If not set or set to zero, the alert resets
immediately. Format: e.g. 1d, 3h, 5m, 15s.
:param notifications: List of notifications to invoke once the alert is triggered
:param entities: Entities that the event relates to. The entity object will contain fields that
uniquely identify a given entity in the system
:param id: Internal id of the alert (user should not supply it)
:param state: State of the alert, may be active/inactive (user should not supply it)
:param created: When the alert is created (user should not supply it)
:param count: Internal counter of the alert (user should not supply it)
:param updated: The last update time of the alert (user should not supply it)
"""
self.project = project
self.name = name
self.description = description
self.summary = summary
self.severity = severity
self.trigger = trigger
self.criteria = criteria
self.reset_policy = reset_policy
self.cooldown_period = cooldown_period
self.notifications = notifications or []
self.entities = entities
self.id = id
self.state = state
self._created = created
self.count = count
self._updated = updated
if template:
self._apply_template(template)
@property
def created(self) -> datetime:
"""
Get the `created` field as a datetime object.
"""
if isinstance(self._created, str):
return datetime.fromisoformat(self._created)
return self._created
@created.setter
def created(self, created):
self._created = created
@property
def updated(self) -> datetime:
"""
Get the `updated` field as a datetime object.
"""
if isinstance(self._updated, str):
return datetime.fromisoformat(self._updated)
return self._updated
@updated.setter
def updated(self, updated):
self._updated = updated
[docs]
def validate_required_fields(self):
if not self.name:
raise mlrun.errors.MLRunInvalidArgumentError("Alert name must be provided")
def _serialize_field(
self, struct: dict, field_name: str | None = None, strip: bool = False
):
if field_name == "entities":
if self.entities:
return (
self.entities.dict()
if not isinstance(self.entities, dict)
else self.entities
)
return None
if field_name == "notifications":
if self.notifications:
return [
notification_data.dict()
if not isinstance(notification_data, dict)
else notification_data
for notification_data in self.notifications
]
return None
if field_name == "trigger":
if self.trigger:
return (
self.trigger.dict()
if not isinstance(self.trigger, dict)
else self.trigger
)
return None
if field_name == "criteria":
if self.criteria:
return (
self.criteria.dict()
if not isinstance(self.criteria, dict)
else self.criteria
)
return None
return super()._serialize_field(struct, field_name, strip)
[docs]
def to_dict(
self,
fields: list | None = None,
exclude: list | None = None,
strip: bool = False,
):
if self.entities is None:
raise mlrun.errors.MLRunBadRequestError("Alert entity field is missing")
if not self.notifications:
raise mlrun.errors.MLRunBadRequestError(
"Alert must have at least one notification"
)
return super().to_dict(self._dict_fields)
[docs]
@classmethod
def from_dict(cls, struct=None, fields=None, deprecated_fields: dict | None = None):
new_obj = super().from_dict(struct, fields=fields)
entity_data = struct.get("entities")
if entity_data:
entity_obj = alert_objects.EventEntities.parse_obj(entity_data)
new_obj.entities = entity_obj
notifications_data = struct.get("notifications")
if notifications_data:
notifications_objs = [
alert_objects.AlertNotification.parse_obj(notification)
for notification in notifications_data
]
new_obj.notifications = notifications_objs
trigger_data = struct.get("trigger")
if trigger_data:
trigger_obj = alert_objects.AlertTrigger.parse_obj(trigger_data)
new_obj.trigger = trigger_obj
criteria_data = struct.get("criteria")
if criteria_data:
criteria_obj = alert_objects.AlertCriteria.parse_obj(criteria_data)
new_obj.criteria = criteria_obj
return new_obj
[docs]
def with_notifications(self, notifications: list[alert_objects.AlertNotification]):
if not isinstance(notifications, list) or not all(
isinstance(item, alert_objects.AlertNotification) for item in notifications
):
raise ValueError(
"Notifications parameter must be a list of AlertNotification"
)
for notification_data in notifications:
self.notifications.append(notification_data)
return self
[docs]
def with_entities(self, entities: alert_objects.EventEntities):
if not isinstance(entities, alert_objects.EventEntities):
raise ValueError("Entities parameter must be of type: EventEntities")
self.entities = entities
return self
def _apply_template(self, template):
if isinstance(template, str):
db = mlrun.get_run_db()
template = db.get_alert_template(template)
# Apply parameters from the template to the AlertConfig object only if they are not already specified by the
# user in the current configuration.
# User-provided parameters will take precedence over corresponding template values
self.summary = self.summary or template.summary
self.severity = self.severity or template.severity
self.criteria = self.criteria or template.criteria
self.trigger = self.trigger or template.trigger
self.reset_policy = self.reset_policy or template.reset_policy
self.cooldown_period = self.cooldown_period or template.cooldown_period
[docs]
def list_activations(
self,
since: datetime | None = None,
until: datetime | None = None,
from_last_update: bool = False,
) -> list[mlrun.common.schemas.alert.AlertActivation]:
"""
Retrieve a list of all alert activations.
:param since: Filters for alert activations occurring after this timestamp.
:param until: Filters for alert activations occurring before this timestamp.
:param from_last_update: If set to True, retrieves alert activations since the alert's last update time.
if both since and from_last_update=True are provided, from_last_update takes precedence
and the since value will be overridden by the alert's last update timestamp.
:returns: A list of alert activations matching the provided filters.
"""
db = mlrun.get_run_db()
if from_last_update and self._updated:
since = self.updated
return db.list_alert_activations(
project=self.project,
name=self.name,
since=since,
until=until,
)
[docs]
def paginated_list_activations(
self,
*args,
page: int | None = None,
page_size: int | None = None,
page_token: str | None = None,
from_last_update: bool = False,
**kwargs,
) -> tuple[mlrun.common.schemas.alert.AlertActivation, str | None]:
"""
List alerts activations with support for pagination and various filtering options.
This method retrieves a paginated list of alert activations based on the specified filter parameters.
Pagination is controlled using the `page`, `page_size`, and `page_token` parameters. The method
will return a list of alert activations that match the filtering criteria provided.
For detailed information about the parameters, refer to the list_activations method:
See :py:func:`~list_activations` for more details.
Examples::
# Fetch first page of alert activations with page size of 5
alert_activations, token = alert_config.paginated_list_activations(
page_size=5
)
# Fetch next page using the pagination token from the previous response
alert_activations, token = alert_config.paginated_list_activations(
page_token=token
)
# Fetch alert activations for a specific page (e.g., page 3)
alert_activations, token = alert_config.paginated_list_activations(
page=3, page_size=5
)
# Automatically iterate over all pages without explicitly specifying the page number
alert_activations = []
token = None
while True:
page_alert_activations, token = alert_config.paginated_list_activations(
page_token=token, page_size=5
)
alert_activations.extend(page_alert_activations)
# If token is None and page_alert_activations is empty, we've reached the end (no more activations).
# If token is None and page_alert_activations is not empty, we've fetched the last page of activations.
if not token:
break
print(f"Total alert activations retrieved: {len(alert_activations)}")
:param page: The page number to retrieve. If not provided, the next page will be retrieved.
:param page_size: The number of items per page to retrieve. Up to `page_size` responses are expected.
:param page_token: A pagination token used to retrieve the next page of results. Should not be provided
for the first request.
:param from_last_update: If set to True, retrieves alert activations since the alert's last update time.
:returns: A tuple containing the list of alert activations and an optional `page_token` for pagination.
"""
if from_last_update and self._updated:
kwargs["since"] = self.updated
db = mlrun.get_run_db()
return db.paginated_list_alert_activations(
*args,
project=self.project,
name=self.name,
page=page,
page_size=page_size,
page_token=page_token,
**kwargs,
)