Skip to content

Commit

Permalink
feat(uptime): Propagate checker configs to redis as well as kafka (#8…
Browse files Browse the repository at this point in the history
…4151)

This propagates our checker configs to our new redis config store. For
the moment we're dual writing, but we'll remove the Kafka code once it
is stable and running in prod.

<!-- Describe your PR here. -->
  • Loading branch information
wedamija authored Jan 28, 2025
1 parent 4283c5d commit 6356183
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 43 deletions.
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3473,6 +3473,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
enabled=True,
),
]
UPTIME_CONFIG_PARTITIONS = 128

MARKETO: Mapping[str, Any] = {
"base-url": os.getenv("MARKETO_BASE_URL"),
Expand Down
61 changes: 61 additions & 0 deletions src/sentry/uptime/config_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
import logging
from uuid import UUID

import msgpack
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from redis import StrictRedis
from rediscluster import RedisCluster
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.uptime_configs_v1 import CheckConfig

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.uptime.subscriptions.regions import get_region_config
from sentry.utils import redis
from sentry.utils.arroyo_producer import SingletonProducer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

Expand All @@ -35,10 +40,16 @@ def produce_config(destination_region_slug: str, config: CheckConfig):
UUID(config["subscription_id"]),
UPTIME_CONFIGS_CODEC.encode(config),
)
_send_to_redis(
destination_region_slug,
UUID(config["subscription_id"]),
UPTIME_CONFIGS_CODEC.encode(config),
)


def produce_config_removal(destination_region_slug: str, subscription_id: str):
_produce_to_kafka(destination_region_slug, UUID(subscription_id), None)
_send_to_redis(destination_region_slug, UUID(subscription_id), None)


def _produce_to_kafka(
Expand All @@ -62,3 +73,53 @@ def _produce_to_kafka(
)
result = _configs_producer.produce(ArroyoTopic(topic), payload)
result.result()


def get_partition_from_subscription_id(subscription_id: UUID) -> int:
return int(subscription_id) % settings.UPTIME_CONFIG_PARTITIONS


def get_partition_keys(subscription_id: UUID) -> tuple[str, str]:
partition = get_partition_from_subscription_id(subscription_id)
return f"uptime:configs:{partition}", f"uptime:updates:{partition}"


def _send_to_redis(
destination_region_slug: str, subscription_id: UUID, value: bytes | None
) -> None:
region_config = get_region_config(destination_region_slug)
if region_config is None:
logger.error(
"Attempted to create uptime subscription with invalid region slug",
extra={"region_slug": destination_region_slug, "subscription_id": subscription_id},
)
return

partition = get_partition_from_subscription_id(subscription_id)
key = subscription_id.hex

config_key = f"uptime:configs:{partition}"
update_key = f"uptime:updates:{partition}"

cluster: RedisCluster | StrictRedis = redis.redis_clusters.get_binary(
region_config.config_redis_cluster
)
pipe = cluster.pipeline()
if value is None:
pipe.hdel(config_key, key)
action = "delete"
else:
pipe.hset(config_key, key, value)
action = "upsert"

pipe.hset(
update_key,
subscription_id.hex,
msgpack.packb(
{
"action": action,
"subscription_id": subscription_id.hex,
}
),
)
pipe.execute()
19 changes: 13 additions & 6 deletions tests/sentry/uptime/consumers/test_results_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import ProcessingStrategy
from arroyo.types import BrokerValue, Partition, Topic
from django.conf import settings
from django.test import override_settings
from sentry_kafka_schemas.schema_types.uptime_results_v1 import (
CHECKSTATUS_FAILURE,
Expand Down Expand Up @@ -41,10 +42,10 @@
UptimeSubscription,
)
from sentry.utils import json
from tests.sentry.uptime.subscriptions.test_tasks import ProducerTestMixin
from tests.sentry.uptime.subscriptions.test_tasks import ConfigPusherTestMixin


class ProcessResultTest(ProducerTestMixin):
class ProcessResultTest(ConfigPusherTestMixin):
def setUp(self):
super().setUp()
self.partition = Partition(Topic("test"), 0)
Expand Down Expand Up @@ -434,7 +435,7 @@ def test_no_subscription(self):
)
]
)
self.assert_producer_calls((subscription_id, kafka_definition.Topic.UPTIME_CONFIGS))
self.assert_config_calls((subscription_id, kafka_definition.Topic.UPTIME_CONFIGS))

