Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add collect methods on ValueMap #2267

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 38 additions & 146 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashSet;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::mem::replace;
use std::ops::DerefMut;
use std::{sync::Mutex, time::SystemTime};

use crate::metrics::data::HistogramDataPoint;
Expand Down Expand Up @@ -37,6 +36,14 @@ where
buckets: Mutex::new(Buckets::<T>::new(*count)),
}
}

fn clone_and_reset(&self, count: &usize) -> Self {
let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
let cloned = replace(current.deref_mut(), Buckets::new(*count));
Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would lead to unnecessary allocation. The replacement with Buckets::new is only needed for the no attribute case as there could be newer updates trying to update the same tracker. For trackers with attributes, we could simply mem::take the buckets instead of creating a new set of buckets for each tracker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're absolutely right we should be able to simply 'take' it from trackers with attributes, but at the moment we cant (or rather it would be super inefficient), because hashmap can contain several instances of same tracker (one with sorted attrib set, an N with not unsorted).
I actually plan to create revision for that as well (was afraid to do this as part as this revision, to reduce scope):)
Idea is to split this hashmap into two: one with sorted attributes and another all attributes for faster lookup. This change would greatly improve collection performance, and by accident would solve #2093 as well:)

In summary, this is temporary minor inefficiency that will be solved:)

Copy link
Contributor

@utpilla utpilla Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but at the moment we cant (or rather it would be super inefficient), because hashmap can contain several instances of same tracker

I didn't get that. Fixing this extra allocation here does not depend on the Hashmap containing two sets of the same attributes. There is a relatively simple fix to this:

  1. Add a new method to the Aggregator trait and update collect_and_reset to call this new method for trackers with attributes
pub(crate) trait Aggregator {
    ...

    /// Return the current value
    fn get_value(&self, init: &Self::InitConfig) -> Self;
}

pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
    MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
    prepare_data(dest, self.count.load(Ordering::SeqCst));

    ...

    let mut seen = HashSet::new();
    for (attrs, tracker) in trackers.drain() {
        if seen.insert(Arc::as_ptr(&tracker)) {
            dest.push(map_fn(attrs, tracker.get_value(&self.config))); // Update the method called here
        }
    }
}
  1. Implement the new method in the HistogramTracker by not calling Buckets::new. Notice that we call mem::take here and not mem::replace.
fn get_value(&self, _init: &Self::InitConfig) -> Self
{
    let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
    Self {
        // simply take the current tracker without replacing it by an allocation requiring `Buckets::new`
        buckets: Mutex::new(take(&mut current)),
    }
}

This should work.

buckets: Mutex::new(cloned),
}
}
}

#[derive(Default)]
Expand Down Expand Up @@ -73,16 +80,6 @@ impl<T: Number> Buckets<T> {
self.max = value
}
}

fn reset(&mut self) {
for item in &mut self.counts {
*item = 0;
}
self.count = Default::default();
self.total = Default::default();
self.min = T::max();
self.max = T::min();
}
}

/// Summarizes a set of measurements as a histogram with explicitly defined
Expand Down Expand Up @@ -139,11 +136,6 @@ impl<T: Number> Histogram<T> {
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
Expand All @@ -155,24 +147,22 @@ impl<T: Number> Histogram<T> {
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;
h.data_points.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > h.data_points.capacity() {
h.data_points.reserve_exact(n - h.data_points.capacity());
}

if self
.value_map
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
let b = aggr
.buckets
.into_inner()
.unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
Expand All @@ -193,54 +183,8 @@ impl<T: Number> Histogram<T> {
None
},
exemplars: vec![],
});

b.reset();
}
}

let mut trackers = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
if let Ok(b) = tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
Some(b.max)
} else {
None
},
exemplars: vec![],
});
}
}
}

// The delta collection cycle resets.
if let Ok(mut start) = self.start.lock() {
*start = t;
}
self.value_map.count.store(0, Ordering::SeqCst);
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}
Expand All @@ -250,11 +194,6 @@ impl<T: Number> Histogram<T> {
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
Expand All @@ -266,24 +205,19 @@ impl<T: Number> Histogram<T> {
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;
h.data_points.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > h.data_points.capacity() {
h.data_points.reserve_exact(n - h.data_points.capacity());
}
let prev_start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());

if self
.value_map
.has_no_attribute_value
.load(Ordering::Acquire)
{
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
Expand All @@ -304,50 +238,8 @@ impl<T: Number> Histogram<T> {
None
},
exemplars: vec![],
});
}
}

let trackers = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

// TODO: This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
if let Ok(b) = tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
Some(b.max)
} else {
None
},
exemplars: vec![],
});
}
}
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}
Expand Down
109 changes: 20 additions & 89 deletions opentelemetry-sdk/src/metrics/internal/last_value.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
collections::HashSet,
sync::{atomic::Ordering, Arc, Mutex},
time::SystemTime,
};
use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};

use crate::metrics::data::DataPoint;
use opentelemetry::KeyValue;
Expand Down Expand Up @@ -33,6 +29,12 @@ where
fn update(&self, value: T) {
self.value.store(value)
}

fn clone_and_reset(&self, _: &()) -> Self {
Self {
value: T::new_atomic_tracker(self.value.get_and_reset_value()),
}
}
}

/// Summarizes a set of measurements as the last one made.
Expand All @@ -56,102 +58,31 @@ impl<T: Number> LastValue<T> {

pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
let t = SystemTime::now();
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
dest.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > dest.capacity() {
dest.reserve_exact(n - dest.capacity());
}

if self
.value_map
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
dest.push(DataPoint {
attributes: vec![],
let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
self.value_map
.collect_and_reset(dest, |attributes, aggr| DataPoint {
attributes,
start_time: Some(prev_start),
time: Some(t),
value: self
.value_map
.no_attribute_tracker
.value
.get_and_reset_value(),
value: aggr.value.get_value(),
exemplars: vec![],
});
}

let mut trackers = match self.value_map.trackers.write() {
Ok(v) => v,
_ => return,
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.value.get_value(),
exemplars: vec![],
});
}
}

// The delta collection cycle resets.
if let Ok(mut start) = self.start.lock() {
*start = t;
}
self.value_map.count.store(0, Ordering::SeqCst);
}

pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec<DataPoint<T>>) {
let t = SystemTime::now();
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);

dest.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > dest.capacity() {
dest.reserve_exact(n - dest.capacity());
}

if self
.value_map
.has_no_attribute_value
.load(Ordering::Acquire)
{
dest.push(DataPoint {
attributes: vec![],
self.value_map
.collect_readonly(dest, |attributes, aggr| DataPoint {
attributes,
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_tracker.value.get_value(),
value: aggr.value.get_value(),
exemplars: vec![],
});
}

let trackers = match self.value_map.trackers.write() {
Ok(v) => v,
_ => return,
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
dest.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.value.get_value(),
exemplars: vec![],
});
}
}
}
}
Loading
Loading