Skip to content

Commit

Permalink
ref: Remove sample_rates from envelope item headers and processor (#4066
Browse files Browse the repository at this point in the history
)

The `sample_rates` field was originally added to Envelope item headers
to track which sampling rules and rates were applied by SDKs and the
server. This was however never fully implemented, and today Relay relies
on the DSC and trace header to obtain the client sample rate.

This removes the `sample_rates` field without replacement.
  • Loading branch information
jan-auer authored Sep 24, 2024
1 parent b08f077 commit 0643a13
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 126 deletions.
28 changes: 1 addition & 27 deletions relay-event-schema/src/protocol/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,7 @@
use relay_protocol::{Annotated, Array, Empty, FromValue, IntoValue};
use relay_protocol::{Annotated, Empty, FromValue, IntoValue};

use crate::processor::ProcessValue;

#[derive(Clone, Debug, Default, Empty, PartialEq, FromValue, IntoValue)]
pub struct SampleRate {
/// The unique identifier of the sampling rule or mechanism.
///
/// For client-side sampling, this identifies the sampling mechanism:
/// - `client_rate`: Default base sample rate configured in client options. Only reported in
/// the absence of the traces sampler callback.
/// - `client_sampler`: Return value from the traces sampler callback during runtime. Always
/// overrides the `client_rate`.
///
/// For server-side sampling, this identifies the dynamic sampling rule.
id: Annotated<String>,

/// The effective sample rate in the range `(0..1]`.
///
/// While allowed in the protocol, a value of `0` can never occur in practice since such events
/// would never be reported to Sentry and thus never generate this metric.
rate: Annotated<f64>,
}

/// Metrics captured during event ingestion and processing.
///
/// These values are collected in Relay and Sentry and finally persisted into the event payload. A
Expand Down Expand Up @@ -156,12 +136,6 @@ pub struct Metrics {
/// This metric is measured in Sentry and should be reported in all processing tasks.
#[metastructure(field = "flag.processing.fatal")]
pub flag_processing_fatal: Annotated<bool>,

/// A list of cumulative sample rates applied to this event.
///
/// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The
/// effective sample rate is multiplied.
pub sample_rates: Annotated<Array<SampleRate>>,
}

