diff --git a/relay-server/src/services/buffer/common.rs b/relay-server/src/services/buffer/common.rs index 924f8aa1c8..2c661039b0 100644 --- a/relay-server/src/services/buffer/common.rs +++ b/relay-server/src/services/buffer/common.rs @@ -3,24 +3,39 @@ use relay_base_schema::project::ProjectKey; use crate::Envelope; /// Struct that represents two project keys. -#[derive(Debug, Clone, Copy, Eq, Hash, Ord, PartialOrd, PartialEq)] +#[derive(Debug, Clone, Copy)] pub struct ProjectKeyPair { - pub own_key: ProjectKey, - pub sampling_key: ProjectKey, + own_key: ProjectKey, + sampling_key: Option, } impl ProjectKeyPair { pub fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self { Self { own_key, - sampling_key, + sampling_key: Some(sampling_key), } } + pub fn own_key(&self) -> ProjectKey { + self.own_key + } + + pub fn sampling_key(&self) -> Option { + self.sampling_key + } + + pub fn sampling_key_unwrap(&self) -> ProjectKey { + self.sampling_key.unwrap_or(self.own_key) + } + pub fn from_envelope(envelope: &Envelope) -> Self { let own_key = envelope.meta().public_key(); - let sampling_key = envelope.sampling_key().unwrap_or(own_key); - Self::new(own_key, sampling_key) + let sampling_key = envelope.sampling_key(); + Self { + own_key, + sampling_key, + } } pub fn iter(&self) -> impl Iterator { @@ -28,6 +43,114 @@ impl ProjectKeyPair { own_key, sampling_key, } = self; - std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key)) + std::iter::once(*own_key).chain(sampling_key.filter(|k| k != own_key)) + } +} + +impl PartialEq for ProjectKeyPair { + fn eq(&self, other: &Self) -> bool { + self.own_key() == other.own_key() + && self.sampling_key_unwrap() == other.sampling_key_unwrap() + } +} + +impl Eq for ProjectKeyPair {} + +impl PartialOrd for ProjectKeyPair { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ProjectKeyPair { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + let own_comparison = self.own_key().cmp(&other.own_key()); + if own_comparison != std::cmp::Ordering::Equal { + return own_comparison; + }; + self.sampling_key_unwrap().cmp(&other.sampling_key_unwrap()) + } +} + +impl std::hash::Hash for ProjectKeyPair { + fn hash(&self, state: &mut H) { + self.own_key().hash(state); + self.sampling_key_unwrap().hash(state); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; + + #[test] + fn test_project_key_pair_new() { + let own = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let sampling = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + let pair = ProjectKeyPair::new(own, sampling); + assert_eq!(pair.own_key(), own); + assert_eq!(pair.sampling_key_unwrap(), sampling); + } + + #[test] + fn test_project_key_pair_equality() { + let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + let pair1 = ProjectKeyPair::new(key1, key2); + let pair2 = ProjectKeyPair::new(key1, key2); + let pair3 = ProjectKeyPair::new(key2, key1); + + assert_eq!(pair1, pair2); + assert_ne!(pair1, pair3); + } + + #[test] + fn test_project_key_pair_ordering() { + let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + let pair1 = ProjectKeyPair::new(key1, key2); + let pair2 = ProjectKeyPair::new(key2, key1); + let pair3 = ProjectKeyPair { + own_key: key1, + sampling_key: None, + }; + + assert!(pair1 < pair2); + assert!(pair3 < pair2); + } + + #[test] + fn test_project_key_pair_hash() { + let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + let pair1 = ProjectKeyPair::new(key1, key2); + let pair2 = ProjectKeyPair::new(key1, key2); + let pair3 = ProjectKeyPair::new(key2, key1); + + let mut set = HashSet::new(); + set.insert(pair1); + assert!(set.contains(&pair2)); + assert!(!set.contains(&pair3)); + } + + #[test] + fn test_project_key_pair_iter() { + let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + + // Test with different sampling key + let pair = ProjectKeyPair::new(key1, key2); + let keys: Vec<_> = pair.iter().collect(); + assert_eq!(keys, vec![key1, key2]); + + // Test with same key (should only yield one key) + let pair = ProjectKeyPair::new(key1, key1); + let keys: Vec<_> = pair.iter().collect(); + assert_eq!(keys, vec![key1]); } } diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index b7591f743b..97cad7c536 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -403,11 +403,11 @@ where let mut found = false; for (subkey, readiness) in [ ( - project_key_pair.own_key, + project_key_pair.own_key(), &mut stack.readiness.own_project_ready, ), ( - project_key_pair.sampling_key, + project_key_pair.sampling_key_unwrap(), &mut stack.readiness.sampling_project_ready, ), ] { diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index f5c2f37d40..87ade07c3d 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -336,10 +336,8 @@ impl EnvelopeBufferService { if Instant::now() >= next_project_fetch { relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); - let ProjectKeyPair { - own_key, - sampling_key, - } = project_key_pair; + let own_key = project_key_pair.own_key(); + let sampling_key = project_key_pair.sampling_key_unwrap(); services.project_cache_handle.fetch(own_key); if sampling_key != own_key { @@ -417,32 +415,33 @@ impl EnvelopeBufferService { buffer: &mut PolymorphicEnvelopeBuffer, project_key_pair: ProjectKeyPair, ) -> Result<(), EnvelopeBufferError> { - let own_project = services.project_cache_handle.get(project_key_pair.own_key); - let sampling_project = services - .project_cache_handle - .get(project_key_pair.sampling_key); - let has_different_keys = project_key_pair.own_key != project_key_pair.sampling_key; - - // If at least one project is pending, we can't forward the envelope, so we will - // mark the respective project as not ready and schedule a project fetch. let mut at_least_one_pending = false; + + let own_key = project_key_pair.own_key(); + let own_project = services.project_cache_handle.get(own_key); if let ProjectState::Pending = own_project.state() { - buffer.mark_ready(&project_key_pair.own_key, false); - services - .project_cache_handle - .fetch(project_key_pair.own_key); + buffer.mark_ready(&own_key, false); + services.project_cache_handle.fetch(own_key); at_least_one_pending = true; } - if let ProjectState::Pending = sampling_project.state() { - if has_different_keys { - buffer.mark_ready(&project_key_pair.sampling_key, false); - services - .project_cache_handle - .fetch(project_key_pair.sampling_key); + + let mut sampling_project = None; + if let Some(sampling_key) = project_key_pair.sampling_key() { + let inner_sampling_project = services.project_cache_handle.get(sampling_key); + if let ProjectState::Pending = inner_sampling_project.state() { + // If the sampling keys are identical, no need to perform duplicate work. + if own_key != sampling_key { + buffer.mark_ready(&sampling_key, false); + services.project_cache_handle.fetch(sampling_key); + } + at_least_one_pending = true; } - at_least_one_pending = true; + + sampling_project = Some(inner_sampling_project); } + // If we have at least one project which was pending, we don't want to pop the envelope and + // early return. if at_least_one_pending { return Ok(()); } @@ -472,25 +471,26 @@ impl EnvelopeBufferService { }; // If we have different project keys, we want to extract the sampling project info. - let sampling_project_info = if has_different_keys { - match sampling_project.state() { - ProjectState::Enabled(sampling_project_info) => { - // Only set if it matches the organization id. Otherwise, treat as if there is - // no sampling project. - (sampling_project_info.organization_id == own_project_info.organization_id) - .then_some(sampling_project_info) - } - ProjectState::Pending => { - unreachable!("The sampling project should not be pending after pop"); - } - _ => { - // In any other case, we treat it as if there is no sampling project state. - None + let sampling_project_info = sampling_project + .as_ref() + .map(|project| project.state()) + .and_then(|state| { + match state { + ProjectState::Enabled(sampling_project_info) => { + // Only set if it matches the organization id. Otherwise, treat as if there is + // no sampling project. + (sampling_project_info.organization_id == own_project_info.organization_id) + .then_some(sampling_project_info) + } + ProjectState::Pending => { + unreachable!("The sampling project should not be pending after pop"); + } + _ => { + // In any other case, we treat it as if there is no sampling project state. + None + } } - } - } else { - None - }; + }); for (group, envelope) in ProcessingGroup::split_envelope(*envelope) { let managed_envelope = ManagedEnvelope::new( @@ -607,6 +607,7 @@ impl Service for EnvelopeBufferService { }, _ => {} }; + relay_statsd::metric!(counter(RelayCounters::BufferProjectChangedEvent) += 1); sleep = Duration::ZERO; }); } diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 51a548d6de..25abb1454c 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -65,8 +65,8 @@ impl StackProvider for SqliteStackProvider { self.partition_id, self.envelope_store.clone(), self.batch_size_bytes, - project_key_pair.own_key, - project_key_pair.sampling_key, + project_key_pair.own_key(), + project_key_pair.sampling_key_unwrap(), // We want to check the disk by default if we are creating the stack for the first time, // since we might have some data on disk. // On the other hand, if we are recreating a stack, it means that we popped it because diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 4d622edc44..77fab27564 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -182,9 +182,6 @@ pub enum RelayHistograms { /// This metric is tagged with: /// - `storage_type`: The type of storage used in the envelope buffer. BufferEnvelopesCount, - /// Number of envelopes in the backpressure buffer between the envelope buffer - /// and the project cache. - BufferBackpressureEnvelopesCount, /// The amount of bytes in the item payloads of an envelope pushed to the envelope buffer. /// /// This is not quite the same as the actual size of a serialized envelope, because it ignores @@ -297,9 +294,6 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::BatchesPerPartition => "metrics.buckets.batches_per_partition", RelayHistograms::BucketsPerBatch => "metrics.buckets.per_batch", RelayHistograms::BufferEnvelopesCount => "buffer.envelopes_count", - RelayHistograms::BufferBackpressureEnvelopesCount => { - "buffer.backpressure_envelopes_count" - } RelayHistograms::BufferEnvelopeBodySize => "buffer.envelope_body_size", RelayHistograms::BufferEnvelopeSize => "buffer.envelope_size", RelayHistograms::BufferEnvelopeSizeCompressed => "buffer.envelope_size.compressed", @@ -469,14 +463,6 @@ pub enum RelayTimers { /// This metric is tagged with: /// - `task`: The type of the task the project cache does. ProjectCacheTaskDuration, - /// Timing in milliseconds for processing a task in the legacy project cache service. - /// - /// A task is a unit of work the service does. Each branch of the - /// `tokio::select` is a different task type. - /// - /// This metric is tagged with: - /// - `task`: The type of the task the project cache does. - LegacyProjectCacheTaskDuration, /// Timing in milliseconds for handling and responding to a health check request. /// /// This metric is tagged with: @@ -577,7 +563,6 @@ impl TimerMetric for RelayTimers { RelayTimers::GlobalConfigRequestDuration => "global_config.requests.duration", RelayTimers::ProcessMessageDuration => "processor.message.duration", RelayTimers::ProjectCacheTaskDuration => "project_cache.task.duration", - RelayTimers::LegacyProjectCacheTaskDuration => "legacy_project_cache.task.duration", RelayTimers::HealthCheckDuration => "health.message.duration", #[cfg(feature = "processing")] RelayTimers::RateLimitBucketsDuration => "processor.rate_limit_buckets", @@ -637,17 +622,14 @@ pub enum RelayCounters { EnvelopeItemBytes, /// Number of transactions with attachments seen in the request handler. TransactionsWithAttachments, - /// Number of envelopes that were returned to the envelope buffer by the project cache. - /// - /// This happens when the envelope buffer falsely assumes that the envelope's projects are loaded - /// in the cache and sends the envelope onward, even though the project cache cannot handle it. - BufferEnvelopesReturned, /// Number of times an envelope from the buffer is trying to be popped. BufferTryPop, /// Number of envelopes spool to disk. BufferSpooledEnvelopes, /// Number of envelopes unspooled from disk. BufferUnspooledEnvelopes, + /// Number of project changed updates received by the buffer. + BufferProjectChangedEvent, /// /// Number of outcomes and reasons for rejected Envelopes. /// @@ -851,10 +833,10 @@ impl CounterMetric for RelayCounters { RelayCounters::EnvelopeItems => "event.items", RelayCounters::TransactionsWithAttachments => "transactions_with_attachments", RelayCounters::EnvelopeItemBytes => "event.item_bytes", - RelayCounters::BufferEnvelopesReturned => "buffer.envelopes_returned", RelayCounters::BufferTryPop => "buffer.try_pop", RelayCounters::BufferSpooledEnvelopes => "buffer.spooled_envelopes", RelayCounters::BufferUnspooledEnvelopes => "buffer.unspooled_envelopes", + RelayCounters::BufferProjectChangedEvent => "buffer_project_changed_event", RelayCounters::Outcomes => "events.outcomes", RelayCounters::ProjectStateRequest => "project_state.request", #[cfg(feature = "processing")]