diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 849684fcbd..4456d36645 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,6 +1,5 @@ -use std::collections::HashSet; -use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::mem::replace; +use std::ops::DerefMut; use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::HistogramDataPoint; @@ -37,6 +36,14 @@ where buckets: Mutex::new(Buckets::::new(*count)), } } + + fn clone_and_reset(&self, count: &usize) -> Self { + let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner()); + let cloned = replace(current.deref_mut(), Buckets::new(*count)); + Self { + buckets: Mutex::new(cloned), + } + } } #[derive(Default)] @@ -73,16 +80,6 @@ impl Buckets { self.max = value } } - - fn reset(&mut self) { - for item in &mut self.counts { - *item = 0; - } - self.count = Default::default(); - self.total = Default::default(); - self.min = T::max(); - self.max = T::min(); - } } /// Summarizes a set of measurements as a histogram with explicitly defined @@ -139,11 +136,6 @@ impl Histogram { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { @@ -155,24 +147,22 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - h.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], - start_time: start, + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + + self.value_map + .collect_and_reset(&mut h.data_points, |attributes, aggr| { + let b = aggr + .buckets + .into_inner() + .unwrap_or_else(|err| err.into_inner()); + HistogramDataPoint { + attributes, + start_time: prev_start, time: t, count: b.count, bounds: self.bounds.clone(), @@ -193,54 +183,8 @@ impl Histogram { None }, exemplars: vec![], - }); - - b.reset(); - } - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); } - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); + }); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } @@ -250,11 +194,6 @@ impl Histogram { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { @@ -266,24 +205,19 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - h.data_points.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } + let prev_start = self + .start + .lock() + .map(|s| *s) + .unwrap_or_else(|_| SystemTime::now()); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], - start_time: start, + self.value_map + .collect_readonly(&mut h.data_points, |attributes, aggr| { + let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner()); + HistogramDataPoint { + attributes, + start_time: prev_start, time: t, count: b.count, bounds: self.bounds.clone(), @@ -304,50 +238,8 @@ impl Histogram { None }, exemplars: vec![], - }); - } - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); } - } - } + }); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index aa6aef0bd3..05fedc1489 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashSet, - sync::{atomic::Ordering, Arc, Mutex}, - time::SystemTime, -}; +use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; use crate::metrics::data::DataPoint; use opentelemetry::KeyValue; @@ -33,6 +29,12 @@ where fn update(&self, value: T) { self.value.store(value) } + + fn clone_and_reset(&self, _: &()) -> Self { + Self { + value: T::new_atomic_tracker(self.value.get_and_reset_value()), + } + } } /// Summarizes a set of measurements as the last one made. @@ -56,102 +58,31 @@ impl LastValue { pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec>) { let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - dest.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > dest.capacity() { - dest.reserve_exact(n - dest.capacity()); - } - - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - dest.push(DataPoint { - attributes: vec![], + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + self.value_map + .collect_and_reset(dest, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self - .value_map - .no_attribute_tracker - .value - .get_and_reset_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - _ => return, - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); } pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec>) { let t = SystemTime::now(); let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - - dest.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > dest.capacity() { - dest.reserve_exact(n - dest.capacity()); - } - - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - dest.push(DataPoint { - attributes: vec![], + self.value_map + .collect_readonly(dest, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.value.get_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - _ => return, - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - dest.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } } } diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index d10ceab34f..8b6136d7ce 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -6,8 +6,9 @@ mod precomputed_sum; mod sum; use core::fmt; -use std::collections::HashMap; -use std::ops::{Add, AddAssign, Sub}; +use std::collections::{HashMap, HashSet}; +use std::mem::take; +use std::ops::{Add, AddAssign, DerefMut, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -37,6 +38,9 @@ pub(crate) trait Aggregator { /// Called for each measurement. fn update(&self, value: Self::PreComputedValue); + + /// Return current value and reset this instance + fn clone_and_reset(&self, init: &Self::InitConfig) -> Self; } /// The storage for sums. @@ -130,6 +134,68 @@ where ); } } + + /// Iterate through all attribute sets and populate `DataPoints` in readonly mode. + /// This is used in Cumulative temporality mode, where [`ValueMap`] is not cleared. + pub(crate) fn collect_readonly(&self, dest: &mut Vec, mut map_fn: MapFn) + where + MapFn: FnMut(Vec, &A) -> Res, + { + prepare_data(dest, self.count.load(Ordering::SeqCst)); + if self.has_no_attribute_value.load(Ordering::Acquire) { + dest.push(map_fn(vec![], &self.no_attribute_tracker)); + } + + let Ok(trackers) = self.trackers.read() else { + return; + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.iter() { + if seen.insert(Arc::as_ptr(tracker)) { + dest.push(map_fn(attrs.clone(), tracker)); + } + } + } + + /// Iterate through all attribute sets, populate `DataPoints` and reset. + /// This is used in Delta temporality mode, where [`ValueMap`] is reset after collection. + pub(crate) fn collect_and_reset(&self, dest: &mut Vec, mut map_fn: MapFn) + where + MapFn: FnMut(Vec, A) -> Res, + { + prepare_data(dest, self.count.load(Ordering::SeqCst)); + if self.has_no_attribute_value.swap(false, Ordering::AcqRel) { + dest.push(map_fn( + vec![], + self.no_attribute_tracker.clone_and_reset(&self.config), + )); + } + + let trackers = match self.trackers.write() { + Ok(mut trackers) => { + self.count.store(0, Ordering::SeqCst); + take(trackers.deref_mut()) + } + Err(_) => todo!(), + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.into_iter() { + if seen.insert(Arc::as_ptr(&tracker)) { + dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + } + } + } +} + +/// Clear and allocate exactly required amount of space for all attribute-sets +fn prepare_data(data: &mut Vec, list_len: usize) { + data.clear(); + let total_len = list_len + 2; // to account for no_attributes case + overflow state + if total_len > data.capacity() { + data.reserve_exact(total_len - data.capacity()); + } } /// Marks a type that can have a value added and retrieved atomically. Required since diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 977c7d4a88..7bd843f4cd 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -3,11 +3,7 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; -use std::{ - collections::{HashMap, HashSet}, - sync::{atomic::Ordering, Arc, Mutex}, - time::SystemTime, -}; +use std::{collections::HashMap, mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { @@ -37,7 +33,6 @@ impl PrecomputedSum { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { @@ -50,68 +45,34 @@ impl PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let mut new_reported = HashMap::with_capacity(n); + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + let mut reported = match self.reported.lock() { Ok(r) => r, Err(_) => return (0, None), }; - - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - let value = self.value_map.no_attribute_tracker.value.get_value(); - let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); - new_reported.insert(vec![], value); - - s_data.data_points.push(DataPoint { - attributes: vec![], - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.value.get_value(); - let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); - new_reported.insert(attrs.clone(), value); - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), + let mut new_reported = HashMap::with_capacity(reported.len()); + + self.value_map + .collect_and_reset(&mut s_data.data_points, |attributes, aggr| { + let value = aggr.value.get_value(); + new_reported.insert(attributes.clone(), value); + let delta = value - *reported.get(&attributes).unwrap_or(&T::default()); + DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), value: delta, exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); + } + }); *reported = new_reported; drop(reported); // drop before values guard is dropped @@ -127,7 +88,6 @@ impl PrecomputedSum { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { @@ -140,50 +100,19 @@ impl PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + self.value_map + .collect_readonly(&mut s_data.data_points, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.value.get_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } ( s_data.data_points.len(), diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 34044020b8..40b95a5e60 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,6 +1,5 @@ -use std::collections::HashSet; -use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::mem::replace; +use std::ops::DerefMut; use std::vec; use std::{sync::Mutex, time::SystemTime}; @@ -33,6 +32,12 @@ where fn update(&self, value: T) { self.value.add(value) } + + fn clone_and_reset(&self, _: &()) -> Self { + Self { + value: T::new_atomic_tracker(self.value.get_and_reset_value()), + } + } } /// Summarizes a set of measurements made as their arithmetic sum. @@ -80,59 +85,20 @@ impl Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; - s_data.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + self.value_map + .collect_and_reset(&mut s_data.data_points, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self - .value_map - .no_attribute_tracker - .value - .get_and_reset_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); ( s_data.data_points.len(), @@ -159,54 +125,17 @@ impl Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - s_data.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + self.value_map + .collect_readonly(&mut s_data.data_points, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.value.get_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } ( s_data.data_points.len(),