diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 2e9f2b88253a63..1a517b860089d9 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3473,6 +3473,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: enabled=True, ), ] +UPTIME_CONFIG_PARTITIONS = 128 MARKETO: Mapping[str, Any] = { "base-url": os.getenv("MARKETO_BASE_URL"), diff --git a/src/sentry/uptime/config_producer.py b/src/sentry/uptime/config_producer.py index dbf486fa188cf1..60bc54248361a4 100644 --- a/src/sentry/uptime/config_producer.py +++ b/src/sentry/uptime/config_producer.py @@ -3,13 +3,18 @@ import logging from uuid import UUID +import msgpack from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration +from django.conf import settings +from redis import StrictRedis +from rediscluster import RedisCluster from sentry_kafka_schemas.codecs import Codec from sentry_kafka_schemas.schema_types.uptime_configs_v1 import CheckConfig from sentry.conf.types.kafka_definition import Topic, get_topic_codec from sentry.uptime.subscriptions.regions import get_region_config +from sentry.utils import redis from sentry.utils.arroyo_producer import SingletonProducer from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -35,10 +40,16 @@ def produce_config(destination_region_slug: str, config: CheckConfig): UUID(config["subscription_id"]), UPTIME_CONFIGS_CODEC.encode(config), ) + _send_to_redis( + destination_region_slug, + UUID(config["subscription_id"]), + UPTIME_CONFIGS_CODEC.encode(config), + ) def produce_config_removal(destination_region_slug: str, subscription_id: str): _produce_to_kafka(destination_region_slug, UUID(subscription_id), None) + _send_to_redis(destination_region_slug, UUID(subscription_id), None) def _produce_to_kafka( @@ -62,3 +73,53 @@ def _produce_to_kafka( ) result = _configs_producer.produce(ArroyoTopic(topic), payload) result.result() + + +def get_partition_from_subscription_id(subscription_id: UUID) -> int: + return int(subscription_id) % settings.UPTIME_CONFIG_PARTITIONS + + +def get_partition_keys(subscription_id: UUID) -> tuple[str, str]: + partition = get_partition_from_subscription_id(subscription_id) + return f"uptime:configs:{partition}", f"uptime:updates:{partition}" + + +def _send_to_redis( + destination_region_slug: str, subscription_id: UUID, value: bytes | None +) -> None: + region_config = get_region_config(destination_region_slug) + if region_config is None: + logger.error( + "Attempted to create uptime subscription with invalid region slug", + extra={"region_slug": destination_region_slug, "subscription_id": subscription_id}, + ) + return + + partition = get_partition_from_subscription_id(subscription_id) + key = subscription_id.hex + + config_key = f"uptime:configs:{partition}" + update_key = f"uptime:updates:{partition}" + + cluster: RedisCluster | StrictRedis = redis.redis_clusters.get_binary( + region_config.config_redis_cluster + ) + pipe = cluster.pipeline() + if value is None: + pipe.hdel(config_key, key) + action = "delete" + else: + pipe.hset(config_key, key, value) + action = "upsert" + + pipe.hset( + update_key, + subscription_id.hex, + msgpack.packb( + { + "action": action, + "subscription_id": subscription_id.hex, + } + ), + ) + pipe.execute() diff --git a/tests/sentry/uptime/consumers/test_results_consumer.py b/tests/sentry/uptime/consumers/test_results_consumer.py index d1297629894e83..1508dcf35ce716 100644 --- a/tests/sentry/uptime/consumers/test_results_consumer.py +++ b/tests/sentry/uptime/consumers/test_results_consumer.py @@ -9,6 +9,7 @@ from arroyo.backends.kafka import KafkaPayload from arroyo.processing.strategies import ProcessingStrategy from arroyo.types import BrokerValue, Partition, Topic +from django.conf import settings from django.test import override_settings from sentry_kafka_schemas.schema_types.uptime_results_v1 import ( CHECKSTATUS_FAILURE, @@ -41,10 +42,10 @@ UptimeSubscription, ) from sentry.utils import json -from tests.sentry.uptime.subscriptions.test_tasks import ProducerTestMixin +from tests.sentry.uptime.subscriptions.test_tasks import ConfigPusherTestMixin -class ProcessResultTest(ProducerTestMixin): +class ProcessResultTest(ConfigPusherTestMixin): def setUp(self): super().setUp() self.partition = Partition(Topic("test"), 0) @@ -434,7 +435,7 @@ def test_no_subscription(self): ) ] ) - self.assert_producer_calls((subscription_id, kafka_definition.Topic.UPTIME_CONFIGS)) + self.assert_config_calls((subscription_id, kafka_definition.Topic.UPTIME_CONFIGS)) def test_multiple_project_subscriptions_with_disabled(self): """ @@ -849,12 +850,14 @@ def test_check_and_update_regions(self, mock_random): slug="region1", name="Region 1", config_topic=KafkaTopic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region2", name="Region 2", config_topic=KafkaTopic.UPTIME_RESULTS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), ] @@ -873,7 +876,7 @@ def test_check_and_update_regions(self, mock_random): self.send_result(result) sub.refresh_from_db() assert {r.region_slug for r in sub.regions.all()} == {"region1", "region2"} - self.assert_producer_calls( + self.assert_config_calls( (sub, kafka_definition.Topic.UPTIME_CONFIGS), (sub, kafka_definition.Topic.UPTIME_RESULTS), ) @@ -890,12 +893,14 @@ def test_check_and_update_regions_removes_disabled(self, mock_random): slug="region1", name="Region 1", config_topic=KafkaTopic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region2", name="Region 2", config_topic=KafkaTopic.UPTIME_RESULTS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=False, ), ] @@ -910,9 +915,10 @@ def test_check_and_update_regions_removes_disabled(self, mock_random): sub.refresh_from_db() assert {r.region_slug for r in sub.regions.all()} == {"region1"} assert sub.subscription_id - self.assert_producer_calls( + self.assert_config_calls( (sub.subscription_id, kafka_definition.Topic.UPTIME_RESULTS), (sub, kafka_definition.Topic.UPTIME_CONFIGS), + check_redis=False, ) assert sub.status == UptimeSubscription.Status.ACTIVE.value @@ -926,6 +932,7 @@ def test_check_and_update_regions_random_skip(self, mock_random): slug="region1", name="Region 1", config_topic=KafkaTopic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), ] @@ -940,4 +947,4 @@ def test_check_and_update_regions_random_skip(self, mock_random): self.send_result(result) sub.refresh_from_db() assert {r.region_slug for r in sub.regions.all()} == set() - self.assert_producer_calls() + self.assert_config_calls() diff --git a/tests/sentry/uptime/subscriptions/test_regions.py b/tests/sentry/uptime/subscriptions/test_regions.py index e4581e5630e453..c847908e7176a6 100644 --- a/tests/sentry/uptime/subscriptions/test_regions.py +++ b/tests/sentry/uptime/subscriptions/test_regions.py @@ -1,3 +1,4 @@ +from django.conf import settings from django.test import TestCase, override_settings from sentry.conf.types.kafka_definition import Topic @@ -12,18 +13,21 @@ def setUp(self): slug="us", name="United States", config_topic=Topic("uptime-results"), + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="eu", name="Europe", config_topic=Topic("uptime-configs"), + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=False, ), UptimeRegionConfig( slug="ap", name="Asia Pacific", config_topic=Topic("monitors-clock-tasks"), + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), ] diff --git a/tests/sentry/uptime/subscriptions/test_subscriptions.py b/tests/sentry/uptime/subscriptions/test_subscriptions.py index 2fc74b14ae27eb..2b13eaf86ef8d8 100644 --- a/tests/sentry/uptime/subscriptions/test_subscriptions.py +++ b/tests/sentry/uptime/subscriptions/test_subscriptions.py @@ -1,6 +1,7 @@ from unittest import mock import pytest +from django.conf import settings from django.test import override_settings from pytest import raises @@ -242,18 +243,21 @@ def test_auto_associates_active_regions(self): slug="region1", name="Region 1", config_topic=Topic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region2", name="Region 2", config_topic=Topic.UPTIME_RESULTS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region3", name="Region 3", config_topic=Topic.MONITORS_CLOCK_TASKS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=False, # This one shouldn't be associated ), ] diff --git a/tests/sentry/uptime/subscriptions/test_tasks.py b/tests/sentry/uptime/subscriptions/test_tasks.py index c0e2a316733d8f..036172c7e2cfbc 100644 --- a/tests/sentry/uptime/subscriptions/test_tasks.py +++ b/tests/sentry/uptime/subscriptions/test_tasks.py @@ -4,19 +4,23 @@ from unittest.mock import patch from uuid import UUID, uuid4 +import msgpack import pytest from arroyo import Topic as ArroyoTopic +from django.conf import settings from django.test import override_settings from django.utils import timezone +from redis import StrictRedis +from rediscluster import RedisCluster from sentry.conf.types.kafka_definition import Topic from sentry.conf.types.uptime import UptimeRegionConfig from sentry.testutils.abstract import Abstract from sentry.testutils.cases import UptimeTestCase from sentry.testutils.skips import requires_kafka -from sentry.uptime.config_producer import UPTIME_CONFIGS_CODEC +from sentry.uptime.config_producer import UPTIME_CONFIGS_CODEC, get_partition_keys from sentry.uptime.models import UptimeSubscription -from sentry.uptime.subscriptions.regions import get_active_region_configs +from sentry.uptime.subscriptions.regions import get_active_region_configs, get_region_config from sentry.uptime.subscriptions.tasks import ( SUBSCRIPTION_STATUS_MAX_AGE, create_remote_uptime_subscription, @@ -26,12 +30,13 @@ update_remote_uptime_subscription, uptime_subscription_to_check_config, ) +from sentry.utils import redis from sentry.utils.kafka_config import get_topic_definition pytestmark = [requires_kafka] -class ProducerTestMixin(UptimeTestCase): +class ConfigPusherTestMixin(UptimeTestCase): __test__ = Abstract(__module__, __qualname__) @pytest.fixture(autouse=True) @@ -40,34 +45,71 @@ def _setup_producer(self): self.producer = producer yield - def assert_producer_calls(self, *args: tuple[UptimeSubscription | str, Topic]): + def assert_config_calls(self, *args: tuple[UptimeSubscription | str, Topic], check_redis=True): # Verify the number of calls matches what we expect assert len(self.producer.produce.call_args_list) == len(args) - for (arg, expected_topic), producer_call in zip(args, self.producer.produce.call_args_list): - # Check topic - assert producer_call[0][0] == ArroyoTopic( - get_topic_definition(expected_topic)["real_topic_name"] + for (sub, expected_topic), producer_call in zip(args, self.producer.produce.call_args_list): + self.assert_kafka_producer_call(sub, expected_topic, producer_call) + if check_redis: + self.assert_redis_config("default", sub) + + def assert_kafka_producer_call( + self, sub: UptimeSubscription | str, expected_topic: Topic, producer_call + ): + # Check topic + assert producer_call[0][0] == ArroyoTopic( + get_topic_definition(expected_topic)["real_topic_name"] + ) + + # Check message ID + expected_message_id = UUID( + sub.subscription_id if isinstance(sub, UptimeSubscription) else sub + ).bytes + assert producer_call[0][1].key == expected_message_id + + # Check payload + expected_payload = ( + UPTIME_CONFIGS_CODEC.encode( + uptime_subscription_to_check_config(sub, str(sub.subscription_id)) ) + if isinstance(sub, UptimeSubscription) + else None + ) + assert producer_call[0][1].value == expected_payload - # Check message ID - expected_message_id = UUID( - arg.subscription_id if isinstance(arg, UptimeSubscription) else arg - ).bytes - assert producer_call[0][1].key == expected_message_id - - # Check payload - expected_payload = ( - UPTIME_CONFIGS_CODEC.encode( - uptime_subscription_to_check_config(arg, str(arg.subscription_id)) - ) - if isinstance(arg, UptimeSubscription) - else None + def assert_redis_config(self, region: str, sub: UptimeSubscription | str): + region_config = get_region_config(region) + assert region_config is not None + cluster: RedisCluster | StrictRedis = redis.redis_clusters.get_binary( + region_config.config_redis_cluster + ) + if isinstance(sub, UptimeSubscription): + action = "upsert" + subscription_id = sub.subscription_id + else: + action = "delete" + subscription_id = sub + assert subscription_id is not None + config_key, update_key = get_partition_keys(UUID(subscription_id)) + if isinstance(sub, UptimeSubscription): + config_bytes = cluster.hget(config_key, subscription_id) + assert config_bytes is not None + assert msgpack.unpackb(config_bytes) == uptime_subscription_to_check_config( + sub, subscription_id ) - assert producer_call[0][1].value == expected_payload + else: + assert not cluster.hexists(config_key, subscription_id) + + update_bytes = cluster.hget(update_key, subscription_id) + assert update_bytes is not None + assert msgpack.unpackb(update_bytes) == { + "action": action, + "subscription_id": subscription_id, + } -class BaseUptimeSubscriptionTaskTest(ProducerTestMixin, metaclass=abc.ABCMeta): +class BaseUptimeSubscriptionTaskTest(ConfigPusherTestMixin, metaclass=abc.ABCMeta): __test__ = Abstract(__module__, __qualname__) status_translations = { @@ -109,7 +151,7 @@ def test_no_subscription(self): ), sample_rate=1.0, ) - self.assert_producer_calls() + self.assert_config_calls() def test_invalid_status(self): sub = self.create_subscription(UptimeSubscription.Status.ACTIVE) @@ -120,7 +162,7 @@ def test_invalid_status(self): ), sample_rate=1.0, ) - self.assert_producer_calls() + self.assert_config_calls() class CreateUptimeSubscriptionTaskTest(BaseUptimeSubscriptionTaskTest): @@ -133,7 +175,7 @@ def test(self): sub.refresh_from_db() assert sub.status == UptimeSubscription.Status.ACTIVE.value assert sub.subscription_id is not None - self.assert_producer_calls((sub, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub, Topic.UPTIME_CONFIGS)) def test_with_regions(self): sub = self.create_uptime_subscription( @@ -143,7 +185,7 @@ def test_with_regions(self): sub.refresh_from_db() assert sub.status == UptimeSubscription.Status.ACTIVE.value assert sub.subscription_id is not None - self.assert_producer_calls((sub, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub, Topic.UPTIME_CONFIGS)) def test_without_regions_uses_default(self): sub = self.create_uptime_subscription(status=UptimeSubscription.Status.CREATING) @@ -151,7 +193,7 @@ def test_without_regions_uses_default(self): sub.refresh_from_db() assert sub.status == UptimeSubscription.Status.ACTIVE.value assert sub.subscription_id is not None - self.assert_producer_calls((sub, get_active_region_configs()[0].config_topic)) + self.assert_config_calls((sub, get_active_region_configs()[0].config_topic)) def test_multi_overlapping_regions(self): regions = [ @@ -159,18 +201,21 @@ def test_multi_overlapping_regions(self): slug="region1", name="Region 1", config_topic=Topic.UPTIME_CONFIGS, + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region2", name="Region 2", config_topic=Topic.UPTIME_RESULTS, # Using a different topic + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), UptimeRegionConfig( slug="region3", name="Region 3", config_topic=Topic.MONITORS_CLOCK_TASKS, # Another different topic + config_redis_cluster=settings.SENTRY_UPTIME_DETECTOR_CLUSTER, enabled=True, ), ] @@ -190,7 +235,7 @@ def test_multi_overlapping_regions(self): sub2.refresh_from_db() # Verify that each subscription was sent to the correct topics for its regions - self.assert_producer_calls( + self.assert_config_calls( (sub1, Topic.UPTIME_CONFIGS), (sub1, Topic.UPTIME_RESULTS), (sub2, Topic.UPTIME_RESULTS), @@ -209,14 +254,14 @@ def test(self): ) delete_remote_uptime_subscription(sub.id) assert not UptimeSubscription.objects.filter(id=sub.id).exists() - self.assert_producer_calls((subscription_id, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((subscription_id, Topic.UPTIME_CONFIGS)) def test_no_subscription_id(self): sub = self.create_subscription(UptimeSubscription.Status.DELETING) assert sub.subscription_id is None delete_remote_uptime_subscription(sub.id) assert not UptimeSubscription.objects.filter(id=sub.id).exists() - self.assert_producer_calls() + self.assert_config_calls() def test_delete_with_regions(self): sub = self.create_uptime_subscription( @@ -226,7 +271,7 @@ def test_delete_with_regions(self): ) delete_remote_uptime_subscription(sub.id) assert sub.subscription_id is not None - self.assert_producer_calls((sub.subscription_id, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub.subscription_id, Topic.UPTIME_CONFIGS)) with pytest.raises(UptimeSubscription.DoesNotExist): sub.refresh_from_db() @@ -236,7 +281,7 @@ def test_delete_without_regions_uses_default(self): ) delete_remote_uptime_subscription(sub.id) assert sub.subscription_id is not None - self.assert_producer_calls((sub.subscription_id, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub.subscription_id, Topic.UPTIME_CONFIGS)) with pytest.raises(UptimeSubscription.DoesNotExist): sub.refresh_from_db() @@ -319,12 +364,12 @@ def test_no_regions(self): } -class SendUptimeConfigDeletionTest(ProducerTestMixin): +class SendUptimeConfigDeletionTest(ConfigPusherTestMixin): def test_with_region(self): subscription_id = uuid4().hex region_slug = "default" send_uptime_config_deletion(region_slug, subscription_id) - self.assert_producer_calls((subscription_id, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((subscription_id, Topic.UPTIME_CONFIGS)) class SubscriptionCheckerTest(UptimeTestCase): @@ -373,7 +418,7 @@ def test_update(self): assert sub.status == UptimeSubscription.Status.ACTIVE.value # Verify config was sent to the region - self.assert_producer_calls((sub, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub, Topic.UPTIME_CONFIGS)) def test_without_regions_uses_default(self): sub = self.create_uptime_subscription( @@ -387,4 +432,4 @@ def test_without_regions_uses_default(self): assert sub.status == UptimeSubscription.Status.ACTIVE.value # Verify config was sent to default region - self.assert_producer_calls((sub, Topic.UPTIME_CONFIGS)) + self.assert_config_calls((sub, Topic.UPTIME_CONFIGS))