// Do not process Metrics
Expand Down
20 changes: 0 additions & 20 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,6 @@ pub struct ItemHeaders {
#[serde(default, skip)]
source_quantities: Option<SourceQuantities>,

/// A list of cumulative sample rates applied to this event.
///
/// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The
/// effective sample rate is multiplied.
#[serde(default, skip_serializing_if = "Option::is_none")]
sample_rates: Option<Value>,

/// Flag indicating if metrics have already been extracted from the item.
///
/// In order to only extract metrics once from an item while through a
Expand Down Expand Up @@ -645,7 +638,6 @@ impl Item {
rate_limited: false,
replay_combined_payload: false,
source_quantities: None,
sample_rates: None,
other: BTreeMap::new(),
metrics_extracted: false,
spans_extracted: false,
Expand Down Expand Up @@ -800,11 +792,6 @@ impl Item {
self.headers.rate_limited = rate_limited;
}

/// Removes sample rates from the headers, if any.
pub fn take_sample_rates(&mut self) -> Option<Value> {
self.headers.sample_rates.take()
}

/// Returns the contained source quantities.
pub fn source_quantities(&self) -> Option<SourceQuantities> {
self.headers.source_quantities
Expand All @@ -826,13 +813,6 @@ impl Item {
self.headers.replay_combined_payload = combined_payload;
}

/// Sets sample rates for this item.
pub fn set_sample_rates(&mut self, sample_rates: Value) {
if matches!(sample_rates, Value::Array(ref a) if !a.is_empty()) {
self.headers.sample_rates = Some(sample_rates);
}
}

/// Returns the metrics extracted flag.
pub fn metrics_extracted(&self) -> bool {
self.headers.metrics_extracted
Expand Down
9 changes: 1 addition & 8 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use relay_filter::FilterStatKey;
use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricMeta, MetricNamespace};
use relay_pii::PiiConfigError;
use relay_profiling::ProfileId;
use relay_protocol::{Annotated, Value};
use relay_protocol::Annotated;
use relay_quotas::{DataCategory, RateLimits, Scoping};
use relay_sampling::config::RuleId;
use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision};
Expand Down Expand Up @@ -713,12 +713,6 @@ struct ProcessEnvelopeState<'a, Group> {
/// persisted into the Event. All modifications afterwards will have no effect.
metrics: Metrics,

/// A list of cumulative sample rates applied to this event.
///
/// This element is obtained from the event or transaction item and re-serialized into the
/// resulting item.
sample_rates: Option<Value>,

/// Metrics extracted from items in the envelope.
///
/// Relay can extract metrics for sessions and transactions, which is controlled by
Expand Down Expand Up @@ -1295,7 +1289,6 @@ impl EnvelopeProcessorService {
event_metrics_extracted: false,
spans_extracted: false,
metrics: Metrics::default(),
sample_rates: None,
extracted_metrics,
project_state,
config,
Expand Down
2 changes: 0 additions & 2 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ mod tests {
ProcessEnvelopeState::<TransactionGroup> {
event: Annotated::from(event),
metrics: Default::default(),
sample_rates: None,
extracted_metrics: ProcessingExtractedMetrics::new(),
config: config.clone(),
project_state,
Expand Down Expand Up @@ -707,7 +706,6 @@ mod tests {
event_metrics_extracted: false,
spans_extracted: false,
metrics: Default::default(),
sample_rates: Default::default(),
extracted_metrics: ProcessingExtractedMetrics::new(),
config: Arc::new(Config::default()),
project_state: project_info,
Expand Down
35 changes: 5 additions & 30 deletions relay-server/src/services/processor/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use relay_event_schema::protocol::{
OtelContext, RelayInfo, SecurityReportType, Values,
};
use relay_pii::PiiProcessor;
use relay_protocol::{Annotated, Array, Empty, FromValue, Object, Value};
use relay_protocol::{Annotated, Array, Empty, Object, Value};
use relay_quotas::DataCategory;
use relay_statsd::metric;
use serde_json::Value as SerdeValue;
Expand Down Expand Up @@ -70,10 +70,8 @@ pub fn extract<G: EventProcessing>(

let skip_normalization = config.processing_enabled() && event_fully_normalized;

let mut sample_rates = None;
let (event, event_len) = if let Some(mut item) = event_item.or(security_item) {
let (event, event_len) = if let Some(item) = event_item.or(security_item) {
relay_log::trace!("processing json event");
sample_rates = item.take_sample_rates();
metric!(timer(RelayTimers::EventProcessingDeserialize), {
let (mut annotated_event, len) = event_from_json_payload(item, None)?;
// Event items can never include transactions, so retain the event type and let
Expand All @@ -85,9 +83,8 @@ pub fn extract<G: EventProcessing>(
}
(annotated_event, len)
})
} else if let Some(mut item) = transaction_item {
} else if let Some(item) = transaction_item {
relay_log::trace!("processing json transaction");
sample_rates = item.take_sample_rates();
state.event_metrics_extracted = item.metrics_extracted();
state.spans_extracted = item.spans_extracted();
metric!(timer(RelayTimers::EventProcessingDeserialize), {
Expand All @@ -103,9 +100,8 @@ pub fn extract<G: EventProcessing>(
return Err(ProcessingError::NoEventPayload);
}
event_from_json_payload(item, Some(EventType::UserReportV2))?
} else if let Some(mut item) = raw_security_item {
} else if let Some(item) = raw_security_item {
relay_log::trace!("processing security report");
sample_rates = item.take_sample_rates();
event_from_security_report(item, envelope.meta()).map_err(|error| {
if !matches!(error, ProcessingError::UnsupportedSecurityType) {
relay_log::error!(
Expand Down Expand Up @@ -138,7 +134,6 @@ pub fn extract<G: EventProcessing>(
};

state.event = event;
state.sample_rates = sample_rates;
state.metrics.bytes_ingested_event = Annotated::new(event_len as u64);

Ok(())
Expand Down Expand Up @@ -196,18 +191,6 @@ pub fn finalize<G: EventProcessing>(
metrics.bytes_ingested_event_attachment = Annotated::new(attachment_size);
}

let sample_rates = state
.sample_rates
.take()
.and_then(|value| Array::from_value(Annotated::new(value)).into_value());

if let Some(rates) = sample_rates {
metrics
.sample_rates
.get_or_insert_with(Array::new)
.extend(rates)
}

event._metrics = Annotated::new(metrics);

if event.ty.value() == Some(&EventType::Transaction) {
Expand Down Expand Up @@ -386,18 +369,10 @@ pub fn serialize<G: EventProcessing>(
let mut event_item = Item::new(ItemType::from_event_type(event_type));
event_item.set_payload(ContentType::Json, data);

// TODO: The state should simply maintain & update an `ItemHeaders` object.
// If transaction metrics were extracted, set the corresponding item header
event_item.set_metrics_extracted(state.event_metrics_extracted);

// TODO: The state should simply maintain & update an `ItemHeaders` object.
event_item.set_spans_extracted(state.spans_extracted);

// If there are sample rates, write them back to the envelope. In processing mode, sample
// rates have been removed from the state and burnt into the event via `finalize_event`.
if let Some(sample_rates) = state.sample_rates.take() {
event_item.set_sample_rates(sample_rates);
}

event_item.set_fully_normalized(state.event_fully_normalized);

state.envelope_mut().add_item(event_item);
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,6 @@ mod tests {
ProcessEnvelopeState {
event: Annotated::from(event),
metrics: Default::default(),
sample_rates: None,
extracted_metrics: ProcessingExtractedMetrics::new(),
config: Arc::new(Config::default()),
project_state,
Expand Down
38 changes: 0 additions & 38 deletions tests/integration/test_envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,44 +430,6 @@ def test_span_exclusive_time(mini_sentry, relay_with_processing, transactions_co
]


def test_sample_rates(mini_sentry, relay_chain):
relay = relay_chain(min_relay_version="21.1.0")
mini_sentry.add_basic_project_config(42)

sample_rates = [
{"id": "client_sampler", "rate": 0.01},
{"id": "dynamic_user", "rate": 0.5},
]

envelope = Envelope()
envelope.add_event({"message": "hello, world!"})
envelope.items[0].headers["sample_rates"] = sample_rates
relay.send_envelope(42, envelope)

envelope = mini_sentry.captured_events.get(timeout=1)
assert envelope.items[0].headers["sample_rates"] == sample_rates


def test_sample_rates_metrics(mini_sentry, relay_with_processing, events_consumer):
events_consumer = events_consumer()

relay = relay_with_processing()
mini_sentry.add_basic_project_config(42)

sample_rates = [
{"id": "client_sampler", "rate": 0.01},
{"id": "dynamic_user", "rate": 0.5},
]

envelope = Envelope()
envelope.add_event({"message": "hello, world!"})
envelope.items[0].headers["sample_rates"] = sample_rates
relay.send_envelope(42, envelope)

event, _ = events_consumer.get_event()
assert event["_metrics"]["sample_rates"] == sample_rates


def test_buffer_envelopes_without_global_config(
mini_sentry, relay_with_processing, events_consumer
):
Expand Down

0 comments on commit 0643a13

Please sign in to comment.