Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(metrics): Rework metrics aggregator to keep internal partitions #4378

Merged
merged 7 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

**Internal**:

- Rework metrics aggregator to keep internal partitions. ([#4378](https://github.com/getsentry/relay/pull/4378))
- Remove support for metrics with profile namespace. ([#4391](https://github.com/getsentry/relay/pull/4391))

## 24.11.2
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 32 additions & 7 deletions relay-config/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Metrics aggregator configuration.

use relay_metrics::aggregator::AggregatorConfig;
use relay_metrics::MetricNamespace;
use relay_metrics::{MetricNamespace, UnixTimestamp};
use serde::{Deserialize, Serialize};

/// Parameters used for metric aggregation.
Expand All @@ -12,27 +12,52 @@ pub struct AggregatorServiceConfig {
#[serde(flatten)]
pub aggregator: AggregatorConfig,

/// The length the name of a metric is allowed to be.
///
/// Defaults to `200` bytes.
pub max_name_length: usize,

/// The length the tag key is allowed to be.
///
/// Defaults to `200` bytes.
pub max_tag_key_length: usize,

/// The length the tag value is allowed to be.
///
/// Defaults to `200` chars.
pub max_tag_value_length: usize,

/// The approximate maximum number of bytes submitted within one flush cycle.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// adds some additional overhead, this number is approximate and some safety margin should be
/// left to hard limits.
pub max_flush_bytes: usize,
}

/// The flushing interval in milliseconds that determines how often the aggregator is polled for
/// flushing new buckets.
impl AggregatorServiceConfig {
/// Returns the valid range for metrics timestamps.
///
/// Defaults to `100` milliseconds.
pub flush_interval_ms: u64,
/// Metrics or buckets outside of this range should be discarded.
pub fn timestamp_range(&self) -> std::ops::Range<UnixTimestamp> {
let now = UnixTimestamp::now().as_secs();
let min_timestamp =
UnixTimestamp::from_secs(now.saturating_sub(self.aggregator.max_secs_in_past));
let max_timestamp =
UnixTimestamp::from_secs(now.saturating_add(self.aggregator.max_secs_in_future));
min_timestamp..max_timestamp
}
}

impl Default for AggregatorServiceConfig {
fn default() -> Self {
Self {
aggregator: AggregatorConfig::default(),
max_name_length: 200,
max_tag_key_length: 200,
max_tag_value_length: 200,
max_flush_bytes: 5_000_000, // 5 MB
flush_interval_ms: 100, // 100 milliseconds
}
}
}
Expand Down
58 changes: 0 additions & 58 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use relay_kafka::{
ConfigError as KafkaConfigError, KafkaConfigParam, KafkaParams, KafkaTopic, TopicAssignment,
TopicAssignments,
};
use relay_metrics::aggregator::{AggregatorConfig, FlushBatching};
use relay_metrics::MetricNamespace;
use serde::de::{DeserializeOwned, Unexpected, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down Expand Up @@ -2461,63 +2460,6 @@ impl Config {
&self.values.cogs.relay_resource_id
}

/// Creates an [`AggregatorConfig`] that is compatible with every other aggregator.
///
/// A lossless aggregator can be put in front of any of the configured aggregators without losing data that the configured aggregator would keep.
/// This is useful for pre-aggregating metrics together in a single aggregator instance.
pub fn permissive_aggregator_config(&self) -> AggregatorConfig {
let AggregatorConfig {
mut bucket_interval,
mut max_secs_in_past,
mut max_secs_in_future,
mut max_name_length,
mut max_tag_key_length,
mut max_tag_value_length,
mut max_project_key_bucket_bytes,
mut max_total_bucket_bytes,
..
} = self.default_aggregator_config().aggregator;

for secondary_config in self.secondary_aggregator_configs() {
let agg = &secondary_config.config.aggregator;

bucket_interval = bucket_interval.min(agg.bucket_interval);
max_secs_in_past = max_secs_in_past.max(agg.max_secs_in_past);
max_secs_in_future = max_secs_in_future.max(agg.max_secs_in_future);
max_name_length = max_name_length.max(agg.max_name_length);
max_tag_key_length = max_tag_key_length.max(agg.max_tag_key_length);
max_tag_value_length = max_tag_value_length.max(agg.max_tag_value_length);
max_project_key_bucket_bytes =
max_project_key_bucket_bytes.max(agg.max_project_key_bucket_bytes);
max_total_bucket_bytes = max_total_bucket_bytes.max(agg.max_total_bucket_bytes);
}

for agg in self
.secondary_aggregator_configs()
.iter()
.map(|sc| &sc.config)
.chain(std::iter::once(self.default_aggregator_config()))
{
if agg.aggregator.bucket_interval % bucket_interval != 0 {
relay_log::error!("buckets don't align");
}
}

AggregatorConfig {
bucket_interval,
max_secs_in_past,
max_secs_in_future,
max_name_length,
max_tag_key_length,
max_tag_value_length,
max_project_key_bucket_bytes,
max_total_bucket_bytes,
initial_delay: 30,
flush_partitions: None,
flush_batching: FlushBatching::Project,
}
}

/// Returns configuration for the default metrics aggregator.
pub fn default_aggregator_config(&self) -> &AggregatorServiceConfig {
&self.values.aggregator
Expand Down
1 change: 1 addition & 0 deletions relay-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
redis = ["relay-redis/impl"]

[dependencies]
ahash = { workspace = true }
bytecount = { workspace = true }
chrono = { workspace = true }
fnv = { workspace = true }
Expand Down
11 changes: 5 additions & 6 deletions relay-metrics/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use rand::SeedableRng;
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use relay_metrics::{
aggregator::{Aggregator, AggregatorConfig, FlushDecision},
aggregator::{Aggregator, AggregatorConfig},
Bucket, BucketValue, DistributionValue, FiniteF64,
};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fmt;
use std::ops::Range;
use std::time::SystemTime;

struct NumbersGenerator {
min: usize,
Expand Down Expand Up @@ -188,7 +189,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
b.iter_batched(
|| {
let timestamp = UnixTimestamp::now();
let aggregator: Aggregator = Aggregator::new(config.clone());
let aggregator = Aggregator::named("default".to_owned(), &config);
(aggregator, input.get_buckets(timestamp))
},
|(mut aggregator, buckets)| {
Expand All @@ -215,7 +216,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
b.iter_batched(
|| {
let timestamp = UnixTimestamp::now();
let mut aggregator: Aggregator = Aggregator::new(config.clone());
let mut aggregator = Aggregator::named("default".to_owned(), &config);
for (project_key, bucket) in input.get_buckets(timestamp) {
aggregator.merge(project_key, bucket).unwrap();
}
Expand All @@ -224,9 +225,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
|mut aggregator| {
// XXX: Ideally we'd want to test the entire try_flush here, but spawning
// a service is too much work here.
black_box(aggregator.pop_flush_buckets(black_box(false), |_| {
FlushDecision::Flush(Vec::new())
}));
let _ = black_box(aggregator.try_flush_next(SystemTime::UNIX_EPOCH));
},
BatchSize::SmallInput,
)
Expand Down
Loading
Loading