Skip to content

Commit

Permalink
feat(ddm): Extend billing consumer to update flag on project model (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Jan 2, 2024
1 parent 61ba090 commit 24f4a83
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 78 deletions.
101 changes: 69 additions & 32 deletions src/sentry/ingest/billing_metrics_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime, timezone
from typing import Any, Mapping, Optional, TypedDict, Union, cast
from typing import Any, Mapping, Optional, cast

from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import (
Expand All @@ -9,17 +9,29 @@
ProcessingStrategyFactory,
)
from arroyo.types import Commit, Message, Partition
from typing_extensions import NotRequired
from django.core.cache import cache
from django.db.models import F
from sentry_kafka_schemas.schema_types.snuba_generic_metrics_v1 import GenericMetric

from sentry.constants import DataCategory
from sentry.models.project import Project
from sentry.sentry_metrics.indexer.strings import SHARED_TAG_STRINGS, TRANSACTION_METRICS_NAMES
from sentry.sentry_metrics.use_case_id_registry import UseCaseID
from sentry.sentry_metrics.utils import reverse_resolve_tag_value
from sentry.snuba.metrics import parse_mri
from sentry.snuba.metrics.naming_layer.mri import is_custom_metric
from sentry.utils import json
from sentry.utils.outcomes import Outcome, track_outcome

logger = logging.getLogger(__name__)

# 7 days of TTL.
CACHE_TTL_IN_SECONDS = 60 * 60 * 24 * 7


def _get_project_flag_updated_cache_key(org_id: int, project_id: int) -> str:
return f"has-custom-metrics-flag-updated:{org_id}:{project_id}"


class BillingMetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def create_with_partitions(
Expand All @@ -30,22 +42,6 @@ def create_with_partitions(
return BillingTxCountMetricConsumerStrategy(CommitOffsets(commit))


class MetricsBucket(TypedDict):
"""
Metrics bucket as decoded from kafka.
Only defines the fields that are relevant for this consumer."""

org_id: int
project_id: int
metric_id: int
timestamp: int
value: Any
tags: Union[Mapping[str, str], Mapping[str, int]]
# not used here but allows us to use the TypedDict for assignments
type: NotRequired[str]


class BillingTxCountMetricConsumerStrategy(ProcessingStrategy[KafkaPayload]):
"""A metrics consumer that generates a billing outcome for each processed
transaction, processing a bucket at a time. The transaction count is
Expand Down Expand Up @@ -74,45 +70,51 @@ def submit(self, message: Message[KafkaPayload]) -> None:
assert not self.__closed

payload = self._get_payload(message)

self._produce_billing_outcomes(payload)
self._flag_metric_received_for_project(payload)

self.__next_step.submit(message)

def _get_payload(self, message: Message[KafkaPayload]) -> MetricsBucket:
def _get_payload(self, message: Message[KafkaPayload]) -> GenericMetric:
payload = json.loads(message.payload.value.decode("utf-8"), use_rapid_json=True)
return cast(MetricsBucket, payload)
return cast(GenericMetric, payload)

def _count_processed_items(self, bucket_payload: MetricsBucket) -> Mapping[DataCategory, int]:
if bucket_payload["metric_id"] != self.metric_id:
def _count_processed_items(self, generic_metric: GenericMetric) -> Mapping[DataCategory, int]:
if generic_metric["metric_id"] != self.metric_id:
return {}
value = bucket_payload["value"]

value = generic_metric["value"]
try:
quantity = max(int(value), 0)
quantity = max(int(value), 0) # type:ignore
except TypeError:
# Unexpected value type for this metric ID, skip.
return {}

items = {DataCategory.TRANSACTION: quantity}

if self._has_profile(bucket_payload):
if self._has_profile(generic_metric):
# The bucket is tagged with the "has_profile" tag,
# so we also count the quantity of this bucket towards profiles.
# This assumes a "1 to 0..1" relationship between transactions and profiles.
items[DataCategory.PROFILE] = quantity

return items

def _has_profile(self, bucket: MetricsBucket) -> bool:
def _has_profile(self, generic_metric: GenericMetric) -> bool:
return bool(
(tag_value := bucket["tags"].get(self.profile_tag_key))
(tag_value := generic_metric["tags"].get(self.profile_tag_key))
and "true"
== reverse_resolve_tag_value(UseCaseID.TRANSACTIONS, bucket["org_id"], tag_value)
== reverse_resolve_tag_value(
UseCaseID.TRANSACTIONS, generic_metric["org_id"], tag_value
)
)

def _produce_billing_outcomes(self, payload: MetricsBucket) -> None:
for category, quantity in self._count_processed_items(payload).items():
def _produce_billing_outcomes(self, generic_metric: GenericMetric) -> None:
for category, quantity in self._count_processed_items(generic_metric).items():
self._produce_billing_outcome(
org_id=payload["org_id"],
project_id=payload["project_id"],
org_id=generic_metric["org_id"],
project_id=generic_metric["project_id"],
category=category,
quantity=quantity,
)
Expand Down Expand Up @@ -141,5 +143,40 @@ def _produce_billing_outcome(
quantity=quantity,
)

def _flag_metric_received_for_project(self, generic_metric: GenericMetric) -> None:
try:
org_id = generic_metric["org_id"]
project_id = generic_metric["project_id"]
metric_mri = self._resolve(generic_metric["mapping_meta"], generic_metric["metric_id"])

parsed_mri = parse_mri(metric_mri)
# If the metric is not custom, we don't want to perform any work.
if parsed_mri is None or not is_custom_metric(parsed_mri):
return

# If the cache key is there, we don't want to load the project at all.
cache_key = _get_project_flag_updated_cache_key(org_id, project_id)
if cache.get(cache_key) is not None:
return

project = Project.objects.get_from_cache(id=project_id)
if not project.flags.has_custom_metrics:
# We assume that the flag update is reflected in the cache, so that upcoming calls will get the up-to-
# date project with the `has_custom_metrics` flag set to true.
project.update(flags=F("flags").bitor(Project.flags.has_custom_metrics))

# If we are here, it means that we received a custom metric, and we didn't have it reflected in the cache,
# so we update the cache.
cache.set(cache_key, "1", CACHE_TTL_IN_SECONDS)
except Project.DoesNotExist:
pass

def _resolve(self, mapping_meta: Mapping[str, Any], indexed_value: int) -> Optional[str]:
for _, inner_meta in mapping_meta.items():
if (string_value := inner_meta.get(str(indexed_value))) is not None:
return string_value

return None

def join(self, timeout: Optional[float] = None) -> None:
self.__next_step.join(timeout)
Loading

0 comments on commit 24f4a83

Please sign in to comment.