From 49ac1315d7a96b372265c901a65634623b3421d9 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 17 Dec 2024 13:13:17 +0100 Subject: [PATCH] more review comments, change some values to u64 --- relay-metrics/src/aggregator/inner.rs | 95 ++++++++++--------- relay-metrics/src/aggregator/mod.rs | 12 +-- .../src/services/metrics/aggregator.rs | 8 +- 3 files changed, 61 insertions(+), 54 deletions(-) diff --git a/relay-metrics/src/aggregator/inner.rs b/relay-metrics/src/aggregator/inner.rs index 65094ff979..04b2d246db 100644 --- a/relay-metrics/src/aggregator/inner.rs +++ b/relay-metrics/src/aggregator/inner.rs @@ -148,7 +148,7 @@ pub struct Config { pub start: UnixTimestamp, /// Size of each individual bucket, inputs are truncated to this value. pub bucket_interval: u32, - /// The amount of time slots to keep track off in the aggregator. + /// The amount of time slots to keep track of in the aggregator. /// /// The size of a time slot is defined by [`Self::bucket_interval`]. pub num_time_slots: u32, @@ -176,7 +176,7 @@ pub struct Config { /// /// The internal time is set on construction to [`Config::start`]. /// -/// Use [`Self::next_flush`] to determine the time when to call [`Self::flush_next`]. +/// Use [`Self::next_flush_at`] to determine the time when to call [`Self::flush_next`]. pub struct Inner { /// Ring-buffer of aggregation slots. /// @@ -196,23 +196,24 @@ pub struct Inner { /// /// The first item in the buffer is tracked by [`Self::head`] which is at any time the /// current partition since the beginning "zero". The beginning in the aggregator refers to the - /// beginning of the epoch. The [`Self::head`] is calculated with `f(x) = x / bucket_interval * num_partitions`. + /// beginning of the epoch. The [`Self::head`] at time `t` is calculated with + /// `f(t) = t / bucket_interval * num_partitions`. /// /// Flushing a partition advances the [`Self::head`] by a single value `+1`. Meaning /// effectively time is advanced by `bucket_interval / num_partitions`. slots: VecDeque, /// The amount of partitions per time slot. - num_partitions: u32, + num_partitions: u64, /// Position of the first element in [`Self::slots`]. head: u64, /// Size of each individual bucket, inputs are truncated modulo to this value. - bucket_interval: u32, + bucket_interval: u64, /// Amount of slots which is added to a bucket as a delay. /// - /// This is a fixed delay which is added to to the time returned by [`Self::next_flush`]. - delay: u32, + /// This is a fixed delay which is added to to the time returned by [`Self::next_flush_at`]. + delay: u64, /// Total stats of the aggregator. stats: stats::Total, @@ -241,7 +242,7 @@ impl Inner { let num_time_slots = config.num_time_slots.max(1) + config.delay.div_ceil(bucket_interval); let num_partitions = config.num_partitions.max(1); - let mut slots = Vec::new(); + let mut slots = Vec::with_capacity((num_time_slots * num_partitions) as usize); for _ in 0..num_time_slots { for partition_key in 0..num_partitions { slots.push(Slot { @@ -251,22 +252,25 @@ impl Inner { } } + let bucket_interval = u64::from(bucket_interval); + let num_partitions = u64::from(num_partitions); + let slot_diff = RelativeRange { max_in_past: config .max_secs_in_past - .map_or(u64::MAX, |v| v.div_ceil(u64::from(bucket_interval))), + .map_or(u64::MAX, |v| v.div_ceil(bucket_interval)), max_in_future: config .max_secs_in_future - .map_or(u64::MAX, |v| v.div_ceil(u64::from(bucket_interval))), + .map_or(u64::MAX, |v| v.div_ceil(bucket_interval)), }; let total_slots = slots.len(); Self { slots: VecDeque::from(slots), num_partitions, - head: config.start.as_secs() / u64::from(bucket_interval) * u64::from(num_partitions), + head: config.start.as_secs() / bucket_interval * num_partitions, bucket_interval, - delay: config.delay, + delay: u64::from(config.delay), stats: stats::Total::default(), limits: stats::Limits { max_total: config.max_total_bucket_bytes.unwrap_or(u64::MAX), @@ -283,7 +287,7 @@ impl Inner { } /// Returns the configured bucket interval. - pub fn bucket_interval(&self) -> u32 { + pub fn bucket_interval(&self) -> u64 { self.bucket_interval } @@ -303,16 +307,17 @@ impl Inner { } /// Returns the time as a duration since epoch when the next flush is supposed to happen. - pub fn next_flush(&self) -> Duration { - // `head + 1` to get the end time of the slot not the start. - (Duration::from_secs(self.head + 1) / self.num_partitions * self.bucket_interval) - + Duration::from_secs(u64::from(self.delay)) + pub fn next_flush_at(&self) -> Duration { + // `head + 1` to get the end time of the slot not the start, convert `head` to a duration + // first, to have enough precision for the division. + // + // Casts do not wrap, configuration requires them to be `u32`. + let offset = Duration::from_secs(self.head + 1) / self.num_partitions as u32 + * self.bucket_interval as u32; + offset + Duration::from_secs(self.delay) } /// Merges a metric bucket. - /// - /// Returns `true` if the bucket already existed in the aggregator - /// and `false` if the bucket did not exist yet. pub fn merge( &mut self, mut key: BucketKey, @@ -321,11 +326,11 @@ impl Inner { let project_key = key.project_key; let namespace = key.metric_name.namespace(); - let time_slot = key.timestamp.as_secs() / u64::from(self.bucket_interval); + let time_slot = key.timestamp.as_secs() / self.bucket_interval; // Make sure the timestamp is normalized to the correct interval as well. - key.timestamp = UnixTimestamp::from_secs(time_slot * u64::from(self.bucket_interval)); + key.timestamp = UnixTimestamp::from_secs(time_slot * self.bucket_interval); - let now_slot = self.head / u64::from(self.num_partitions); + let now_slot = self.head / self.num_partitions; if !self.slot_range.contains(now_slot, time_slot) { return Err(AggregateMetricsError::InvalidTimestamp(key.timestamp)); } @@ -338,10 +343,10 @@ impl Inner { self.hasher .hash_one((key.project_key, &key.metric_name, &key.tags)) } - } % u64::from(self.num_partitions); + } % self.num_partitions; // Calculate the slot of the bucket based on it's time and shift it by its assigned partition. - let slot = time_slot * u64::from(self.num_partitions) + assigned_partition; + let slot = time_slot * self.num_partitions + assigned_partition; // Transform the slot to an offset/index into the ring-buffer, by calculating: // `(slot - self.head).rem_euclid(self.slots.len())`. let index = sub_rem_euclid(slot, self.head, self.slots.len() as u64); @@ -458,10 +463,12 @@ impl fmt::Debug for Inner { let mut list = f.debug_list(); list.entry(&self.stats); for (i, v) in self.slots.iter().enumerate() { - let head_partitions = self.head % u64::from(self.num_partitions); - let time_offset = (head_partitions + i as u64) / u64::from(self.num_partitions); - let head_time = self.head / u64::from(self.num_partitions); - let time = (head_time + time_offset) * u64::from(self.bucket_interval); + let head_partitions = self.head % self.num_partitions; + let head_time = self.head / self.num_partitions; + + let time_offset = (head_partitions + i as u64) / self.num_partitions; + let time = (head_time + time_offset) * self.bucket_interval; + match v.is_empty() { // Make the output shorter with a string until `entry_with` is stable. true => list.entry(&format!("({time}, {v:?})")), @@ -795,7 +802,7 @@ mod tests { buckets.merge(bucket_key(120, "b"), counter(2.0)).unwrap(); // First flush is in 20 seconds + 10 seconds for the end of the partition. - assert_eq!(buckets.next_flush(), Duration::from_secs(90)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(90)); insta::assert_debug_snapshot!(buckets.flush_next(), @r###" Partition { partition_key: 0, @@ -817,17 +824,17 @@ mod tests { assert!(buckets.flush_next().buckets.is_empty()); // We're now at second 100s. - assert_eq!(buckets.next_flush(), Duration::from_secs(110)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(110)); assert!(buckets.flush_next().buckets.is_empty()); assert!(buckets.flush_next().buckets.is_empty()); // We're now at second 120s. - assert_eq!(buckets.next_flush(), Duration::from_secs(130)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(130)); assert!(buckets.flush_next().buckets.is_empty()); assert!(buckets.flush_next().buckets.is_empty()); // We're now at second 140s -> our second bucket is ready (120s + 20s delay). - assert_eq!(buckets.next_flush(), Duration::from_secs(150)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(150)); insta::assert_debug_snapshot!(buckets.flush_next(), @r###" Partition { partition_key: 0, @@ -849,7 +856,7 @@ mod tests { assert!(buckets.flush_next().buckets.is_empty()); // We're now at 160s. - assert_eq!(buckets.next_flush(), Duration::from_secs(170)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(170)); } #[test] @@ -866,14 +873,14 @@ mod tests { start: UnixTimestamp::from_secs(70), }); - assert_eq!(buckets.next_flush(), Duration::from_secs(75)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(75)); assert_eq!(buckets.flush_next().partition_key, 0); - assert_eq!(buckets.next_flush(), Duration::from_secs(80)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(80)); assert_eq!(buckets.flush_next().partition_key, 1); - assert_eq!(buckets.next_flush(), Duration::from_secs(85)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(85)); assert_eq!(buckets.flush_next().partition_key, 0); - assert_eq!(buckets.next_flush(), Duration::from_secs(90)); - assert_eq!(buckets.next_flush(), Duration::from_secs(90)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(90)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(90)); } #[test] @@ -890,14 +897,14 @@ mod tests { start: UnixTimestamp::from_secs(70), }); - assert_eq!(buckets.next_flush(), Duration::from_secs(78)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(78)); assert_eq!(buckets.flush_next().partition_key, 0); - assert_eq!(buckets.next_flush(), Duration::from_secs(83)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(83)); assert_eq!(buckets.flush_next().partition_key, 1); - assert_eq!(buckets.next_flush(), Duration::from_secs(88)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(88)); assert_eq!(buckets.flush_next().partition_key, 0); - assert_eq!(buckets.next_flush(), Duration::from_secs(93)); - assert_eq!(buckets.next_flush(), Duration::from_secs(93)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(93)); + assert_eq!(buckets.next_flush_at(), Duration::from_secs(93)); } #[test] diff --git a/relay-metrics/src/aggregator/mod.rs b/relay-metrics/src/aggregator/mod.rs index 801e750c93..fce6fab1ea 100644 --- a/relay-metrics/src/aggregator/mod.rs +++ b/relay-metrics/src/aggregator/mod.rs @@ -128,7 +128,7 @@ impl Aggregator { /// After a successful flush, retry immediately until an error is returned with the next flush /// time, this makes sure time is eventually synchronized. pub fn try_flush_next(&mut self, now: SystemTime) -> Result { - let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush(); + let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at(); if let Err(err) = now.duration_since(next_flush) { // The flush time is in the future, return the amount of time to wait before the next flush. @@ -148,8 +148,8 @@ impl Aggregator { } /// Returns when the next partition is ready to be flushed using [`Self::try_flush_next`]. - pub fn next_flush(&mut self, now: SystemTime) -> Duration { - let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush(); + pub fn next_flush_at(&mut self, now: SystemTime) -> Duration { + let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at(); match now.duration_since(next_flush) { Ok(_) => Duration::ZERO, @@ -178,7 +178,7 @@ pub struct Partition { /// The partition key. pub partition_key: u32, buckets: HashMap, - bucket_interval: u32, + bucket_interval: u64, } impl IntoIterator for Partition { @@ -196,7 +196,7 @@ impl IntoIterator for Partition { /// Iterator yielded from [`Partition::into_iter`]. pub struct PartitionIter { inner: hashbrown::hash_map::IntoIter, - bucket_interval: u32, + bucket_interval: u64, } impl Iterator for PartitionIter { @@ -209,7 +209,7 @@ impl Iterator for PartitionIter { key.project_key, Bucket { timestamp: key.timestamp, - width: u64::from(self.bucket_interval), + width: self.bucket_interval, name: key.metric_name, tags: key.tags, value: data.value, diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 961f28c4fc..b81b804f91 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -11,7 +11,7 @@ use relay_metrics::aggregator::{self, AggregateMetricsError, AggregatorConfig, P use relay_metrics::Bucket; use relay_quotas::{RateLimits, Scoping}; use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service}; -use tokio::time::Sleep; +use tokio::time::{Instant, Sleep}; use crate::services::projects::cache::ProjectCacheHandle; use crate::services::projects::project::{ProjectInfo, ProjectState}; @@ -157,7 +157,7 @@ impl AggregatorService { self.flush_partition(partition); - self.aggregator.next_flush(SystemTime::now()) + self.aggregator.next_flush_at(SystemTime::now()) } fn flush_partition(&mut self, partition: Partition) { @@ -310,7 +310,7 @@ impl AggregatorService { // Reset the next flush time, to the time of the new aggregator. self.next_flush .as_mut() - .reset(tokio::time::Instant::now() + self.aggregator.next_flush(SystemTime::now())); + .reset(Instant::now() + self.aggregator.next_flush_at(SystemTime::now())); } } @@ -339,7 +339,7 @@ impl Service for AggregatorService { _ = &mut self.next_flush => timed!( "try_flush", { let next = self.try_flush(); - self.next_flush.as_mut().reset(tokio::time::Instant::now() + next); + self.next_flush.as_mut().reset(Instant::now() + next); } ), Some(message) = rx.recv() => timed!(message.variant(), self.handle_message(message)),