Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(metrics): Rework metrics aggregator to keep internal partitions #4378

Merged
merged 7 commits into from
Dec 17, 2024

Conversation

Dav1dde
Copy link
Member

@Dav1dde Dav1dde commented Dec 12, 2024

Replaces the current metrics aggregator, which works based off fixed priorities in a priority queue with regular intervals, with a ring buffer based aggregator.

Overview:

  • Aggregator is now ring buffer based instead of a priority queue, this minimizes work needed for merging and flushing buckets.
  • Aggregator now no longer guarantees a minimum delay for backdated buckets but on average still delays enough (for real time buckets there still is an accurate delay).
  • Aggregator is now driven by flushes and this is how it tracks and advances time. This means all operations (merges and flushes) can be done entirely without accessing system time.
  • Cost tracking is much more efficient now and tracked for the total and per slot, on flush the slot values are subtracted from the total, which does not require additional iterations and calculations of costs.
  • Per projects cost limits are only tracked per slot, instead of overall, reducing the necessary book keeping by a lot.
  • On shutdown the aggregator is replaced with an aggregator with much more aggressive flush behaviour. This massively simplifies the code, still has a good time to flush and overall much better flush behaviour by keeping partitions consistent.
  • Metric name/tag validation is now a concern of the service instead of the aggregator.
  • Uses ahash with a fixed seed instead of fnv (it's faster)
  • Lots of unused metrics have been reworked or modified (from a histogram which only was used for sum+count to two counters)

For implementation details see the exhaustive code documentation, especially in the inner aggregator.

Fixes: https://github.com/getsentry/team-ingest/issues/606

@Dav1dde Dav1dde force-pushed the dav1d/agg-multi branch 6 times, most recently from 53da55e to cc3619e Compare December 16, 2024 16:50
@Dav1dde Dav1dde self-assigned this Dec 16, 2024
@Dav1dde Dav1dde marked this pull request as ready for review December 16, 2024 16:53
@Dav1dde Dav1dde requested a review from a team as a code owner December 16, 2024 16:53
@Dav1dde Dav1dde force-pushed the dav1d/agg-multi branch 8 times, most recently from 34b71b3 to d5498e3 Compare December 16, 2024 19:01
Copy link
Member

@jjbayer jjbayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very nice!

relay-server/src/services/metrics/aggregator.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/mod.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/mod.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
let slot = time_slot * u64::from(self.num_partitions) + assigned_partition;

let slots_len = self.slots.len() as u64;
let index = (slot + slots_len).wrapping_sub(self.head % slots_len) % slots_len;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naively I would expect this to be (slot - self.head) % slots_len.

Is the + slots_len here to make the wrapping sub work in case slot < self.head? This operation is complex enough to warrant some documentation, and / or a helper function.

Copy link
Member Author

@Dav1dde Dav1dde Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we need here is rem_euclid in case slot - self.head is negative, but for that we need to go from u64 to i64 which truncates the size of the u64 down to u64 / 2. This is a way to shift the operation entirely into the positive space.

Will add some docs, and/or a function, this is double confusing since the wrapping_sub isn't necessary but still here (it can't wrap).

let slot = time_slot * u64::from(self.num_partitions) + assigned_partition;

let slots_len = self.slots.len() as u64;
let index = (slot + slots_len).wrapping_sub(self.head % slots_len) % slots_len;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens with different time stamps that map to the same index? Is that case prevented by how slots.len() is chosen in the beginning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine and will happen for backdated und future buckets, there are tests that cover this case. Since the timestamp is part of the key, they won't be aggregated together and are independent.

slot.buckets.hasher().clone(),
),
..slot
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this would be simpler with an actual statically sized ring buffer. Then you could reset the slot at head and move the head, without pop / push.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also get the impression that this implementation has features of a queue and a static ring buffer simultaneously.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can rotate the VecDeque, initially this is what I had, but it turns out it's a bit nicer this way. We need ownership of parts of the slot (e.g. all buckets), with a rotation we then have to std::mem::replace the parts we need ownership of. We also don't get around a fallible access/unwrap like here, since we still need a bounds check.

Removing the item (which just shifts indices internally and returns the value) and adding a new one back turned out to be nicer.

relay-metrics/src/aggregator/inner.rs Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
relay-metrics/src/aggregator/inner.rs Show resolved Hide resolved
slot.buckets.hasher().clone(),
),
..slot
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also get the impression that this implementation has features of a queue and a static ring buffer simultaneously.

relay-metrics/src/aggregator/inner.rs Outdated Show resolved Hide resolved
Copy link
Member

@jjbayer jjbayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is pretty big so I probably missed some details, but overall the design makes sense & I'm looking forward to see how this performs in production.

The rollout is probably gonna be merge -> test -> revert?

// threaded runtime.
self.do_try_flush()
} else {
tokio::task::block_in_place(|| self.do_try_flush())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do? I'm reading the docs of block_in_place but I don't understand its purpose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might not need this anymore, but the previous aggregators' max on the flush was up to a 200ms. Since we run in a tokio task here, we're blocking all queued other tasks on this tokio worker for the duration of the call. With block_in_place we can tell the runtime to clear it's local queue before running the closure. This in theory should bring down p99+ latencies.

@Dav1dde
Copy link
Member Author

Dav1dde commented Dec 17, 2024

The rollout is probably gonna be merge -> test -> revert?

Merge, test, it works, rollout prod ideally. Let's talk tomorrow sync about it?

@Dav1dde Dav1dde merged commit faaaa55 into master Dec 17, 2024
23 checks passed
@Dav1dde Dav1dde deleted the dav1d/agg-multi branch December 17, 2024 15:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants