Skip to content

Commit

Permalink
Prepare insight metrics structure for adding service_name label (#4227)
Browse files Browse the repository at this point in the history
# What this PR does
Prepare insight metrics for adding `service_name` label.
This PR updates metrics cache structure, supporting both old and new
version of cache.
`service_name` label can be added with additional PR when all metric
cache is updated.

## Which issue(s) this PR closes
grafana/oncall-private#2610

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
  • Loading branch information
Ferril authored Apr 29, 2024
1 parent 6ed7a1e commit d1085b7
Show file tree
Hide file tree
Showing 9 changed files with 697 additions and 122 deletions.
17 changes: 12 additions & 5 deletions engine/apps/metrics_exporter/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
from django.conf import settings


class AlertGroupStateDict(typing.TypedDict):
firing: int
acknowledged: int
silenced: int
resolved: int


class AlertGroupsTotalMetricsDict(typing.TypedDict):
integration_name: str
team_name: str
team_id: int
org_id: int
slug: str
id: int
firing: int
acknowledged: int
silenced: int
resolved: int
services: typing.Dict[str, AlertGroupStateDict]


class AlertGroupsResponseTimeMetricsDict(typing.TypedDict):
Expand All @@ -24,7 +28,7 @@ class AlertGroupsResponseTimeMetricsDict(typing.TypedDict):
org_id: int
slug: str
id: int
response_time: list
services: typing.Dict[str, list]


class UserWasNotifiedOfAlertGroupsMetricsDict(typing.TypedDict):
Expand Down Expand Up @@ -61,3 +65,6 @@ class RecalculateOrgMetricsDict(typing.TypedDict):

METRICS_ORGANIZATIONS_IDS = "metrics_organizations_ids"
METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT = 3600 # 1 hour

SERVICE_LABEL = "service_name"
NO_SERVICE_VALUE = "No service"
91 changes: 72 additions & 19 deletions engine/apps/metrics_exporter/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
METRICS_RECALCULATION_CACHE_TIMEOUT,
METRICS_RECALCULATION_CACHE_TIMEOUT_DISPERSE,
METRICS_RESPONSE_TIME_CALCULATION_PERIOD,
NO_SERVICE_VALUE,
USER_WAS_NOTIFIED_OF_ALERT_GROUPS,
AlertGroupsResponseTimeMetricsDict,
AlertGroupStateDict,
AlertGroupsTotalMetricsDict,
RecalculateMetricsTimer,
UserWasNotifiedOfAlertGroupsMetricsDict,
Expand Down Expand Up @@ -126,6 +128,15 @@ def get_metric_calculation_started_key(metric_name) -> str:
return f"calculation_started_for_{metric_name}"


def get_default_states_dict() -> AlertGroupStateDict:
return {
AlertGroupState.FIRING.value: 0,
AlertGroupState.ACKNOWLEDGED.value: 0,
AlertGroupState.RESOLVED.value: 0,
AlertGroupState.SILENCED.value: 0,
}


def metrics_update_integration_cache(integration: "AlertReceiveChannel") -> None:
"""Update integration data in metrics cache"""
metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id)
Expand Down Expand Up @@ -185,10 +196,7 @@ def metrics_add_integrations_to_cache(integrations: list["AlertReceiveChannel"],
"org_id": grafana_org_id,
"slug": instance_slug,
"id": instance_id,
AlertGroupState.FIRING.value: 0,
AlertGroupState.ACKNOWLEDGED.value: 0,
AlertGroupState.RESOLVED.value: 0,
AlertGroupState.SILENCED.value: 0,
"services": {NO_SERVICE_VALUE: get_default_states_dict()},
},
)
cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout)
Expand All @@ -208,13 +216,13 @@ def metrics_add_integrations_to_cache(integrations: list["AlertReceiveChannel"],
"org_id": grafana_org_id,
"slug": instance_slug,
"id": instance_id,
"response_time": [],
"services": {NO_SERVICE_VALUE: []},
},
)
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)


def metrics_bulk_update_team_label_cache(teams_updated_data, organization_id):
def metrics_bulk_update_team_label_cache(teams_updated_data: dict, organization_id: int):
"""Update team related data in metrics cache for each team in `teams_updated_data`"""
if not teams_updated_data:
return
Expand Down Expand Up @@ -243,8 +251,29 @@ def metrics_bulk_update_team_label_cache(teams_updated_data, organization_id):
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)


def metrics_update_alert_groups_state_cache(states_diff, organization_id):
"""Update alert groups state metric cache for each integration in states_diff dict."""
def metrics_update_alert_groups_state_cache(states_diff: dict, organization_id: int):
"""
Update alert groups state metric cache for each integration in states_diff dict.
states_diff example:
{
<integration_id>: {
<service name>: {
"previous_states": {
firing: 1,
acknowledged: 0,
resolved: 0,
silenced: 0,
},
"new_states": {
firing: 0,
acknowledged: 1,
resolved: 0,
silenced: 0,
}
}
}
}
"""
if not states_diff:
return

