diff --git a/CHANGELOG.md b/CHANGELOG.md index f5a158bfb6..9c7656f79e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ **Features**: +- Separate the logic for producing UserReportV2 events (user feedback) and handle attachments in the same envelope as feedback. ([#3403](https://github.com/getsentry/relay/pull/3403)) - Use same keys for OTel span attributes and Sentry span data. ([#3457](https://github.com/getsentry/relay/pull/3457)) - Support passing owner when upserting Monitors. ([#3468](https://github.com/getsentry/relay/pull/3468)) - Extract `frames.slow`, `frames.frozen`, and `frames.total` metrics from mobile spans. ([#3473](https://github.com/getsentry/relay/pull/3473)) diff --git a/relay-dynamic-config/src/global.rs b/relay-dynamic-config/src/global.rs index da0d840f06..c1fc054e22 100644 --- a/relay-dynamic-config/src/global.rs +++ b/relay-dynamic-config/src/global.rs @@ -167,6 +167,18 @@ pub struct Options { )] pub feedback_ingest_topic_rollout_rate: f32, + /// Flag for handling feedback and attachments in the same envelope. Temporary FF for fast-revert + /// (will remove after user feedback GA release). + /// + /// Enabling this will also separate the logic for producing feedback, to its own match case in + /// StoreService::store_envelope + #[serde( + rename = "feedback.ingest-inline-attachments", + deserialize_with = "default_on_error", + skip_serializing_if = "is_default" + )] + pub feedback_ingest_same_envelope_attachments: bool, + /// Overall sampling of span extraction. /// /// This number represents the fraction of transactions for which diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 4fab6f3f26..54d35f41ee 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -189,13 +189,19 @@ impl StoreService { let retention = envelope.retention(); let event_id = envelope.event_id(); + let feedback_ingest_same_envelope_attachments = self + .global_config + .current() + .options + .feedback_ingest_same_envelope_attachments; + let event_item = envelope.as_mut().take_item_by(|item| { matches!( - item.ty(), - ItemType::Event - | ItemType::Transaction - | ItemType::Security - | ItemType::UserReportV2 + (item.ty(), feedback_ingest_same_envelope_attachments), + (ItemType::Event, _) + | (ItemType::Transaction, _) + | (ItemType::Security, _) + | (ItemType::UserReportV2, false) ) }); let client = envelope.meta().client(); @@ -244,6 +250,17 @@ impl StoreService { item, )?; } + ItemType::UserReportV2 if feedback_ingest_same_envelope_attachments => { + let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string()); + self.produce_user_report_v2( + event_id.ok_or(StoreError::NoEventId)?, + scoping.project_id, + scoping.organization_id, + start_time, + item, + remote_addr, + )?; + } ItemType::Profile => self.produce_profile( scoping.organization_id, scoping.project_id, @@ -673,6 +690,36 @@ impl StoreService { self.produce(KafkaTopic::Attachments, message) } + fn produce_user_report_v2( + &self, + event_id: EventId, + project_id: ProjectId, + organization_id: u64, + start_time: Instant, + item: &Item, + remote_addr: Option, + ) -> Result<(), StoreError> { + // check rollout rate option (effectively a FF) to determine whether to produce to new infra + let global_config = self.global_config.current(); + let feedback_ingest_topic_rollout_rate = + global_config.options.feedback_ingest_topic_rollout_rate; + let topic = if is_rolled_out(organization_id, feedback_ingest_topic_rollout_rate) { + KafkaTopic::Feedback + } else { + KafkaTopic::Events + }; + + let message = KafkaMessage::Event(EventKafkaMessage { + project_id, + event_id, + payload: item.payload(), + start_time: UnixTimestamp::from_instant(start_time).as_secs(), + remote_addr, + attachments: vec![], + }); + self.produce(topic, message) + } + fn send_metric_message( &self, namespace: MetricNamespace, diff --git a/tests/.idea/workspace.xml b/tests/.idea/workspace.xml new file mode 100644 index 0000000000..3b77dea1ba --- /dev/null +++ b/tests/.idea/workspace.xml @@ -0,0 +1,69 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1713824101510 + + + + + + \ No newline at end of file diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index ef45e4dfc2..2d16b1a180 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -277,7 +277,9 @@ def transactions_consumer(kafka_consumer): @pytest.fixture def attachments_consumer(kafka_consumer): - return lambda: AttachmentsConsumer(*kafka_consumer("attachments")) + return lambda timeout=None: AttachmentsConsumer( + timeout=timeout, *kafka_consumer("attachments") + ) @pytest.fixture diff --git a/tests/integration/test_feedback.py b/tests/integration/test_feedback.py index 478dc01bdd..5e17b6715c 100644 --- a/tests/integration/test_feedback.py +++ b/tests/integration/test_feedback.py @@ -1,5 +1,6 @@ import pytest import json +from sentry_sdk.envelope import Envelope, Item, PayloadRef def generate_feedback_sdk_event(): @@ -43,6 +44,51 @@ def generate_feedback_sdk_event(): } +def assert_expected_feedback(parsed_feedback, sent_feedback): + """Assert required fields were returned.""" + assert parsed_feedback["event_id"] + assert parsed_feedback["type"] == sent_feedback["type"] + assert parsed_feedback["dist"] == sent_feedback["dist"] + assert parsed_feedback["platform"] == sent_feedback["platform"] + assert parsed_feedback["environment"] == sent_feedback["environment"] + assert parsed_feedback["release"] == str(sent_feedback["release"]) + assert parsed_feedback["sdk"]["name"] == sent_feedback["sdk"]["name"] + assert parsed_feedback["sdk"]["version"] == sent_feedback["sdk"]["version"] + assert parsed_feedback["user"]["id"] == sent_feedback["user"]["id"] + assert parsed_feedback["user"]["username"] == sent_feedback["user"]["username"] + assert parsed_feedback["user"]["ip_address"] == sent_feedback["user"]["ip_address"] + + assert parsed_feedback["user"]["email"] == "[email]" + assert parsed_feedback["timestamp"] + + # Assert the tags and requests objects were normalized to lists of doubles. + assert parsed_feedback["tags"] == [ + ["transaction", sent_feedback["tags"]["transaction"]] + ] + assert parsed_feedback["request"] == { + "headers": [["User-Agent", sent_feedback["request"]["headers"]["user-Agent"]]] + } + + # Assert contexts object was pulled out. + assert parsed_feedback["contexts"] == { + "browser": {"name": "Safari", "version": "15.5", "type": "browser"}, + "device": {"brand": "Apple", "family": "Mac", "model": "Mac", "type": "device"}, + "os": {"name": "Mac OS X", "version": ">=10.15.7", "type": "os"}, + "replay": { + "replay_id": sent_feedback["contexts"]["replay"]["replay_id"].lower(), + "type": "replay", + }, + "trace": { + "status": "unknown", + "trace_id": sent_feedback["contexts"]["trace"]["trace_id"].lower(), + "span_id": sent_feedback["contexts"]["trace"]["span_id"].lower(), + "type": "trace", + }, + "feedback": sent_feedback["contexts"]["feedback"], + } + + +@pytest.mark.parametrize("use_feedback_ingest_v2", (False, True)) @pytest.mark.parametrize("use_feedback_topic", (False, True)) def test_feedback_event_with_processing( mini_sentry, @@ -50,10 +96,14 @@ def test_feedback_event_with_processing( events_consumer, feedback_consumer, use_feedback_topic, + use_feedback_ingest_v2, ): mini_sentry.add_basic_project_config( 42, extra={"config": {"features": ["organizations:user-feedback-ingest"]}} ) + mini_sentry.set_global_config_option( + "feedback.ingest-inline-attachments", use_feedback_ingest_v2 + ) if use_feedback_topic: mini_sentry.set_global_config_option("feedback.ingest-topic.rollout-rate", 1.0) @@ -72,60 +122,26 @@ def test_feedback_event_with_processing( assert event["type"] == "feedback" parsed_feedback = json.loads(message["payload"]) - # Assert required fields were returned. - assert parsed_feedback["event_id"] - assert parsed_feedback["type"] == feedback["type"] - assert parsed_feedback["dist"] == feedback["dist"] - assert parsed_feedback["platform"] == feedback["platform"] - assert parsed_feedback["environment"] == feedback["environment"] - assert parsed_feedback["release"] == str(feedback["release"]) - assert parsed_feedback["sdk"]["name"] == feedback["sdk"]["name"] - assert parsed_feedback["sdk"]["version"] == feedback["sdk"]["version"] - assert parsed_feedback["user"]["id"] == feedback["user"]["id"] - assert parsed_feedback["user"]["username"] == feedback["user"]["username"] - assert parsed_feedback["user"]["ip_address"] == feedback["user"]["ip_address"] - - assert parsed_feedback["user"]["email"] == "[email]" - assert parsed_feedback["timestamp"] - - # Assert the tags and requests objects were normalized to lists of doubles. - assert parsed_feedback["tags"] == [["transaction", feedback["tags"]["transaction"]]] - assert parsed_feedback["request"] == { - "headers": [["User-Agent", feedback["request"]["headers"]["user-Agent"]]] - } - - # Assert contexts object was pulled out. - assert parsed_feedback["contexts"] == { - "browser": {"name": "Safari", "version": "15.5", "type": "browser"}, - "device": {"brand": "Apple", "family": "Mac", "model": "Mac", "type": "device"}, - "os": {"name": "Mac OS X", "version": ">=10.15.7", "type": "os"}, - "replay": {"replay_id": "e2d42047b1c5431c8cba85ee2a8ab25d", "type": "replay"}, - "trace": { - "status": "unknown", - "trace_id": "4c79f60c11214eb38604f4ae0781bfb2", - "span_id": "fa90fdead5f74052", - "type": "trace", - }, - "feedback": { - "message": "test message", - "contact_email": "test@example.com", - "type": "feedback", - }, - } + # Assert required fields were returned + assert_expected_feedback(parsed_feedback, feedback) # test message wasn't dup'd to the wrong topic other_consumer.assert_empty() +@pytest.mark.parametrize("use_feedback_ingest_v2", (False, True)) @pytest.mark.parametrize("use_feedback_topic", (False, True)) def test_feedback_events_without_processing( - mini_sentry, relay_chain, use_feedback_topic + mini_sentry, relay_chain, use_feedback_topic, use_feedback_ingest_v2 ): project_id = 42 mini_sentry.add_basic_project_config( project_id, extra={"config": {"features": ["organizations:user-feedback-ingest"]}}, ) + mini_sentry.set_global_config_option( + "feedback.ingest-inline-attachments", use_feedback_ingest_v2 + ) mini_sentry.set_global_config_option( "feedback.ingest-topic.rollout-rate", 1.0 if use_feedback_topic else 0.0 ) @@ -139,3 +155,89 @@ def test_feedback_events_without_processing( userfeedback = envelope.items[0] assert userfeedback.type == "feedback" + + +@pytest.mark.parametrize("use_feedback_topic", (False, True)) +def test_feedback_with_attachment_in_same_envelope( + mini_sentry, + relay_with_processing, + feedback_consumer, + events_consumer, + attachments_consumer, + use_feedback_topic, +): + mini_sentry.add_basic_project_config( + 42, extra={"config": {"features": ["organizations:user-feedback-ingest"]}} + ) + mini_sentry.set_global_config_option( + "feedback.ingest-topic.rollout-rate", 1.0 if use_feedback_topic else 0.0 + ) + # Test will only pass with this option set + mini_sentry.set_global_config_option("feedback.ingest-inline-attachments", True) + + if use_feedback_topic: + mini_sentry.set_global_config_option("feedback.ingest-topic.rollout-rate", 1.0) + feedback_consumer_ = feedback_consumer(timeout=20) + other_consumer = events_consumer(timeout=20) + else: + mini_sentry.set_global_config_option("feedback.ingest-topic.rollout-rate", 0.0) + feedback_consumer_ = events_consumer(timeout=20) + other_consumer = feedback_consumer(timeout=20) + attachments_consumer_ = attachments_consumer(timeout=20) + + feedback = generate_feedback_sdk_event() + event_id = feedback["event_id"] + project_id = 42 + + attachment_contents = b"Fake PNG bytes!" + attachment_headers = { + "length": len(attachment_contents), + "filename": "screenshot.png", + "content_type": "application/png", + } + + envelope = Envelope(headers=[["event_id", event_id]]) + envelope.add_item( + Item(PayloadRef(json=feedback), type="feedback"), + ) + envelope.add_item( + Item( + PayloadRef(bytes=attachment_contents), + type="attachment", + headers=attachment_headers, + ) + ) + relay = relay_with_processing() + relay.send_envelope(project_id, envelope) + + # attachment data (relaxed version of test_attachments.py) + received_contents = {} # attachment id -> bytes + while set(received_contents.values()) != {attachment_contents}: + chunk, v = attachments_consumer_.get_attachment_chunk() + received_contents[v["id"]] = received_contents.get(v["id"], b"") + chunk + assert v["event_id"] == event_id + assert v["project_id"] == project_id + assert len(received_contents) == 1 + + # attachment headers + attachment_event = attachments_consumer_.get_individual_attachment() + assert attachment_event["event_id"] == event_id + assert attachment_event["project_id"] == project_id + attachment = attachment_event["attachment"] + assert attachment["name"] == attachment_headers["filename"] + assert attachment["content_type"] == attachment_headers["content_type"] + assert attachment["size"] == attachment_headers["length"] + + # feedback event sent to correct topic + event, message = feedback_consumer_.get_event() + assert event["type"] == "feedback" + + parsed_feedback = json.loads(message["payload"]) + # Assert required fields were returned + assert_expected_feedback(parsed_feedback, feedback) + + # test message wasn't dup'd to the wrong topic + other_consumer.assert_empty() + + # test message wasn't sent to attachments topic + attachments_consumer_.assert_empty()