Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cardinality): Track exceeded CardinalityLimit for each bucket #3825

Merged
merged 13 commits into from
Jul 18, 2024
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Do not drop envelopes for unparsable project configs. ([#3770](https://github.com/getsentry/relay/pull/3770))

**Features**

- "Cardinality limit" outcomes now report which limit was exceeded. ([#3825](https://github.com/getsentry/relay/pull/3825))

## 24.7.0

**Bug Fixes**:
Expand Down
121 changes: 84 additions & 37 deletions relay-cardinality/src/limiter.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Relay Cardinality Limiter

use std::cmp::Reverse;
use std::collections::BTreeMap;

use hashbrown::HashSet;
use hashbrown::{HashMap, HashSet};
use relay_base_schema::metrics::{MetricName, MetricNamespace, MetricType};
use relay_base_schema::project::ProjectId;
use relay_common::time::UnixTimestamp;
Expand Down Expand Up @@ -198,8 +199,13 @@ impl<T: Limiter> CardinalityLimiter<T> {
/// The result can be used directly by [`CardinalityLimits`].
#[derive(Debug, Default)]
struct DefaultReporter<'a> {
/// All limits that have been exceeded.
exceeded_limits: HashSet<&'a CardinalityLimit>,
entries: HashSet<usize>,
/// A map from entries that have been rejected to the most
/// specific non-passive limit that they exceeded.
///
/// "Specificity" is determined by scope and limit, in that order.
entries: HashMap<usize, &'a CardinalityLimit>,
reports: BTreeMap<&'a CardinalityLimit, Vec<CardinalityReport>>,
}

Expand All @@ -208,7 +214,19 @@ impl<'a> Reporter<'a> for DefaultReporter<'a> {
fn reject(&mut self, limit: &'a CardinalityLimit, entry_id: EntryId) {
self.exceeded_limits.insert(limit);
if !limit.passive {
self.entries.insert(entry_id.0);
// Write `limit` into the entry if it's more specific than the existing limit
// (or if there isn't one)
self.entries
.entry(entry_id.0)
.and_modify(|existing_limit| {
// Scopes are ordered by reverse specificity (org is the smallest), so we use `Reverse` here
if (Reverse(limit.scope), limit.limit)
< (Reverse(existing_limit.scope), existing_limit.limit)
{
*existing_limit = limit;
}
})
.or_insert(limit);
}
}

Expand All @@ -223,20 +241,18 @@ impl<'a> Reporter<'a> for DefaultReporter<'a> {

/// Split of the original source containing accepted and rejected source elements.
#[derive(Debug)]
pub struct CardinalityLimitsSplit<T> {
pub struct CardinalityLimitsSplit<'a, T> {
/// The list of accepted elements of the source.
pub accepted: Vec<T>,
/// The list of rejected elements of the source.
pub rejected: Vec<T>,
/// The list of rejected elements of the source, together
/// with the most specific limit they exceeded.
pub rejected: Vec<(T, &'a CardinalityLimit)>,
}

impl<T> CardinalityLimitsSplit<T> {
impl<'a, T> CardinalityLimitsSplit<'a, T> {
/// Creates a new cardinality limits split with a given capacity for `accepted` and `rejected`
/// elements.
fn with_capacity(
accepted_capacity: usize,
rejected_capacity: usize,
) -> CardinalityLimitsSplit<T> {
fn with_capacity(accepted_capacity: usize, rejected_capacity: usize) -> Self {
CardinalityLimitsSplit {
accepted: Vec::with_capacity(accepted_capacity),
rejected: Vec::with_capacity(rejected_capacity),
Expand All @@ -250,7 +266,7 @@ pub struct CardinalityLimits<'a, T> {
/// The source.
source: Vec<T>,
/// List of rejected item indices pointing into `source`.
rejections: HashSet<usize>,
rejections: HashMap<usize, &'a CardinalityLimit>,
/// All non-passive exceeded limits.
exceeded_limits: HashSet<&'a CardinalityLimit>,
/// Generated cardinality reports.
Expand Down Expand Up @@ -294,32 +310,26 @@ impl<'a, T> CardinalityLimits<'a, T> {

/// Returns an iterator yielding only rejected items.
pub fn rejected(&self) -> impl Iterator<Item = &T> {
self.rejections.iter().filter_map(|&i| self.source.get(i))
self.rejections.keys().filter_map(|&i| self.source.get(i))
}

/// Consumes the result and returns [`CardinalityLimitsSplit`] containing all accepted and rejected items.
pub fn into_split(self) -> CardinalityLimitsSplit<T> {
pub fn into_split(mut self) -> CardinalityLimitsSplit<'a, T> {
if self.rejections.is_empty() {
return CardinalityLimitsSplit {
accepted: self.source,
rejected: Vec::new(),
};
} else if self.source.len() == self.rejections.len() {
return CardinalityLimitsSplit {
accepted: Vec::new(),
rejected: self.source,
};
}

// TODO: we might want to optimize this method later, by reusing one of the arrays and
// swap removing elements from it.
let source_len = self.source.len();
let rejections_len = self.rejections.len();
self.source.into_iter().enumerate().fold(
CardinalityLimitsSplit::with_capacity(source_len - rejections_len, rejections_len),
|mut split, (i, item)| {
if self.rejections.contains(&i) {
split.rejected.push(item);
if let Some(exceeded) = self.rejections.remove(&i) {
split.rejected.push((item, exceeded));
} else {
split.accepted.push(item);
};
Expand Down Expand Up @@ -396,37 +406,68 @@ mod tests {
assert_eq!(value, expected_value)
}

let limit = CardinalityLimit {
id: "dummy_limit".to_owned(),
passive: false,
report: false,
window: SlidingWindow {
window_seconds: 0,
granularity_seconds: 0,
},
limit: 0,
scope: CardinalityScope::Organization,
namespace: None,
};

let limits = CardinalityLimits {
source: vec!['a', 'b', 'c', 'd', 'e'],
rejections: HashSet::from([0, 1, 3]),
rejections: HashMap::from([(0, &limit), (1, &limit), (3, &limit)]),
exceeded_limits: HashSet::new(),
reports: BTreeMap::new(),
};
assert!(limits.has_rejections());
let split = limits.into_split();
assert_eq!(split.rejected, vec!['a', 'b', 'd']);
assert_eq!(
split.rejected,
vec![('a', &limit), ('b', &limit), ('d', &limit)]
);
assert_eq!(split.accepted, vec!['c', 'e']);

let limits = CardinalityLimits {
source: vec!['a', 'b', 'c', 'd', 'e'],
rejections: HashSet::from([]),
rejections: HashMap::from([]),
exceeded_limits: HashSet::new(),
reports: BTreeMap::new(),
};
assert!(!limits.has_rejections());
let split = limits.into_split();
assert_eq(split.rejected, vec![]);
assert!(split.rejected.is_empty());
assert_eq!(split.accepted, vec!['a', 'b', 'c', 'd', 'e']);

let limits = CardinalityLimits {
source: vec!['a', 'b', 'c', 'd', 'e'],
rejections: HashSet::from([0, 1, 2, 3, 4]),
rejections: HashMap::from([
(0, &limit),
(1, &limit),
(2, &limit),
(3, &limit),
(4, &limit),
]),
exceeded_limits: HashSet::new(),
reports: BTreeMap::new(),
};
assert!(limits.has_rejections());
let split = limits.into_split();
assert_eq!(split.rejected, vec!['a', 'b', 'c', 'd', 'e']);
assert_eq!(
split.rejected,
vec![
('a', &limit),
('b', &limit),
('c', &limit),
('d', &limit),
('e', &limit)
]
);
assert_eq(split.accepted, vec![]);
}

Expand Down Expand Up @@ -465,9 +506,14 @@ mod tests {
.check_cardinality_limits(build_scoping(), &limits, items.clone())
.unwrap();

let expected_items = items
.into_iter()
.zip(std::iter::repeat(&limits[0]))
.collect::<Vec<_>>();

assert_eq!(result.exceeded_limits(), &HashSet::from([&limits[0]]));
let split = result.into_split();
assert_eq!(split.rejected, items);
assert_eq!(split.rejected, expected_items);
assert!(split.accepted.is_empty());
}

Expand Down Expand Up @@ -547,18 +593,19 @@ mod tests {
Item::new(5, MetricNamespace::Transactions),
Item::new(6, MetricNamespace::Spans),
];
let limits = build_limits();
let split = limiter
.check_cardinality_limits(build_scoping(), &build_limits(), items)
.check_cardinality_limits(build_scoping(), &limits, items)
.unwrap()
.into_split();

assert_eq!(
split.rejected,
vec![
Item::new(0, MetricNamespace::Sessions),
Item::new(2, MetricNamespace::Spans),
Item::new(4, MetricNamespace::Custom),
Item::new(6, MetricNamespace::Spans),
(Item::new(0, MetricNamespace::Sessions), &limits[0]),
(Item::new(2, MetricNamespace::Spans), &limits[0]),
(Item::new(4, MetricNamespace::Custom), &limits[0]),
(Item::new(6, MetricNamespace::Spans), &limits[0]),
]
);
assert_eq!(
Expand Down Expand Up @@ -641,9 +688,9 @@ mod tests {
assert_eq!(
split.rejected,
vec![
Item::new(0, MetricNamespace::Custom),
Item::new(2, MetricNamespace::Custom),
Item::new(4, MetricNamespace::Custom),
(Item::new(0, MetricNamespace::Custom), &limits[0]),
(Item::new(2, MetricNamespace::Custom), &limits[0]),
(Item::new(4, MetricNamespace::Custom), &limits[0]),
]
);
assert_eq!(
Expand Down
11 changes: 7 additions & 4 deletions relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,11 @@ pub enum Outcome {
RateLimited(Option<ReasonCode>),

/// The event/metric has been cardinality limited.
///
/// Contains the [ID](relay_cardinality::CardinalityLimit::id)
/// of the most specific [CardinalityLimit](relay_cardinality::CardinalityLimit) that was exceeded.
#[cfg(feature = "processing")]
CardinalityLimited,
CardinalityLimited(String),

/// The event has been discarded because of invalid data.
Invalid(DiscardReason),
Expand All @@ -192,7 +195,7 @@ impl Outcome {
Outcome::Filtered(_) | Outcome::FilteredSampling(_) => OutcomeId::FILTERED,
Outcome::RateLimited(_) => OutcomeId::RATE_LIMITED,
#[cfg(feature = "processing")]
Outcome::CardinalityLimited => OutcomeId::CARDINALITY_LIMITED,
Outcome::CardinalityLimited(_) => OutcomeId::CARDINALITY_LIMITED,
Outcome::Invalid(_) => OutcomeId::INVALID,
Outcome::Abuse => OutcomeId::ABUSE,
Outcome::ClientDiscard(_) => OutcomeId::CLIENT_DISCARD,
Expand All @@ -210,7 +213,7 @@ impl Outcome {
code_opt.as_ref().map(|code| Cow::Borrowed(code.as_str()))
}
#[cfg(feature = "processing")]
Outcome::CardinalityLimited => None,
Outcome::CardinalityLimited(id) => Some(Cow::Borrowed(id)),
Outcome::ClientDiscard(ref discard_reason) => Some(Cow::Borrowed(discard_reason)),
Outcome::Abuse => None,
Outcome::Accepted => None,
Expand Down Expand Up @@ -242,7 +245,7 @@ impl fmt::Display for Outcome {
Outcome::RateLimited(None) => write!(f, "rate limited"),
Outcome::RateLimited(Some(reason)) => write!(f, "rate limited with reason {reason}"),
#[cfg(feature = "processing")]
Outcome::CardinalityLimited => write!(f, "cardinality limited"),
Outcome::CardinalityLimited(id) => write!(f, "cardinality limited ({})", id),
Outcome::Invalid(DiscardReason::Internal) => write!(f, "internal error"),
Outcome::Invalid(reason) => write!(f, "invalid data ({reason})"),
Outcome::Abuse => write!(f, "abuse limit reached"),
Expand Down
11 changes: 6 additions & 5 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2501,12 +2501,13 @@ impl EnvelopeProcessorService {

let CardinalityLimitsSplit { accepted, rejected } = limits.into_split();

if !rejected.is_empty() {
self.inner
.metric_outcomes
.track(scoping, &rejected, Outcome::CardinalityLimited);
for (bucket, exceeded) in rejected {
loewenheim marked this conversation as resolved.
Show resolved Hide resolved
self.inner.metric_outcomes.track(
scoping,
&[bucket],
Outcome::CardinalityLimited(exceeded.id.clone()),
);
}

accepted
}

Expand Down
15 changes: 13 additions & 2 deletions tests/integration/test_metric_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,21 @@ def test_metric_stats_with_limit_surpassed(
project_config["config"]["metrics"] = {
"cardinalityLimits": [
{
"id": "custom-limit",
"id": "org-limit",
"window": {"windowSeconds": 1, "granularitySeconds": 1},
"report": True,
"limit": 0,
"scope": "org",
"namespace": "custom",
},
{
"id": "name-limit",
"window": {"windowSeconds": 1, "granularitySeconds": 1},
"report": True,
"limit": 0,
"scope": "name",
"namespace": "custom",
}
},
]
}

Expand All @@ -196,6 +204,7 @@ def test_metric_stats_with_limit_surpassed(
"mri.type": "d",
"mri.namespace": "custom",
"outcome.id": "6",
"outcome.reason": "name-limit",
}
assert metrics.volume["d:custom/baz@none"]["org_id"] == 0
assert metrics.volume["d:custom/baz@none"]["project_id"] == project_id
Expand All @@ -205,6 +214,7 @@ def test_metric_stats_with_limit_surpassed(
"mri.type": "d",
"mri.namespace": "custom",
"outcome.id": "6",
"outcome.reason": "name-limit",
}
assert metrics.volume["s:custom/bar@none"]["org_id"] == 0
assert metrics.volume["s:custom/bar@none"]["project_id"] == project_id
Expand All @@ -214,6 +224,7 @@ def test_metric_stats_with_limit_surpassed(
"mri.type": "s",
"mri.namespace": "custom",
"outcome.id": "6",
"outcome.reason": "name-limit",
}
assert len(metrics.volume) == 3
assert len(metrics.cardinality) == 0
Expand Down
Loading