Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ELE-2398 - move sort_alerts from API to data monitoring alerts flow #1387

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 12 additions & 96 deletions elementary/monitor/api/alerts/alerts.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from collections import defaultdict
from datetime import datetime
from typing import DefaultDict, Dict, List, Optional
from typing import Dict, List

from elementary.clients.api.api_client import APIClient
from elementary.clients.dbt.dbt_runner import DbtRunner
from elementary.config.config import Config
from elementary.monitor.api.alerts.schema import SortedAlertsSchema
from elementary.monitor.fetchers.alerts.alerts import AlertsFetcher
from elementary.monitor.fetchers.alerts.schema.pending_alerts import PendingAlertSchema
from elementary.utils.log import get_logger
Expand All @@ -19,8 +17,6 @@ def __init__(
dbt_runner: DbtRunner,
config: Config,
elementary_database_and_schema: str,
global_suppression_interval: int,
override_meta_suppression_interval: bool = False,
):
super().__init__(dbt_runner)
self.config = config
Expand All @@ -30,16 +26,21 @@ def __init__(
config=self.config,
elementary_database_and_schema=self.elementary_database_and_schema,
)
self.global_suppression_interval = global_suppression_interval
self.override_meta_suppression_interval = override_meta_suppression_interval

def get_new_alerts(self, days_back: int) -> SortedAlertsSchema:
def get_new_alerts(self, days_back: int) -> List[PendingAlertSchema]:
pending_alerts = self.alerts_fetcher.query_pending_alerts(days_back=days_back)
last_alert_sent_times = self.alerts_fetcher.query_last_alert_times(
return pending_alerts

def get_alerts_last_sent_times(self, days_back: int) -> Dict[str, datetime]:
alerts_last_sent_times = self.alerts_fetcher.query_last_alert_times(
days_back=days_back
)
sorted_alerts = self._sort_alerts(pending_alerts, last_alert_sent_times)
return sorted_alerts
last_sent_times = dict()
for alert_class_id, last_sent_time_as_string in alerts_last_sent_times.items():
last_sent_times.update(
{alert_class_id: datetime.fromisoformat(last_sent_time_as_string)}
)
return last_sent_times

def skip_alerts(
self,
Expand All @@ -49,88 +50,3 @@ def skip_alerts(

def update_sent_alerts(self, alert_ids: List[str]) -> None:
self.alerts_fetcher.update_sent_alerts(alert_ids=alert_ids)

def _sort_alerts(
self,
pending_alerts: List[PendingAlertSchema],
last_alert_sent_times: Dict[str, str],
) -> SortedAlertsSchema:
suppressed_alerts = self._get_suppressed_alerts(
pending_alerts, last_alert_sent_times
)
latest_alert_ids = self._get_latest_alerts(pending_alerts)
alerts_to_skip = []
alerts_to_send = []

for valid_alert in pending_alerts:
if (
valid_alert.id in suppressed_alerts
or valid_alert.id not in latest_alert_ids
):
alerts_to_skip.append(valid_alert)
else:
alerts_to_send.append(valid_alert)
return SortedAlertsSchema(send=alerts_to_send, skip=alerts_to_skip)

def _get_suppressed_alerts(
self,
alerts: List[PendingAlertSchema],
last_alert_sent_times: Dict[str, str],
) -> List[str]:
suppressed_alerts = []
current_time_utc = datetime.utcnow()
for alert in alerts:
alert_class_id = alert.alert_class_id
if alert_class_id is None:
# Shouldn't happen, but logging in any case
logger.debug("Alert without an id detected!")
continue

suppression_interval = alert.data.get_suppression_interval(
self.global_suppression_interval,
self.override_meta_suppression_interval,
)
last_sent_time = (
datetime.fromisoformat(last_alert_sent_times[alert_class_id])
if last_alert_sent_times.get(alert_class_id)
else None
)
is_alert_in_suppression = (
(current_time_utc - last_sent_time).total_seconds() / 3600
<= suppression_interval
if last_sent_time
else False
)
if is_alert_in_suppression:
suppressed_alerts.append(alert.id)

return suppressed_alerts

@staticmethod
def _get_latest_alerts(
alerts: List[PendingAlertSchema],
) -> List[str]:
alert_last_times: DefaultDict[
str,
Optional[PendingAlertSchema],
] = defaultdict(lambda: None)
latest_alert_ids = []
for alert in alerts:
alert_class_id = alert.alert_class_id
if alert_class_id is None:
# Shouldn't happen, but logging in any case
logger.debug("Alert without an id detected!")
continue

current_last_alert = alert_last_times[alert_class_id]
alert_detected_at = alert.detected_at
if (
not current_last_alert
or current_last_alert.detected_at < alert_detected_at
):
alert_last_times[alert_class_id] = alert

for alert_last_time in alert_last_times.values():
if alert_last_time:
latest_alert_ids.append(alert_last_time.id)
return latest_alert_ids
95 changes: 84 additions & 11 deletions elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from collections import defaultdict
from datetime import datetime
from typing import List, Optional, Union
from typing import DefaultDict, Dict, List, Optional, Union

from alive_progress import alive_it

Expand All @@ -12,13 +12,13 @@
from elementary.monitor.alerts.test_alert import TestAlertModel
from elementary.monitor.api.alerts.alert_filters import filter_alerts
from elementary.monitor.api.alerts.alerts import AlertsAPI
from elementary.monitor.api.alerts.schema import SortedAlertsSchema
from elementary.monitor.data_monitoring.alerts.integrations.base_integration import (
BaseIntegration,
)
from elementary.monitor.data_monitoring.alerts.integrations.integrations import (
Integrations,
)
from elementary.monitor.data_monitoring.alerts.schema import SortedAlertsSchema
from elementary.monitor.data_monitoring.data_monitoring import DataMonitoring
from elementary.monitor.data_monitoring.schema import FiltersSchema
from elementary.monitor.fetchers.alerts.schema.pending_alerts import PendingAlertSchema
Expand Down Expand Up @@ -50,8 +50,6 @@ def __init__(
self.internal_dbt_runner,
self.config,
self.elementary_database_and_schema,
self.global_suppression_interval,
self.override_config,
)
self.sent_alert_count = 0
self.send_test_message_on_success = send_test_message_on_success
Expand Down Expand Up @@ -82,17 +80,88 @@ def _populate_data(

return success

def _fetch_data(self, days_back: int) -> SortedAlertsSchema:
def _fetch_data(self, days_back: int) -> List[PendingAlertSchema]:
return self.alerts_api.get_new_alerts(
days_back=days_back,
)

def _filter_data(self, data: SortedAlertsSchema) -> SortedAlertsSchema:
return SortedAlertsSchema(
send=filter_alerts(alerts=data.send, alerts_filter=self.selector_filter),
skip=filter_alerts(alerts=data.skip, alerts_filter=self.selector_filter),
def _filter_data(self, data: List[PendingAlertSchema]) -> List[PendingAlertSchema]:
return filter_alerts(data, alerts_filter=self.selector_filter)

def _fetch_last_sent_times(self, days_back: int) -> Dict[str, datetime]:
return self.alerts_api.get_alerts_last_sent_times(
days_back=days_back,
)
Comment on lines +83 to 94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure we really need these functions


def _sort_alerts(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same code as it was on the alerts API

self,
alerts: List[PendingAlertSchema],
alerts_last_sent_times: Dict[str, datetime],
) -> SortedAlertsSchema:
suppressed_alerts = self._get_suppressed_alerts(alerts, alerts_last_sent_times)
latest_alert_ids = self._get_latest_alerts(alerts)
alerts_to_skip = []
alerts_to_send = []

for valid_alert in alerts:
if (
valid_alert.id in suppressed_alerts
or valid_alert.id not in latest_alert_ids
):
alerts_to_skip.append(valid_alert)
else:
alerts_to_send.append(valid_alert)
return SortedAlertsSchema(send=alerts_to_send, skip=alerts_to_skip)

def _get_suppressed_alerts(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same code as it was on the alerts API

self,
alerts: List[PendingAlertSchema],
alerts_last_sent_times: Dict[str, datetime],
) -> List[str]:
suppressed_alerts = []
current_time_utc = datetime.utcnow()
for alert in alerts:
alert_class_id = alert.alert_class_id
suppression_interval = alert.data.get_suppression_interval(
self.global_suppression_interval,
self.override_config,
)
last_sent_time = alerts_last_sent_times.get(alert_class_id)
is_alert_in_suppression = (
(current_time_utc - last_sent_time).total_seconds() / 3600
<= suppression_interval
if last_sent_time
else False
)
if is_alert_in_suppression:
suppressed_alerts.append(alert.id)

return suppressed_alerts

@staticmethod
def _get_latest_alerts(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same code as it was on the alerts API

alerts: List[PendingAlertSchema],
) -> List[str]:
alert_last_times: DefaultDict[
str,
Optional[PendingAlertSchema],
] = defaultdict(lambda: None)
latest_alert_ids = []
for alert in alerts:
alert_class_id = alert.alert_class_id
current_last_alert = alert_last_times[alert_class_id]
alert_detected_at = alert.detected_at
if (
not current_last_alert
or current_last_alert.detected_at < alert_detected_at
):
alert_last_times[alert_class_id] = alert

for alert_last_time in alert_last_times.values():
if alert_last_time:
latest_alert_ids.append(alert_last_time.id)
return latest_alert_ids

def _format_alerts(
self,
alerts: List[PendingAlertSchema],
Expand Down Expand Up @@ -221,8 +290,12 @@ def run_alerts(
# Fetch and filter data
alerts = self._fetch_data(days_back)
alerts = self._filter_data(alerts)
alerts_to_skip = alerts.skip
alerts_to_send = alerts.send
alerts_last_sent_times = self._fetch_last_sent_times(days_back)
sorted_alerts = self._sort_alerts(
alerts=alerts, alerts_last_sent_times=alerts_last_sent_times
)
alerts_to_skip = sorted_alerts.skip
alerts_to_send = sorted_alerts.send

# Skip alerts
self._skip_alerts(alerts_to_skip)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@
('updated_at','timestamp'),
('status','string'),
('data', 'long_string'),
('sent_at', 'string')
('sent_at', 'timestamp')
]) }}
{% endmacro %}
2 changes: 2 additions & 0 deletions elementary/monitor/fetchers/alerts/schema/alert_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def get_suppression_interval(


class TestAlertDataSchema(BaseAlertDataSchema):
__test__ = False # Mark for pytest - The class name starts with "Test" which throws warnings on pytest runs

test_unique_id: str
table_name: Optional[str] = None
column_name: Optional[str] = None
Expand Down
Empty file.
69 changes: 0 additions & 69 deletions tests/integration/monitor/api/alerts/test_alerts_fetcher.py

This file was deleted.

1 change: 0 additions & 1 deletion tests/mocks/api/alerts_api_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def __init__(self, *args, **kwargs):
super().__init__(
mock_dbt_runner,
config,
global_suppression_interval=0,
elementary_database_and_schema="test.test",
)
self.alerts_fetcher = MockAlertsFetcher()
Loading