Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ele 3893 alert rules operators #1768

Merged
merged 14 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,29 @@
"module": "elementary.cli.cli",
"console": "integratedTerminal",
"args": "${command:pickArgs}"
},
{
"name": "pytest: Current File",
"type": "debugpy",
"request": "launch",
"module": "pytest",
"args": ["-vvv", "-s", "${file}"],
"console": "integratedTerminal"
},
{
"name": "pytest: Selector",
"type": "debugpy",
"request": "launch",
"module": "pytest",
"args": ["-vvv", "-s", "${file}::${input:selector}"],
"console": "integratedTerminal"
}
],
"inputs": [
{
"id": "selector",
"type": "promptString",
"description": "Selector"
}
]
}
251 changes: 61 additions & 190 deletions elementary/monitor/api/alerts/alert_filters.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from functools import reduce
from typing import List
from typing import List, Optional

from elementary.monitor.data_monitoring.schema import (
FilterSchema,
FiltersSchema,
ResourceTypeFilterSchema,
StatusFilterSchema,
ResourceType,
Status,
)
from elementary.monitor.fetchers.alerts.schema.pending_alerts import (
AlertTypes,
Expand All @@ -16,6 +14,61 @@
logger = get_logger(__name__)


def get_string_ends(input_string: str, splitter: str) -> List[str]:
parts = input_string.split(splitter)
result = []

for i in range(len(parts)):
result.append(splitter.join(parts[i:]))

return result


def _get_alert_node_name(alert: PendingAlertSchema) -> Optional[str]:
alert_node_name = None
alert_type = AlertTypes(alert.type)
if alert_type is AlertTypes.TEST:
alert_node_name = alert.data.test_name # type: ignore[union-attr]
elif alert_type is AlertTypes.MODEL or alert_type is AlertTypes.SOURCE_FRESHNESS:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you can remove this type ignore by adding type hints in line 32: alert_node_name: Optional[str] = None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently not :(

def _get_alert_node_name(alert: PendingAlertSchema) -> Optional[str]:
    alert_node_name: Optional[str] = None
    alert_type = AlertTypes(alert.type)
    if alert_type is AlertTypes.TEST:
        alert_node_name = alert.data.test_name
elementary/monitor/api/alerts/alert_filters.py:31: error: Item "ModelAlertDataSchema" of "Union[TestAlertDataSchema, ModelAlertDataSchema, SourceFreshnessAlertDataSchema]" has no attribute "test_name"  [union-attr]
elementary/monitor/api/alerts/alert_filters.py:31: error: Item "SourceFreshnessAlertDataSchema" of "Union[TestAlertDataSchema, ModelAlertDataSchema, SourceFreshnessAlertDataSchema]" has no attribute "test_name"  [union-attr]

It's from the earlier version.

alert_node_name = alert.data.model_unique_id
else:
raise ValueError(f"Unexpected alert type: {alert_type}")
return alert_node_name


def apply_filters_schema_on_alert(
alert: PendingAlertSchema, filters_schema: FiltersSchema
) -> bool:
tags = alert.data.tags or []
models = (
[
alert.data.model_unique_id,
*get_string_ends(alert.data.model_unique_id, "."),
]
if alert.data.model_unique_id
else []
)
owners = alert.data.unified_owners or []
status = Status(alert.data.status)
resource_type = ResourceType(alert.data.resource_type)

alert_node_name = _get_alert_node_name(alert)
node_names = (
[alert_node_name, *get_string_ends(alert_node_name, ".")]
if alert_node_name
else []
)

return filters_schema.apply(
tags=tags,
models=models,
owners=owners,
statuses=[status],
resource_types=[resource_type],
node_names=node_names,
)


def filter_alerts(
alerts: List[PendingAlertSchema],
alerts_filter: FiltersSchema = FiltersSchema(),
Expand All @@ -29,188 +82,6 @@ def filter_alerts(
logger.warning("Invalid filter for alerts: %s", alerts_filter.selector)
return [] # type: ignore[return-value]

# If the filter is empty, we want to return all of the alerts
filtered_alerts = alerts
filtered_alerts = _filter_alerts_by_tags(filtered_alerts, alerts_filter.tags)
filtered_alerts = _filter_alerts_by_models(filtered_alerts, alerts_filter.models)
filtered_alerts = _filter_alerts_by_owners(filtered_alerts, alerts_filter.owners)
filtered_alerts = _filter_alerts_by_statuses(
filtered_alerts, alerts_filter.statuses
)
filtered_alerts = _filter_alerts_by_resource_types(
filtered_alerts, alerts_filter.resource_types
)
if alerts_filter.node_names:
filtered_alerts = _filter_alerts_by_node_names(
filtered_alerts, alerts_filter.node_names
)

return filtered_alerts


def _find_common_alerts(
first_alerts: List[PendingAlertSchema],
second_alerts: List[PendingAlertSchema],
) -> List[PendingAlertSchema]:
first_alert_ids = [alert.id for alert in first_alerts]
second_alert_ids = [alert.id for alert in second_alerts]
common_alert_ids = list(set(first_alert_ids) & set(second_alert_ids))

common_alerts = []
# To handle dedupping common alerts
alert_ids_already_handled = []

for alert in [*first_alerts, *second_alerts]:
if alert.id in common_alert_ids and alert.id not in alert_ids_already_handled:
common_alerts.append(alert)
alert_ids_already_handled.append(alert.id)
return common_alerts


def _filter_alerts_by_tags(
alerts: List[PendingAlertSchema],
tags_filters: List[FilterSchema],
) -> List[PendingAlertSchema]:
if not tags_filters:
return [*alerts]

grouped_filtered_alerts_by_tags = []

# OR filter for each tags_filter's values
for tags_filter in tags_filters:
filtered_alerts_by_tags = []
for alert in alerts:
if any(tag in (alert.data.tags or []) for tag in tags_filter.values):
filtered_alerts_by_tags.append(alert)
grouped_filtered_alerts_by_tags.append(filtered_alerts_by_tags)

# AND filter between all tags_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_tags)


def _filter_alerts_by_owners(
alerts: List[PendingAlertSchema],
owners_filters: List[FilterSchema],
) -> List[PendingAlertSchema]:
if not owners_filters:
return [*alerts]

grouped_filtered_alerts_by_owners = []

# OR filter for each owners_filter's values
for owners_filter in owners_filters:
filtered_alerts_by_owners = []
for alert in alerts:
if any(
owner in alert.data.unified_owners for owner in owners_filter.values
):
filtered_alerts_by_owners.append(alert)
grouped_filtered_alerts_by_owners.append(filtered_alerts_by_owners)

# AND filter between all owners_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_owners)


def _filter_alerts_by_models(
alerts: List[PendingAlertSchema],
models_filters: List[FilterSchema],
) -> List[PendingAlertSchema]:
if not models_filters:
return [*alerts]

grouped_filtered_alerts_by_models = []

# OR filter for each models_filter's values
for models_filter in models_filters:
filtered_alerts_by_models = []
for alert in alerts:
if any(
(
alert.data.model_unique_id
and alert.data.model_unique_id.endswith(model)
)
for model in models_filter.values
):
filtered_alerts_by_models.append(alert)
grouped_filtered_alerts_by_models.append(filtered_alerts_by_models)

# AND filter between all models_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_models)


def _filter_alerts_by_node_names(
alerts: List[PendingAlertSchema],
node_names_filters: List[str],
) -> List[PendingAlertSchema]:
if not node_names_filters:
return [*alerts]

filtered_alerts = []
for alert in alerts:
alert_node_name = None
alert_type = AlertTypes(alert.type)
if alert_type is AlertTypes.TEST:
alert_node_name = alert.data.test_name # type: ignore[union-attr]
elif (
alert_type is AlertTypes.MODEL or alert_type is AlertTypes.SOURCE_FRESHNESS
):
alert_node_name = alert.data.model_unique_id
else:
# Shouldn't happen
raise Exception(f"Unexpected alert type: {type(alert)}")

if alert_node_name:
for node_name in node_names_filters:
if alert_node_name.endswith(node_name) or node_name.endswith(
alert_node_name
):
filtered_alerts.append(alert)
break
return filtered_alerts # type: ignore[return-value]


def _filter_alerts_by_statuses(
alerts: List[PendingAlertSchema],
statuses_filters: List[StatusFilterSchema],
) -> List[PendingAlertSchema]:
if not statuses_filters:
return [*alerts]

grouped_filtered_alerts_by_statuses = []

# OR filter for each statuses_filter's values
for statuses_filter in statuses_filters:
filtered_alerts_by_statuses = []
for alert in alerts:
if any(status == alert.data.status for status in statuses_filter.values):
filtered_alerts_by_statuses.append(alert)
grouped_filtered_alerts_by_statuses.append(filtered_alerts_by_statuses)

# AND filter between all statuses_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_statuses)


def _filter_alerts_by_resource_types(
alerts: List[PendingAlertSchema],
resource_types_filters: List[ResourceTypeFilterSchema],
) -> List[PendingAlertSchema]:
if not resource_types_filters:
return [*alerts]

grouped_filtered_alerts_by_resource_types = []

# OR filter for each resource_types_filter's values
for resource_types_filter in resource_types_filters:
filtered_alerts_by_resource_types = []
for alert in alerts:
if any(
resource_type == alert.data.resource_type.value
for resource_type in resource_types_filter.values
):
filtered_alerts_by_resource_types.append(alert)
grouped_filtered_alerts_by_resource_types.append(
filtered_alerts_by_resource_types
)

# AND filter between all resource_types_filters
return reduce(_find_common_alerts, grouped_filtered_alerts_by_resource_types)
return [
alert for alert in alerts if apply_filters_schema_on_alert(alert, alerts_filter)
]
Loading
Loading