Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Dec 23, 2024
1 parent f560eb9 commit 04c0631
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 73 deletions.
137 changes: 130 additions & 7 deletions relay-server/src/services/buffer/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,154 @@ use relay_base_schema::project::ProjectKey;
use crate::Envelope;

/// Struct that represents two project keys.
#[derive(Debug, Clone, Copy, Eq, Hash, Ord, PartialOrd, PartialEq)]
#[derive(Debug, Clone, Copy)]
pub struct ProjectKeyPair {
pub own_key: ProjectKey,
pub sampling_key: ProjectKey,
own_key: ProjectKey,
sampling_key: Option<ProjectKey>,
}

impl ProjectKeyPair {
pub fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self {
Self {
own_key,
sampling_key,
sampling_key: Some(sampling_key),
}
}

pub fn own_key(&self) -> ProjectKey {
self.own_key
}

pub fn sampling_key(&self) -> Option<ProjectKey> {
self.sampling_key
}

pub fn sampling_key_unwrap(&self) -> ProjectKey {
self.sampling_key.unwrap_or(self.own_key)
}

pub fn from_envelope(envelope: &Envelope) -> Self {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
Self::new(own_key, sampling_key)
let sampling_key = envelope.sampling_key();
Self {
own_key,
sampling_key,
}
}

pub fn iter(&self) -> impl Iterator<Item = ProjectKey> {
let Self {
own_key,
sampling_key,
} = self;
std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key))
std::iter::once(*own_key).chain(sampling_key.filter(|k| k != own_key))
}
}

impl PartialEq for ProjectKeyPair {
fn eq(&self, other: &Self) -> bool {
self.own_key() == other.own_key()
&& self.sampling_key_unwrap() == other.sampling_key_unwrap()
}
}

impl Eq for ProjectKeyPair {}

impl PartialOrd for ProjectKeyPair {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ProjectKeyPair {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
let own_comparison = self.own_key().cmp(&other.own_key());
if own_comparison != std::cmp::Ordering::Equal {
return own_comparison;
};
self.sampling_key_unwrap().cmp(&other.sampling_key_unwrap())
}
}

impl std::hash::Hash for ProjectKeyPair {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.own_key().hash(state);
self.sampling_key_unwrap().hash(state);
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;

#[test]
fn test_project_key_pair_new() {
let own = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair = ProjectKeyPair::new(own, sampling);
assert_eq!(pair.own_key(), own);
assert_eq!(pair.sampling_key_unwrap(), sampling);
}

#[test]
fn test_project_key_pair_equality() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair1 = ProjectKeyPair::new(key1, key2);
let pair2 = ProjectKeyPair::new(key1, key2);
let pair3 = ProjectKeyPair::new(key2, key1);

assert_eq!(pair1, pair2);
assert_ne!(pair1, pair3);
}

#[test]
fn test_project_key_pair_ordering() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair1 = ProjectKeyPair::new(key1, key2);
let pair2 = ProjectKeyPair::new(key2, key1);
let pair3 = ProjectKeyPair {
own_key: key1,
sampling_key: None,
};

assert!(pair1 < pair2);
assert!(pair3 < pair2);
}

#[test]
fn test_project_key_pair_hash() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

let pair1 = ProjectKeyPair::new(key1, key2);
let pair2 = ProjectKeyPair::new(key1, key2);
let pair3 = ProjectKeyPair::new(key2, key1);

let mut set = HashSet::new();
set.insert(pair1);
assert!(set.contains(&pair2));
assert!(!set.contains(&pair3));
}