Expand All @@ -253,23 +282,40 @@ def metrics_update_alert_groups_state_cache(states_diff, organization_id):
metric_alert_groups_total = cache.get(metric_alert_groups_total_key, {})
if not metric_alert_groups_total:
return
for integration_id, integration_states_diff in states_diff.items():
for integration_id, service_data in states_diff.items():
integration_alert_groups = metric_alert_groups_total.get(int(integration_id))
if not integration_alert_groups:
continue
for previous_state, counter in integration_states_diff["previous_states"].items():
if integration_alert_groups[previous_state] - counter > 0:
integration_alert_groups[previous_state] -= counter
for service_name, service_state_diff in service_data.items():
if "services" in integration_alert_groups:
states_to_update = integration_alert_groups["services"].setdefault(
service_name, get_default_states_dict()
)
else:
integration_alert_groups[previous_state] = 0
for new_state, counter in integration_states_diff["new_states"].items():
integration_alert_groups[new_state] += counter
# support version of metrics cache without service name. This clause can be removed when all metrics
# cache is updated on prod (~2 days after release)
states_to_update = integration_alert_groups
for previous_state, counter in service_state_diff["previous_states"].items():
if states_to_update[previous_state] - counter > 0:
states_to_update[previous_state] -= counter
else:
states_to_update[previous_state] = 0
for new_state, counter in service_state_diff["new_states"].items():
states_to_update[new_state] += counter

cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout)


def metrics_update_alert_groups_response_time_cache(integrations_response_time, organization_id):
"""Update alert groups response time metric cache for each integration in `integrations_response_time` dict."""
def metrics_update_alert_groups_response_time_cache(integrations_response_time: dict, organization_id: int):
"""
Update alert groups response time metric cache for each integration in `integrations_response_time` dict.
integrations_response_time dict example:
{
<integration_id>: {
<service name>: [10],
}
}
"""
if not integrations_response_time:
return

Expand All @@ -278,11 +324,18 @@ def metrics_update_alert_groups_response_time_cache(integrations_response_time,
metric_alert_groups_response_time = cache.get(metric_alert_groups_response_time_key, {})
if not metric_alert_groups_response_time:
return
for integration_id, integration_response_time in integrations_response_time.items():
for integration_id, service_data in integrations_response_time.items():
integration_response_time_metrics = metric_alert_groups_response_time.get(int(integration_id))
if not integration_response_time_metrics:
continue
integration_response_time_metrics["response_time"].extend(integration_response_time)
for service_name, response_time_values in service_data.items():
if "services" in integration_response_time_metrics:
integration_response_time_metrics["services"].setdefault(service_name, [])
integration_response_time_metrics["services"][service_name].extend(response_time_values)
else:
# support version of metrics cache without service name. This clause can be removed when all metrics
# cache is updated on prod (~2 days after release)
integration_response_time_metrics["response_time"].extend(response_time_values)
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)


Expand Down
46 changes: 25 additions & 21 deletions engine/apps/metrics_exporter/metrics_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,56 +40,60 @@ def get_default_states_diff_dict():
return default_dict

@staticmethod
def update_integration_states_diff(metrics_dict, integration_id, previous_state=None, new_state=None):
metrics_dict.setdefault(integration_id, MetricsCacheManager.get_default_states_diff_dict())
def update_integration_states_diff(metrics_dict, integration_id, service_name, previous_state=None, new_state=None):
state_per_service = metrics_dict.setdefault(
integration_id, {service_name: MetricsCacheManager.get_default_states_diff_dict()}
)
if previous_state:
state_value = previous_state
metrics_dict[integration_id]["previous_states"][state_value] += 1
state_per_service[service_name]["previous_states"][state_value] += 1
if new_state:
state_value = new_state
metrics_dict[integration_id]["new_states"][state_value] += 1
return metrics_dict

@staticmethod
def update_integration_response_time_diff(metrics_dict, integration_id, response_time_seconds):
metrics_dict.setdefault(integration_id, [])
metrics_dict[integration_id].append(response_time_seconds)
state_per_service[service_name]["new_states"][state_value] += 1
return metrics_dict

@staticmethod
def metrics_update_state_cache_for_alert_group(integration_id, organization_id, old_state=None, new_state=None):
def metrics_update_state_cache_for_alert_group(
integration_id, organization_id, service_name, old_state=None, new_state=None
):
"""
Update state metric cache for one alert group.
Run the task to update async if organization_id is None due to an additional request to db
"""
metrics_state_diff = MetricsCacheManager.update_integration_states_diff(
{}, integration_id, previous_state=old_state, new_state=new_state
{}, integration_id, service_name, previous_state=old_state, new_state=new_state
)
metrics_update_alert_groups_state_cache(metrics_state_diff, organization_id)

