Skip to content

Commit

Permalink
feat(replay): separate feedback from attachments in the same envelope (
Browse files Browse the repository at this point in the history
  • Loading branch information
aliu39 authored Apr 25, 2024
1 parent 17e0518 commit 2f3c8ff
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

**Features**:

- Separate the logic for producing UserReportV2 events (user feedback) and handle attachments in the same envelope as feedback. ([#3403](https://github.com/getsentry/relay/pull/3403))
- Use same keys for OTel span attributes and Sentry span data. ([#3457](https://github.com/getsentry/relay/pull/3457))
- Support passing owner when upserting Monitors. ([#3468](https://github.com/getsentry/relay/pull/3468))
- Extract `frames.slow`, `frames.frozen`, and `frames.total` metrics from mobile spans. ([#3473](https://github.com/getsentry/relay/pull/3473))
Expand Down
12 changes: 12 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ pub struct Options {
)]
pub feedback_ingest_topic_rollout_rate: f32,

/// Flag for handling feedback and attachments in the same envelope. Temporary FF for fast-revert
/// (will remove after user feedback GA release).
///
/// Enabling this will also separate the logic for producing feedback, to its own match case in
/// StoreService::store_envelope
#[serde(
rename = "feedback.ingest-inline-attachments",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub feedback_ingest_same_envelope_attachments: bool,

/// Overall sampling of span extraction.
///
/// This number represents the fraction of transactions for which
Expand Down
57 changes: 52 additions & 5 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,19 @@ impl StoreService {
let retention = envelope.retention();
let event_id = envelope.event_id();

let feedback_ingest_same_envelope_attachments = self
.global_config
.current()
.options
.feedback_ingest_same_envelope_attachments;

let event_item = envelope.as_mut().take_item_by(|item| {
matches!(
item.ty(),
ItemType::Event
| ItemType::Transaction
| ItemType::Security
| ItemType::UserReportV2
(item.ty(), feedback_ingest_same_envelope_attachments),
(ItemType::Event, _)
| (ItemType::Transaction, _)
| (ItemType::Security, _)
| (ItemType::UserReportV2, false)
)
});
let client = envelope.meta().client();
Expand Down Expand Up @@ -244,6 +250,17 @@ impl StoreService {
item,
)?;
}
ItemType::UserReportV2 if feedback_ingest_same_envelope_attachments => {
let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
self.produce_user_report_v2(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
scoping.organization_id,
start_time,
item,
remote_addr,
)?;
}
ItemType::Profile => self.produce_profile(
scoping.organization_id,
scoping.project_id,
Expand Down Expand Up @@ -673,6 +690,36 @@ impl StoreService {
self.produce(KafkaTopic::Attachments, message)
}

fn produce_user_report_v2(
&self,
event_id: EventId,
project_id: ProjectId,
organization_id: u64,
start_time: Instant,
item: &Item,
remote_addr: Option<String>,
) -> Result<(), StoreError> {
// check rollout rate option (effectively a FF) to determine whether to produce to new infra
let global_config = self.global_config.current();
let feedback_ingest_topic_rollout_rate =
global_config.options.feedback_ingest_topic_rollout_rate;
let topic = if is_rolled_out(organization_id, feedback_ingest_topic_rollout_rate) {
KafkaTopic::Feedback
} else {
KafkaTopic::Events
};

let message = KafkaMessage::Event(EventKafkaMessage {
project_id,
event_id,
payload: item.payload(),
start_time: UnixTimestamp::from_instant(start_time).as_secs(),
remote_addr,
attachments: vec![],
});
self.produce(topic, message)
}

fn send_metric_message(
&self,
namespace: MetricNamespace,
Expand Down
69 changes: 69 additions & 0 deletions tests/.idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ def transactions_consumer(kafka_consumer):

@pytest.fixture
def attachments_consumer(kafka_consumer):
return lambda: AttachmentsConsumer(*kafka_consumer("attachments"))
return lambda timeout=None: AttachmentsConsumer(
timeout=timeout, *kafka_consumer("attachments")
)


@pytest.fixture
Expand Down
Loading

0 comments on commit 2f3c8ff

Please sign in to comment.