Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add collect methods on ValueMap #2267

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

fraillt
Copy link
Contributor

@fraillt fraillt commented Nov 2, 2024

Design discussion issue #2145

Changes

Mostly extract DataPoint collection logic from specific metrics implementations into ValueMap under collect_readonly and collect_and_reset functions.
I tried to preserve existing logic with few exceptions:

  • for cumulative temporality - acquire read lock on hashmap instead of write lock.
  • for delta temporality - since hashmap is cleared anyway, reduce the time hashmap is locked. Instead of "lock->iterate(consume)->unlock" changed to "lock->consume->unlock->iterate".

There are a lot more things that could be improved, but with this change it will become more convenient/simpler to experiment/implement further improvements in the future.

Merge requirement checklist

  • CONTRIBUTING guidelines followed
  • Unit tests added/updated (if applicable)
  • Appropriate CHANGELOG.md files updated for non-trivial, user-facing changes
  • Changes in public API reviewed (if applicable)

Copy link

codecov bot commented Nov 2, 2024

Codecov Report

Attention: Patch coverage is 98.52941% with 2 lines in your changes missing coverage. Please review.

Project coverage is 79.3%. Comparing base (eca53b2) to head (edbbeaf).

Files with missing lines Patch % Lines
opentelemetry-sdk/src/metrics/internal/mod.rs 95.4% 2 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##            main   #2267     +/-   ##
=======================================
- Coverage   79.4%   79.3%   -0.2%     
=======================================
  Files        121     121             
  Lines      20981   20790    -191     
=======================================
- Hits       16673   16500    -173     
+ Misses      4308    4290     -18     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@fraillt fraillt changed the title add collect methods on ValueMap Add collect methods on ValueMap Nov 3, 2024
Copy link

linux-foundation-easycla bot commented Nov 3, 2024

CLA Signed

The committers listed above are authorized under a signed CLA.

  • ✅ login: fraillt / name: Mindaugas Vinkelis (8d56bd2, 66cb25a)
  • ✅ login: cijothomas / name: Cijo Thomas (e99bb97, e52baeb)
  • ✅ login: lalitb / name: Lalit Kumar Bhasin (edbbeaf)


fn clone_and_reset(&self, count: &usize) -> Self {
let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
let cloned = replace(current.deref_mut(), Buckets::new(*count));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would lead to unnecessary allocation. The replacement with Buckets::new is only needed for the no attribute case as there could be newer updates trying to update the same tracker. For trackers with attributes, we could simply mem::take the buckets instead of creating a new set of buckets for each tracker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're absolutely right we should be able to simply 'take' it from trackers with attributes, but at the moment we cant (or rather it would be super inefficient), because hashmap can contain several instances of same tracker (one with sorted attrib set, an N with not unsorted).
I actually plan to create revision for that as well (was afraid to do this as part as this revision, to reduce scope):)
Idea is to split this hashmap into two: one with sorted attributes and another all attributes for faster lookup. This change would greatly improve collection performance, and by accident would solve #2093 as well:)

In summary, this is temporary minor inefficiency that will be solved:)

Copy link
Contributor

@utpilla utpilla Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but at the moment we cant (or rather it would be super inefficient), because hashmap can contain several instances of same tracker

I didn't get that. Fixing this extra allocation here does not depend on the Hashmap containing two sets of the same attributes. There is a relatively simple fix to this:

  1. Add a new method to the Aggregator trait and update collect_and_reset to call this new method for trackers with attributes
pub(crate) trait Aggregator {
    ...

    /// Return the current value
    fn get_value(&self, init: &Self::InitConfig) -> Self;
}

pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
    MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
    prepare_data(dest, self.count.load(Ordering::SeqCst));

    ...

    let mut seen = HashSet::new();
    for (attrs, tracker) in trackers.drain() {
        if seen.insert(Arc::as_ptr(&tracker)) {
            dest.push(map_fn(attrs, tracker.get_value(&self.config))); // Update the method called here
        }
    }
}
  1. Implement the new method in the HistogramTracker by not calling Buckets::new. Notice that we call mem::take here and not mem::replace.
