diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index d4b9bc3d97..981bba7c58 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -2054,6 +2054,13 @@ func TestAnnotations(t *testing.T) { metric{series="incompatible-custom-buckets"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1 2 1]}} ` + nativeHistogramsWithResetHintsMix := ` + metric{reset_hint="unknown"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3 + metric{reset_hint="gauge"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3 + metric{reset_hint="gauge-unknown"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3 + metric{reset_hint="unknown-gauge"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3 + ` + testCases := map[string]annotationTestCase{ "sum() with float and native histogram at same step": { data: mixedFloatHistogramData, @@ -2073,6 +2080,26 @@ func TestAnnotations(t *testing.T) { expr: `sum(metric{type="histogram"})`, }, + "delta() over a native histogram with unknown CounterResetHint": { + data: nativeHistogramsWithResetHintsMix, + expr: `delta(metric{reset_hint="unknown"}[3m])`, + expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a gauge: "metric" (1:7)`}, + }, + "delta() over a native histogram with gauge CounterResetHint": { + data: nativeHistogramsWithResetHintsMix, + expr: `delta(metric{reset_hint="gauge"}[3m])`, + }, + "delta() with first point having gauge CounterResetHint and last point having unknown CounterResetHint": { + data: nativeHistogramsWithResetHintsMix, + expr: `delta(metric{reset_hint="gauge-unknown"}[3m])`, + expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a gauge: "metric" (1:7)`}, + }, + "delta() with first point having unknown CounterResetHint and last point having gauge CounterResetHint": { + data: nativeHistogramsWithResetHintsMix, + expr: `delta(metric{reset_hint="unknown-gauge"}[3m])`, + expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a gauge: "metric" (1:7)`}, + }, + "stdvar() with only floats": { data: mixedFloatHistogramData, expr: `stdvar(metric{type="float"})`, @@ -2340,6 +2367,31 @@ func TestRateIncreaseAnnotations(t *testing.T) { runAnnotationTests(t, testCases) } +func TestDeltaAnnotations(t *testing.T) { + nativeHistogramsWithGaugeResetHints := ` + metric{series="mix-float-nh"} 10 {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1] counter_reset_hint:gauge}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1] counter_reset_hint:gauge}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1] counter_reset_hint:gauge}} + metric{series="mixed-exponential-custom-buckets"} {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} + ` + + testCases := map[string]annotationTestCase{ + "delta() over series with mixed floats and native histograms": { + data: nativeHistogramsWithGaugeResetHints, + expr: `delta(metric{series="mix-float-nh"}[1m1s])`, + expectedWarningAnnotations: []string{ + `PromQL warning: encountered a mix of histograms and floats for metric name "metric" (1:7)`, + }, + }, + "delta() over metric with incompatible schema": { + data: nativeHistogramsWithGaugeResetHints, + expr: `delta(metric{series="mixed-exponential-custom-buckets"}[1m1s])`, + expectedWarningAnnotations: []string{ + `PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:7)`, + }, + }, + } + runAnnotationTests(t, testCases) +} + func TestBinaryOperationAnnotations(t *testing.T) { mixedFloatHistogramData := ` metric{type="float", series="1"} 0+1x3 @@ -2801,7 +2853,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) { for _, labels := range labelCombinations { labelRegex := strings.Join(labels, "|") - for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta"} { + for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta", "delta"} { expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex)) expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex)) expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex)) diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 99415ccce7..7255b7f987 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -364,6 +364,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh), "count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime), "deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg), + "delta": FunctionOverRangeVectorOperatorFactory("delta", functions.Delta), "deriv": FunctionOverRangeVectorOperatorFactory("deriv", functions.Deriv), "exp": InstantVectorTransformationFunctionOperatorFactory("exp", functions.Exp), "floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor), diff --git a/pkg/streamingpromql/operators/functions/rate_increase.go b/pkg/streamingpromql/operators/functions/rate_increase.go index fa5e040424..f662ffd09b 100644 --- a/pkg/streamingpromql/operators/functions/rate_increase.go +++ b/pkg/streamingpromql/operators/functions/rate_increase.go @@ -29,6 +29,12 @@ var Increase = FunctionOverRangeVectorDefinition{ NeedsSeriesNamesForAnnotations: true, } +var Delta = FunctionOverRangeVectorDefinition{ + StepFunc: delta, + SeriesMetadataFunction: DropSeriesName, + NeedsSeriesNamesForAnnotations: true, +} + // isRate is true for `rate` function, or false for `instant` function func rate(isRate bool) RangeVectorStepFunction { return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { @@ -169,7 +175,7 @@ func floatRate(isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FP accumulate(fHead) accumulate(fTail) - val := calculateFloatRate(isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount) + val := calculateFloatRate(true, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount) return val } @@ -208,7 +214,7 @@ func calculateHistogramRate(isRate bool, rangeStart, rangeEnd int64, rangeSecond // This is based on extrapolatedRate from promql/functions.go. // https://github.com/prometheus/prometheus/pull/13725 has a good explanation of the intended behaviour here. -func calculateFloatRate(isRate bool, rangeStart, rangeEnd int64, rangeSeconds float64, firstPoint, lastPoint promql.FPoint, delta float64, count int) float64 { +func calculateFloatRate(isCounter, isRate bool, rangeStart, rangeEnd int64, rangeSeconds float64, firstPoint, lastPoint promql.FPoint, delta float64, count int) float64 { durationToStart := float64(firstPoint.T-rangeStart) / 1000 durationToEnd := float64(rangeEnd-lastPoint.T) / 1000 @@ -222,7 +228,7 @@ func calculateFloatRate(isRate bool, rangeStart, rangeEnd int64, rangeSeconds fl durationToStart = averageDurationBetweenSamples / 2 } - if delta > 0 && firstPoint.F >= 0 { + if isCounter && delta > 0 && firstPoint.F >= 0 { // Counters cannot be negative. If we have any slope at all // (i.e. delta went up), we can extrapolate the zero point // of the counter. If the duration to the zero point is shorter @@ -272,3 +278,74 @@ func rateSeriesValidator() RangeVectorSeriesValidationFunction { lastCheckedMetricName = metricName } } + +func delta(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + fHead, fTail := step.Floats.UnsafePoints() + fCount := len(fHead) + len(fTail) + + hHead, hTail := step.Histograms.UnsafePoints() + hCount := len(hHead) + len(hTail) + + if fCount > 0 && hCount > 0 { + // We need either at least two histograms and no floats, or at least two floats and no histograms to calculate a delta. + // Otherwise, emit a warning and drop this sample. + emitAnnotation(annotations.NewMixedFloatsHistogramsWarning) + return 0, false, nil, nil + } + + if fCount >= 2 { + val := floatDelta(fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds) + return val, true, nil, nil + } + + if hCount >= 2 { + val, err := histogramDelta(hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation) + if err != nil { + err = NativeHistogramErrorToAnnotation(err, emitAnnotation) + return 0, false, nil, err + } + return 0, false, val, nil + } + + return 0, false, nil, nil +} + +func floatDelta(fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 { + firstPoint := fHead[0] + + var lastPoint promql.FPoint + if len(fTail) > 0 { + lastPoint = fTail[len(fTail)-1] + } else { + lastPoint = fHead[len(fHead)-1] + } + + delta := lastPoint.F - firstPoint.F + return calculateFloatRate(false, false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount) +} + +func histogramDelta(hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) { + firstPoint := hHead[0] + + var lastPoint promql.HPoint + if len(hTail) > 0 { + lastPoint = hTail[len(hTail)-1] + } else { + lastPoint = hHead[len(hHead)-1] + } + + if firstPoint.H.UsesCustomBuckets() != lastPoint.H.UsesCustomBuckets() { + return nil, histogram.ErrHistogramsIncompatibleSchema + } + + delta, err := lastPoint.H.Copy().Sub(firstPoint.H) + if err != nil { + return nil, err + } + if firstPoint.H.CounterResetHint != histogram.GaugeType || lastPoint.H.CounterResetHint != histogram.GaugeType { + emitAnnotation(annotations.NewNativeHistogramNotGaugeWarning) + } + + val := calculateHistogramRate(false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, hCount) + return val, nil +} diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index 9105bc0184..6a8e796441 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -25,6 +25,13 @@ eval range from 0 to 4m step 1m increase(some_metric_count[1m1s]) {env="test", cluster="eu"} _ 180 183 183 183 {env="test", cluster="us"} _ 240 244 244 244 +# Range query with delta. +eval range from 0 to 4m step 1m delta(some_metric_count[1m1s]) + {env="prod", cluster="eu"} _ 61 61 61 61 + {env="prod", cluster="us"} _ 122 122 122 122 + {env="test", cluster="eu"} _ 183 183 183 183 + {env="test", cluster="us"} _ 244 244 244 244 + # If no series are matched, we shouldn't return any results. eval range from 0 to 4m step 1m rate(some_nonexistent_metric[1m]) # Should return no results. @@ -32,6 +39,9 @@ eval range from 0 to 4m step 1m rate(some_nonexistent_metric[1m]) eval range from 0 to 4m step 1m increase(some_nonexistent_metric[1m]) # Should return no results. +eval range from 0 to 4m step 1m delta(some_nonexistent_metric[1m]) + # Should return no results. + # Ensure we don't include points outside the range of each individual step. # # When evaluating a range selector, if there is no point with timestamp equal to the end of the range, @@ -622,6 +632,46 @@ eval range from 0 to 8m step 1m irate(metric[3m1s]) clear +# Testing irate and idelta +# nh stands for native histogram +# nhcb stands for native histogram custom bucket +load 1m + metric{case="1 float"} 9 + metric{case="2 floats"} 1 5 + metric{case="all floats with reset"} 1 7 1 7 1 7 1 7 + metric{case="2 floats with missing middle sample"} 1 _ 5 + metric{case="2 floats with missing 2 middle samples"} 1 _ _ 5 + metric{case="2 floats with missing last sample"} 1 5 _ + metric{case="2 floats with NaN middle sample"} 1 NaN 5 + metric{case="2 floats with NaN 2 middle samples"} 1 NaN NaN 5 + metric{case="2 floats with NaN last sample"} 1 5 NaN + metric{case="2 floats with Inf middle sample"} 1 Inf 5 + metric{case="2 floats with Inf 2 middle samples"} 1 Inf Inf 5 + metric{case="2 floats with Inf last sample"} 1 5 Inf + metric{case="all NaN"} NaN NaN NaN NaN + metric{case="all Inf"} Inf Inf Inf Inf + metric{case="all nh up down"} {{schema:3 sum:0 count:0 buckets:[1 1 1] counter_reset_hint:gauge}} {{schema:3 sum:0 count:0 buckets:[4 1 1] counter_reset_hint:gauge}} {{schema:3 sum:0 count:0 buckets:[2 1 1] counter_reset_hint:gauge}} + metric{case="all nhcb up down"} {{schema:-53 sum:0 count:0 buckets:[1 1 1] custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 sum:0 count:0 buckets:[4 1 1] custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 sum:0 count:0 buckets:[1 1 1] custom_values:[5 10] counter_reset_hint:gauge}} + +eval range from 0 to 8m step 1m delta(metric[3m1s]) + {case="2 floats"} _ 6 12.066666666666666 6.066666666666666 + {case="2 floats with Inf 2 middle samples"} _ +Inf +Inf 4.022222222222222 -Inf -Inf + {case="2 floats with Inf last sample"} _ 6 +Inf +Inf +Inf + {case="2 floats with Inf middle sample"} _ +Inf 6.033333333333333 6.033333333333333 -Inf + {case="2 floats with NaN 2 middle samples"} _ NaN NaN 4.022222222222222 NaN NaN + {case="2 floats with NaN last sample"} _ 6 NaN NaN NaN + {case="2 floats with NaN middle sample"} _ NaN 6.033333333333333 6.033333333333333 NaN + {case="2 floats with missing 2 middle samples"} _ _ _ 4.022222222222222 + {case="2 floats with missing last sample"} _ 6 12.066666666666666 6.066666666666666 + {case="2 floats with missing middle sample"} _ _ 6.033333333333333 6.033333333333333 + {case="all Inf"} _ NaN NaN NaN NaN NaN + {case="all NaN"} _ NaN NaN NaN NaN NaN + {case="all floats with reset"} _ 9 0 6.033333333333333 -6.033333333333333 6.033333333333333 -6.033333333333333 6.033333333333333 0 + {case="all nh up down"} _ {{schema:3 counter_reset_hint:gauge buckets:[4.5]}} {{schema:3 counter_reset_hint:gauge buckets:[1.5083333333333333]}} {{schema:3 counter_reset_hint:gauge buckets:[1.5083333333333333]}} {{schema:3 counter_reset_hint:gauge buckets:[-3.033333333333333]}} + {case="all nhcb up down"} _ {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge buckets:[4.5]}} {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge buckets:[-4.55]}} + +clear + load 1m some_metric_count{env="prod", cluster="eu"} _ _ _ 0+1x4 some_metric_count{env="prod", cluster="us"} _ _ _ 0+2x4 diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index 004ef8d199..4e716c1185 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -243,24 +243,20 @@ load 5m http_requests_counter{path="/foo"} {{schema:0 sum:0 count:0 buckets:[0 0 0]}}+{{schema:0 sum:1 count:2 buckets:[1 1 1]}}x5 http_requests_mix{path="/foo"} 0 50 100 {{schema:0 sum:0 count:0 buckets:[0 0 0] counter_reset_hint:gauge}} {{schema:0 sum:1 count:2 buckets:[1 1 1] counter_reset_hint:gauge}} -# Unsupported by streaming engine. -# eval instant at 20m delta(http_requests[20m]) -# {path="/foo"} 200 -# {path="/bar"} -200 +eval instant at 20m delta(http_requests[20m]) + {path="/foo"} 200 + {path="/bar"} -200 -# Unsupported by streaming engine. -# eval instant at 20m delta(http_requests_gauge[20m]) -# {path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}} +eval instant at 20m delta(http_requests_gauge[20m]) + {path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}} -# Unsupported by streaming engine. -# # delta emits warn annotation for non-gauge histogram types. -# eval_warn instant at 20m delta(http_requests_counter[20m]) -# {path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}} +# delta emits warn annotation for non-gauge histogram types. +eval_warn instant at 20m delta(http_requests_counter[20m]) + {path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}} -# Unsupported by streaming engine. -# # delta emits warn annotation for mix of histogram and floats. -# eval_warn instant at 20m delta(http_requests_mix[20m]) -# #empty +# delta emits warn annotation for mix of histogram and floats. +eval_warn instant at 20m delta(http_requests_mix[20m]) + #empty clear