@staticmethod
def metrics_update_response_time_cache_for_alert_group(integration_id, organization_id, response_time_seconds):
def metrics_update_response_time_cache_for_alert_group(
integration_id, organization_id, response_time_seconds, service_name
):
"""
Update response time metric cache for one alert group.
Run the task to update async if organization_id is None due to an additional request to db
"""
metrics_response_time = MetricsCacheManager.update_integration_response_time_diff(
{}, integration_id, response_time_seconds
)
metrics_response_time: typing.Dict[int, typing.Dict[str, typing.List[int]]] = {
integration_id: {service_name: [response_time_seconds]}
}
metrics_update_alert_groups_response_time_cache(metrics_response_time, organization_id)

@staticmethod
def metrics_update_cache_for_alert_group(
integration_id, organization_id, old_state=None, new_state=None, response_time=None, started_at=None
integration_id,
organization_id,
old_state=None,
new_state=None,
response_time=None,
started_at=None,
service_name=None,
):
"""Call methods to update state and response time metrics cache for one alert group."""

if response_time and old_state == AlertGroupState.FIRING and started_at > get_response_time_period():
response_time_seconds = int(response_time.total_seconds())
MetricsCacheManager.metrics_update_response_time_cache_for_alert_group(
integration_id, organization_id, response_time_seconds
integration_id, organization_id, response_time_seconds, service_name
)
if old_state or new_state:
MetricsCacheManager.metrics_update_state_cache_for_alert_group(
integration_id, organization_id, old_state, new_state
integration_id, organization_id, service_name, old_state, new_state
)
53 changes: 41 additions & 12 deletions engine/apps/metrics_exporter/metrics_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@ def __init__(self):
"slug",
"id",
]
self._integration_labels = [
"integration",
"team",
] + self._stack_labels
self._integration_labels = (
[
"integration",
"team",
]
+ self._stack_labels
# + [SERVICE_LABEL] # todo:metrics: uncomment when all metric cache is updated (~2 after release)
)
self._integration_labels_with_state = self._integration_labels + ["state"]
self._user_labels = ["username"] + self._stack_labels

Expand Down Expand Up @@ -96,8 +100,19 @@ def _get_alert_groups_total_metric(self, org_ids):
integration_data["id"], # grafana instance id
]
labels_values = list(map(str, labels_values))
for state in AlertGroupState:
alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value])
# clause below is needed for compatibility with old metric cache during rollout metrics with services
if "services" in integration_data:
for service_name in integration_data["services"]:
for state in AlertGroupState:
alert_groups_total.add_metric(
labels_values + [state.value],
# todo:metrics: replace [state.value] when all metric cache is updated
# + [service_name, state.value],
integration_data["services"][service_name][state.value],
)
else:
for state in AlertGroupState:
alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value])
org_id_from_key = RE_ALERT_GROUPS_TOTAL.match(org_key).groups()[0]
processed_org_ids.add(int(org_id_from_key))
missing_org_ids = org_ids - processed_org_ids
Expand Down Expand Up @@ -126,12 +141,26 @@ def _get_response_time_metric(self, org_ids):
]
labels_values = list(map(str, labels_values))

response_time_values = integration_data["response_time"]
if not response_time_values:
continue
buckets, sum_value = self.get_buckets_with_sum(response_time_values)
buckets = sorted(list(buckets.items()), key=lambda x: float(x[0]))
alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value)
# clause below is needed for compatibility with old metric cache during rollout metrics with services
if "services" in integration_data:
# todo:metrics: for service_name, response_time
for _, response_time in integration_data["services"].items():
if not response_time:
continue
buckets, sum_value = self.get_buckets_with_sum(response_time)
buckets = sorted(list(buckets.items()), key=lambda x: float(x[0]))
alert_groups_response_time_seconds.add_metric(
labels_values, # + [service_name] todo:metrics: uncomment when all metric cache is updated
buckets=buckets,
sum_value=sum_value,
)
else:
response_time_values = integration_data["response_time"]
if not response_time_values:
continue
buckets, sum_value = self.get_buckets_with_sum(response_time_values)
buckets = sorted(list(buckets.items()), key=lambda x: float(x[0]))
alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value)
org_id_from_key = RE_ALERT_GROUPS_RESPONSE_TIME.match(org_key).groups()[0]
processed_org_ids.add(int(org_id_from_key))
missing_org_ids = org_ids - processed_org_ids
Expand Down
Loading

0 comments on commit d1085b7

Please sign in to comment.