From 7068fd9362ef1f7f63398d2ec009f622ece9ad3b Mon Sep 17 00:00:00 2001 From: William Dumont Date: Tue, 13 Aug 2024 16:23:44 +0200 Subject: [PATCH] [processor/metricstransform]: Add scaling exponential histogram support (#34039) **Description:** This PR adds support for the exponential histograms for the `experimental_scale_value` in the metricstransformprocessor. The scaling works by scaling the middle value of the first bucket (the one that corresponds to the offset) and finding the offset corresponding to this new value (the method used is described here: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function). The buckets are actually unchanged because they are "shifted" by the new offset. I initially remapped all the values but I ended up always having the same buckets so I left the buckets untouched to make the code simpler and save on performance. **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29803 **Testing:** unit test + e2e local test --- ...icstransform-processor-scale-exp-hist.yaml | 27 ++++ processor/metricstransformprocessor/README.md | 2 +- .../metrics_testcase_builder_test.go | 60 ++++++++ ...rics_transform_processor_testcases_test.go | 145 ++++++++++++++++++ .../operation_scale_value.go | 78 +++++++++- 5 files changed, 303 insertions(+), 9 deletions(-) create mode 100644 .chloggen/metricstransform-processor-scale-exp-hist.yaml diff --git a/.chloggen/metricstransform-processor-scale-exp-hist.yaml b/.chloggen/metricstransform-processor-scale-exp-hist.yaml new file mode 100644 index 000000000000..ba3d39eb08b9 --- /dev/null +++ b/.chloggen/metricstransform-processor-scale-exp-hist.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: metricstransformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add scaling exponential histogram support + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29803] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/metricstransformprocessor/README.md b/processor/metricstransformprocessor/README.md index 2cbb3da3237f..bac2242119d5 100644 --- a/processor/metricstransformprocessor/README.md +++ b/processor/metricstransformprocessor/README.md @@ -107,7 +107,7 @@ processors: label_set: [labels...] # aggregation_type defines how data points will be aggregated; if action is aggregate_labels or aggregate_label_values, aggregation_type is required aggregation_type: {sum, mean, min, max, count, median} - # experimental_scale specifies the scalar to apply to values + # experimental_scale specifies the scalar to apply to values. Scaling exponential histograms inherently involves some loss of accuracy. experimental_scale: # value_actions contain a list of operations that will be performed on the selected label value_actions: diff --git a/processor/metricstransformprocessor/metrics_testcase_builder_test.go b/processor/metricstransformprocessor/metrics_testcase_builder_test.go index d402c69b5820..dc2142b2ca76 100644 --- a/processor/metricstransformprocessor/metrics_testcase_builder_test.go +++ b/processor/metricstransformprocessor/metrics_testcase_builder_test.go @@ -116,6 +116,66 @@ func (b builder) addHistogramDatapointWithMinMaxAndExemplars(start, ts pcommon.T return b } +type expHistogramConfig struct { + count uint64 + sum float64 + min float64 + max float64 + zeroThreshold float64 + zeroCount uint64 + scale int32 + positiveOffset int32 + positiveCount []uint64 + negativeOffset int32 + negativeCount []uint64 + exemplarValues []float64 +} + +func (b builder) addExpHistogramDatapoint(config expHistogramConfig) builder { + if b.metric.Type() != pmetric.MetricTypeExponentialHistogram { + panic(b.metric.Type().String()) + } + dp := b.metric.ExponentialHistogram().DataPoints().AppendEmpty() + dp.SetCount(config.count) + dp.SetSum(config.sum) + dp.SetMin(config.min) + dp.SetMax(config.max) + dp.SetZeroThreshold(config.zeroThreshold) + dp.SetZeroCount(config.zeroCount) + dp.SetScale(config.scale) + dp.Positive().SetOffset(config.positiveOffset) + dp.Positive().BucketCounts().FromRaw(config.positiveCount) + dp.Negative().SetOffset(config.negativeOffset) + dp.Negative().BucketCounts().FromRaw(config.negativeCount) + for ei := 0; ei < len(config.exemplarValues); ei++ { + exemplar := dp.Exemplars().AppendEmpty() + exemplar.SetTimestamp(1) + exemplar.SetDoubleValue(config.exemplarValues[ei]) + } + dp.SetStartTimestamp(1) + dp.SetTimestamp(1) + return b +} + +func buildExpHistogramBucket(m map[int]uint64) []uint64 { + if len(m) == 0 { + return []uint64{} + } + maxIndex := 0 + for index := range m { + if index > maxIndex { + maxIndex = index + } + } + + result := make([]uint64, maxIndex+1) + for index, count := range m { + result[index] = count + } + + return result +} + // setUnit sets the unit of this metric func (b builder) setUnit(unit string) builder { b.metric.SetUnit(unit) diff --git a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go index 85337e5d842b..cc6e5ff742d7 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go +++ b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go @@ -1634,6 +1634,151 @@ var ( addHistogramDatapointWithMinMaxAndExemplars(2, 2, 2, 40, 10, 30, []float64{20}, []uint64{1, 2}, []float64{10, 30}).build(), }, }, + { + name: "metric_experimental_scale_value_exp_histogram", + transforms: []internalTransform{ + { + MetricIncludeFilter: internalFilterStrict{include: "metric1"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: scaleValue, + Scale: 1000, + }, + }, + }, + }, + { + MetricIncludeFilter: internalFilterStrict{include: "metric2"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: scaleValue, + Scale: .1, + }, + }, + }, + }, + { + MetricIncludeFilter: internalFilterStrict{include: "metric3"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: scaleValue, + Scale: 100000, + }, + }, + }, + }, + { + MetricIncludeFilter: internalFilterStrict{include: "metric4"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: scaleValue, + Scale: 42.123, + }, + }, + }, + }, + }, + in: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric1"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 5, + sum: 1359, + scale: 4, + min: 10, + max: 500, + zeroThreshold: 5, + zeroCount: 1, + positiveOffset: 53, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 53: 1, 74: 1, 90: 2}), // 10, 100, 250, 499, 500 + exemplarValues: []float64{100, 300}, + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric2"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 10100.000123, + scale: 2, + min: 0.000123, + max: 10000, + positiveOffset: -52, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 78: 1, 105: 1}), // 0.000123, 100, 10000 + exemplarValues: []float64{100, 300}, + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric3"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 4.3678, + scale: 7, + min: 1.123, + max: 1.789, + positiveOffset: 21, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 48: 1, 86: 1}), // 1.123, 1.456, 1.789 + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric4"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 6.00003, + scale: 20, + min: 2, + max: 2.00002, + negativeOffset: 1048575, + negativeCount: buildExpHistogramBucket(map[int]uint64{0: 1, 8: 1, 16: 1}), // 2, 2.00001, 2.00002 + }).build(), + }, + out: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric1"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 5, + sum: 1359000, + scale: 4, + min: 10000, + max: 500000, + zeroThreshold: 5000, + zeroCount: 1, + positiveOffset: 212, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 53: 1, 74: 1, 90: 2}), + exemplarValues: []float64{100000, 300000}, + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric2"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 1010.0000123, + scale: 2, + min: 0.0000123, + max: 1000, + positiveOffset: -65, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 78: 1, 105: 1}), + exemplarValues: []float64{10, 30}, + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric3"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 436780, + scale: 7, + min: 112300, + max: 178900, + positiveOffset: 2147, + positiveCount: buildExpHistogramBucket(map[int]uint64{0: 1, 48: 1, 86: 1}), + }).build(), + metricBuilder(pmetric.MetricTypeExponentialHistogram, "metric4"). + addExpHistogramDatapoint(expHistogramConfig{ + count: 3, + sum: 252.73926368999997, + scale: 20, + min: 84.246, + max: 84.24684246, + negativeOffset: 6707253, + negativeCount: buildExpHistogramBucket(map[int]uint64{0: 1, 8: 1, 16: 1}), + }).build(), + }, + }, { name: "metric_experimental_scale_with_attr_filtering", transforms: []internalTransform{ diff --git a/processor/metricstransformprocessor/operation_scale_value.go b/processor/metricstransformprocessor/operation_scale_value.go index 0200b3eb36a4..c119d30c81ea 100644 --- a/processor/metricstransformprocessor/operation_scale_value.go +++ b/processor/metricstransformprocessor/operation_scale_value.go @@ -4,6 +4,8 @@ package metricstransformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor" import ( + "math" + "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -19,6 +21,9 @@ func scaleValueOp(metric pmetric.Metric, op internalOperation, f internalFilter) case pmetric.MetricTypeHistogram: scaleHistogramOp(metric, op, f) return + case pmetric.MetricTypeExponentialHistogram: + scaleExpHistogramOp(metric, op, f) + return default: return } @@ -60,14 +65,71 @@ func scaleHistogramOp(metric pmetric.Metric, op internalOperation, f internalFil bounds.SetAt(bi, bounds.At(bi)*op.configOperation.Scale) } - for exemplars, ei := dp.Exemplars(), 0; ei < exemplars.Len(); ei++ { - exemplar := exemplars.At(ei) - switch exemplar.ValueType() { - case pmetric.ExemplarValueTypeInt: - exemplar.SetIntValue(int64(float64(exemplar.IntValue()) * op.configOperation.Scale)) - case pmetric.ExemplarValueTypeDouble: - exemplar.SetDoubleValue(exemplar.DoubleValue() * op.configOperation.Scale) - } + scaleExemplars(dp.Exemplars(), &op) + } +} + +func scaleExpHistogramOp(metric pmetric.Metric, op internalOperation, f internalFilter) { + var dps = metric.ExponentialHistogram().DataPoints() + for i := 0; i < dps.Len(); i++ { + dp := dps.At(i) + if !f.matchAttrs(dp.Attributes()) { + continue + } + + if dp.HasSum() { + dp.SetSum(dp.Sum() * op.configOperation.Scale) + } + if dp.HasMin() { + dp.SetMin(dp.Min() * op.configOperation.Scale) + } + if dp.HasMax() { + dp.SetMax(dp.Max() * op.configOperation.Scale) + } + + dp.SetZeroThreshold(dp.ZeroThreshold() * op.configOperation.Scale) + + // For the buckets, we only need to change the offset. + // The bucket counts and the scale remain the same. + if len(dp.Positive().BucketCounts().AsRaw()) != 0 { + dp.Positive().SetOffset(updateOffset(dp.Scale(), dp.Positive().Offset(), &op)) + } + + if len(dp.Negative().BucketCounts().AsRaw()) != 0 { + dp.Negative().SetOffset(updateOffset(dp.Scale(), dp.Negative().Offset(), &op)) + } + + scaleExemplars(dp.Exemplars(), &op) + } +} + +func updateOffset(scale int32, offset int32, op *internalOperation) int32 { + // Take the middle of the first bucket. + base := math.Pow(2, math.Pow(2, float64(-scale))) + value := (math.Pow(base, float64(offset)) + math.Pow(base, float64(offset+1))) / 2 + + // Scale it according to the config. + scaledValue := value * op.configOperation.Scale + + // Find the new offset by mapping the scaled value. + return mapToIndex(scaledValue, int(scale)) +} + +// mapToIndex returns the index that corresponds to the given value on the scale. +// See https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function. +func mapToIndex(value float64, scale int) int32 { + scaleFactor := math.Ldexp(math.Log2E, scale) + return int32(math.Ceil(math.Log(value)*scaleFactor) - 1) +} + +func scaleExemplars(exemplars pmetric.ExemplarSlice, op *internalOperation) { + for e, ei := exemplars, 0; ei < e.Len(); ei++ { + exemplar := e.At(ei) + switch exemplar.ValueType() { + case pmetric.ExemplarValueTypeInt: + exemplar.SetIntValue(int64(float64(exemplar.IntValue()) * op.configOperation.Scale)) + case pmetric.ExemplarValueTypeDouble: + exemplar.SetDoubleValue(exemplar.DoubleValue() * op.configOperation.Scale) } } }