From 6356183e34ae9b037212f918e50407ec2d23e1f6 Mon Sep 17 00:00:00 2001 From: Dan Fuller Date: Tue, 28 Jan 2025 13:41:46 -0800 Subject: [PATCH] feat(uptime): Propagate checker configs to redis as well as kafka (#84151) This propagates our checker configs to our new redis config store. For the moment we're dual writing, but we'll remove the Kafka code once it is stable and running in prod. --- src/sentry/conf/server.py | 1 + src/sentry/uptime/config_producer.py | 61 +++++++++ .../uptime/consumers/test_results_consumer.py | 19 ++- .../uptime/subscriptions/test_regions.py | 4 + .../subscriptions/test_subscriptions.py | 4 + .../sentry/uptime/subscriptions/test_tasks.py | 119 ++++++++++++------ 6 files changed, 165 insertions(+), 43 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 2e9f2b88253a63..1a517b860089d9 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3473,6 +3473,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: enabled=True, ), ] +UPTIME_CONFIG_PARTITIONS = 128 MARKETO: Mapping[str, Any] = { "base-url": os.getenv("MARKETO_BASE_URL"), diff --git a/src/sentry/uptime/config_producer.py b/src/sentry/uptime/config_producer.py index dbf486fa188cf1..60bc54248361a4 100644 --- a/src/sentry/uptime/config_producer.py +++ b/src/sentry/uptime/config_producer.py @@ -3,13 +3,18 @@ import logging from uuid import UUID +import msgpack from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration +from django.conf import settings +from redis import StrictRedis +from rediscluster import RedisCluster from sentry_kafka_schemas.codecs import Codec from sentry_kafka_schemas.schema_types.uptime_configs_v1 import CheckConfig from sentry.conf.types.kafka_definition import Topic, get_topic_codec from sentry.uptime.subscriptions.regions import get_region_config +from sentry.utils import redis from sentry.utils.arroyo_producer import SingletonProducer from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -35,10 +40,16 @@ def produce_config(destination_region_slug: str, config: CheckConfig): UUID(config["subscription_id"]), UPTIME_CONFIGS_CODEC.encode(config), ) + _send_to_redis( + destination_region_slug, + UUID(config["subscription_id"]), + UPTIME_CONFIGS_CODEC.encode(config), + ) def produce_config_removal(destination_region_slug: str, subscription_id: str): _produce_to_kafka(destination_region_slug, UUID(subscription_id), None) + _send_to_redis(destination_region_slug, UUID(subscription_id), None) def _produce_to_kafka( @@ -62,3 +73,53 @@ def _produce_to_kafka( ) result = _configs_producer.produce(ArroyoTopic(topic), payload) result.result() + + +def get_partition_from_subscription_id(subscription_id: UUID) -> int: + return int(subscription_id) % settings.UPTIME_CONFIG_PARTITIONS + + +def get_partition_keys(subscription_id: UUID) -> tuple[str, str]: + partition = get_partition_from_subscription_id(subscription_id) + return f"uptime:configs:{partition}", f"uptime:updates:{partition}" + + +def _send_to_redis( + destination_region_slug: str, subscription_id: UUID, value: bytes | None +) -> None: + region_config = get_region_config(destination_region_slug) + if region_config is None: + logger.error( + "Attempted to create uptime subscription with invalid region slug", + extra={"region_slug": destination_region_slug, "subscription_id": subscription_id}, + ) + return + + partition = get_partition_from_subscription_id(subscription_id) + key = subscription_id.hex + + config_key = f"uptime:configs:{partition}" + update_key = f"uptime:updates:{partition}" + + cluster: RedisCluster | StrictRedis = redis.redis_clusters.get_binary( + region_config.config_redis_cluster + ) + pipe = cluster.pipeline() + if value is None: + pipe.hdel(config_key, key) + action = "delete" + else: + pipe.hset(config_key, key, value) + action = "upsert" + + pipe.hset( + update_key, + subscription_id.hex, + msgpack.packb( + { + "action": action, + "subscription_id": subscription_id.hex, + } + ), + ) + pipe.execute() diff --git a/tests/sentry/uptime/consumers/test_results_consumer.py b/tests/sentry/uptime/consumers/test_results_consumer.py index d1297629894e83..1508dcf35ce716 100644 --- a/tests/sentry/uptime/consumers/test_results_consumer.py +++ b/tests/sentry/uptime/consumers/test_results_consumer.py @@ -9,6 +9,7 @@ from arroyo.backends.kafka import KafkaPayload from arroyo.processing.strategies import ProcessingStrategy from arroyo.types import BrokerValue, Partition, Topic +from django.conf import settings from django.test import override_settings from sentry_kafka_schemas.schema_types.uptime_results_v1 import ( CHECKSTATUS_FAILURE, @@ -41,10 +42,10 @@ UptimeSubscription, ) from sentry.utils import json -from tests.sentry.uptime.subscriptions.test_tasks import ProducerTestMixin +from tests.sentry.uptime.subscriptions.test_tasks import ConfigPusherTestMixin -class ProcessResultTest(ProducerTestMixin): +class ProcessResultTest(ConfigPusherTestMixin): def setUp(self): super().setUp() self.partition = Partition(Topic("test"), 0) @@ -434,7 +435,7 @@ def test_no_subscription(self): ) ] ) - self.assert_producer_calls((subscription_id, kafka_definition.Topic.UPTIME_CONFIGS)) + self.assert_config_calls((subscription_id, kafka_definition.Topic.UPTIME_CONFIGS)) def test_multiple_project_subscriptions_with_disabled(self): """ @@ -849,12 +850,14 @@ def test_check_and_update_regions(self, mock_random): slug="region1", name="Region 1", config_topic=KafkaTopic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region2", name="Region 2", config_topic=KafkaTopic.UPTIME_RESULTS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), ] @@ -873,7 +876,7 @@ def test_check_and_update_regions(self, mock_random): self.send_result(result) sub.refresh_from_db() assert {r.region_slug for r in sub.regions.all()} == {"region1", "region2"} - self.assert_producer_calls( + self.assert_config_calls( (sub, kafka_definition.Topic.UPTIME_CONFIGS), (sub, kafka_definition.Topic.UPTIME_RESULTS), ) @@ -890,12 +893,14 @@ def test_check_and_update_regions_removes_disabled(self, mock_random): slug="region1", name="Region 1", config_topic=KafkaTopic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region2", name="Region 2", config_topic=KafkaTopic.UPTIME_RESULTS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=False, ), ] @@ -910,9 +915,10 @@ def test_check_and_update_regions_removes_disabled(self, mock_random): sub.refresh_from_db() assert {r.region_slug for r in sub.regions.all()} == {"region1"} assert sub.subscription_id - self.assert_producer_calls( + self.assert_config_calls( (sub.subscription_id, kafka_definition.Topic.UPTIME_RESULTS), (sub, kafka_definition.Topic.UPTIME_CONFIGS), + check_redis=False, ) assert sub.status == UptimeSubscription.Status.ACTIVE.value @@ -926,6 +932,7 @@ def test_check_and_update_regions_random_skip(self, mock_random): slug="region1", name="Region 1", config_topic=KafkaTopic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), ] @@ -940,4 +947,4 @@ def test_check_and_update_regions_random_skip(self, mock_random): self.send_result(result) sub.refresh_from_db() assert {r.region_slug for r in sub.regions.all()} == set() - self.assert_producer_calls() + self.assert_config_calls() diff --git a/tests/sentry/uptime/subscriptions/test_regions.py b/tests/sentry/uptime/subscriptions/test_regions.py index e4581e5630e453..c847908e7176a6 100644 --- a/tests/sentry/uptime/subscriptions/test_regions.py +++ b/tests/sentry/uptime/subscriptions/test_regions.py @@ -1,3 +1,4 @@ +from django.conf import settings from django.test import TestCase, override_settings from sentry.conf.types.kafka_definition import Topic @@ -12,18 +13,21 @@ def setUp(self): slug="us", name="United States", config_topic=Topic("uptime-results"), + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="eu", name="Europe", config_topic=Topic("uptime-configs"), + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=False, ), UptimeRegionConfig( slug="ap", name="Asia Pacific", config_topic=Topic("monitors-clock-tasks"), + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), ] diff --git a/tests/sentry/uptime/subscriptions/test_subscriptions.py b/tests/sentry/uptime/subscriptions/test_subscriptions.py index 2fc74b14ae27eb..2b13eaf86ef8d8 100644 --- a/tests/sentry/uptime/subscriptions/test_subscriptions.py +++ b/tests/sentry/uptime/subscriptions/test_subscriptions.py @@ -1,6 +1,7 @@ from unittest import mock import pytest +from django.conf import settings from django.test import override_settings from pytest import raises @@ -242,18 +243,21 @@ def test_auto_associates_active_regions(self): slug="region1", name="Region 1", config_topic=Topic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region2", name="Region 2", config_topic=Topic.UPTIME_RESULTS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region3", name="Region 3", config_topic=Topic.MONITORS_CLOCK_TASKS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=False, # This one shouldn't be associated ), ] diff --git a/tests/sentry/uptime/subscriptions/test_tasks.py b/tests/sentry/uptime/subscriptions/test_tasks.py index c0e2a316733d8f..036172c7e2cfbc 100644 --- a/tests/sentry/uptime/subscriptions/test_tasks.py +++ b/tests/sentry/uptime/subscriptions/test_tasks.py @@ -4,19 +4,23 @@ from unittest.mock import patch from uuid import UUID, uuid4 +import msgpack import pytest from arroyo import Topic as ArroyoTopic +from django.conf import settings from django.test import override_settings from django.utils import timezone +from redis import StrictRedis +from rediscluster import RedisCluster from sentry.conf.types.kafka_definition import Topic from sentry.conf.types.uptime import UptimeRegionConfig from sentry.testutils.abstract import Abstract from sentry.testutils.cases import UptimeTestCase from sentry.testutils.skips import requires_kafka -from sentry.uptime.config_producer import UPTIME_CONFIGS_CODEC +from sentry.uptime.config_producer import UPTIME_CONFIGS_CODEC, get_partition_keys from sentry.uptime.models import UptimeSubscription -from sentry.uptime.subscriptions.regions import get_active_region_configs +from sentry.uptime.subscriptions.regions import get_active_region_configs, get_region_config from sentry.uptime.subscriptions.tasks import ( SUBSCRIPTION_STATUS_MAX_AGE, create_remote_uptime_subscription, @@ -26,12 +30,13 @@ update_remote_uptime_subscription, uptime_subscription_to_check_config, ) +from sentry.utils import redis from sentry.utils.kafka_config import get_topic_definition pytestmark = [requires_kafka] -class ProducerTestMixin(UptimeTestCase): +class ConfigPusherTestMixin(UptimeTestCase): __test__ = Abstract(__module__, __qualname__) @pytest.fixture(autouse=True) @@ -40,34 +45,71 @@ def _setup_producer(self): self.producer = producer yield - def assert_producer_calls(self, *args: tuple[UptimeSubscription | str, Topic]): + def assert_config_calls(self, *args: tuple[UptimeSubscription | str, Topic], check_redis=True): # Verify the number of calls matches what we expect assert len(self.producer.produce.call_args_list) == len(args) - for (arg, expected_topic), producer_call in zip(args, self.producer.produce.call_args_list): - # Check topic - assert producer_call[0][0] == ArroyoTopic( - get_topic_definition(expected_topic)["real_topic_name"] + for (sub, expected_topic), producer_call in zip(args, self.producer.produce.call_args_list): + self.assert_kafka_producer_call(sub, expected_topic, producer_call) + if check_redis: + self.assert_redis_config("default", sub) + + def assert_kafka_producer_call( + self, sub: UptimeSubscription | str, expected_topic: Topic, producer_call + ): + # Check topic + assert producer_call[0][0] == ArroyoTopic( + get_topic_definition(expected_topic)["real_topic_name"] + ) + + # Check message ID + expected_message_id = UUID( + sub.subscription_id if isinstance(sub, UptimeSubscription) else sub + ).bytes + assert producer_call[0][1].key == expected_message_id + + # Check payload + expected_payload = ( + UPTIME_CONFIGS_CODEC.encode( + uptime_subscription_to_check_config(sub, str(sub.subscription_id)) ) + if isinstance(sub, UptimeSubscription) + else None + ) + assert producer_call[0][1].value == expected_payload - # Check message ID - expected_message_id = UUID( - arg.subscription_id if isinstance(arg, UptimeSubscription) else arg - ).bytes - assert producer_call[0][1].key == expected_message_id - - # Check payload - expected_payload = ( - UPTIME_CONFIGS_CODEC.encode( - uptime_subscription_to_check_config(arg, str(arg.subscription_id)) - ) - if isinstance(arg, UptimeSubscription) - else None + def assert_redis_config(self, region: str, sub: UptimeSubscription | str): + region_config = get_region_config(region) + assert region_config is not None + cluster: RedisCluster | StrictRedis = redis.redis_clusters.get_binary( + region_config.config_redis_cluster + ) + if isinstance(sub, UptimeSubscription): + action = "upsert" + subscription_id = sub.subscription_id + else: + action = "delete" + subscription_id = sub + assert subscription_id is not None + config_key, update_key = get_partition_keys(UUID(subscription_id)) + if isinstance(sub, UptimeSubscription): + config_bytes = cluster.hget(config_key, subscription_id) + assert config_bytes is not None + assert msgpack.unpackb(config_bytes) == uptime_subscription_to_check_config( + sub, subscription_id ) - assert producer_call[0][1].value == expected_payload + else: + assert not cluster.hexists(config_key, subscription_id) + + update_bytes = cluster.hget(update_key, subscription_id) + assert update_bytes is not None + assert msgpack.unpackb(update_bytes) == { + "action": action, + "subscription_id": subscription_id, + } -class BaseUptimeSubscriptionTaskTest(ProducerTestMixin, metaclass=abc.ABCMeta): +class BaseUptimeSubscriptionTaskTest(ConfigPusherTestMixin, metaclass=abc.ABCMeta): __test__ = Abstract(__module__, __qualname__) status_translations = { @@ -109,7 +151,7 @@ def test_no_subscription(self): ), sample_rate=1.0, ) - self.assert_producer_calls() + self.assert_config_calls() def test_invalid_status(self): sub = self.create_subscription(UptimeSubscription.Status.ACTIVE) @@ -120,7 +162,7 @@ def test_invalid_status(self): ), sample_rate=1.0, ) - self.assert_producer_calls() + self.assert_config_calls() class CreateUptimeSubscriptionTaskTest(BaseUptimeSubscriptionTaskTest): @@ -133,7 +175,7 @@ def test(self): sub.refresh_from_db() assert sub.status == UptimeSubscription.Status.ACTIVE.value assert sub.subscription_id is not None - self.assert_producer_calls((sub, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub, Topic.UPTIME_CONFIGS)) def test_with_regions(self): sub = self.create_uptime_subscription( @@ -143,7 +185,7 @@ def test_with_regions(self): sub.refresh_from_db() assert sub.status == UptimeSubscription.Status.ACTIVE.value assert sub.subscription_id is not None - self.assert_producer_calls((sub, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub, Topic.UPTIME_CONFIGS)) def test_without_regions_uses_default(self): sub = self.create_uptime_subscription(status=UptimeSubscription.Status.CREATING) @@ -151,7 +193,7 @@ def test_without_regions_uses_default(self): sub.refresh_from_db() assert sub.status == UptimeSubscription.Status.ACTIVE.value assert sub.subscription_id is not None - self.assert_producer_calls((sub, get_active_region_configs()[0].config_topic)) + self.assert_config_calls((sub, get_active_region_configs()[0].config_topic)) def test_multi_overlapping_regions(self): regions = [ @@ -159,18 +201,21 @@ def test_multi_overlapping_regions(self): slug="region1", name="Region 1", config_topic=Topic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region2", name="Region 2", config_topic=Topic.UPTIME_RESULTS, # Using a different topic + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region3", name="Region 3", config_topic=Topic.MONITORS_CLOCK_TASKS, # Another different topic + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), ] @@ -190,7 +235,7 @@ def test_multi_overlapping_regions(self): sub2.refresh_from_db() # Verify that each subscription was sent to the correct topics for its regions - self.assert_producer_calls( + self.assert_config_calls( (sub1, Topic.UPTIME_CONFIGS), (sub1, Topic.UPTIME_RESULTS), (sub2, Topic.UPTIME_RESULTS), @@ -209,14 +254,14 @@ def test(self): ) delete_remote_uptime_subscription(sub.id) assert not UptimeSubscription.objects.filter(id=sub.id).exists() - self.assert_producer_calls((subscription_id, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((subscription_id, Topic.UPTIME_CONFIGS)) def test_no_subscription_id(self): sub = self.create_subscription(UptimeSubscription.Status.DELETING) assert sub.subscription_id is None delete_remote_uptime_subscription(sub.id) assert not UptimeSubscription.objects.filter(id=sub.id).exists() - self.assert_producer_calls() + self.assert_config_calls() def test_delete_with_regions(self): sub = self.create_uptime_subscription( @@ -226,7 +271,7 @@ def test_delete_with_regions(self): ) delete_remote_uptime_subscription(sub.id) assert sub.subscription_id is not None - self.assert_producer_calls((sub.subscription_id, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub.subscription_id, Topic.UPTIME_CONFIGS)) with pytest.raises(UptimeSubscription.DoesNotExist): sub.refresh_from_db() @@ -236,7 +281,7 @@ def test_delete_without_regions_uses_default(self): ) delete_remote_uptime_subscription(sub.id) assert sub.subscription_id is not None - self.assert_producer_calls((sub.subscription_id, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub.subscription_id, Topic.UPTIME_CONFIGS)) with pytest.raises(UptimeSubscription.DoesNotExist): sub.refresh_from_db() @@ -319,12 +364,12 @@ def test_no_regions(self): } -class SendUptimeConfigDeletionTest(ProducerTestMixin): +class SendUptimeConfigDeletionTest(ConfigPusherTestMixin): def test_with_region(self): subscription_id = uuid4().hex region_slug = "default" send_uptime_config_deletion(region_slug, subscription_id) - self.assert_producer_calls((subscription_id, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((subscription_id, Topic.UPTIME_CONFIGS)) class SubscriptionCheckerTest(UptimeTestCase): @@ -373,7 +418,7 @@ def test_update(self): assert sub.status == UptimeSubscription.Status.ACTIVE.value # Verify config was sent to the region - self.assert_producer_calls((sub, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub, Topic.UPTIME_CONFIGS)) def test_without_regions_uses_default(self): sub = self.create_uptime_subscription( @@ -387,4 +432,4 @@ def test_without_regions_uses_default(self): assert sub.status == UptimeSubscription.Status.ACTIVE.value # Verify config was sent to default region - self.assert_producer_calls((sub, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub, Topic.UPTIME_CONFIGS))