#[test]
fn test_project_key_pair_iter() {
let key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key2 = ProjectKey::parse("b94ae32be2584e0bbd7a4cbb95971fee").unwrap();

// Test with different sampling key
let pair = ProjectKeyPair::new(key1, key2);
let keys: Vec<_> = pair.iter().collect();
assert_eq!(keys, vec![key1, key2]);

// Test with same key (should only yield one key)
let pair = ProjectKeyPair::new(key1, key1);
let keys: Vec<_> = pair.iter().collect();
assert_eq!(keys, vec![key1]);
}
}
4 changes: 2 additions & 2 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,11 @@ where
let mut found = false;
for (subkey, readiness) in [
(
project_key_pair.own_key,
project_key_pair.own_key(),
&mut stack.readiness.own_project_ready,
),
(
project_key_pair.sampling_key,
project_key_pair.sampling_key_unwrap(),
&mut stack.readiness.sampling_project_ready,
),
] {
Expand Down
83 changes: 42 additions & 41 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,8 @@ impl EnvelopeBufferService {
if Instant::now() >= next_project_fetch {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");

let ProjectKeyPair {
own_key,
sampling_key,
} = project_key_pair;
let own_key = project_key_pair.own_key();
let sampling_key = project_key_pair.sampling_key_unwrap();

services.project_cache_handle.fetch(own_key);
if sampling_key != own_key {
Expand Down Expand Up @@ -417,32 +415,33 @@ impl EnvelopeBufferService {
buffer: &mut PolymorphicEnvelopeBuffer,
project_key_pair: ProjectKeyPair,
) -> Result<(), EnvelopeBufferError> {
let own_project = services.project_cache_handle.get(project_key_pair.own_key);
let sampling_project = services
.project_cache_handle
.get(project_key_pair.sampling_key);
let has_different_keys = project_key_pair.own_key != project_key_pair.sampling_key;

// If at least one project is pending, we can't forward the envelope, so we will
// mark the respective project as not ready and schedule a project fetch.
let mut at_least_one_pending = false;

let own_key = project_key_pair.own_key();
let own_project = services.project_cache_handle.get(own_key);
if let ProjectState::Pending = own_project.state() {
buffer.mark_ready(&project_key_pair.own_key, false);
services
.project_cache_handle
.fetch(project_key_pair.own_key);
buffer.mark_ready(&own_key, false);
services.project_cache_handle.fetch(own_key);
at_least_one_pending = true;
}
if let ProjectState::Pending = sampling_project.state() {
if has_different_keys {
buffer.mark_ready(&project_key_pair.sampling_key, false);
services
.project_cache_handle
.fetch(project_key_pair.sampling_key);

let mut sampling_project = None;
if let Some(sampling_key) = project_key_pair.sampling_key() {
let inner_sampling_project = services.project_cache_handle.get(sampling_key);
if let ProjectState::Pending = inner_sampling_project.state() {
// If the sampling keys are identical, no need to perform duplicate work.
if own_key != sampling_key {
buffer.mark_ready(&sampling_key, false);
services.project_cache_handle.fetch(sampling_key);
}
at_least_one_pending = true;
}
at_least_one_pending = true;

sampling_project = Some(inner_sampling_project);
}

// If we have at least one project which was pending, we don't want to pop the envelope and
// early return.
if at_least_one_pending {
return Ok(());
}
Expand Down Expand Up @@ -472,25 +471,26 @@ impl EnvelopeBufferService {
};

// If we have different project keys, we want to extract the sampling project info.
let sampling_project_info = if has_different_keys {
match sampling_project.state() {
ProjectState::Enabled(sampling_project_info) => {
// Only set if it matches the organization id. Otherwise, treat as if there is
// no sampling project.
(sampling_project_info.organization_id == own_project_info.organization_id)
.then_some(sampling_project_info)
}
ProjectState::Pending => {
unreachable!("The sampling project should not be pending after pop");
}
_ => {
// In any other case, we treat it as if there is no sampling project state.
None
let sampling_project_info = sampling_project
.as_ref()
.map(|project| project.state())
.and_then(|state| {
match state {
ProjectState::Enabled(sampling_project_info) => {
// Only set if it matches the organization id. Otherwise, treat as if there is
// no sampling project.
(sampling_project_info.organization_id == own_project_info.organization_id)
.then_some(sampling_project_info)
}
ProjectState::Pending => {
unreachable!("The sampling project should not be pending after pop");
}
_ => {
// In any other case, we treat it as if there is no sampling project state.
None
}
}
}
} else {
None
};
});

for (group, envelope) in ProcessingGroup::split_envelope(*envelope) {
let managed_envelope = ManagedEnvelope::new(
Expand Down Expand Up @@ -607,6 +607,7 @@ impl Service for EnvelopeBufferService {
},
_ => {}
};
relay_statsd::metric!(counter(RelayCounters::BufferProjectChangedEvent) += 1);
sleep = Duration::ZERO;
});
}
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ impl StackProvider for SqliteStackProvider {
self.partition_id,
self.envelope_store.clone(),
self.batch_size_bytes,
project_key_pair.own_key,
project_key_pair.sampling_key,
project_key_pair.own_key(),
project_key_pair.sampling_key_unwrap(),
// We want to check the disk by default if we are creating the stack for the first time,
// since we might have some data on disk.
// On the other hand, if we are recreating a stack, it means that we popped it because
Expand Down
24 changes: 3 additions & 21 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,6 @@ pub enum RelayHistograms {
/// This metric is tagged with:
/// - `storage_type`: The type of storage used in the envelope buffer.
BufferEnvelopesCount,
/// Number of envelopes in the backpressure buffer between the envelope buffer
/// and the project cache.
BufferBackpressureEnvelopesCount,
/// The amount of bytes in the item payloads of an envelope pushed to the envelope buffer.
///
/// This is not quite the same as the actual size of a serialized envelope, because it ignores
Expand Down Expand Up @@ -297,9 +294,6 @@ impl HistogramMetric for RelayHistograms {
RelayHistograms::BatchesPerPartition => "metrics.buckets.batches_per_partition",
RelayHistograms::BucketsPerBatch => "metrics.buckets.per_batch",
RelayHistograms::BufferEnvelopesCount => "buffer.envelopes_count",
RelayHistograms::BufferBackpressureEnvelopesCount => {
"buffer.backpressure_envelopes_count"
}
RelayHistograms::BufferEnvelopeBodySize => "buffer.envelope_body_size",
RelayHistograms::BufferEnvelopeSize => "buffer.envelope_size",
RelayHistograms::BufferEnvelopeSizeCompressed => "buffer.envelope_size.compressed",
Expand Down Expand Up @@ -469,14 +463,6 @@ pub enum RelayTimers {
/// This metric is tagged with:
/// - `task`: The type of the task the project cache does.
ProjectCacheTaskDuration,
/// Timing in milliseconds for processing a task in the legacy project cache service.
///
/// A task is a unit of work the service does. Each branch of the
/// `tokio::select` is a different task type.
///
/// This metric is tagged with:
/// - `task`: The type of the task the project cache does.
LegacyProjectCacheTaskDuration,
/// Timing in milliseconds for handling and responding to a health check request.
///
/// This metric is tagged with:
Expand Down Expand Up @@ -577,7 +563,6 @@ impl TimerMetric for RelayTimers {
RelayTimers::GlobalConfigRequestDuration => "global_config.requests.duration",
RelayTimers::ProcessMessageDuration => "processor.message.duration",
RelayTimers::ProjectCacheTaskDuration => "project_cache.task.duration",
RelayTimers::LegacyProjectCacheTaskDuration => "legacy_project_cache.task.duration",
RelayTimers::HealthCheckDuration => "health.message.duration",
#[cfg(feature = "processing")]
RelayTimers::RateLimitBucketsDuration => "processor.rate_limit_buckets",
Expand Down Expand Up @@ -637,17 +622,14 @@ pub enum RelayCounters {
EnvelopeItemBytes,
/// Number of transactions with attachments seen in the request handler.
TransactionsWithAttachments,
/// Number of envelopes that were returned to the envelope buffer by the project cache.
///
/// This happens when the envelope buffer falsely assumes that the envelope's projects are loaded
/// in the cache and sends the envelope onward, even though the project cache cannot handle it.
BufferEnvelopesReturned,
/// Number of times an envelope from the buffer is trying to be popped.
BufferTryPop,
/// Number of envelopes spool to disk.
BufferSpooledEnvelopes,
/// Number of envelopes unspooled from disk.
BufferUnspooledEnvelopes,
/// Number of project changed updates received by the buffer.
BufferProjectChangedEvent,
///
/// Number of outcomes and reasons for rejected Envelopes.
///
Expand Down Expand Up @@ -851,10 +833,10 @@ impl CounterMetric for RelayCounters {
RelayCounters::EnvelopeItems => "event.items",
RelayCounters::TransactionsWithAttachments => "transactions_with_attachments",
RelayCounters::EnvelopeItemBytes => "event.item_bytes",
RelayCounters::BufferEnvelopesReturned => "buffer.envelopes_returned",
RelayCounters::BufferTryPop => "buffer.try_pop",
RelayCounters::BufferSpooledEnvelopes => "buffer.spooled_envelopes",
RelayCounters::BufferUnspooledEnvelopes => "buffer.unspooled_envelopes",
RelayCounters::BufferProjectChangedEvent => "buffer_project_changed_event",
RelayCounters::Outcomes => "events.outcomes",
RelayCounters::ProjectStateRequest => "project_state.request",
#[cfg(feature = "processing")]
Expand Down

0 comments on commit 04c0631

Please sign in to comment.