Source code for mlrun.alerts.alert

# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from typing import Union

import mlrun
import mlrun.common.schemas.alert as alert_objects
from mlrun.model import ModelObj


[docs] class AlertConfig(ModelObj): _dict_fields = [ "project", "name", "description", "summary", "severity", "reset_policy", "cooldown_period", "state", "count", "created", "updated", ] _fields_to_serialize = ModelObj._fields_to_serialize + [ "entities", "notifications", "trigger", "criteria", ] def __init__( self, project: str | None = None, name: str | None = None, template: Union[alert_objects.AlertTemplate, str] = None, description: str | None = None, summary: str | None = None, severity: alert_objects.AlertSeverity = None, trigger: alert_objects.AlertTrigger = None, criteria: alert_objects.AlertCriteria = None, reset_policy: alert_objects.ResetPolicy = None, cooldown_period: str | None = None, notifications: list[alert_objects.AlertNotification] | None = None, entities: alert_objects.EventEntities = None, id: int | None = None, state: alert_objects.AlertActiveState = None, created: str | None = None, count: int | None = None, updated: str | None = None, **kwargs, ): """Alert config object Example:: # create an alert on endpoint_id, which will be triggered to slack if there is a "data_drift_detected" event # 3 times in the next hour. from mlrun.alerts import AlertConfig import mlrun.common.schemas.alert as alert_objects entity_kind = alert_objects.EventEntityKind.MODEL_ENDPOINT_RESULT entity_id = get_default_result_instance_fqn(endpoint_id) event_name = alert_objects.EventKind.DATA_DRIFT_DETECTED notification = mlrun.model.Notification( kind="slack", name="slack_notification", message="drift was detected", severity="warning", when=["now"], condition="failed", secret_params={ "webhook": "https://hooks.slack.com/", }, ).to_dict() alert_data = AlertConfig( project="my-project", name="drift-alert", summary="a drift was detected", severity=alert_objects.AlertSeverity.LOW, entities=alert_objects.EventEntities( kind=entity_kind, project="my-project", ids=[entity_id] ), trigger=alert_objects.AlertTrigger(events=[event_name]), criteria=alert_objects.AlertCriteria(count=3, period="1h"), notifications=[ alert_objects.AlertNotification(notification=notification) ], ) project.store_alert_config(alert_data) :param project: Name of the project to associate the alert with :param name: Name of the alert :param template: Optional parameter that allows creating an alert based on a predefined template. You can pass either an AlertTemplate object or a string (the template name). If a template is used, many fields of the alert will be auto-generated based on the template.However, you still need to provide the following fields: `name`, `project`, `entity`, `notifications` :param description: Description of the alert :param summary: Summary of the alert, will be sent in the generated notifications :param severity: Severity of the alert :param trigger: The events that will trigger this alert, may be a simple trigger based on events or complex trigger which is based on a prometheus alert :param criteria: When the alert will be triggered based on the specified number of events within the defined time period. :param reset_policy: When to clear the alert. "manual" means the alert stays active after triggering and must be reset explicitly. "auto" means the alert is reset automatically after triggering and sending notifications (immediately if cooldown_period is not set, or after the cooldown period elapses if it is set). :param cooldown_period: Period during which the alert remains active after being triggered before it is automatically reset. Only applicable when reset_policy=auto and cooldown_period > 0. If not set or set to zero, the alert resets immediately. Format: e.g. 1d, 3h, 5m, 15s. :param notifications: List of notifications to invoke once the alert is triggered :param entities: Entities that the event relates to. The entity object will contain fields that uniquely identify a given entity in the system :param id: Internal id of the alert (user should not supply it) :param state: State of the alert, may be active/inactive (user should not supply it) :param created: When the alert is created (user should not supply it) :param count: Internal counter of the alert (user should not supply it) :param updated: The last update time of the alert (user should not supply it) """ self.project = project self.name = name self.description = description self.summary = summary self.severity = severity self.trigger = trigger self.criteria = criteria self.reset_policy = reset_policy self.cooldown_period = cooldown_period self.notifications = notifications or [] self.entities = entities self.id = id self.state = state self._created = created self.count = count self._updated = updated if template: self._apply_template(template) @property def created(self) -> datetime: """ Get the `created` field as a datetime object. """ if isinstance(self._created, str): return datetime.fromisoformat(self._created) return self._created @created.setter def created(self, created): self._created = created @property def updated(self) -> datetime: """ Get the `updated` field as a datetime object. """ if isinstance(self._updated, str): return datetime.fromisoformat(self._updated) return self._updated @updated.setter def updated(self, updated): self._updated = updated
[docs] def validate_required_fields(self): if not self.name: raise mlrun.errors.MLRunInvalidArgumentError("Alert name must be provided")
def _serialize_field( self, struct: dict, field_name: str | None = None, strip: bool = False ): if field_name == "entities": if self.entities: return ( self.entities.dict() if not isinstance(self.entities, dict) else self.entities ) return None if field_name == "notifications": if self.notifications: return [ notification_data.dict() if not isinstance(notification_data, dict) else notification_data for notification_data in self.notifications ] return None if field_name == "trigger": if self.trigger: return ( self.trigger.dict() if not isinstance(self.trigger, dict) else self.trigger ) return None if field_name == "criteria": if self.criteria: return ( self.criteria.dict() if not isinstance(self.criteria, dict) else self.criteria ) return None return super()._serialize_field(struct, field_name, strip)
[docs] def to_dict( self, fields: list | None = None, exclude: list | None = None, strip: bool = False, ): if self.entities is None: raise mlrun.errors.MLRunBadRequestError("Alert entity field is missing") if not self.notifications: raise mlrun.errors.MLRunBadRequestError( "Alert must have at least one notification" ) return super().to_dict(self._dict_fields)
[docs] @classmethod def from_dict(cls, struct=None, fields=None, deprecated_fields: dict | None = None): new_obj = super().from_dict(struct, fields=fields) entity_data = struct.get("entities") if entity_data: entity_obj = alert_objects.EventEntities.parse_obj(entity_data) new_obj.entities = entity_obj notifications_data = struct.get("notifications") if notifications_data: notifications_objs = [ alert_objects.AlertNotification.parse_obj(notification) for notification in notifications_data ] new_obj.notifications = notifications_objs trigger_data = struct.get("trigger") if trigger_data: trigger_obj = alert_objects.AlertTrigger.parse_obj(trigger_data) new_obj.trigger = trigger_obj criteria_data = struct.get("criteria") if criteria_data: criteria_obj = alert_objects.AlertCriteria.parse_obj(criteria_data) new_obj.criteria = criteria_obj return new_obj
[docs] def with_notifications(self, notifications: list[alert_objects.AlertNotification]): if not isinstance(notifications, list) or not all( isinstance(item, alert_objects.AlertNotification) for item in notifications ): raise ValueError( "Notifications parameter must be a list of AlertNotification" ) for notification_data in notifications: self.notifications.append(notification_data) return self
[docs] def with_entities(self, entities: alert_objects.EventEntities): if not isinstance(entities, alert_objects.EventEntities): raise ValueError("Entities parameter must be of type: EventEntities") self.entities = entities return self
def _apply_template(self, template): if isinstance(template, str): db = mlrun.get_run_db() template = db.get_alert_template(template) # Apply parameters from the template to the AlertConfig object only if they are not already specified by the # user in the current configuration. # User-provided parameters will take precedence over corresponding template values self.summary = self.summary or template.summary self.severity = self.severity or template.severity self.criteria = self.criteria or template.criteria self.trigger = self.trigger or template.trigger self.reset_policy = self.reset_policy or template.reset_policy self.cooldown_period = self.cooldown_period or template.cooldown_period
[docs] def list_activations( self, since: datetime | None = None, until: datetime | None = None, from_last_update: bool = False, ) -> list[mlrun.common.schemas.alert.AlertActivation]: """ Retrieve a list of all alert activations. :param since: Filters for alert activations occurring after this timestamp. :param until: Filters for alert activations occurring before this timestamp. :param from_last_update: If set to True, retrieves alert activations since the alert's last update time. if both since and from_last_update=True are provided, from_last_update takes precedence and the since value will be overridden by the alert's last update timestamp. :returns: A list of alert activations matching the provided filters. """ db = mlrun.get_run_db() if from_last_update and self._updated: since = self.updated return db.list_alert_activations( project=self.project, name=self.name, since=since, until=until, )
[docs] def paginated_list_activations( self, *args, page: int | None = None, page_size: int | None = None, page_token: str | None = None, from_last_update: bool = False, **kwargs, ) -> tuple[mlrun.common.schemas.alert.AlertActivation, str | None]: """ List alerts activations with support for pagination and various filtering options. This method retrieves a paginated list of alert activations based on the specified filter parameters. Pagination is controlled using the `page`, `page_size`, and `page_token` parameters. The method will return a list of alert activations that match the filtering criteria provided. For detailed information about the parameters, refer to the list_activations method: See :py:func:`~list_activations` for more details. Examples:: # Fetch first page of alert activations with page size of 5 alert_activations, token = alert_config.paginated_list_activations( page_size=5 ) # Fetch next page using the pagination token from the previous response alert_activations, token = alert_config.paginated_list_activations( page_token=token ) # Fetch alert activations for a specific page (e.g., page 3) alert_activations, token = alert_config.paginated_list_activations( page=3, page_size=5 ) # Automatically iterate over all pages without explicitly specifying the page number alert_activations = [] token = None while True: page_alert_activations, token = alert_config.paginated_list_activations( page_token=token, page_size=5 ) alert_activations.extend(page_alert_activations) # If token is None and page_alert_activations is empty, we've reached the end (no more activations). # If token is None and page_alert_activations is not empty, we've fetched the last page of activations. if not token: break print(f"Total alert activations retrieved: {len(alert_activations)}") :param page: The page number to retrieve. If not provided, the next page will be retrieved. :param page_size: The number of items per page to retrieve. Up to `page_size` responses are expected. :param page_token: A pagination token used to retrieve the next page of results. Should not be provided for the first request. :param from_last_update: If set to True, retrieves alert activations since the alert's last update time. :returns: A tuple containing the list of alert activations and an optional `page_token` for pagination. """ if from_last_update and self._updated: kwargs["since"] = self.updated db = mlrun.get_run_db() return db.paginated_list_alert_activations( *args, project=self.project, name=self.name, page=page, page_size=page_size, page_token=page_token, **kwargs, )