Skip to content

Commit

Permalink
more review comments, change some values to u64
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Dec 17, 2024
1 parent 3f4c91c commit 15c6dfc
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 52 deletions.
91 changes: 49 additions & 42 deletions relay-metrics/src/aggregator/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub struct Config {
pub start: UnixTimestamp,
/// Size of each individual bucket, inputs are truncated to this value.
pub bucket_interval: u32,
/// The amount of time slots to keep track off in the aggregator.
/// The amount of time slots to keep track of in the aggregator.
///
/// The size of a time slot is defined by [`Self::bucket_interval`].
pub num_time_slots: u32,
Expand Down Expand Up @@ -196,23 +196,24 @@ pub struct Inner {
///
/// The first item in the buffer is tracked by [`Self::head`] which is at any time the
/// current partition since the beginning "zero". The beginning in the aggregator refers to the
/// beginning of the epoch. The [`Self::head`] is calculated with `f(x) = x / bucket_interval * num_partitions`.
/// beginning of the epoch. The [`Self::head`] at time `t` is calculated with
/// `f(t) = t / bucket_interval * num_partitions`.
///
/// Flushing a partition advances the [`Self::head`] by a single value `+1`. Meaning
/// effectively time is advanced by `bucket_interval / num_partitions`.
slots: VecDeque<Slot>,
/// The amount of partitions per time slot.
num_partitions: u32,
num_partitions: u64,

/// Position of the first element in [`Self::slots`].
head: u64,

/// Size of each individual bucket, inputs are truncated modulo to this value.
bucket_interval: u32,
bucket_interval: u64,
/// Amount of slots which is added to a bucket as a delay.
///
/// This is a fixed delay which is added to to the time returned by [`Self::next_flush`].
delay: u32,
delay: u64,

/// Total stats of the aggregator.
stats: stats::Total,
Expand Down Expand Up @@ -241,7 +242,7 @@ impl Inner {
let num_time_slots = config.num_time_slots.max(1) + config.delay.div_ceil(bucket_interval);
let num_partitions = config.num_partitions.max(1);

let mut slots = Vec::new();
let mut slots = Vec::with_capacity((num_time_slots * num_partitions) as usize);
for _ in 0..num_time_slots {
for partition_key in 0..num_partitions {
slots.push(Slot {
Expand All @@ -251,22 +252,25 @@ impl Inner {
}
}

let bucket_interval = u64::from(bucket_interval);
let num_partitions = u64::from(num_partitions);

let slot_diff = RelativeRange {
max_in_past: config
.max_secs_in_past
.map_or(u64::MAX, |v| v.div_ceil(u64::from(bucket_interval))),
.map_or(u64::MAX, |v| v.div_ceil(bucket_interval)),
max_in_future: config
.max_secs_in_future
.map_or(u64::MAX, |v| v.div_ceil(u64::from(bucket_interval))),
.map_or(u64::MAX, |v| v.div_ceil(bucket_interval)),
};

let total_slots = slots.len();
Self {
slots: VecDeque::from(slots),
num_partitions,
head: config.start.as_secs() / u64::from(bucket_interval) * u64::from(num_partitions),
head: config.start.as_secs() / bucket_interval * num_partitions,
bucket_interval,
delay: config.delay,
delay: u64::from(config.delay),
stats: stats::Total::default(),
limits: stats::Limits {
max_total: config.max_total_bucket_bytes.unwrap_or(u64::MAX),
Expand All @@ -283,7 +287,7 @@ impl Inner {
}

/// Returns the configured bucket interval.
pub fn bucket_interval(&self) -> u32 {
pub fn bucket_interval(&self) -> u64 {
self.bucket_interval
}

Expand All @@ -303,16 +307,17 @@ impl Inner {
}

/// Returns the time as a duration since epoch when the next flush is supposed to happen.
pub fn next_flush(&self) -> Duration {
// `head + 1` to get the end time of the slot not the start.
(Duration::from_secs(self.head + 1) / self.num_partitions * self.bucket_interval)
+ Duration::from_secs(u64::from(self.delay))
pub fn next_flush_at(&self) -> Duration {
// `head + 1` to get the end time of the slot not the start, convert `head` to a duration
// first, to have enough precision for the division.
//
// Casts do not wrap, configuration requires them to be `u32`.
let offset = Duration::from_secs(self.head + 1) / self.num_partitions as u32
* self.bucket_interval as u32;
offset + Duration::from_secs(self.delay)
}

/// Merges a metric bucket.
///
/// Returns `true` if the bucket already existed in the aggregator
/// and `false` if the bucket did not exist yet.
pub fn merge(
&mut self,
mut key: BucketKey,
Expand All @@ -321,11 +326,11 @@ impl Inner {
let project_key = key.project_key;
let namespace = key.metric_name.namespace();

let time_slot = key.timestamp.as_secs() / u64::from(self.bucket_interval);
let time_slot = key.timestamp.as_secs() / self.bucket_interval;
// Make sure the timestamp is normalized to the correct interval as well.
key.timestamp = UnixTimestamp::from_secs(time_slot * u64::from(self.bucket_interval));
key.timestamp = UnixTimestamp::from_secs(time_slot * self.bucket_interval);

let now_slot = self.head / u64::from(self.num_partitions);
let now_slot = self.head / self.num_partitions;
if !self.slot_range.contains(now_slot, time_slot) {
return Err(AggregateMetricsError::InvalidTimestamp(key.timestamp));
}
Expand All @@ -338,10 +343,10 @@ impl Inner {
self.hasher
.hash_one((key.project_key, &key.metric_name, &key.tags))
}
} % u64::from(self.num_partitions);
} % self.num_partitions;

// Calculate the slot of the bucket based on it's time and shift it by its assigned partition.
let slot = time_slot * u64::from(self.num_partitions) + assigned_partition;
let slot = time_slot * self.num_partitions + assigned_partition;
// Transform the slot to an offset/index into the ring-buffer, by calculating:
// `(slot - self.head).rem_euclid(self.slots.len())`.
let index = sub_rem_euclid(slot, self.head, self.slots.len() as u64);
Expand Down Expand Up @@ -458,10 +463,12 @@ impl fmt::Debug for Inner {
let mut list = f.debug_list();
list.entry(&self.stats);
for (i, v) in self.slots.iter().enumerate() {
let head_partitions = self.head % u64::from(self.num_partitions);
let time_offset = (head_partitions + i as u64) / u64::from(self.num_partitions);
let head_time = self.head / u64::from(self.num_partitions);
let time = (head_time + time_offset) * u64::from(self.bucket_interval);
let head_partitions = self.head % self.num_partitions;
let head_time = self.head / self.num_partitions;

let time_offset = (head_partitions + i as u64) / self.num_partitions;
let time = (head_time + time_offset) * self.bucket_interval;

match v.is_empty() {
// Make the output shorter with a string until `entry_with` is stable.
true => list.entry(&format!("({time}, {v:?})")),
Expand Down Expand Up @@ -795,7 +802,7 @@ mod tests {
buckets.merge(bucket_key(120, "b"), counter(2.0)).unwrap();

// First flush is in 20 seconds + 10 seconds for the end of the partition.
assert_eq!(buckets.next_flush(), Duration::from_secs(90));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(90));
insta::assert_debug_snapshot!(buckets.flush_next(), @r###"
Partition {
partition_key: 0,
Expand All @@ -817,17 +824,17 @@ mod tests {
assert!(buckets.flush_next().buckets.is_empty());

// We're now at second 100s.
assert_eq!(buckets.next_flush(), Duration::from_secs(110));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(110));
assert!(buckets.flush_next().buckets.is_empty());
assert!(buckets.flush_next().buckets.is_empty());

// We're now at second 120s.
assert_eq!(buckets.next_flush(), Duration::from_secs(130));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(130));
assert!(buckets.flush_next().buckets.is_empty());
assert!(buckets.flush_next().buckets.is_empty());

// We're now at second 140s -> our second bucket is ready (120s + 20s delay).
assert_eq!(buckets.next_flush(), Duration::from_secs(150));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(150));
insta::assert_debug_snapshot!(buckets.flush_next(), @r###"
Partition {
partition_key: 0,
Expand All @@ -849,7 +856,7 @@ mod tests {
assert!(buckets.flush_next().buckets.is_empty());

// We're now at 160s.
assert_eq!(buckets.next_flush(), Duration::from_secs(170));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(170));
}

#[test]
Expand All @@ -866,14 +873,14 @@ mod tests {
start: UnixTimestamp::from_secs(70),
});

assert_eq!(buckets.next_flush(), Duration::from_secs(75));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(75));
assert_eq!(buckets.flush_next().partition_key, 0);
assert_eq!(buckets.next_flush(), Duration::from_secs(80));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(80));
assert_eq!(buckets.flush_next().partition_key, 1);
assert_eq!(buckets.next_flush(), Duration::from_secs(85));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(85));
assert_eq!(buckets.flush_next().partition_key, 0);
assert_eq!(buckets.next_flush(), Duration::from_secs(90));
assert_eq!(buckets.next_flush(), Duration::from_secs(90));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(90));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(90));
}

#[test]
Expand All @@ -890,14 +897,14 @@ mod tests {
start: UnixTimestamp::from_secs(70),
});

assert_eq!(buckets.next_flush(), Duration::from_secs(78));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(78));
assert_eq!(buckets.flush_next().partition_key, 0);
assert_eq!(buckets.next_flush(), Duration::from_secs(83));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(83));
assert_eq!(buckets.flush_next().partition_key, 1);
assert_eq!(buckets.next_flush(), Duration::from_secs(88));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(88));
assert_eq!(buckets.flush_next().partition_key, 0);
assert_eq!(buckets.next_flush(), Duration::from_secs(93));
assert_eq!(buckets.next_flush(), Duration::from_secs(93));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(93));
assert_eq!(buckets.next_flush_at(), Duration::from_secs(93));
}

#[test]
Expand Down
12 changes: 6 additions & 6 deletions relay-metrics/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Aggregator {
/// After a successful flush, retry immediately until an error is returned with the next flush
/// time, this makes sure time is eventually synchronized.
pub fn try_flush_next(&mut self, now: SystemTime) -> Result<Partition, Duration> {
let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush();
let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at();

if let Err(err) = now.duration_since(next_flush) {
// The flush time is in the future, return the amount of time to wait before the next flush.
Expand All @@ -148,8 +148,8 @@ impl Aggregator {
}

/// Returns when the next partition is ready to be flushed using [`Self::try_flush_next`].
pub fn next_flush(&mut self, now: SystemTime) -> Duration {
let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush();
pub fn next_flush_at(&mut self, now: SystemTime) -> Duration {
let next_flush = SystemTime::UNIX_EPOCH + self.inner.next_flush_at();

match now.duration_since(next_flush) {
Ok(_) => Duration::ZERO,
Expand Down Expand Up @@ -178,7 +178,7 @@ pub struct Partition {
/// The partition key.
pub partition_key: u32,
buckets: HashMap<BucketKey, BucketData>,
bucket_interval: u32,
bucket_interval: u64,
}

impl IntoIterator for Partition {
Expand All @@ -196,7 +196,7 @@ impl IntoIterator for Partition {
/// Iterator yielded from [`Partition::into_iter`].
pub struct PartitionIter {
inner: hashbrown::hash_map::IntoIter<BucketKey, BucketData>,
bucket_interval: u32,
bucket_interval: u64,
}

impl Iterator for PartitionIter {
Expand All @@ -209,7 +209,7 @@ impl Iterator for PartitionIter {
key.project_key,
Bucket {
timestamp: key.timestamp,
width: u64::from(self.bucket_interval),
width: self.bucket_interval,
name: key.metric_name,
tags: key.tags,
value: data.value,
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use relay_metrics::aggregator::{self, AggregateMetricsError, AggregatorConfig, P
use relay_metrics::Bucket;
use relay_quotas::{RateLimits, Scoping};
use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service};
use tokio::time::Sleep;
use tokio::time::{Instant, Sleep};

use crate::services::projects::cache::ProjectCacheHandle;
use crate::services::projects::project::{ProjectInfo, ProjectState};
Expand Down Expand Up @@ -157,7 +157,7 @@ impl AggregatorService {

self.flush_partition(partition);

self.aggregator.next_flush(SystemTime::now())
self.aggregator.next_flush_at(SystemTime::now())
}

fn flush_partition(&mut self, partition: Partition) {
Expand Down Expand Up @@ -310,7 +310,7 @@ impl AggregatorService {
// Reset the next flush time, to the time of the new aggregator.
self.next_flush
.as_mut()
.reset(tokio::time::Instant::now() + self.aggregator.next_flush(SystemTime::now()));
.reset(Instant::now() + self.aggregator.next_flush_at(SystemTime::now()));
}
}

Expand Down Expand Up @@ -339,7 +339,7 @@ impl Service for AggregatorService {
_ = &mut self.next_flush => timed!(
"try_flush", {
let next = self.try_flush();
self.next_flush.as_mut().reset(tokio::time::Instant::now() + next);
self.next_flush.as_mut().reset(Instant::now() + next);
}
),
Some(message) = rx.recv() => timed!(message.variant(), self.handle_message(message)),
Expand Down

0 comments on commit 15c6dfc

Please sign in to comment.