diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 92c8ee5566..e6a889921b 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -31,7 +31,7 @@ use relay_event_schema::protocol::{ use relay_filter::FilterStatKey; use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace}; use relay_pii::PiiConfigError; -use relay_protocol::Annotated; +use relay_protocol::{Annotated, Empty}; use relay_quotas::{DataCategory, RateLimits, Scoping}; use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision}; use relay_statsd::metric; @@ -735,13 +735,6 @@ fn send_metrics(metrics: ExtractedMetrics, envelope: &Envelope, aggregator: &Add /// A state container for envelope processing. #[derive(Debug)] struct ProcessEnvelopeState { - /// The extracted event payload. - /// - /// For Envelopes without event payloads, this contains `Annotated::empty`. If a single item has - /// `creates_event`, the event is required and the pipeline errors if no payload can be - /// extracted. - event: Annotated, - /// Partial metrics of the Event during construction. /// /// The pipeline stages can add to this metrics objects. In `finalize_event`, the metrics are @@ -755,37 +748,22 @@ struct ProcessEnvelopeState { extracted_metrics: ProcessingExtractedMetrics, } -impl ProcessEnvelopeState { - /// Returns true if there is an event in the processing state. - /// - /// The event was previously removed from the Envelope. This returns false if there was an - /// invalid event item. - fn has_event(&self) -> bool { - self.event.value().is_some() - } - - /// Returns the event type if there is an event. - /// - /// If the event does not have a type, `Some(EventType::Default)` is assumed. If, in contrast, there - /// is no event, `None` is returned. - fn event_type(&self) -> Option { - self.event - .value() - .map(|event| event.ty.value().copied().unwrap_or_default()) - } - - /// Returns the data category if there is an event. - /// - /// The data category is computed from the event type. Both `Default` and `Error` events map to - /// the `Error` data category. If there is no Event, `None` is returned. - fn event_category(&self) -> Option { - self.event_type().map(DataCategory::from) - } +/// Returns the data category if there is an event. +/// +/// The data category is computed from the event type. Both `Default` and `Error` events map to +/// the `Error` data category. If there is no Event, `None` is returned. +fn event_category(event: &Annotated) -> Option { + event_type(event).map(DataCategory::from) +} - /// Removes the event payload from this processing state. - fn remove_event(&mut self) { - self.event = Annotated::empty(); - } +/// Returns the event type if there is an event. +/// +/// If the event does not have a type, `Some(EventType::Default)` is assumed. If, in contrast, there +/// is no event, `None` is returned. +fn event_type(event: &Annotated) -> Option { + event + .value() + .map(|event| event.ty.value().copied().unwrap_or_default()) } /// Function for on-off switches that filter specific item types (profiles, spans) @@ -816,11 +794,11 @@ impl EventFullyNormalized { } /// New type representing whether metrics were extracted from transactions/spans. -#[derive(Copy, Clone)] +#[derive(Debug, Copy, Clone)] struct EventMetricsExtracted(bool); /// New type representing whether spans were extracted. -#[derive(Copy, Clone)] +#[derive(Debug, Copy, Clone)] struct SpansExtracted(bool); /// The view out of the [`ProcessEnvelopeState`] after processing. @@ -1229,44 +1207,47 @@ impl EnvelopeProcessorService { &self, state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: Annotated, project_info: Arc, rate_limits: Arc, - ) -> Result<(), ProcessingError> { + ) -> Result, ProcessingError> { let global_config = self.inner.global_config.current(); let rate_limiter = match self.inner.rate_limiter.as_ref() { Some(rate_limiter) => rate_limiter, - None => return Ok(()), + None => return Ok(event), }; // Cached quotas first, they are quick to evaluate and some quotas (indexed) are not // applied in the fast path, all cached quotas can be applied here. - let _ = RateLimiter::Cached.enforce( + let cached_result = RateLimiter::Cached.enforce( state, managed_envelope, + event, &global_config, project_info.clone(), rate_limits.clone(), )?; // Enforce all quotas consistently with Redis. - let limits = RateLimiter::Consistent(rate_limiter).enforce( + let consistent_result = RateLimiter::Consistent(rate_limiter).enforce( state, managed_envelope, + cached_result.event, &global_config, project_info, rate_limits, )?; // Update cached rate limits with the freshly computed ones. - if !limits.is_empty() { + if !consistent_result.rate_limits.is_empty() { self.inner .project_cache .get(managed_envelope.scoping().project_key) .rate_limits() - .merge(limits); + .merge(consistent_result.rate_limits); } - Ok(()) + Ok(consistent_result.event) } /// Extract transaction metrics. @@ -1275,6 +1256,7 @@ impl EnvelopeProcessorService { &self, state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, project_id: ProjectId, project_info: Arc, sampling_decision: SamplingDecision, @@ -1284,7 +1266,7 @@ impl EnvelopeProcessorService { if event_metrics_extracted.0 { return Ok(event_metrics_extracted); } - let Some(event) = state.event.value_mut() else { + let Some(event) = event.value_mut() else { return Ok(event_metrics_extracted); }; @@ -1389,13 +1371,13 @@ impl EnvelopeProcessorService { fn normalize_event( &self, - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, project_id: ProjectId, project_info: Arc, mut event_fully_normalized: EventFullyNormalized, ) -> Result { - if !state.has_event() { + if event.value().is_empty() { // NOTE(iker): only processing relays create events from // attachments, so these events won't be normalized in // non-processing relays even if the config is set to run full @@ -1432,7 +1414,7 @@ impl EnvelopeProcessorService { .unwrap_or(DEFAULT_EVENT_RETENTION) .into(); - utils::log_transaction_name_metrics(&mut state.event, |event| { + utils::log_transaction_name_metrics(event, |event| { let event_validation_config = EventValidationConfig { received_at: Some(managed_envelope.received_at()), max_secs_in_past: Some(retention_days * 24 * 3600), @@ -1541,60 +1523,66 @@ impl EnvelopeProcessorService { unreal::expand(managed_envelope, &self.inner.config)?; }); - // When extracting the event when processing an error, we expect that the result of this - // function is unused since we don't have error metrics and errors are not correlated to - // extracted spans. - event::extract( + let extraction_result = event::extract( state, managed_envelope, event_fully_normalized, &self.inner.config, )?; + let mut event = extraction_result.event; if_processing!(self.inner.config, { - if let Some(inner_event_fully_normalized) = unreal::process(state, managed_envelope)? { + if let Some(inner_event_fully_normalized) = + unreal::process(managed_envelope, &mut event)? + { event_fully_normalized = inner_event_fully_normalized; } if let Some(inner_event_fully_normalized) = - attachment::create_placeholders(state, managed_envelope) + attachment::create_placeholders(state, managed_envelope, &mut event) { event_fully_normalized = inner_event_fully_normalized; } }); - event::finalize(state, managed_envelope, &self.inner.config)?; + event::finalize(state, managed_envelope, &mut event, &self.inner.config)?; event_fully_normalized = self.normalize_event( - state, managed_envelope, + &mut event, project_id, project_info.clone(), event_fully_normalized, )?; let filter_run = event::filter( - state, managed_envelope, + &mut event, project_info.clone(), &self.inner.global_config.current(), )?; if self.inner.config.processing_enabled() || matches!(filter_run, FiltersStatus::Ok) { dynamic_sampling::tag_error_with_sampling_decision( - state, managed_envelope, + &mut event, sampling_project_info, &self.inner.config, ); } if_processing!(self.inner.config, { - self.enforce_quotas(state, managed_envelope, project_info.clone(), rate_limits)?; + event = self.enforce_quotas( + state, + managed_envelope, + event, + project_info.clone(), + rate_limits, + )?; }); - if state.has_event() { - event::scrub(state, project_info.clone())?; + if event.value().is_some() { + event::scrub(&mut event, project_info.clone())?; event::serialize( - state, managed_envelope, + &mut event, event_fully_normalized, EventMetricsExtracted(false), SpansExtracted(false), @@ -1607,7 +1595,7 @@ impl EnvelopeProcessorService { if self.inner.config.processing_enabled() && !event_fully_normalized.0 { relay_log::error!( tags.project = %project_id, - tags.ty = state.event_type().map(|e| e.to_string()).unwrap_or("none".to_owned()), + tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()), "ingested event without normalizing" ); } @@ -1635,44 +1623,53 @@ impl EnvelopeProcessorService { let global_config = self.inner.global_config.current(); - if let Some((inner_event_metrics_extracted, inner_spans_extracted)) = event::extract( + // We extract the main event from the envelope. + let extraction_result = event::extract( state, managed_envelope, event_fully_normalized, &self.inner.config, - )? { + )?; + + // If metrics were extracted we mark that. + if let Some(inner_event_metrics_extracted) = extraction_result.event_metrics_extracted { event_metrics_extracted = inner_event_metrics_extracted; + } + if let Some(inner_spans_extracted) = extraction_result.spans_extracted { spans_extracted = inner_spans_extracted; }; + // We take the main event out of the result. + let mut event = extraction_result.event; + let profile_id = profile::filter( - state, managed_envelope, + &event, config.clone(), project_id, project_info.clone(), ); - profile::transfer_id(state, profile_id); + profile::transfer_id(&mut event, profile_id); - event::finalize(state, managed_envelope, &self.inner.config)?; + event::finalize(state, managed_envelope, &mut event, &self.inner.config)?; event_fully_normalized = self.normalize_event( - state, managed_envelope, + &mut event, project_id, project_info.clone(), event_fully_normalized, )?; sampling_project_info = dynamic_sampling::validate_and_set_dsc( - state, managed_envelope, + &mut event, project_info.clone(), sampling_project_info.clone(), ); let filter_run = event::filter( - state, managed_envelope, + &mut event, project_info.clone(), &self.inner.global_config.current(), )?; @@ -1689,8 +1686,8 @@ impl EnvelopeProcessorService { let sampling_result = match run_dynamic_sampling { true => dynamic_sampling::run( - state, managed_envelope, + &mut event, config.clone(), project_info.clone(), sampling_project_info, @@ -1709,8 +1706,8 @@ impl EnvelopeProcessorService { // Process profiles before dropping the transaction, if necessary. // Before metric extraction to make sure the profile count is reflected correctly. profile::process( - state, managed_envelope, + &mut event, &global_config, config.clone(), project_info.clone(), @@ -1719,6 +1716,7 @@ impl EnvelopeProcessorService { event_metrics_extracted = self.extract_transaction_metrics( state, managed_envelope, + &mut event, project_id, project_info.clone(), SamplingDecision::Drop, @@ -1726,14 +1724,20 @@ impl EnvelopeProcessorService { spans_extracted, )?; - dynamic_sampling::drop_unsampled_items(state, managed_envelope, outcome); + dynamic_sampling::drop_unsampled_items(managed_envelope, event, outcome); // At this point we have: // - An empty envelope. // - An envelope containing only processed profiles. // We need to make sure there are enough quotas for these profiles. if_processing!(self.inner.config, { - self.enforce_quotas(state, managed_envelope, project_info.clone(), rate_limits)?; + event = self.enforce_quotas( + state, + managed_envelope, + Annotated::empty(), + project_info.clone(), + rate_limits, + )?; }); return Ok(()); @@ -1742,24 +1746,25 @@ impl EnvelopeProcessorService { // Need to scrub the transaction before extracting spans. // // Unconditionally scrub to make sure PII is removed as early as possible. - event::scrub(state, project_info.clone())?; + event::scrub(&mut event, project_info.clone())?; attachment::scrub(managed_envelope, project_info.clone()); if_processing!(self.inner.config, { // Process profiles before extracting metrics, to make sure they are removed if they are invalid. let profile_id = profile::process( - state, managed_envelope, + &mut event, &global_config, config.clone(), project_info.clone(), ); - profile::transfer_id(state, profile_id); + profile::transfer_id(&mut event, profile_id); // Always extract metrics in processing Relays for sampled items. event_metrics_extracted = self.extract_transaction_metrics( state, managed_envelope, + &mut event, project_id, project_info.clone(), SamplingDecision::Keep, @@ -1769,8 +1774,8 @@ impl EnvelopeProcessorService { if project_info.has_feature(Feature::ExtractSpansFromEvent) { spans_extracted = span::extract_from_event( - state, managed_envelope, + &event, &global_config, config, project_info.clone(), @@ -1780,16 +1785,22 @@ impl EnvelopeProcessorService { ); } - self.enforce_quotas(state, managed_envelope, project_info.clone(), rate_limits)?; + event = self.enforce_quotas( + state, + managed_envelope, + event, + project_info.clone(), + rate_limits, + )?; - span::maybe_discard_transaction(state, managed_envelope, project_info); + event = span::maybe_discard_transaction(managed_envelope, event, project_info); }); // Event may have been dropped because of a quota and the envelope can be empty. - if state.has_event() { + if event.value().is_some() { event::serialize( - state, managed_envelope, + &mut event, event_fully_normalized, event_metrics_extracted, spans_extracted, @@ -1799,7 +1810,7 @@ impl EnvelopeProcessorService { if self.inner.config.processing_enabled() && !event_fully_normalized.0 { relay_log::error!( tags.project = %project_id, - tags.ty = state.event_type().map(|e| e.to_string()).unwrap_or("none".to_owned()), + tags.ty = event_type(&event).map(|e| e.to_string()).unwrap_or("none".to_owned()), "ingested event without normalizing" ); }; @@ -1828,7 +1839,7 @@ impl EnvelopeProcessorService { /// Processes standalone items that require an event ID, but do not have an event on the same envelope. fn process_standalone( &self, - state: &mut ProcessEnvelopeState, + #[allow(unused_variables)] state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, config: Arc, project_id: ProjectId, @@ -1836,15 +1847,21 @@ impl EnvelopeProcessorService { #[allow(unused_variables)] rate_limits: Arc, ) -> Result<(), ProcessingError> { profile::filter( - state, managed_envelope, + &Annotated::empty(), config, project_id, project_info.clone(), ); if_processing!(self.inner.config, { - self.enforce_quotas(state, managed_envelope, project_info.clone(), rate_limits)?; + self.enforce_quotas( + state, + managed_envelope, + Annotated::empty(), + project_info.clone(), + rate_limits, + )?; }); report::process_user_reports(managed_envelope); @@ -1867,7 +1884,13 @@ impl EnvelopeProcessorService { &self.inner.config, ); if_processing!(self.inner.config, { - self.enforce_quotas(state, managed_envelope, project_info, rate_limits)?; + self.enforce_quotas( + state, + managed_envelope, + Annotated::empty(), + project_info, + rate_limits, + )?; }); Ok(()) } @@ -1882,7 +1905,13 @@ impl EnvelopeProcessorService { #[allow(unused_variables)] rate_limits: Arc, ) -> Result<(), ProcessingError> { if_processing!(self.inner.config, { - self.enforce_quotas(state, managed_envelope, project_info.clone(), rate_limits)?; + self.enforce_quotas( + state, + managed_envelope, + Annotated::empty(), + project_info.clone(), + rate_limits, + )?; }); report::process_client_reports( @@ -1912,7 +1941,13 @@ impl EnvelopeProcessorService { self.inner.geoip_lookup.as_ref(), )?; if_processing!(self.inner.config, { - self.enforce_quotas(state, managed_envelope, project_info, rate_limits)?; + self.enforce_quotas( + state, + managed_envelope, + Annotated::empty(), + project_info, + rate_limits, + )?; }); Ok(()) } @@ -1927,7 +1962,13 @@ impl EnvelopeProcessorService { #[allow(unused_variables)] rate_limits: Arc, ) -> Result<(), ProcessingError> { if_processing!(self.inner.config, { - self.enforce_quotas(state, managed_envelope, project_info, rate_limits)?; + self.enforce_quotas( + state, + managed_envelope, + Annotated::empty(), + project_info, + rate_limits, + )?; self.normalize_checkins(managed_envelope, project_id); }); Ok(()) @@ -1961,6 +2002,7 @@ impl EnvelopeProcessorService { span::process( state, managed_envelope, + &mut Annotated::empty(), &global_config, config, project_id, @@ -1970,7 +2012,13 @@ impl EnvelopeProcessorService { &reservoir, ); - self.enforce_quotas(state, managed_envelope, project_info, rate_limits)?; + self.enforce_quotas( + state, + managed_envelope, + Annotated::empty(), + project_info, + rate_limits, + )?; }); Ok(()) @@ -2017,7 +2065,6 @@ impl EnvelopeProcessorService { ($fn_name:ident $(, $args:expr)*) => {{ let mut managed_envelope = managed_envelope.try_into()?; let mut state = ProcessEnvelopeState { - event: Annotated::empty(), metrics: Metrics::default(), extracted_metrics: ProcessingExtractedMetrics::new(), }; @@ -3003,6 +3050,24 @@ impl Service for EnvelopeProcessorService { } } +/// Result of the enforcement of rate limiting. +/// +/// If the event is already `None` or it's rate limited, it will be `None` +/// within the [`Annotated`]. +#[cfg(feature = "processing")] +struct EnforcementResult { + event: Annotated, + rate_limits: RateLimits, +} + +#[cfg(feature = "processing")] +impl EnforcementResult { + /// Creates a new [`EnforcementResult`]. + pub fn new(event: Annotated, rate_limits: RateLimits) -> Self { + Self { event, rate_limits } + } +} + #[cfg(feature = "processing")] enum RateLimiter<'a> { Cached, @@ -3015,20 +3080,21 @@ impl RateLimiter<'_> { &self, state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: Annotated, global_config: &GlobalConfig, project_info: Arc, rate_limits: Arc, - ) -> Result { - if managed_envelope.envelope().is_empty() && !state.has_event() { - return Ok(RateLimits::default()); + ) -> Result { + if managed_envelope.envelope().is_empty() && event.value().is_none() { + return Ok(EnforcementResult::new(event, RateLimits::default())); } let quotas = CombinedQuotas::new(global_config, project_info.get_quotas()); if quotas.is_empty() { - return Ok(RateLimits::default()); + return Ok(EnforcementResult::new(event, RateLimits::default())); } - let event_category = state.event_category(); + let event_category = event_category(&event); // When invoking the rate limiter, capture if the event item has been rate limited to also // remove it from the processing state eventually. @@ -3047,9 +3113,10 @@ impl RateLimiter<'_> { } let scoping = managed_envelope.scoping(); - let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), { - envelope_limiter.compute(managed_envelope.envelope_mut(), &scoping)? - }); + let (enforcement, rate_limits) = + metric!(timer(RelayTimers::EventProcessingRateLimiting), { + envelope_limiter.compute(managed_envelope.envelope_mut(), &scoping)? + }); let event_active = enforcement.is_event_active(); // Use the same rate limits as used for the envelope on the metrics. @@ -3061,12 +3128,11 @@ impl RateLimiter<'_> { enforcement.apply_with_outcomes(managed_envelope); if event_active { - state.remove_event(); debug_assert!(managed_envelope.envelope().is_empty()); - debug_assert!(!state.has_event()); + return Ok(EnforcementResult::new(Annotated::empty(), rate_limits)); } - Ok(limits) + Ok(EnforcementResult::new(event, rate_limits)) } } diff --git a/relay-server/src/services/processor/attachment.rs b/relay-server/src/services/processor/attachment.rs index 8aab88a32c..7741ad6b2e 100644 --- a/relay-server/src/services/processor/attachment.rs +++ b/relay-server/src/services/processor/attachment.rs @@ -30,6 +30,7 @@ use { pub fn create_placeholders( state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, ) -> Option { let envelope = managed_envelope.envelope(); let minidump_attachment = @@ -38,12 +39,12 @@ pub fn create_placeholders( .get_item_by(|item| item.attachment_type() == Some(&AttachmentType::AppleCrashReport)); if let Some(item) = minidump_attachment { - let event = state.event.get_or_insert_with(Event::default); + let event = event.get_or_insert_with(Event::default); state.metrics.bytes_ingested_event_minidump = Annotated::new(item.len() as u64); utils::process_minidump(event, &item.payload()); return Some(EventFullyNormalized(false)); } else if let Some(item) = apple_crash_report_attachment { - let event = state.event.get_or_insert_with(Event::default); + let event = event.get_or_insert_with(Event::default); state.metrics.bytes_ingested_event_applecrashreport = Annotated::new(item.len() as u64); utils::process_apple_crash_report(event, &item.payload()); return Some(EventFullyNormalized(false)); diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index a42f421566..71bbbf1475 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -13,9 +13,7 @@ use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use crate::envelope::{CountFor, ItemType}; use crate::services::outcome::Outcome; -use crate::services::processor::{ - EventProcessing, ProcessEnvelopeState, Sampling, TransactionGroup, -}; +use crate::services::processor::{event_category, EventProcessing, Sampling, TransactionGroup}; use crate::services::projects::project::ProjectInfo; use crate::utils::{self, SamplingResult, TypedEnvelope}; @@ -44,8 +42,8 @@ use crate::utils::{self, SamplingResult, TypedEnvelope}; /// no sampling project information is specified, the project information of the event’s project /// will be returned. pub fn validate_and_set_dsc( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, project_info: Arc, sampling_project_info: Option>, ) -> Option> { @@ -55,7 +53,7 @@ pub fn validate_and_set_dsc( // The DSC can only be computed if there's a transaction event. Note that `dsc_from_event` // below already checks for the event type. - let Some(event) = state.event.value() else { + let Some(event) = event.value() else { return sampling_project_info; }; let Some(key_config) = project_info.get_public_key_config() else { @@ -72,8 +70,8 @@ pub fn validate_and_set_dsc( /// Computes the sampling decision on the incoming event pub fn run( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, config: Arc, project_info: Arc, sampling_project_info: Option>, @@ -103,7 +101,7 @@ where config.processing_enabled(), reservoir, sampling_config, - state.event.value(), + event.value(), root_config, managed_envelope.envelope().dsc(), ) @@ -111,8 +109,8 @@ where /// Apply the dynamic sampling decision from `compute_sampling_decision`. pub fn drop_unsampled_items( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: Annotated, outcome: Outcome, ) { // Remove all items from the envelope which need to be dropped due to dynamic sampling. @@ -139,11 +137,10 @@ pub fn drop_unsampled_items( } // All items have been dropped, now make sure the event is also handled and dropped. - if let Some(category) = state.event_category() { + if let Some(category) = event_category(&event) { let category = category.index_category().unwrap_or(category); managed_envelope.track_outcome(outcome, category, 1) } - state.remove_event(); } /// Computes the sampling decision on the incoming envelope. @@ -202,13 +199,12 @@ fn compute_sampling_decision( /// This execution of dynamic sampling is technically a "simulation" since we will use the result /// only for tagging errors and not for actually sampling incoming events. pub fn tag_error_with_sampling_decision( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, sampling_project_info: Option>, config: &Config, ) { - let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), state.event.value_mut()) - else { + let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), event.value_mut()) else { return; }; @@ -265,9 +261,7 @@ mod tests { use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; - use crate::services::processor::{ - ProcessEnvelope, ProcessingExtractedMetrics, ProcessingGroup, SpanGroup, - }; + use crate::services::processor::{ProcessEnvelope, ProcessingGroup, SpanGroup}; use crate::services::projects::project::ProjectInfo; use crate::testutils::{ self, create_test_processor, new_envelope, state_with_rule_and_condition, @@ -422,8 +416,7 @@ mod tests { .unwrap(), ); - // Gets a ProcessEnvelopeState, either with or without the metrics_exracted flag toggled. - let get_state = |version: Option| { + let get_test_params = |version: Option| { let event = Event { id: Annotated::new(EventId::new()), ty: Annotated::new(EventType::Transaction), @@ -446,15 +439,7 @@ mod tests { .into(); } - let project_info = Arc::new(project_info); let envelope = new_envelope(false, "foo"); - - let state = ProcessEnvelopeState { - event: Annotated::from(event), - metrics: Default::default(), - extracted_metrics: ProcessingExtractedMetrics::new(), - }; - let managed_envelope: TypedEnvelope = ManagedEnvelope::new( envelope, outcome_aggregator.clone(), @@ -464,16 +449,20 @@ mod tests { .try_into() .unwrap(); - (state, managed_envelope, project_info) + let event = Annotated::from(event); + + let project_info = Arc::new(project_info); + + (managed_envelope, event, project_info) }; let reservoir = dummy_reservoir(); // None represents no TransactionMetricsConfig, DS will not be run - let (mut state, mut managed_envelope, project_info) = get_state(None); + let (mut managed_envelope, mut event, project_info) = get_test_params(None); let sampling_result = run( - &mut state, &mut managed_envelope, + &mut event, config.clone(), project_info, None, @@ -482,10 +471,10 @@ mod tests { assert_eq!(sampling_result.decision(), SamplingDecision::Keep); // Current version is 3, so it won't run DS if it's outdated - let (mut state, mut managed_envelope, project_info) = get_state(Some(2)); + let (mut managed_envelope, mut event, project_info) = get_test_params(Some(2)); let sampling_result = run( - &mut state, &mut managed_envelope, + &mut event, config.clone(), project_info, None, @@ -494,10 +483,10 @@ mod tests { assert_eq!(sampling_result.decision(), SamplingDecision::Keep); // Dynamic sampling is run, as the transaction metrics version is up to date. - let (mut state, mut managed_envelope, project_info) = get_state(Some(3)); + let (mut managed_envelope, mut event, project_info) = get_test_params(Some(3)); let sampling_result = run( - &mut state, &mut managed_envelope, + &mut event, config.clone(), project_info, None, @@ -740,17 +729,14 @@ mod tests { ); let envelope = Envelope::parse_bytes(bytes).unwrap(); let config = Arc::new(Config::default()); - let mut state = ProcessEnvelopeState { - event: Annotated::new(Event::default()), - metrics: Default::default(), - extracted_metrics: ProcessingExtractedMetrics::new(), - }; let mut managed_envelope: TypedEnvelope = ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy(), processing_group) .try_into() .unwrap(); + let mut event = Annotated::new(Event::default()); + let sampling_project_info = { let mut state = ProjectInfo::default(); state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default()); @@ -783,8 +769,8 @@ mod tests { let reservoir = dummy_reservoir(); run::( - &mut state, &mut managed_envelope, + &mut event, config, project_info, sampling_project_info, diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 3631864a46..caf4ddb706 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -24,13 +24,21 @@ use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::services::outcome::Outcome; use crate::services::processor::{ - EventFullyNormalized, EventMetricsExtracted, EventProcessing, ExtractedEvent, - ProcessEnvelopeState, ProcessingError, SpansExtracted, MINIMUM_CLOCK_DRIFT, + event_category, event_type, EventFullyNormalized, EventMetricsExtracted, EventProcessing, + ExtractedEvent, ProcessEnvelopeState, ProcessingError, SpansExtracted, MINIMUM_CLOCK_DRIFT, }; use crate::services::projects::project::ProjectInfo; use crate::statsd::{PlatformTag, RelayCounters, RelayHistograms, RelayTimers}; use crate::utils::{self, ChunkedFormDataAggregator, FormDataIter, TypedEnvelope}; +/// Result of the extraction of the primary event payload from an envelope. +#[derive(Debug)] +pub struct ExtractionResult { + pub event: Annotated, + pub event_metrics_extracted: Option, + pub spans_extracted: Option, +} + /// Extracts the primary event payload from an envelope. /// /// The event is obtained from only one source in the following precedence: @@ -44,7 +52,7 @@ pub fn extract( managed_envelope: &mut TypedEnvelope, event_fully_normalized: EventFullyNormalized, config: &Config, -) -> Result, ProcessingError> { +) -> Result { let envelope = managed_envelope.envelope_mut(); // Remove all items first, and then process them. After this function returns, only @@ -73,8 +81,8 @@ pub fn extract( let skip_normalization = config.processing_enabled() && event_fully_normalized.0; - let mut result = None; - + let mut event_metrics_extracted = None; + let mut spans_extracted = None; let (event, event_len) = if let Some(item) = event_item.or(security_item) { relay_log::trace!("processing json event"); metric!(timer(RelayTimers::EventProcessingDeserialize), { @@ -90,10 +98,10 @@ pub fn extract( }) } else if let Some(item) = transaction_item { relay_log::trace!("processing json transaction"); - result = Some(( - EventMetricsExtracted(item.metrics_extracted()), - SpansExtracted(item.spans_extracted()), - )); + + event_metrics_extracted = Some(EventMetricsExtracted(item.metrics_extracted())); + spans_extracted = Some(SpansExtracted(item.spans_extracted())); + metric!(timer(RelayTimers::EventProcessingDeserialize), { // Transaction items can only contain transaction events. Force the event type to // hint to normalization that we're dealing with a transaction now. @@ -135,20 +143,24 @@ pub fn extract( (Annotated::empty(), 0) }; - state.event = event; state.metrics.bytes_ingested_event = Annotated::new(event_len as u64); - Ok(result) + Ok(ExtractionResult { + event, + event_metrics_extracted, + spans_extracted, + }) } pub fn finalize( state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, config: &Config, ) -> Result<(), ProcessingError> { let envelope = managed_envelope.envelope_mut(); - let event = match state.event.value_mut() { + let inner_event = match event.value_mut() { Some(event) => event, None if !config.processing_enabled() => return Ok(()), None => return Err(ProcessingError::NoEventPayload), @@ -158,7 +170,7 @@ pub fn finalize( static MY_VERSION_STRING: OnceLock = OnceLock::new(); let my_version = MY_VERSION_STRING.get_or_init(|| RelayVersion::current().to_string()); - event + inner_event .ingest_path .get_or_insert_with(Default::default) .push(Annotated::new(RelayInfo { @@ -177,7 +189,7 @@ pub fn finalize( // Ensure that the event id in the payload is consistent with the envelope. If an event // id was ingested, this will already be the case. Otherwise, this will insert a new // event id. To be defensive, we always overwrite to ensure consistency. - event.id = Annotated::new(event_id); + inner_event.id = Annotated::new(event_id); // In processing mode, also write metrics into the event. Most metrics have already been // collected at this state, except for the combined size of all attachments. @@ -194,28 +206,34 @@ pub fn finalize( metrics.bytes_ingested_event_attachment = Annotated::new(attachment_size); } - event._metrics = Annotated::new(metrics); + inner_event._metrics = Annotated::new(metrics); - if event.ty.value() == Some(&EventType::Transaction) { + if inner_event.ty.value() == Some(&EventType::Transaction) { metric!( counter(RelayCounters::EventTransaction) += 1, - source = utils::transaction_source_tag(event), - platform = PlatformTag::from(event.platform.as_str().unwrap_or("other")).name(), - contains_slashes = if event.transaction.as_str().unwrap_or_default().contains('/') { + source = utils::transaction_source_tag(inner_event), + platform = + PlatformTag::from(inner_event.platform.as_str().unwrap_or("other")).name(), + contains_slashes = if inner_event + .transaction + .as_str() + .unwrap_or_default() + .contains('/') + { "true" } else { "false" } ); - let span_count = event.spans.value().map(Vec::len).unwrap_or(0) as u64; + let span_count = inner_event.spans.value().map(Vec::len).unwrap_or(0) as u64; metric!( histogram(RelayHistograms::EventSpans) = span_count, sdk = envelope.meta().client_name().name(), - platform = event.platform.as_str().unwrap_or("other"), + platform = inner_event.platform.as_str().unwrap_or("other"), ); - let has_otel = event + let has_otel = inner_event .contexts .value() .map_or(false, |contexts| contexts.contains::()); @@ -224,14 +242,14 @@ pub fn finalize( metric!( counter(RelayCounters::OpenTelemetryEvent) += 1, sdk = envelope.meta().client_name().name(), - platform = event.platform.as_str().unwrap_or("other"), + platform = inner_event.platform.as_str().unwrap_or("other"), ); } } if let Some(dsc) = envelope.dsc() { if let Ok(Some(value)) = relay_protocol::to_value(dsc) { - event._dsc = Annotated::new(value); + inner_event._dsc = Annotated::new(value); } } } @@ -239,16 +257,16 @@ pub fn finalize( let mut processor = ClockDriftProcessor::new(envelope.sent_at(), managed_envelope.received_at()) .at_least(MINIMUM_CLOCK_DRIFT); - processor::process_value(&mut state.event, &mut processor, ProcessingState::root()) + processor::process_value(event, &mut processor, ProcessingState::root()) .map_err(|_| ProcessingError::InvalidTransaction)?; // Log timestamp delays for all events after clock drift correction. This happens before // store processing, which could modify the timestamp if it exceeds a threshold. We are // interested in the actual delay before this correction. - if let Some(timestamp) = state.event.value().and_then(|e| e.timestamp.value()) { + if let Some(timestamp) = event.value().and_then(|e| e.timestamp.value()) { let event_delay = managed_envelope.received_at() - timestamp.into_inner(); if event_delay > SignedDuration::minutes(1) { - let category = state.event_category().unwrap_or(DataCategory::Unknown); + let category = event_category(event).unwrap_or(DataCategory::Unknown); metric!( timer(RelayTimers::TimestampDelay) = event_delay.to_std().unwrap(), category = category.name(), @@ -281,12 +299,12 @@ pub enum FiltersStatus { } pub fn filter( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, project_info: Arc, global_config: &GlobalConfig, ) -> Result { - let event = match state.event.value_mut() { + let event = match event.value_mut() { Some(event) => event, // Some events are created by processing relays (e.g. unreal), so they do not yet // exist at this point in non-processing relays. @@ -324,10 +342,9 @@ pub fn filter( /// /// This uses both the general `datascrubbing_settings`, as well as the the PII rules. pub fn scrub( - state: &mut ProcessEnvelopeState, + event: &mut Annotated, project_info: Arc, ) -> Result<(), ProcessingError> { - let event = &mut state.event; let config = &project_info.config; if config.datascrubbing_settings.scrub_data { @@ -355,25 +372,22 @@ pub fn scrub( } pub fn serialize( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, event_fully_normalized: EventFullyNormalized, event_metrics_extracted: EventMetricsExtracted, spans_extracted: SpansExtracted, ) -> Result<(), ProcessingError> { - if state.event.is_empty() { + if event.is_empty() { relay_log::error!("Cannot serialize empty event"); return Ok(()); } let data = metric!(timer(RelayTimers::EventProcessingSerialization), { - state - .event - .to_json() - .map_err(ProcessingError::SerializeFailed)? + event.to_json().map_err(ProcessingError::SerializeFailed)? }); - let event_type = state.event_type().unwrap_or_default(); + let event_type = event_type(event).unwrap_or_default(); let mut event_item = Item::new(ItemType::from_event_type(event_type)); event_item.set_payload(ContentType::Json, data); diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index 9e07b7751a..978266f039 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -13,7 +13,7 @@ use relay_protocol::Annotated; use crate::envelope::{ContentType, Item, ItemType}; use crate::services::outcome::{DiscardReason, Outcome}; -use crate::services::processor::{should_filter, ProcessEnvelopeState, TransactionGroup}; +use crate::services::processor::{event_type, should_filter, TransactionGroup}; use crate::services::projects::project::ProjectInfo; use crate::utils::{ItemAction, TypedEnvelope}; @@ -21,14 +21,14 @@ use crate::utils::{ItemAction, TypedEnvelope}; /// /// Returns the profile id of the single remaining profile, if there is one. pub fn filter( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &Annotated, config: Arc, project_id: ProjectId, project_info: Arc, ) -> Option { let profiling_disabled = should_filter(&config, &project_info, Feature::Profiling); - let has_transaction = state.event_type() == Some(EventType::Transaction); + let has_transaction = event_type(event) == Some(EventType::Transaction); let keep_unsampled_profiles = true; let mut profile_id = None; @@ -71,8 +71,8 @@ pub fn filter( /// The profile id may be `None` when the envelope does not contain a profile, /// in that case the profile context is removed. /// Some SDKs send transactions with profile ids but omit the profile in the envelope. -pub fn transfer_id(state: &mut ProcessEnvelopeState, profile_id: Option) { - let Some(event) = state.event.value_mut() else { +pub fn transfer_id(event: &mut Annotated, profile_id: Option) { + let Some(event) = event.value_mut() else { return; }; @@ -96,8 +96,8 @@ pub fn transfer_id(state: &mut ProcessEnvelopeState, profile_id: Option, + event: &mut Annotated, global_config: &GlobalConfig, config: Arc, project_info: Arc, @@ -116,7 +116,7 @@ pub fn process( // There should always be an event/transaction available at this stage. // It is required to expand the profile. If it's missing, drop the item. - let Some(event) = state.event.value() else { + let Some(event) = event.value() else { return ItemAction::DropSilently; }; diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 21c1255f5b..e511e9190f 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -8,8 +8,8 @@ use crate::metrics_extraction::{event, generic}; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::span::extract_transaction_span; use crate::services::processor::{ - dynamic_sampling, EventMetricsExtracted, ProcessEnvelopeState, ProcessingError, SpanGroup, - SpansExtracted, TransactionGroup, + dynamic_sampling, event_type, EventMetricsExtracted, ProcessEnvelopeState, ProcessingError, + SpanGroup, SpansExtracted, TransactionGroup, }; use crate::services::projects::project::ProjectInfo; use crate::utils::{sample, ItemAction, ManagedEnvelope, TypedEnvelope}; @@ -30,7 +30,7 @@ use relay_event_normalization::{ }; use relay_event_schema::processor::{process_value, ProcessingAction, ProcessingState}; use relay_event_schema::protocol::{ - BrowserContext, EventId, IpAddr, Measurement, Measurements, Span, SpanData, + BrowserContext, Event, EventId, IpAddr, Measurement, Measurements, Span, SpanData, }; use relay_log::protocol::{Attachment, AttachmentType}; use relay_metrics::{FractionUnit, MetricNamespace, MetricUnit, UnixTimestamp}; @@ -49,6 +49,7 @@ struct ValidationError(#[from] anyhow::Error); pub fn process( state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, global_config: &GlobalConfig, config: Arc, project_id: ProjectId, @@ -62,8 +63,8 @@ pub fn process( // We only implement trace-based sampling rules for now, which can be computed // once for all spans in the envelope. let sampling_result = dynamic_sampling::run( - state, managed_envelope, + event, config.clone(), project_info.clone(), sampling_project_info, @@ -273,8 +274,8 @@ fn add_sample_rate(measurements: &mut Annotated, name: &str, value #[allow(clippy::too_many_arguments)] pub fn extract_from_event( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &Annotated, global_config: &GlobalConfig, config: Arc, project_info: Arc, @@ -283,7 +284,7 @@ pub fn extract_from_event( spans_extracted: SpansExtracted, ) -> SpansExtracted { // Only extract spans from transactions (not errors). - if state.event_type() != Some(EventType::Transaction) { + if event_type(event) != Some(EventType::Transaction) { return spans_extracted; }; @@ -359,7 +360,7 @@ pub fn extract_from_event( managed_envelope.envelope_mut().add_item(item); }; - let Some(event) = state.event.value() else { + let Some(event) = event.value() else { return spans_extracted; }; @@ -402,16 +403,18 @@ pub fn extract_from_event( /// Removes the transaction in case the project has made the transition to spans-only. pub fn maybe_discard_transaction( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: Annotated, project_info: Arc, -) { - if state.event_type() == Some(EventType::Transaction) +) -> Annotated { + if event_type(&event) == Some(EventType::Transaction) && project_info.has_feature(Feature::DiscardTransaction) { - state.remove_event(); managed_envelope.update(); + return Annotated::empty(); } + + event } /// Config needed to normalize a standalone span. #[derive(Clone, Debug)] @@ -818,15 +821,15 @@ mod tests { use relay_system::Addr; use crate::envelope::Envelope; - use crate::services::processor::{ProcessingExtractedMetrics, ProcessingGroup}; + use crate::services::processor::ProcessingGroup; use crate::services::projects::project::ProjectInfo; use crate::utils::ManagedEnvelope; use super::*; - fn state() -> ( - ProcessEnvelopeState, + fn params() -> ( TypedEnvelope, + Annotated, Arc, ) { let bytes = Bytes::from( @@ -871,15 +874,11 @@ mod tests { ProcessingGroup::Transaction, ); - let state = ProcessEnvelopeState { - event: Annotated::from(event), - metrics: Default::default(), - extracted_metrics: ProcessingExtractedMetrics::new(), - }; - let managed_envelope = managed_envelope.try_into().unwrap(); - (state, managed_envelope, project_info) + let event = Annotated::from(event); + + (managed_envelope, event, project_info) } #[test] @@ -887,10 +886,10 @@ mod tests { let global_config = GlobalConfig::default(); let config = Arc::new(Config::default()); assert!(global_config.options.span_extraction_sample_rate.is_none()); - let (mut state, mut managed_envelope, project_info) = state(); + let (mut managed_envelope, event, project_info) = params(); extract_from_event( - &mut state, &mut managed_envelope, + &event, &global_config, config, project_info, @@ -913,10 +912,10 @@ mod tests { let mut global_config = GlobalConfig::default(); global_config.options.span_extraction_sample_rate = Some(1.0); let config = Arc::new(Config::default()); - let (mut state, mut managed_envelope, project_info) = state(); + let (mut managed_envelope, event, project_info) = params(); extract_from_event( - &mut state, &mut managed_envelope, + &event, &global_config, config, project_info, @@ -939,10 +938,10 @@ mod tests { let mut global_config = GlobalConfig::default(); global_config.options.span_extraction_sample_rate = Some(0.0); let config = Arc::new(Config::default()); - let (mut state, mut managed_envelope, project_info) = state(); + let (mut managed_envelope, event, project_info) = params(); extract_from_event( - &mut state, &mut managed_envelope, + &event, &global_config, config, project_info, @@ -965,10 +964,10 @@ mod tests { let mut global_config = GlobalConfig::default(); global_config.options.span_extraction_sample_rate = Some(1.0); // force enable let config = Arc::new(Config::default()); - let (mut state, mut managed_envelope, project_info) = state(); // client sample rate is 0.2 + let (mut managed_envelope, event, project_info) = params(); // client sample rate is 0.2 extract_from_event( - &mut state, &mut managed_envelope, + &event, &global_config, config, project_info, diff --git a/relay-server/src/services/processor/unreal.rs b/relay-server/src/services/processor/unreal.rs index e9e54e9fd3..ea97e251fa 100644 --- a/relay-server/src/services/processor/unreal.rs +++ b/relay-server/src/services/processor/unreal.rs @@ -2,14 +2,13 @@ //! //! These functions are included only in the processing mode. -use relay_config::Config; - use crate::envelope::ItemType; -use crate::services::processor::{ - ErrorGroup, EventFullyNormalized, ProcessEnvelopeState, ProcessingError, -}; +use crate::services::processor::{ErrorGroup, EventFullyNormalized, ProcessingError}; use crate::utils; use crate::utils::TypedEnvelope; +use relay_config::Config; +use relay_event_schema::protocol::Event; +use relay_protocol::Annotated; /// Expands Unreal 4 items inside an envelope. /// @@ -41,10 +40,10 @@ pub fn expand( /// If the event does not contain an unreal context, this function does not perform any action. /// If there was no event payload prior to this function, it is created. pub fn process( - state: &mut ProcessEnvelopeState, managed_envelope: &mut TypedEnvelope, + event: &mut Annotated, ) -> Result, ProcessingError> { - if utils::process_unreal_envelope(&mut state.event, managed_envelope.envelope_mut()) + if utils::process_unreal_envelope(event, managed_envelope.envelope_mut()) .map_err(ProcessingError::InvalidUnrealReport)? { return Ok(Some(EventFullyNormalized(false)));