Skip to content

Commit

Permalink
ref(metrics): Rework metrics aggregator to keep internal partitions (#…
Browse files Browse the repository at this point in the history
…4378)

Replaces the current metrics aggregator, which works based off fixed
priorities in a priority queue with regular intervals, with a ring
buffer based aggregator.

Overview:
- Aggregator is now ring buffer based instead of a priority queue, this
minimizes work needed for merging and flushing buckets.
- Aggregator now no longer guarantees a minimum delay for backdated
buckets but on average still delays enough (for real time buckets there
still is an accurate delay).
- Aggregator is now driven by flushes and this is how it tracks and
advances time. This means all operations (merges and flushes) can be
done entirely without accessing system time.
- Cost tracking is much more efficient now and tracked for the total and
per slot, on flush the slot values are subtracted from the total, which
does not require additional iterations and calculations of costs.
- Per projects cost limits are only tracked per slot, instead of
overall, reducing the necessary book keeping by a lot.
- On shutdown the aggregator is replaced with an aggregator with much
more aggressive flush behaviour. This massively simplifies the code,
still has a good time to flush and overall much better flush behaviour
by keeping partitions consistent.
- Metric name/tag validation is now a concern of the service instead of
the aggregator.
- Uses `ahash` with a fixed seed instead of fnv ([it's
faster](https://github.com/tkaitchuck/aHash?tab=readme-ov-file#comparison-with-other-hashers))
- Lots of unused metrics have been reworked or modified (from a
histogram which only was used for sum+count to two counters)

For implementation details see the exhaustive code documentation,
especially in the inner aggregator.

Fixes: getsentry/team-ingest#606
  • Loading branch information
Dav1dde authored Dec 17, 2024
1 parent 63c8e11 commit faaaa55
Show file tree
Hide file tree
Showing 31 changed files with 2,267 additions and 1,767 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

**Internal**:

- Rework metrics aggregator to keep internal partitions. ([#4378](https://github.com/getsentry/relay/pull/4378))
- Remove support for metrics with profile namespace. ([#4391](https://github.com/getsentry/relay/pull/4391))

## 24.11.2
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 32 additions & 7 deletions relay-config/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Metrics aggregator configuration.
use relay_metrics::aggregator::AggregatorConfig;
use relay_metrics::MetricNamespace;
use relay_metrics::{MetricNamespace, UnixTimestamp};
use serde::{Deserialize, Serialize};

/// Parameters used for metric aggregation.
Expand All @@ -12,27 +12,52 @@ pub struct AggregatorServiceConfig {
#[serde(flatten)]
pub aggregator: AggregatorConfig,

/// The length the name of a metric is allowed to be.
///
/// Defaults to `200` bytes.
pub max_name_length: usize,

/// The length the tag key is allowed to be.
///
/// Defaults to `200` bytes.
pub max_tag_key_length: usize,

/// The length the tag value is allowed to be.
///
/// Defaults to `200` chars.
pub max_tag_value_length: usize,

/// The approximate maximum number of bytes submitted within one flush cycle.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// adds some additional overhead, this number is approximate and some safety margin should be
/// left to hard limits.
pub max_flush_bytes: usize,
}

/// The flushing interval in milliseconds that determines how often the aggregator is polled for
/// flushing new buckets.
impl AggregatorServiceConfig {
/// Returns the valid range for metrics timestamps.
///
/// Defaults to `100` milliseconds.
pub flush_interval_ms: u64,
/// Metrics or buckets outside of this range should be discarded.
pub fn timestamp_range(&self) -> std::ops::Range<UnixTimestamp> {
let now = UnixTimestamp::now().as_secs();
let min_timestamp =
UnixTimestamp::from_secs(now.saturating_sub(self.aggregator.max_secs_in_past));
let max_timestamp =
UnixTimestamp::from_secs(now.saturating_add(self.aggregator.max_secs_in_future));
min_timestamp..max_timestamp
}
}

impl Default for AggregatorServiceConfig {
fn default() -> Self {
Self {
aggregator: AggregatorConfig::default(),
max_name_length: 200,
max_tag_key_length: 200,
max_tag_value_length: 200,
max_flush_bytes: 5_000_000, // 5 MB
flush_interval_ms: 100, // 100 milliseconds
}
}
}
Expand Down
58 changes: 0 additions & 58 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use relay_kafka::{
ConfigError as KafkaConfigError, KafkaConfigParam, KafkaParams, KafkaTopic, TopicAssignment,
TopicAssignments,
};
use relay_metrics::aggregator::{AggregatorConfig, FlushBatching};
use relay_metrics::MetricNamespace;
use serde::de::{DeserializeOwned, Unexpected, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down Expand Up @@ -2461,63 +2460,6 @@ impl Config {
&self.values.cogs.relay_resource_id
}

/// Creates an [`AggregatorConfig`] that is compatible with every other aggregator.
///
/// A lossless aggregator can be put in front of any of the configured aggregators without losing data that the configured aggregator would keep.
/// This is useful for pre-aggregating metrics together in a single aggregator instance.
pub fn permissive_aggregator_config(&self) -> AggregatorConfig {
let AggregatorConfig {
mut bucket_interval,
mut max_secs_in_past,
mut max_secs_in_future,
mut max_name_length,
mut max_tag_key_length,
mut max_tag_value_length,
mut max_project_key_bucket_bytes,
mut max_total_bucket_bytes,
..
} = self.default_aggregator_config().aggregator;

for secondary_config in self.secondary_aggregator_configs() {
let agg = &secondary_config.config.aggregator;

bucket_interval = bucket_interval.min(agg.bucket_interval);
max_secs_in_past = max_secs_in_past.max(agg.max_secs_in_past);
max_secs_in_future = max_secs_in_future.max(agg.max_secs_in_future);
max_name_length = max_name_length.max(agg.max_name_length);
max_tag_key_length = max_tag_key_length.max(agg.max_tag_key_length);
max_tag_value_length = max_tag_value_length.max(agg.max_tag_value_length);
max_project_key_bucket_bytes =
max_project_key_bucket_bytes.max(agg.max_project_key_bucket_bytes);
max_total_bucket_bytes = max_total_bucket_bytes.max(agg.max_total_bucket_bytes);
}

for agg in self
.secondary_aggregator_configs()
.iter()
.map(|sc| &sc.config)
.chain(std::iter::once(self.default_aggregator_config()))
{
if agg.aggregator.bucket_interval % bucket_interval != 0 {
relay_log::error!("buckets don't align");
}
}

AggregatorConfig {
bucket_interval,
max_secs_in_past,
max_secs_in_future,
max_name_length,
max_tag_key_length,
max_tag_value_length,
max_project_key_bucket_bytes,
max_total_bucket_bytes,
initial_delay: 30,
flush_partitions: None,
flush_batching: FlushBatching::Project,
}
}

/// Returns configuration for the default metrics aggregator.
pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
&self.values.aggregator
Expand Down
1 change: 1 addition & 0 deletions relay-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
redis = ["relay-redis/impl"]

[dependencies]
ahash = { workspace = true }
bytecount = { workspace = true }
chrono = { workspace = true }
fnv = { workspace = true }
Expand Down
11 changes: 5 additions & 6 deletions relay-metrics/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use rand::SeedableRng;
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use relay_metrics::{
aggregator::{Aggregator, AggregatorConfig, FlushDecision},
aggregator::{Aggregator, AggregatorConfig},
Bucket, BucketValue, DistributionValue, FiniteF64,
};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fmt;
use std::ops::Range;
use std::time::SystemTime;

struct NumbersGenerator {
min: usize,
Expand Down Expand Up @@ -188,7 +189,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
b.iter_batched(
|| {
let timestamp = UnixTimestamp::now();
let aggregator: Aggregator = Aggregator::new(config.clone());
let aggregator = Aggregator::named("default".to_owned(), &config);
(aggregator, input.get_buckets(timestamp))
},
|(mut aggregator, buckets)| {
Expand All @@ -215,7 +216,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
b.iter_batched(
|| {
let timestamp = UnixTimestamp::now();
let mut aggregator: Aggregator = Aggregator::new(config.clone());
let mut aggregator = Aggregator::named("default".to_owned(), &config);
for (project_key, bucket) in input.get_buckets(timestamp) {
aggregator.merge(project_key, bucket).unwrap();
}
Expand All @@ -224,9 +225,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
|mut aggregator| {
// XXX: Ideally we'd want to test the entire try_flush here, but spawning
// a service is too much work here.
black_box(aggregator.pop_flush_buckets(black_box(false), |_| {
FlushDecision::Flush(Vec::new())
}));
let _ = black_box(aggregator.try_flush_next(SystemTime::UNIX_EPOCH));
},
BatchSize::SmallInput,
)
Expand Down
Loading

0 comments on commit faaaa55

Please sign in to comment.