fn get_value(&self, _init: &Self::InitConfig) -> Self
{
    let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
    Self {
        // simply take the current tracker without replacing it by an allocation requiring `Buckets::new`
        buckets: Mutex::new(take(&mut current)),
    }
}

This should work.

let trackers = match self.trackers.write() {
Ok(mut trackers) => {
self.count.store(0, Ordering::SeqCst);
take(trackers.deref_mut())
Copy link
Contributor

@utpilla utpilla Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would lead to additional allocation as we would be replacing trackers with a new Hashmap (with capacity = 0) on each collect. So, after every collect, we would be allocating to add entries to the Hashmap. This was avoided earlier when we used drain on Hashmap, which retained the Hashmap's capacity.

One approach to solve this would be to add another RwLock<HashMap> field to the ValueMap struct and instead of calling mem::take here, we could mem::swap with current trackers with this newly added Hashmap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to minimize holding write lock, as much as possible (i cannot release lock if I want to use 'drain').
In #2247 I have observed almost 4x improvement (when collect phase is running), with this change.
Regarding extra allocation, I don't think that it makes a huge difference, (given that they happen constantly, and we really don't notice them that much:)).
Maybe I could improve this just a bit by doing something like replace(trackers, Hashmap::with_capacity(n))? and leave other potential improvements for the future.
Would that work for you?

Copy link
Contributor

@utpilla utpilla Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm totally in favor of reducing the time held for write lock, so I'm not asking to go back to calling drain with the write lock. However, since we are optimizing collect anyway why not just avoid any amount of allocation that we can?

Maybe I could improve this just a bit by doing something like replace(trackers, Hashmap::with_capacity(n))?

Hashmap::with_capacity would also allocate.

One simple way to fix this would be:

  1. Add another field trackers_for_collect to the ValueMap struct
pub(crate) struct ValueMap<A>
where
    A: Aggregator,
{
    /// Trackers store the values associated with different attribute sets.
    trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,

    /// Trackers store the values associated with different attribute sets.
    trackers_for_collect: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,

    ...
}

impl<A> ValueMap<A>
where
    A: Aggregator,
{
    fn new(config: A::InitConfig) -> Self {
        ValueMap {
            trackers: RwLock::new(HashMap::new()),
            trackers_for_collect: RwLock::new(HashMap::new()),

            ...
        }
    }
}
  1. Update the collect_and_reset method to:
    • Use mem::swap to swap the trackers used for updates with the trackers used for collect.
    • trackers_for_collect would then have the aggregated data
    • Call drain instead of into_iter on trackers for collect to retain its capacity for subsequent collects
pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
    where
        MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
    prepare_data(dest, self.count.load(Ordering::SeqCst));

    ...
    
    let mut trackers = match self.trackers.write() {
        Ok(mut trackers_guard) => {
            self.count.store(0, Ordering::SeqCst);
            // take(trackers_guard.deref_mut())
            let mut trackers_export = self.trackers_for_collect.write().unwrap();
            swap(trackers_guard.deref_mut(), trackers_export.deref_mut()); // swap the tracker instances
            trackers_export
        }
        Err(_) => todo!(),
    };

    let mut seen = HashSet::new();
    for (attrs, tracker) in trackers.drain() { // calling `drain` here retains the capacity of `trackers_for_collect`
        if seen.insert(Arc::as_ptr(&tracker)) {
            ...
        }
    }
}

How does that look?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if we go with the 2 hashmap approach, leave a code comment like "even though trackers_for_collect is RWLock guarded, only collect() thread ever need to accesses it, so the W lock on it has no impact on update() threads.".

Copy link
Contributor

@utpilla utpilla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work!

There are two newer kinds of allocations happening with these changes as mentioned in #2267 (comment) and #2267 (comment). These can be avoided. I had a chat with @fraillt on Slack and he agreed to address these in his upcoming PRs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants