Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(processor): Remove managed envelope from the state #4398

Merged
merged 17 commits into from
Dec 17, 2024
249 changes: 150 additions & 99 deletions relay-server/src/services/processor.rs

Large diffs are not rendered by default.

17 changes: 10 additions & 7 deletions relay-server/src/services/processor/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ use relay_pii::PiiAttachmentsProcessor;
use relay_statsd::metric;

use crate::envelope::{AttachmentType, ContentType};
use crate::services::processor::ProcessEnvelopeState;
use crate::statsd::RelayTimers;

use crate::services::projects::project::ProjectInfo;
use crate::utils::TypedEnvelope;
#[cfg(feature = "processing")]
use {
crate::services::processor::ErrorGroup, crate::services::processor::EventFullyNormalized,
crate::utils, relay_event_schema::protocol::Event, relay_protocol::Annotated,
crate::services::processor::{ErrorGroup, EventFullyNormalized, ProcessEnvelopeState},
crate::utils,
relay_event_schema::protocol::Event,
relay_protocol::Annotated,
};

/// Adds processing placeholders for special attachments.
Expand All @@ -26,9 +28,10 @@ use {
/// If the event payload was empty before, it is created.
#[cfg(feature = "processing")]
pub fn create_placeholders(
state: &mut ProcessEnvelopeState<ErrorGroup>,
state: &mut ProcessEnvelopeState,
managed_envelope: &mut TypedEnvelope<ErrorGroup>,
) -> Option<EventFullyNormalized> {
let envelope = state.managed_envelope.envelope();
let envelope = managed_envelope.envelope();
let minidump_attachment =
envelope.get_item_by(|item| item.attachment_type() == Some(&AttachmentType::Minidump));
let apple_crash_report_attachment = envelope
Expand All @@ -54,8 +57,8 @@ pub fn create_placeholders(
/// This only applies the new PII rules that explicitly select `ValueType::Binary` or one of the
/// attachment types. When special attachments are detected, these are scrubbed with custom
/// logic; otherwise the entire attachment is treated as a single binary blob.
pub fn scrub<G>(state: &mut ProcessEnvelopeState<G>, project_info: Arc<ProjectInfo>) {
let envelope = state.managed_envelope.envelope_mut();
pub fn scrub<Group>(managed_envelope: &mut TypedEnvelope<Group>, project_info: Arc<ProjectInfo>) {
let envelope = managed_envelope.envelope_mut();
if let Some(ref config) = project_info.config.pii_config {
let minidump = envelope
.get_item_by_mut(|item| item.attachment_type() == Some(&AttachmentType::Minidump));
Expand Down
117 changes: 70 additions & 47 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::services::processor::{
EventProcessing, ProcessEnvelopeState, Sampling, TransactionGroup,
};
use crate::services::projects::project::ProjectInfo;
use crate::utils::{self, SamplingResult};
use crate::utils::{self, SamplingResult, TypedEnvelope};

/// Ensures there is a valid dynamic sampling context and corresponding project state.
///
Expand All @@ -44,11 +44,12 @@ use crate::utils::{self, SamplingResult};
/// no sampling project information is specified, the project information of the event’s project
/// will be returned.
pub fn validate_and_set_dsc(
state: &mut ProcessEnvelopeState<TransactionGroup>,
state: &mut ProcessEnvelopeState,
managed_envelope: &mut TypedEnvelope<TransactionGroup>,
project_info: Arc<ProjectInfo>,
sampling_project_info: Option<Arc<ProjectInfo>>,
) -> Option<Arc<ProjectInfo>> {
if state.envelope().dsc().is_some() && sampling_project_info.is_some() {
if managed_envelope.envelope().dsc().is_some() && sampling_project_info.is_some() {
return sampling_project_info;
}

Expand All @@ -62,7 +63,7 @@ pub fn validate_and_set_dsc(
};

if let Some(dsc) = utils::dsc_from_event(key_config.public_key, event) {
state.envelope_mut().set_dsc(dsc);
managed_envelope.envelope_mut().set_dsc(dsc);
return Some(project_info.clone());
}

Expand All @@ -71,7 +72,8 @@ pub fn validate_and_set_dsc(

/// Computes the sampling decision on the incoming event
pub fn run<Group>(
state: &mut ProcessEnvelopeState<Group>,
state: &mut ProcessEnvelopeState,
managed_envelope: &mut TypedEnvelope<Group>,
config: Arc<Config>,
project_info: Arc<ProjectInfo>,
sampling_project_info: Option<Arc<ProjectInfo>>,
Expand Down Expand Up @@ -103,14 +105,18 @@ where
sampling_config,
state.event.value(),
root_config,
state.envelope().dsc(),
managed_envelope.envelope().dsc(),
)
}

/// Apply the dynamic sampling decision from `compute_sampling_decision`.
pub fn drop_unsampled_items(state: &mut ProcessEnvelopeState<TransactionGroup>, outcome: Outcome) {
pub fn drop_unsampled_items(
state: &mut ProcessEnvelopeState,
managed_envelope: &mut TypedEnvelope<TransactionGroup>,
outcome: Outcome,
) {
// Remove all items from the envelope which need to be dropped due to dynamic sampling.
let dropped_items = state
let dropped_items = managed_envelope
.envelope_mut()
// Profiles are not dropped by dynamic sampling, they are all forwarded to storage and
// later processed in Sentry and potentially dropped there.
Expand All @@ -123,21 +129,19 @@ pub fn drop_unsampled_items(state: &mut ProcessEnvelopeState<TransactionGroup>,
// but attachments are still emitted as attachments.
let category = category.index_category().unwrap_or(category);

state
.managed_envelope
.track_outcome(outcome.clone(), category, quantity);
managed_envelope.track_outcome(outcome.clone(), category, quantity);
}
}

// Mark all remaining items in the envelope as un-sampled.
for item in state.envelope_mut().items_mut() {
for item in managed_envelope.envelope_mut().items_mut() {
item.set_sampled(false);
}

// All items have been dropped, now make sure the event is also handled and dropped.
if let Some(category) = state.event_category() {
let category = category.index_category().unwrap_or(category);
state.managed_envelope.track_outcome(outcome, category, 1)
managed_envelope.track_outcome(outcome, category, 1)
}
state.remove_event();
}
Expand Down Expand Up @@ -197,15 +201,14 @@ fn compute_sampling_decision(
///
/// This execution of dynamic sampling is technically a "simulation" since we will use the result
/// only for tagging errors and not for actually sampling incoming events.
pub fn tag_error_with_sampling_decision<G: EventProcessing>(
state: &mut ProcessEnvelopeState<G>,
pub fn tag_error_with_sampling_decision<Group: EventProcessing>(
state: &mut ProcessEnvelopeState,
managed_envelope: &mut TypedEnvelope<Group>,
sampling_project_info: Option<Arc<ProjectInfo>>,
config: &Config,
) {
let (Some(dsc), Some(event)) = (
state.managed_envelope.envelope().dsc(),
state.event.value_mut(),
) else {
let (Some(dsc), Some(event)) = (managed_envelope.envelope().dsc(), state.event.value_mut())
else {
return;
};

Expand Down Expand Up @@ -446,38 +449,60 @@ mod tests {
let project_info = Arc::new(project_info);
let envelope = new_envelope(false, "foo");

let state = ProcessEnvelopeState::<TransactionGroup> {
let state = ProcessEnvelopeState {
event: Annotated::from(event),
metrics: Default::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
managed_envelope: ManagedEnvelope::new(
envelope,
outcome_aggregator.clone(),
test_store.clone(),
ProcessingGroup::Transaction,
)
.try_into()
.unwrap(),
};

(state, project_info)
let managed_envelope: TypedEnvelope<TransactionGroup> = ManagedEnvelope::new(
envelope,
outcome_aggregator.clone(),
test_store.clone(),
ProcessingGroup::Transaction,
)
.try_into()
.unwrap();

(state, managed_envelope, project_info)
};

let reservoir = dummy_reservoir();

// None represents no TransactionMetricsConfig, DS will not be run
let (mut state, project_info) = get_state(None);
let sampling_result = run(&mut state, config.clone(), project_info, None, &reservoir);
let (mut state, mut managed_envelope, project_info) = get_state(None);
let sampling_result = run(
&mut state,
&mut managed_envelope,
config.clone(),
project_info,
None,
&reservoir,
);
assert_eq!(sampling_result.decision(), SamplingDecision::Keep);

// Current version is 3, so it won't run DS if it's outdated
let (mut state, project_info) = get_state(Some(2));
let sampling_result = run(&mut state, config.clone(), project_info, None, &reservoir);
let (mut state, mut managed_envelope, project_info) = get_state(Some(2));
let sampling_result = run(
&mut state,
&mut managed_envelope,
config.clone(),
project_info,
None,
&reservoir,
);
assert_eq!(sampling_result.decision(), SamplingDecision::Keep);

// Dynamic sampling is run, as the transaction metrics version is up to date.
let (mut state, project_info) = get_state(Some(3));
let sampling_result = run(&mut state, config.clone(), project_info, None, &reservoir);
let (mut state, mut managed_envelope, project_info) = get_state(Some(3));
let sampling_result = run(
&mut state,
&mut managed_envelope,
config.clone(),
project_info,
None,
&reservoir,
);
assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
}

Expand Down Expand Up @@ -697,9 +722,9 @@ mod tests {
assert_eq!(get_sampling_match(res).sample_rate(), 0.2);
}

fn run_with_reservoir_rule<G>(processing_group: ProcessingGroup) -> SamplingResult
fn run_with_reservoir_rule<Group>(processing_group: ProcessingGroup) -> SamplingResult
where
G: Sampling + TryFrom<ProcessingGroup>,
Group: Sampling + TryFrom<ProcessingGroup>,
{
let project_info = {
let mut info = ProjectInfo::default();
Expand All @@ -715,20 +740,17 @@ mod tests {
);
let envelope = Envelope::parse_bytes(bytes).unwrap();
let config = Arc::new(Config::default());
let mut state = ProcessEnvelopeState::<G> {
let mut state = ProcessEnvelopeState {
event: Annotated::new(Event::default()),
metrics: Default::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
managed_envelope: ManagedEnvelope::new(
envelope,
Addr::dummy(),
Addr::dummy(),
processing_group,
)
.try_into()
.unwrap(),
};

let mut managed_envelope: TypedEnvelope<Group> =
ManagedEnvelope::new(envelope, Addr::dummy(), Addr::dummy(), processing_group)
.try_into()
.unwrap();

let sampling_project_info = {
let mut state = ProjectInfo::default();
state.config.metric_extraction = ErrorBoundary::Ok(MetricExtractionConfig::default());
Expand Down Expand Up @@ -760,8 +782,9 @@ mod tests {
};

let reservoir = dummy_reservoir();
run(
run::<Group>(
&mut state,
&mut managed_envelope,
config,
project_info,
sampling_project_info,
Expand Down
Loading
Loading