From 125c3fe66cc4e2b1e3f9975fc403e572e6610148 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Sat, 14 Sep 2024 17:24:58 +0300 Subject: [PATCH] ValueMap interface change --- .../src/metrics/internal/histogram.rs | 198 ++++++++---------- .../src/metrics/internal/last_value.rs | 45 +++- opentelemetry-sdk/src/metrics/internal/mod.rs | 153 ++++++-------- .../src/metrics/internal/precomputed_sum.rs | 16 +- opentelemetry-sdk/src/metrics/internal/sum.rs | 46 +++- opentelemetry-sdk/src/metrics/pipeline.rs | 2 - 6 files changed, 239 insertions(+), 221 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 089415ba7c..9b32b0a2df 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -7,46 +7,17 @@ use crate::metrics::data::HistogramDataPoint; use crate::metrics::data::{self, Aggregation, Temporality}; use opentelemetry::KeyValue; -use super::Number; -use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap}; +use super::ValueMap; +use super::{Aggregator, Number}; -struct HistogramUpdate; - -impl Operation for HistogramUpdate { - fn update_tracker>(tracker: &AT, value: T, index: usize) { - tracker.update_histogram(index, value); - } -} - -struct HistogramTracker { - buckets: Mutex>, -} - -impl AtomicTracker for HistogramTracker { - fn update_histogram(&self, index: usize, value: T) { - let mut buckets = match self.buckets.lock() { - Ok(guard) => guard, - Err(_) => return, - }; - - buckets.bin(index, value); - buckets.sum(value); - } -} - -impl AtomicallyUpdate for HistogramTracker { - type AtomicTracker = HistogramTracker; - - fn new_atomic_tracker(buckets_count: Option) -> Self::AtomicTracker { - let count = buckets_count.unwrap(); - HistogramTracker { - buckets: Mutex::new(Buckets::::new(count)), - } - } +struct BucketsConfig { + bounds: Vec, + record_min_max: bool, + record_sum: bool, } -#[derive(Default)] -struct Buckets { +#[derive(Default, Debug, Clone)] +struct BucketsData { counts: Vec, count: u64, total: T, @@ -54,32 +25,23 @@ struct Buckets { max: T, } -impl Buckets { - /// returns buckets with `n` bins. - fn new(n: usize) -> Buckets { - Buckets { - counts: vec![0; n], +struct Buckets { + data: Mutex>, +} + +impl BucketsData +where + T: Number, +{ + fn new(size: usize) -> Self { + Self { + counts: vec![0; size], min: T::max(), max: T::min(), ..Default::default() } } - fn sum(&mut self, value: T) { - self.total += value; - } - - fn bin(&mut self, idx: usize, value: T) { - self.counts[idx] += 1; - self.count += 1; - if value < self.min { - self.min = value; - } - if value > self.max { - self.max = value - } - } - fn reset(&mut self) { for item in &mut self.counts { *item = 0; @@ -91,45 +53,71 @@ impl Buckets { } } +impl Aggregator for Buckets +where + T: Number, +{ + type Config = BucketsConfig; + + fn create(config: &BucketsConfig) -> Self { + let size = config.bounds.len() + 1; + Buckets { + data: Mutex::new(BucketsData::new(size)), + } + } + + fn update(&self, config: &BucketsConfig, measurement: T) { + let f_value = measurement.into_float(); + // Ignore NaN and infinity. + if f_value.is_infinite() || f_value.is_nan() { + return; + } + // This search will return an index in the range `[0, bounds.len()]`, where + // it will return `bounds.len()` if value is greater than the last element + // of `bounds`. This aligns with the buckets in that the length of buckets + // is `bounds.len()+1`, with the last bucket representing: + // `(bounds[bounds.len()-1], +∞)`. + let idx = config.bounds.partition_point(|&x| x < f_value); + if let Ok(mut data) = self.data.lock() { + data.counts[idx] += 1; + data.count += 1; + if config.record_min_max { + if measurement < data.min { + data.min = measurement; + } + if measurement > data.max { + data.max = measurement + } + } + // it's very cheap to update it, even if it is not configured to record_sum + data.total += measurement; + } + } +} + /// Summarizes a set of measurements as a histogram with explicitly defined /// buckets. pub(crate) struct Histogram { - value_map: ValueMap, T, HistogramUpdate>, - bounds: Vec, - record_min_max: bool, - record_sum: bool, + value_map: ValueMap>, start: Mutex, } impl Histogram { - pub(crate) fn new(boundaries: Vec, record_min_max: bool, record_sum: bool) -> Self { - let buckets_count = boundaries.len() + 1; - let mut histogram = Histogram { - value_map: ValueMap::new_with_buckets_count(buckets_count), - bounds: boundaries, - record_min_max, - record_sum, + pub(crate) fn new(mut bounds: Vec, record_min_max: bool, record_sum: bool) -> Self { + bounds.retain(|v| !v.is_nan()); + bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out")); + Self { + value_map: ValueMap::new(BucketsConfig { + record_min_max, + record_sum, + bounds, + }), start: Mutex::new(SystemTime::now()), - }; - - histogram.bounds.retain(|v| !v.is_nan()); - histogram - .bounds - .sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out")); - - histogram + } } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - let f = measurement.into_float(); - - // This search will return an index in the range `[0, bounds.len()]`, where - // it will return `bounds.len()` if value is greater than the last element - // of `bounds`. This aligns with the buckets in that the length of buckets - // is `bounds.len()+1`, with the last bucket representing: - // `(bounds[bounds.len()-1], +∞)`. - let index = self.bounds.partition_point(|&x| x < f); - self.value_map.measure(measurement, attrs, index); + self.value_map.measure(measurement, attrs); } pub(crate) fn delta( @@ -167,25 +155,25 @@ impl Histogram { .has_no_attribute_value .swap(false, Ordering::AcqRel) { - if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() { + if let Ok(ref mut b) = self.value_map.no_attribute_tracker.data.lock() { h.data_points.push(HistogramDataPoint { attributes: vec![], start_time: start, time: t, count: b.count, - bounds: self.bounds.clone(), + bounds: self.value_map.config.bounds.clone(), bucket_counts: b.counts.clone(), - sum: if self.record_sum { + sum: if self.value_map.config.record_sum { b.total } else { T::default() }, - min: if self.record_min_max { + min: if self.value_map.config.record_min_max { Some(b.min) } else { None }, - max: if self.record_min_max { + max: if self.value_map.config.record_min_max { Some(b.max) } else { None @@ -205,25 +193,25 @@ impl Histogram { let mut seen = HashSet::new(); for (attrs, tracker) in trackers.drain() { if seen.insert(Arc::as_ptr(&tracker)) { - if let Ok(b) = tracker.buckets.lock() { + if let Ok(b) = tracker.data.lock() { h.data_points.push(HistogramDataPoint { attributes: attrs.clone(), start_time: start, time: t, count: b.count, - bounds: self.bounds.clone(), + bounds: self.value_map.config.bounds.clone(), bucket_counts: b.counts.clone(), - sum: if self.record_sum { + sum: if self.value_map.config.record_sum { b.total } else { T::default() }, - min: if self.record_min_max { + min: if self.value_map.config.record_min_max { Some(b.min) } else { None }, - max: if self.record_min_max { + max: if self.value_map.config.record_min_max { Some(b.max) } else { None @@ -278,25 +266,25 @@ impl Histogram { .has_no_attribute_value .load(Ordering::Acquire) { - if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() { + if let Ok(b) = &self.value_map.no_attribute_tracker.data.lock() { h.data_points.push(HistogramDataPoint { attributes: vec![], start_time: start, time: t, count: b.count, - bounds: self.bounds.clone(), + bounds: self.value_map.config.bounds.clone(), bucket_counts: b.counts.clone(), - sum: if self.record_sum { + sum: if self.value_map.config.record_sum { b.total } else { T::default() }, - min: if self.record_min_max { + min: if self.value_map.config.record_min_max { Some(b.min) } else { None }, - max: if self.record_min_max { + max: if self.value_map.config.record_min_max { Some(b.max) } else { None @@ -318,25 +306,25 @@ impl Histogram { let mut seen = HashSet::new(); for (attrs, tracker) in trackers.iter() { if seen.insert(Arc::as_ptr(tracker)) { - if let Ok(b) = tracker.buckets.lock() { + if let Ok(b) = tracker.data.lock() { h.data_points.push(HistogramDataPoint { attributes: attrs.clone(), start_time: start, time: t, count: b.count, - bounds: self.bounds.clone(), + bounds: self.value_map.config.bounds.clone(), bucket_counts: b.counts.clone(), - sum: if self.record_sum { + sum: if self.value_map.config.record_sum { b.total } else { T::default() }, - min: if self.record_min_max { + min: if self.value_map.config.record_min_max { Some(b.min) } else { None }, - max: if self.record_min_max { + max: if self.value_map.config.record_min_max { Some(b.max) } else { None diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index d1eab4fada..898a06a6b8 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -7,25 +7,50 @@ use std::{ use crate::metrics::data::DataPoint; use opentelemetry::KeyValue; -use super::{Assign, AtomicTracker, Number, ValueMap}; +use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap}; + +/// this is reused by PrecomputedSum +pub(crate) struct Assign +where + T: AtomicallyUpdate, +{ + pub(crate) value: T::AtomicTracker, +} + +impl Aggregator for Assign +where + T: Number, +{ + type Config = (); + + fn create(_init: &()) -> Self { + Self { + value: T::new_atomic_tracker(T::default()), + } + } + + fn update(&self, _init: &(), measurement: T) { + self.value.store(measurement) + } +} /// Summarizes a set of measurements as the last one made. pub(crate) struct LastValue { - value_map: ValueMap, + value_map: ValueMap>, start: Mutex, } impl LastValue { pub(crate) fn new() -> Self { LastValue { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), start: Mutex::new(SystemTime::now()), } } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to LastValue. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec>) { @@ -49,7 +74,11 @@ impl LastValue { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), + value: self + .value_map + .no_attribute_tracker + .value + .get_and_reset_value(), exemplars: vec![], }); } @@ -66,7 +95,7 @@ impl LastValue { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } @@ -101,7 +130,7 @@ impl LastValue { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -118,7 +147,7 @@ impl LastValue { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index abc691b2fc..05a626395b 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -7,7 +7,6 @@ mod sum; use core::fmt; use std::collections::HashMap; -use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -24,79 +23,60 @@ use crate::metrics::AttributeSet; pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); -/// Abstracts the update operation for a measurement. -pub(crate) trait Operation { - fn update_tracker>(tracker: &AT, value: T, index: usize); -} - -struct Increment; - -impl Operation for Increment { - fn update_tracker>(tracker: &AT, value: T, _: usize) { - tracker.add(value); - } -} +pub(crate) trait Aggregator +where + T: Number, +{ + /// A static configuration that is needed by configurators. + /// E.g. bucket_size at creation time and buckets list at aggregator update. + type Config; -struct Assign; + /// Called everytime a new attribute-set is stored. + fn create(init: &Self::Config) -> Self; -impl Operation for Assign { - fn update_tracker>(tracker: &AT, value: T, _: usize) { - tracker.store(value); - } + /// Called for each measurement. + fn update(&self, config: &Self::Config, measurement: T); } /// The storage for sums. /// /// This structure is parametrized by an `Operation` that indicates how /// updates to the underlying value trackers should be performed. -pub(crate) struct ValueMap, T: Number, O> { +pub(crate) struct ValueMap +where + T: Number, + A: Aggregator, +{ /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, + trackers: RwLock, Arc>>, /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. has_no_attribute_value: AtomicBool, /// Tracker for values with no attributes attached. - no_attribute_tracker: AU::AtomicTracker, - /// Buckets Count is only used by Histogram. - buckets_count: Option, - phantom: PhantomData, -} - -impl, T: Number, O> Default for ValueMap { - fn default() -> Self { - ValueMap::new() - } + no_attribute_tracker: A, + /// Configuration for an Aggregator + config: A::Config, } -impl, T: Number, O> ValueMap { - fn new() -> Self { - ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(None), - count: AtomicUsize::new(0), - buckets_count: None, - phantom: PhantomData, - } - } - - fn new_with_buckets_count(buckets_count: usize) -> Self { +impl ValueMap +where + T: Number, + A: Aggregator, +{ + fn new(config: A::Config) -> Self { ValueMap { trackers: RwLock::new(HashMap::new()), has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(Some(buckets_count)), + no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), - buckets_count: Some(buckets_count), - phantom: PhantomData, + config, } } -} -impl, T: Number, O: Operation> ValueMap { - fn measure(&self, measurement: T, attributes: &[KeyValue], index: usize) { + fn measure(&self, measurement: T, attributes: &[KeyValue]) { if attributes.is_empty() { - O::update_tracker(&self.no_attribute_tracker, measurement, index); + self.no_attribute_tracker.update(&self.config, measurement); self.has_no_attribute_value.store(true, Ordering::Release); return; } @@ -107,14 +87,14 @@ impl, T: Number, O: Operation> ValueMap { // Try to retrieve and update the tracker with the attributes in the provided order first if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(&self.config, measurement); return; } // Try to retrieve and update the tracker with the attributes sorted. let sorted_attrs = AttributeSet::from(attributes).into_vec(); if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(&self.config, measurement); return; } @@ -128,12 +108,12 @@ impl, T: Number, O: Operation> ValueMap { // Recheck both the provided and sorted orders after acquiring the write lock // in case another thread has pushed an update in the meantime. if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(&self.config, measurement); } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(&self.config, measurement); } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_tracker = Arc::new(AU::new_atomic_tracker(self.buckets_count)); - O::update_tracker(&*new_tracker, measurement, index); + let new_tracker = Arc::new(A::create(&self.config)); + new_tracker.update(&self.config, measurement); // Insert tracker with the attributes in the provided and sorted orders trackers.insert(attributes.to_vec(), new_tracker.clone()); @@ -141,10 +121,10 @@ impl, T: Number, O: Operation> ValueMap { self.count.fetch_add(1, Ordering::SeqCst); } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - O::update_tracker(&**overflow_value, measurement, index); + overflow_value.update(&self.config, measurement); } else { - let new_tracker = AU::new_atomic_tracker(self.buckets_count); - O::update_tracker(&new_tracker, measurement, index); + let new_tracker = A::create(&self.config); + new_tracker.update(&self.config, measurement); trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); } @@ -153,22 +133,17 @@ impl, T: Number, O: Operation> ValueMap { /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms -pub(crate) trait AtomicTracker: Sync + Send + 'static { - fn store(&self, _value: T) {} - fn add(&self, _value: T) {} - fn get_value(&self) -> T { - T::default() - } - fn get_and_reset_value(&self) -> T { - T::default() - } - fn update_histogram(&self, _index: usize, _value: T) {} +pub(crate) trait AtomicTracker: Sync + Send + 'static { + fn store(&self, _value: T); + fn add(&self, _value: T); + fn get_value(&self) -> T; + fn get_and_reset_value(&self) -> T; } /// Marks a type that can have an atomic tracker generated for it -pub(crate) trait AtomicallyUpdate { +pub(crate) trait AtomicallyUpdate { type AtomicTracker: AtomicTracker; - fn new_atomic_tracker(buckets_count: Option) -> Self::AtomicTracker; + fn new_atomic_tracker(init: T) -> Self::AtomicTracker; } pub(crate) trait Number: @@ -255,8 +230,8 @@ impl AtomicTracker for AtomicU64 { impl AtomicallyUpdate for u64 { type AtomicTracker = AtomicU64; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - AtomicU64::new(0) + fn new_atomic_tracker(init: u64) -> Self::AtomicTracker { + AtomicU64::new(init) } } @@ -281,8 +256,8 @@ impl AtomicTracker for AtomicI64 { impl AtomicallyUpdate for i64 { type AtomicTracker = AtomicI64; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - AtomicI64::new(0) + fn new_atomic_tracker(init: i64) -> Self::AtomicTracker { + AtomicI64::new(init) } } @@ -291,10 +266,10 @@ pub(crate) struct F64AtomicTracker { } impl F64AtomicTracker { - fn new() -> Self { - let zero_as_u64 = 0.0_f64.to_bits(); + fn new(init: f64) -> Self { + let value_as_u64 = init.to_bits(); F64AtomicTracker { - inner: AtomicU64::new(zero_as_u64), + inner: AtomicU64::new(value_as_u64), } } } @@ -343,8 +318,8 @@ impl AtomicTracker for F64AtomicTracker { impl AtomicallyUpdate for f64 { type AtomicTracker = F64AtomicTracker; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - F64AtomicTracker::new() + fn new_atomic_tracker(init: f64) -> Self::AtomicTracker { + F64AtomicTracker::new(init) } } @@ -354,7 +329,7 @@ mod tests { #[test] fn can_store_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -367,7 +342,7 @@ mod tests { #[test] fn can_add_and_get_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); atomic.add(15); atomic.add(10); @@ -377,7 +352,7 @@ mod tests { #[test] fn can_reset_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); atomic.add(15); let value = atomic.get_and_reset_value(); @@ -389,7 +364,7 @@ mod tests { #[test] fn can_store_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -406,7 +381,7 @@ mod tests { #[test] fn can_add_and_get_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); atomic.add(15); atomic.add(-10); @@ -416,7 +391,7 @@ mod tests { #[test] fn can_reset_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); atomic.add(15); let value = atomic.get_and_reset_value(); @@ -428,7 +403,7 @@ mod tests { #[test] fn can_store_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -445,7 +420,7 @@ mod tests { #[test] fn can_add_and_get_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); atomic.add(15.3); atomic.add(10.4); @@ -456,7 +431,7 @@ mod tests { #[test] fn can_reset_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); atomic.add(15.5); let value = atomic.get_and_reset_value(); diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 060c7baaa6..f08f70b73e 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -2,7 +2,7 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; -use super::{Assign, AtomicTracker, Number, ValueMap}; +use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; use std::{ collections::{HashMap, HashSet}, sync::{atomic::Ordering, Arc, Mutex}, @@ -11,7 +11,7 @@ use std::{ /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { - value_map: ValueMap, + value_map: ValueMap>, monotonic: bool, start: Mutex, reported: Mutex, T>>, @@ -20,7 +20,7 @@ pub(crate) struct PrecomputedSum { impl PrecomputedSum { pub(crate) fn new(monotonic: bool) -> Self { PrecomputedSum { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), monotonic, start: Mutex::new(SystemTime::now()), reported: Mutex::new(Default::default()), @@ -29,7 +29,7 @@ impl PrecomputedSum { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to PrecomputedSum. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn delta( @@ -73,7 +73,7 @@ impl PrecomputedSum { .has_no_attribute_value .swap(false, Ordering::AcqRel) { - let value = self.value_map.no_attribute_tracker.get_value(); + let value = self.value_map.no_attribute_tracker.value.get_value(); let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); new_reported.insert(vec![], value); @@ -94,7 +94,7 @@ impl PrecomputedSum { let mut seen = HashSet::new(); for (attrs, tracker) in trackers.drain() { if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.get_value(); + let value = tracker.value.get_value(); let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); new_reported.insert(attrs.clone(), value); s_data.data_points.push(DataPoint { @@ -162,7 +162,7 @@ impl PrecomputedSum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -179,7 +179,7 @@ impl PrecomputedSum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 66af75734d..1ec61356ad 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -7,12 +7,36 @@ use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use opentelemetry::KeyValue; -use super::{AtomicTracker, Number}; -use super::{Increment, ValueMap}; +use super::{Aggregator, AtomicTracker, Number}; +use super::{AtomicallyUpdate, ValueMap}; + +struct Increment +where + T: AtomicallyUpdate, +{ + value: T::AtomicTracker, +} + +impl Aggregator for Increment +where + T: Number, +{ + type Config = (); + + fn create(_init: &()) -> Self { + Self { + value: T::new_atomic_tracker(T::default()), + } + } + + fn update(&self, _init: &(), measurement: T) { + self.value.add(measurement) + } +} /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum { - value_map: ValueMap, + value_map: ValueMap>, monotonic: bool, start: Mutex, } @@ -25,7 +49,7 @@ impl Sum { /// were made in. pub(crate) fn new(monotonic: bool) -> Self { Sum { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), monotonic, start: Mutex::new(SystemTime::now()), } @@ -33,7 +57,7 @@ impl Sum { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to Sum. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn delta( @@ -76,7 +100,11 @@ impl Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), + value: self + .value_map + .no_attribute_tracker + .value + .get_and_reset_value(), exemplars: vec![], }); } @@ -93,7 +121,7 @@ impl Sum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } @@ -152,7 +180,7 @@ impl Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -173,7 +201,7 @@ impl Sum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 6e0f139809..c4638eb28c 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -470,7 +470,6 @@ fn aggregate_fn( agg: &aggregation::Aggregation, kind: InstrumentKind, ) -> Result>> { - use aggregation::Aggregation; fn box_val( (m, ca): (impl internal::Measure, impl internal::ComputeAggregation), ) -> ( @@ -544,7 +543,6 @@ fn aggregate_fn( /// | Gauge | ✓ | ✓ | | ✓ | ✓ | /// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ | fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregation) -> Result<()> { - use aggregation::Aggregation; match agg { Aggregation::Default => Ok(()), Aggregation::ExplicitBucketHistogram { .. }