Skip to content

Commit

Permalink
ref(server): Performance improvements to metric router service (#3836)
Browse files Browse the repository at this point in the history
- Instruments the service better
- Optimizes the handle_merge_buckets function by not splitting into a
message per namespace, but into a message per aggregator
- Optimizes the `AcceptMetrics` message to race the messages instead of
checking sequentially
  • Loading branch information
Dav1dde authored Jul 19, 2024
1 parent 82401da commit 28a0b5f
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 78 deletions.
40 changes: 40 additions & 0 deletions relay-config/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,43 @@ impl FieldCondition {
}
}
}

#[cfg(test)]
mod tests {
use insta::assert_debug_snapshot;
use relay_metrics::MetricNamespace;
use serde_json::json;

use super::*;

#[test]
fn condition_roundtrip() {
let json = json!({"op": "eq", "field": "namespace", "value": "spans"});
assert_debug_snapshot!(
serde_json::from_value::<Condition>(json).unwrap(),
@r###"
Eq(
Namespace(
Spans,
),
)
"###
);
}

#[test]
fn condition_multiple_namespaces() {
let json = json!({
"op": "or",
"inner": [
{"op": "eq", "field": "namespace", "value": "spans"},
{"op": "eq", "field": "namespace", "value": "custom"}
]
});

let condition = serde_json::from_value::<Condition>(json).unwrap();
assert!(condition.matches(Some(MetricNamespace::Spans)));
assert!(condition.matches(Some(MetricNamespace::Custom)));
assert!(!condition.matches(Some(MetricNamespace::Transactions)));
}
}
2 changes: 1 addition & 1 deletion relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum Aggregator {

impl Aggregator {
/// Returns the name of the message variant.
fn variant(&self) -> &'static str {
pub fn variant(&self) -> &'static str {
match self {
Aggregator::AcceptsMetrics(_, _) => "AcceptsMetrics",
Aggregator::MergeBuckets(_) => "MergeBuckets",
Expand Down
145 changes: 68 additions & 77 deletions relay-server/src/services/metrics/router.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
//! Routing logic for metrics. Metrics from different namespaces may be routed to different aggregators,
//! with their own limits, bucket intervals, etc.
use itertools::Itertools;
use relay_config::{aggregator::Condition, AggregatorServiceConfig, ScopedAggregatorConfig};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig};
use relay_metrics::MetricNamespace;
use relay_system::{Addr, NoResponse, Recipient, Service};

use crate::services::metrics::{
AcceptsMetrics, Aggregator, AggregatorService, FlushBuckets, MergeBuckets,
};
use crate::statsd::RelayTimers;
use crate::utils;

/// Service that routes metrics & metric buckets to the appropriate aggregator.
///
/// Each aggregator gets its own configuration.
/// Metrics are routed to the first aggregator which matches the configuration's [`Condition`].
/// Metrics are routed to the first aggregator which matches the configuration's
/// [`Condition`](relay_config::aggregator::Condition).
/// If no condition matches, the metric/bucket is routed to the `default_aggregator`.
pub struct RouterService {
default_config: AggregatorServiceConfig,
Expand Down Expand Up @@ -49,7 +54,9 @@ impl Service for RouterService {
tokio::select! {
biased;

Some(message) = rx.recv() => router.handle_message(message),
Some(message) = rx.recv() => {
router.handle_message(message)
},

else => break,
}
Expand All @@ -62,7 +69,7 @@ impl Service for RouterService {
/// Helper struct that holds the [`Addr`]s of started aggregators.
struct StartedRouter {
default: Addr<Aggregator>,
secondary: Vec<(Condition, Addr<Aggregator>)>,
secondary: Vec<(Addr<Aggregator>, Vec<MetricNamespace>)>,
}

impl StartedRouter {
Expand All @@ -79,6 +86,14 @@ impl StartedRouter {
let addr = AggregatorService::named(c.name, c.config, receiver.clone()).start();
(c.condition, addr)
})
.map(|(cond, agg)| {
let namespaces: Vec<_> = MetricNamespace::all()
.into_iter()
.filter(|&namespace| cond.matches(Some(namespace)))
.collect();

(agg, namespaces)
})
.collect();

Self {
Expand All @@ -87,84 +102,60 @@ impl StartedRouter {
}
}

fn handle_message(&mut self, msg: Aggregator) {
match msg {
Aggregator::AcceptsMetrics(_, sender) => {
let requests = self
.secondary
.iter()
.map(|(_, agg)| agg.send(AcceptsMetrics))
.chain(Some(self.default.send(AcceptsMetrics)))
.collect::<Vec<_>>();

tokio::spawn(async {
let mut accepts = true;
for req in requests {
accepts &= req.await.unwrap_or_default();
fn handle_message(&mut self, message: Aggregator) {
let ty = message.variant();
relay_statsd::metric!(
timer(RelayTimers::MetricRouterServiceDuration),
message = ty,
{
match message {
Aggregator::AcceptsMetrics(_, sender) => {
let mut requests = self
.secondary
.iter()
.map(|(agg, _)| agg.send(AcceptsMetrics))
.chain(Some(self.default.send(AcceptsMetrics)))
.collect::<FuturesUnordered<_>>();

tokio::spawn(async move {
let mut accepts = true;
while let Some(req) = requests.next().await {
accepts &= req.unwrap_or_default();
}
sender.send(accepts);
});
}
sender.send(accepts);
});
Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg),
#[cfg(test)]
Aggregator::BucketCountInquiry(_, _sender) => (), // not supported
}
}
Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg),
#[cfg(test)]
Aggregator::BucketCountInquiry(_, _sender) => (), // not supported
}
)
}

fn handle_merge_buckets(&mut self, message: MergeBuckets) {
let metrics_by_namespace = message
.buckets
.into_iter()
.group_by(|bucket| bucket.name.try_namespace());

for (namespace, group) in metrics_by_namespace.into_iter() {
let aggregator = self
.secondary
.iter()
.find_map(|(cond, addr)| cond.matches(namespace).then_some(addr))
.unwrap_or(&self.default);

aggregator.send(MergeBuckets::new(message.project_key, group.collect()));
let MergeBuckets {
project_key,
mut buckets,
} = message;

for (aggregator, namespaces) in &self.secondary {
let matching;
(buckets, matching) = utils::split_off(buckets, |bucket| {
bucket
.name
.try_namespace()
.map(|namespace| namespaces.contains(&namespace))
.unwrap_or(false)
});

if !matching.is_empty() {
aggregator.send(MergeBuckets::new(project_key, matching));
}
}
}
}

#[cfg(test)]
mod tests {
use insta::assert_debug_snapshot;
use relay_metrics::MetricNamespace;
use serde_json::json;

use super::*;

#[test]
fn condition_roundtrip() {
let json = json!({"op": "eq", "field": "namespace", "value": "spans"});
assert_debug_snapshot!(
serde_json::from_value::<Condition>(json).unwrap(),
@r###"
Eq(
Namespace(
Spans,
),
)
"###
);
}

#[test]
fn condition_multiple_namespaces() {
let json = json!({
"op": "or",
"inner": [
{"op": "eq", "field": "namespace", "value": "spans"},
{"op": "eq", "field": "namespace", "value": "custom"}
]
});

let condition = serde_json::from_value::<Condition>(json).unwrap();
assert!(condition.matches(Some(MetricNamespace::Spans)));
assert!(condition.matches(Some(MetricNamespace::Custom)));
assert!(!condition.matches(Some(MetricNamespace::Transactions)));
if !buckets.is_empty() {
self.default.send(MergeBuckets::new(project_key, buckets));
}
}
}
6 changes: 6 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,11 @@ pub enum RelayTimers {
/// This metric is tagged with:
/// - `message`: The type of message that was processed.
AggregatorServiceDuration,
/// Timing in milliseconds for processing a message in the metric router service.
///
/// This metric is tagged with:
/// - `message`: The type of message that was processed.
MetricRouterServiceDuration,
}

impl TimerMetric for RelayTimers {
Expand Down Expand Up @@ -560,6 +565,7 @@ impl TimerMetric for RelayTimers {
#[cfg(feature = "processing")]
RelayTimers::RateLimitBucketsDuration => "processor.rate_limit_buckets",
RelayTimers::AggregatorServiceDuration => "metrics.aggregator.message.duration",
RelayTimers::MetricRouterServiceDuration => "metrics.router.message.duration",
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/utils/split_off.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use itertools::Either;

/// Splits off items from a vector matching a predicate.
///
/// Matching elements are returned in the second vector.
pub fn split_off<T>(data: Vec<T>, mut f: impl FnMut(&T) -> bool) -> (Vec<T>, Vec<T>) {
split_off_map(data, |item| {
if f(&item) {
Expand Down

0 comments on commit 28a0b5f

Please sign in to comment.