def test_multiple_project_subscriptions_with_disabled(self):
"""
Expand Down Expand Up @@ -849,12 +850,14 @@ def test_check_and_update_regions(self, mock_random):
slug="region1",
name="Region 1",
config_topic=KafkaTopic.UPTIME_CONFIGS,
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=True,
),
UptimeRegionConfig(
slug="region2",
name="Region 2",
config_topic=KafkaTopic.UPTIME_RESULTS,
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=True,
),
]
Expand All @@ -873,7 +876,7 @@ def test_check_and_update_regions(self, mock_random):
self.send_result(result)
sub.refresh_from_db()
assert {r.region_slug for r in sub.regions.all()} == {"region1", "region2"}
self.assert_producer_calls(
self.assert_config_calls(
(sub, kafka_definition.Topic.UPTIME_CONFIGS),
(sub, kafka_definition.Topic.UPTIME_RESULTS),
)
Expand All @@ -890,12 +893,14 @@ def test_check_and_update_regions_removes_disabled(self, mock_random):
slug="region1",
name="Region 1",
config_topic=KafkaTopic.UPTIME_CONFIGS,
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=True,
),
UptimeRegionConfig(
slug="region2",
name="Region 2",
config_topic=KafkaTopic.UPTIME_RESULTS,
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=False,
),
]
Expand All @@ -910,9 +915,10 @@ def test_check_and_update_regions_removes_disabled(self, mock_random):
sub.refresh_from_db()
assert {r.region_slug for r in sub.regions.all()} == {"region1"}
assert sub.subscription_id
self.assert_producer_calls(
self.assert_config_calls(
(sub.subscription_id, kafka_definition.Topic.UPTIME_RESULTS),
(sub, kafka_definition.Topic.UPTIME_CONFIGS),
check_redis=False,
)
assert sub.status == UptimeSubscription.Status.ACTIVE.value

Expand All @@ -926,6 +932,7 @@ def test_check_and_update_regions_random_skip(self, mock_random):
slug="region1",
name="Region 1",
config_topic=KafkaTopic.UPTIME_CONFIGS,
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=True,
),
]
Expand All @@ -940,4 +947,4 @@ def test_check_and_update_regions_random_skip(self, mock_random):
self.send_result(result)
sub.refresh_from_db()
assert {r.region_slug for r in sub.regions.all()} == set()
self.assert_producer_calls()
self.assert_config_calls()
4 changes: 4 additions & 0 deletions tests/sentry/uptime/subscriptions/test_regions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.conf import settings
from django.test import TestCase, override_settings

from sentry.conf.types.kafka_definition import Topic
Expand All @@ -12,18 +13,21 @@ def setUp(self):
slug="us",
name="United States",
config_topic=Topic("uptime-results"),
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=True,
),
UptimeRegionConfig(
slug="eu",
name="Europe",
config_topic=Topic("uptime-configs"),
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=False,
),
UptimeRegionConfig(
slug="ap",
name="Asia Pacific",
config_topic=Topic("monitors-clock-tasks"),
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=True,
),
]
Expand Down
4 changes: 4 additions & 0 deletions tests/sentry/uptime/subscriptions/test_subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from unittest import mock

import pytest
from django.conf import settings
from django.test import override_settings
from pytest import raises

Expand Down Expand Up @@ -242,18 +243,21 @@ def test_auto_associates_active_regions(self):
slug="region1",
name="Region 1",
config_topic=Topic.UPTIME_CONFIGS,
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=True,
),
UptimeRegionConfig(
slug="region2",
name="Region 2",
config_topic=Topic.UPTIME_RESULTS,
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=True,
),
UptimeRegionConfig(
slug="region3",
name="Region 3",
config_topic=Topic.MONITORS_CLOCK_TASKS,
config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER,
enabled=False, # This one shouldn't be associated
),
]
Expand Down
Loading

0 comments on commit 6356183

Please sign in to comment.