From 163635848e5d2dda451f5c5f9d483da9da48da4d Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 18 Dec 2024 20:44:28 +0100 Subject: [PATCH] instr(metrics): Instrument metric delays (#4409) --- relay-metrics/src/lib.rs | 1 + relay-metrics/src/utils.rs | 35 ++++++++++++++++++++++++++++++ relay-server/src/services/store.rs | 32 ++++++++++++++++++++++++--- relay-server/src/statsd.rs | 32 +++++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 3 deletions(-) diff --git a/relay-metrics/src/lib.rs b/relay-metrics/src/lib.rs index a6891087ac..14990934b7 100644 --- a/relay-metrics/src/lib.rs +++ b/relay-metrics/src/lib.rs @@ -81,4 +81,5 @@ mod view; pub use bucket::*; pub use finite::*; pub use protocol::*; +pub use utils::ByNamespace; pub use view::*; diff --git a/relay-metrics/src/utils.rs b/relay-metrics/src/utils.rs index a167c03bd9..f5cebd8499 100644 --- a/relay-metrics/src/utils.rs +++ b/relay-metrics/src/utils.rs @@ -12,17 +12,25 @@ pub fn tags_cost(tags: &BTreeMap) -> usize { tags.iter().map(|(k, v)| k.len() + v.len()).sum() } +/// Utility to store information for each [`MetricNamespace`]. #[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ByNamespace { + /// Value for the [`MetricNamespace::Sessions`] namespace. pub sessions: T, + /// Value for the [`MetricNamespace::Transactions`] namespace. pub transactions: T, + /// Value for the [`MetricNamespace::Spans`] namespace. pub spans: T, + /// Value for the [`MetricNamespace::Custom`] namespace. pub custom: T, + /// Value for the [`MetricNamespace::Stats`] namespace. pub stats: T, + /// Value for the [`MetricNamespace::Unsupported`] namespace. pub unsupported: T, } impl ByNamespace { + /// Returns a reference for the value stored for `namespace`. pub fn get(&self, namespace: MetricNamespace) -> &T { match namespace { MetricNamespace::Sessions => &self.sessions, @@ -34,6 +42,7 @@ impl ByNamespace { } } + /// Returns a mutable reference for the value stored for `namespace`. pub fn get_mut(&mut self, namespace: MetricNamespace) -> &mut T { match namespace { MetricNamespace::Sessions => &mut self.sessions, @@ -46,6 +55,32 @@ impl ByNamespace { } } +impl IntoIterator for ByNamespace { + type Item = (MetricNamespace, T); + type IntoIter = std::array::IntoIter<(MetricNamespace, T), 6>; + + fn into_iter(self) -> Self::IntoIter { + let Self { + sessions, + transactions, + spans, + custom, + stats, + unsupported, + } = self; + + [ + (MetricNamespace::Sessions, sessions), + (MetricNamespace::Transactions, transactions), + (MetricNamespace::Spans, spans), + (MetricNamespace::Custom, custom), + (MetricNamespace::Stats, stats), + (MetricNamespace::Unsupported, unsupported), + ] + .into_iter() + } +} + impl fmt::Debug for ByNamespace { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // A more compact representation. Mainly for snapshot testing. diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 48ec176750..4400d6500e 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -19,8 +19,8 @@ use relay_event_schema::protocol::{EventId, VALID_PLATFORMS}; use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message}; use relay_metrics::{ - Bucket, BucketView, BucketViewValue, BucketsView, FiniteF64, GaugeValue, MetricName, - MetricNamespace, SetView, + Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, FiniteF64, GaugeValue, + MetricName, MetricNamespace, SetView, }; use relay_quotas::Scoping; use relay_statsd::metric; @@ -37,7 +37,7 @@ use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::Processed; -use crate::statsd::{RelayCounters, RelayTimers}; +use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; use crate::utils::{FormDataIter, ThreadPool, TypedEnvelope, WorkerGroup}; /// Fallback name used for attachment items without a `filename` header. @@ -390,9 +390,20 @@ impl StoreService { let global_config = self.global_config.current(); let mut encoder = BucketEncoder::new(&global_config); + let now = UnixTimestamp::now(); + let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default(); + for mut bucket in buckets { let namespace = encoder.prepare(&mut bucket); + if let Some(received_at) = bucket.metadata.received_at { + let delay = now.as_secs().saturating_sub(received_at.as_secs()); + let (total, count, max) = delay_stats.get_mut(namespace); + *total += delay; + *count += 1; + *max = (*max).max(delay); + } + // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce // each bucket separately, we only need to split buckets that exceed the size, but not // batches. @@ -430,6 +441,21 @@ impl StoreService { "failed to produce metric buckets: {error}" ); } + + for (namespace, (total, count, max)) in delay_stats { + metric!( + counter(RelayCounters::MetricDelaySum) += total, + namespace = namespace.as_str() + ); + metric!( + counter(RelayCounters::MetricDelayCount) += count, + namespace = namespace.as_str() + ); + metric!( + gauge(RelayGauges::MetricDelayMax) = max, + namespace = namespace.as_str() + ); + } } fn create_metric_message<'a>( diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 6bf5f7a9d1..ec850de076 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -33,6 +33,15 @@ pub enum RelayGauges { ProjectCacheScheduledFetches, /// Exposes the amount of currently open and handled connections by the server. ServerActiveConnections, + /// Maximum delay of a metric bucket in seconds. + /// + /// The maximum is measured from initial creation of the bucket in an internal Relay + /// until it is produced to Kafka. + /// + /// This metric is tagged with: + /// - `namespace`: the metric namespace. + #[cfg(feature = "processing")] + MetricDelayMax, } impl GaugeMetric for RelayGauges { @@ -52,6 +61,8 @@ impl GaugeMetric for RelayGauges { } RelayGauges::ProjectCacheScheduledFetches => "project_cache.fetches.size", RelayGauges::ServerActiveConnections => "server.http.connections", + #[cfg(feature = "processing")] + RelayGauges::MetricDelayMax => "metrics.buckets.delay.max", } } } @@ -812,6 +823,23 @@ pub enum RelayCounters { ServerSocketAccept, /// Incremented every time the server aborts a connection because of an idle timeout. ServerConnectionIdleTimeout, + /// The total delay of metric buckets in seconds. + /// + /// The delay is measured from initial creation of the bucket in an internal Relay + /// until it is produced to Kafka. + /// + /// Use [`Self::MetricDelayCount`] to calculate the average delay. + /// + /// This metric is tagged with: + /// - `namespace`: the metric namespace. + #[cfg(feature = "processing")] + MetricDelaySum, + /// The amount of buckets counted for the [`Self::MetricDelaySum`] metric. + /// + /// This metric is tagged with: + /// - `namespace`: the metric namespace. + #[cfg(feature = "processing")] + MetricDelayCount, } impl CounterMetric for RelayCounters { @@ -854,6 +882,10 @@ impl CounterMetric for RelayCounters { RelayCounters::ReplayExceededSegmentLimit => "replay.segment_limit_exceeded", RelayCounters::ServerSocketAccept => "server.http.accepted", RelayCounters::ServerConnectionIdleTimeout => "server.http.idle_timeout", + #[cfg(feature = "processing")] + RelayCounters::MetricDelaySum => "metrics.delay.sum", + #[cfg(feature = "processing")] + RelayCounters::MetricDelayCount => "metrics.delay.count", } } }