Skip to content

Commit

Permalink
fix(metrics): Use configured flush batching/shift key (#4403)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde authored Dec 18, 2024
1 parent 04027dd commit 5144771
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
10 changes: 9 additions & 1 deletion relay-metrics/src/aggregator/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ pub struct Config {
pub max_secs_in_past: Option<u64>,
/// The time in seconds that a timestamp may be in the future.
pub max_secs_in_future: Option<u64>,
/// Determines how partitions are assigned based on the input bucket.
pub partition_by: FlushBatching,
}

/// A metrics aggregator.
Expand Down Expand Up @@ -281,7 +283,7 @@ impl Inner {
.unwrap_or(u64::MAX),
},
slot_range: slot_diff,
partition_by: FlushBatching::Partition,
partition_by: config.partition_by,
hasher: build_hasher(),
}
}
Expand Down Expand Up @@ -590,6 +592,7 @@ mod tests {
max_total_bucket_bytes: None,
max_project_key_bucket_bytes: None,
start: UnixTimestamp::from_secs(70),
partition_by: FlushBatching::Partition,
});

// Within the time range.
Expand Down Expand Up @@ -679,6 +682,7 @@ mod tests {
// Enough for one bucket per partition.
max_project_key_bucket_bytes: Some(ONE_BUCKET_COST * 3),
start: UnixTimestamp::from_secs(70),
partition_by: FlushBatching::Partition,
});

buckets.merge(bucket_key(70, "a"), counter(1.0))?;
Expand Down Expand Up @@ -794,6 +798,7 @@ mod tests {
max_secs_in_future: None,
// Truncated to 60 seconds.
start: UnixTimestamp::from_secs(63),
partition_by: FlushBatching::Partition,
});

// Add a bucket now -> should be flushed 30 seconds in the future.
Expand Down Expand Up @@ -871,6 +876,7 @@ mod tests {
max_total_bucket_bytes: None,
max_project_key_bucket_bytes: None,
start: UnixTimestamp::from_secs(70),
partition_by: FlushBatching::Partition,
});

assert_eq!(buckets.next_flush_at(), Duration::from_secs(75));
Expand All @@ -895,6 +901,7 @@ mod tests {
max_total_bucket_bytes: None,
max_project_key_bucket_bytes: None,
start: UnixTimestamp::from_secs(70),
partition_by: FlushBatching::Partition,
});

assert_eq!(buckets.next_flush_at(), Duration::from_secs(78));
Expand All @@ -919,6 +926,7 @@ mod tests {
max_total_bucket_bytes: None,
max_project_key_bucket_bytes: None,
start: UnixTimestamp::from_secs(70),
partition_by: FlushBatching::Partition,
});

buckets.merge(bucket_key(70, "a"), counter(1.0))?;
Expand Down
1 change: 1 addition & 0 deletions relay-metrics/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl Aggregator {
max_project_key_bucket_bytes: config.max_project_key_bucket_bytes,
max_secs_in_past: Some(config.max_secs_in_past),
max_secs_in_future: Some(config.max_secs_in_future),
partition_by: config.flush_batching,
}),
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ def test_enforce_bucket_rate_limits(
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
"shift_key": "bucket",
},
}
)
Expand Down

0 comments on commit 5144771

Please sign in to comment.