Skip to content

Commit

Permalink
MQE: Add support for delta function (#9795)
Browse files Browse the repository at this point in the history
* First step delta function

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Enable delta functions.test

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Add delta to engine_test.go

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Clear up the comment

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Flip floatRate isCounter check condition

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Fix delta and implement delta annotation

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Clear up comment for isCounter and isRate parameter

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Remove unneeded comment

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Fix delta should not consider reset

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Add more tests

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Extract delta from rate function (#10353)

* Extract delta from rate function

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Update pkg/streamingpromql/operators/functions/rate_increase.go

Co-authored-by: Charles Korn <[email protected]>

* Update pkg/streamingpromql/operators/functions/rate_increase.go

Co-authored-by: Charles Korn <[email protected]>

* Update pkg/streamingpromql/operators/functions/rate_increase.go

Co-authored-by: Charles Korn <[email protected]>

* Update pkg/streamingpromql/operators/functions/rate_increase.go

Co-authored-by: Charles Korn <[email protected]>

* Tidy up after applying PR suggestion

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Remove unnecessary head subslice

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Remove wrong placed annotation

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Remove extra copySchema

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Simplify native histogram sub schema

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Remove TODO

Signed-off-by: Jon Kartago Lamida <[email protected]>

* The lastPoint should be last index of the head

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Add delta counterResetHint test

Signed-off-by: Jon Kartago Lamida <[email protected]>

---------

Signed-off-by: Jon Kartago Lamida <[email protected]>
Co-authored-by: Charles Korn <[email protected]>

* Add various our tests to delta

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Add reset_hint annotation test

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Revert comment change

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Revert whitespace removal

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Add mix reset hint sample delta annotation

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Remove too much whitespace

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Add more delta tests

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Remove unnecessary sprintf

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Update pkg/streamingpromql/engine_test.go

Co-authored-by: Charles Korn <[email protected]>

* Update pkg/streamingpromql/engine_test.go

Co-authored-by: Charles Korn <[email protected]>

* Update pkg/streamingpromql/engine_test.go

Co-authored-by: Charles Korn <[email protected]>

* Update pkg/streamingpromql/engine_test.go

Co-authored-by: Charles Korn <[email protected]>

* Update pkg/streamingpromql/engine_test.go

Co-authored-by: Charles Korn <[email protected]>

* Simplify inline map

Signed-off-by: Jon Kartago Lamida <[email protected]>

* Really enable delta upstream tests

Signed-off-by: Jon Kartago Lamida <[email protected]>

---------

Signed-off-by: Jon Kartago Lamida <[email protected]>
Co-authored-by: Charles Korn <[email protected]>
  • Loading branch information
lamida and charleskorn authored Jan 10, 2025
1 parent 56c9edd commit 6511332
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 19 deletions.
54 changes: 53 additions & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,13 @@ func TestAnnotations(t *testing.T) {
metric{series="incompatible-custom-buckets"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1 2 1]}}
`

nativeHistogramsWithResetHintsMix := `
metric{reset_hint="unknown"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3
metric{reset_hint="gauge"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3
metric{reset_hint="gauge-unknown"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3
metric{reset_hint="unknown-gauge"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3
`

testCases := map[string]annotationTestCase{
"sum() with float and native histogram at same step": {
data: mixedFloatHistogramData,
Expand All @@ -2073,6 +2080,26 @@ func TestAnnotations(t *testing.T) {
expr: `sum(metric{type="histogram"})`,
},

"delta() over a native histogram with unknown CounterResetHint": {
data: nativeHistogramsWithResetHintsMix,
expr: `delta(metric{reset_hint="unknown"}[3m])`,
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a gauge: "metric" (1:7)`},
},
"delta() over a native histogram with gauge CounterResetHint": {
data: nativeHistogramsWithResetHintsMix,
expr: `delta(metric{reset_hint="gauge"}[3m])`,
},
"delta() with first point having gauge CounterResetHint and last point having unknown CounterResetHint": {
data: nativeHistogramsWithResetHintsMix,
expr: `delta(metric{reset_hint="gauge-unknown"}[3m])`,
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a gauge: "metric" (1:7)`},
},
"delta() with first point having unknown CounterResetHint and last point having gauge CounterResetHint": {
data: nativeHistogramsWithResetHintsMix,
expr: `delta(metric{reset_hint="unknown-gauge"}[3m])`,
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a gauge: "metric" (1:7)`},
},

"stdvar() with only floats": {
data: mixedFloatHistogramData,
expr: `stdvar(metric{type="float"})`,
Expand Down Expand Up @@ -2340,6 +2367,31 @@ func TestRateIncreaseAnnotations(t *testing.T) {
runAnnotationTests(t, testCases)
}

func TestDeltaAnnotations(t *testing.T) {
nativeHistogramsWithGaugeResetHints := `
metric{series="mix-float-nh"} 10 {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1] counter_reset_hint:gauge}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1] counter_reset_hint:gauge}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1] counter_reset_hint:gauge}}
metric{series="mixed-exponential-custom-buckets"} {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}}
`

testCases := map[string]annotationTestCase{
"delta() over series with mixed floats and native histograms": {
data: nativeHistogramsWithGaugeResetHints,
expr: `delta(metric{series="mix-float-nh"}[1m1s])`,
expectedWarningAnnotations: []string{
`PromQL warning: encountered a mix of histograms and floats for metric name "metric" (1:7)`,
},
},
"delta() over metric with incompatible schema": {
data: nativeHistogramsWithGaugeResetHints,
expr: `delta(metric{series="mixed-exponential-custom-buckets"}[1m1s])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:7)`,
},
},
}
runAnnotationTests(t, testCases)
}

func TestBinaryOperationAnnotations(t *testing.T) {
mixedFloatHistogramData := `
metric{type="float", series="1"} 0+1x3
Expand Down Expand Up @@ -2801,7 +2853,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta"} {
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta", "delta"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex))
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh),
"count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime),
"deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg),
"delta": FunctionOverRangeVectorOperatorFactory("delta", functions.Delta),
"deriv": FunctionOverRangeVectorOperatorFactory("deriv", functions.Deriv),
"exp": InstantVectorTransformationFunctionOperatorFactory("exp", functions.Exp),
"floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor),
Expand Down
83 changes: 80 additions & 3 deletions pkg/streamingpromql/operators/functions/rate_increase.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ var Increase = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

var Delta = FunctionOverRangeVectorDefinition{
StepFunc: delta,
SeriesMetadataFunction: DropSeriesName,
NeedsSeriesNamesForAnnotations: true,
}

// isRate is true for `rate` function, or false for `instant` function
func rate(isRate bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand Down Expand Up @@ -169,7 +175,7 @@ func floatRate(isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FP
accumulate(fHead)
accumulate(fTail)

val := calculateFloatRate(isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
val := calculateFloatRate(true, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
return val
}

Expand Down Expand Up @@ -208,7 +214,7 @@ func calculateHistogramRate(isRate bool, rangeStart, rangeEnd int64, rangeSecond

// This is based on extrapolatedRate from promql/functions.go.
// https://github.com/prometheus/prometheus/pull/13725 has a good explanation of the intended behaviour here.
func calculateFloatRate(isRate bool, rangeStart, rangeEnd int64, rangeSeconds float64, firstPoint, lastPoint promql.FPoint, delta float64, count int) float64 {
func calculateFloatRate(isCounter, isRate bool, rangeStart, rangeEnd int64, rangeSeconds float64, firstPoint, lastPoint promql.FPoint, delta float64, count int) float64 {
durationToStart := float64(firstPoint.T-rangeStart) / 1000
durationToEnd := float64(rangeEnd-lastPoint.T) / 1000

Expand All @@ -222,7 +228,7 @@ func calculateFloatRate(isRate bool, rangeStart, rangeEnd int64, rangeSeconds fl
durationToStart = averageDurationBetweenSamples / 2
}

if delta > 0 && firstPoint.F >= 0 {
if isCounter && delta > 0 && firstPoint.F >= 0 {
// Counters cannot be negative. If we have any slope at all
// (i.e. delta went up), we can extrapolate the zero point
// of the counter. If the duration to the zero point is shorter
Expand Down Expand Up @@ -272,3 +278,74 @@ func rateSeriesValidator() RangeVectorSeriesValidationFunction {
lastCheckedMetricName = metricName
}
}

func delta(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
fCount := len(fHead) + len(fTail)

hHead, hTail := step.Histograms.UnsafePoints()
hCount := len(hHead) + len(hTail)

if fCount > 0 && hCount > 0 {
// We need either at least two histograms and no floats, or at least two floats and no histograms to calculate a delta.
// Otherwise, emit a warning and drop this sample.
emitAnnotation(annotations.NewMixedFloatsHistogramsWarning)
return 0, false, nil, nil
}

if fCount >= 2 {
val := floatDelta(fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds)
return val, true, nil, nil
}

if hCount >= 2 {
val, err := histogramDelta(hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation)
if err != nil {
err = NativeHistogramErrorToAnnotation(err, emitAnnotation)
return 0, false, nil, err
}
return 0, false, val, nil
}

return 0, false, nil, nil
}

func floatDelta(fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 {
firstPoint := fHead[0]

var lastPoint promql.FPoint
if len(fTail) > 0 {
lastPoint = fTail[len(fTail)-1]
} else {
lastPoint = fHead[len(fHead)-1]
}

delta := lastPoint.F - firstPoint.F
return calculateFloatRate(false, false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
}

func histogramDelta(hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
firstPoint := hHead[0]

var lastPoint promql.HPoint
if len(hTail) > 0 {
lastPoint = hTail[len(hTail)-1]
} else {
lastPoint = hHead[len(hHead)-1]
}

if firstPoint.H.UsesCustomBuckets() != lastPoint.H.UsesCustomBuckets() {
return nil, histogram.ErrHistogramsIncompatibleSchema
}

delta, err := lastPoint.H.Copy().Sub(firstPoint.H)
if err != nil {
return nil, err
}
if firstPoint.H.CounterResetHint != histogram.GaugeType || lastPoint.H.CounterResetHint != histogram.GaugeType {
emitAnnotation(annotations.NewNativeHistogramNotGaugeWarning)
}

val := calculateHistogramRate(false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, hCount)
return val, nil
}
50 changes: 50 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,23 @@ eval range from 0 to 4m step 1m increase(some_metric_count[1m1s])
{env="test", cluster="eu"} _ 180 183 183 183
{env="test", cluster="us"} _ 240 244 244 244

# Range query with delta.
eval range from 0 to 4m step 1m delta(some_metric_count[1m1s])
{env="prod", cluster="eu"} _ 61 61 61 61
{env="prod", cluster="us"} _ 122 122 122 122
{env="test", cluster="eu"} _ 183 183 183 183
{env="test", cluster="us"} _ 244 244 244 244

# If no series are matched, we shouldn't return any results.
eval range from 0 to 4m step 1m rate(some_nonexistent_metric[1m])
# Should return no results.

eval range from 0 to 4m step 1m increase(some_nonexistent_metric[1m])
# Should return no results.

eval range from 0 to 4m step 1m delta(some_nonexistent_metric[1m])
# Should return no results.

# Ensure we don't include points outside the range of each individual step.
#
# When evaluating a range selector, if there is no point with timestamp equal to the end of the range,
Expand Down Expand Up @@ -622,6 +632,46 @@ eval range from 0 to 8m step 1m irate(metric[3m1s])

clear

# Testing irate and idelta
# nh stands for native histogram
# nhcb stands for native histogram custom bucket
load 1m
metric{case="1 float"} 9
metric{case="2 floats"} 1 5
metric{case="all floats with reset"} 1 7 1 7 1 7 1 7
metric{case="2 floats with missing middle sample"} 1 _ 5
metric{case="2 floats with missing 2 middle samples"} 1 _ _ 5
metric{case="2 floats with missing last sample"} 1 5 _
metric{case="2 floats with NaN middle sample"} 1 NaN 5
metric{case="2 floats with NaN 2 middle samples"} 1 NaN NaN 5
metric{case="2 floats with NaN last sample"} 1 5 NaN
metric{case="2 floats with Inf middle sample"} 1 Inf 5
metric{case="2 floats with Inf 2 middle samples"} 1 Inf Inf 5
metric{case="2 floats with Inf last sample"} 1 5 Inf
metric{case="all NaN"} NaN NaN NaN NaN
metric{case="all Inf"} Inf Inf Inf Inf
metric{case="all nh up down"} {{schema:3 sum:0 count:0 buckets:[1 1 1] counter_reset_hint:gauge}} {{schema:3 sum:0 count:0 buckets:[4 1 1] counter_reset_hint:gauge}} {{schema:3 sum:0 count:0 buckets:[2 1 1] counter_reset_hint:gauge}}
metric{case="all nhcb up down"} {{schema:-53 sum:0 count:0 buckets:[1 1 1] custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 sum:0 count:0 buckets:[4 1 1] custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 sum:0 count:0 buckets:[1 1 1] custom_values:[5 10] counter_reset_hint:gauge}}

eval range from 0 to 8m step 1m delta(metric[3m1s])
{case="2 floats"} _ 6 12.066666666666666 6.066666666666666
{case="2 floats with Inf 2 middle samples"} _ +Inf +Inf 4.022222222222222 -Inf -Inf
{case="2 floats with Inf last sample"} _ 6 +Inf +Inf +Inf
{case="2 floats with Inf middle sample"} _ +Inf 6.033333333333333 6.033333333333333 -Inf
{case="2 floats with NaN 2 middle samples"} _ NaN NaN 4.022222222222222 NaN NaN
{case="2 floats with NaN last sample"} _ 6 NaN NaN NaN
{case="2 floats with NaN middle sample"} _ NaN 6.033333333333333 6.033333333333333 NaN
{case="2 floats with missing 2 middle samples"} _ _ _ 4.022222222222222
{case="2 floats with missing last sample"} _ 6 12.066666666666666 6.066666666666666
{case="2 floats with missing middle sample"} _ _ 6.033333333333333 6.033333333333333
{case="all Inf"} _ NaN NaN NaN NaN NaN
{case="all NaN"} _ NaN NaN NaN NaN NaN
{case="all floats with reset"} _ 9 0 6.033333333333333 -6.033333333333333 6.033333333333333 -6.033333333333333 6.033333333333333 0
{case="all nh up down"} _ {{schema:3 counter_reset_hint:gauge buckets:[4.5]}} {{schema:3 counter_reset_hint:gauge buckets:[1.5083333333333333]}} {{schema:3 counter_reset_hint:gauge buckets:[1.5083333333333333]}} {{schema:3 counter_reset_hint:gauge buckets:[-3.033333333333333]}}
{case="all nhcb up down"} _ {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge buckets:[4.5]}} {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge buckets:[-4.55]}}

clear

load 1m
some_metric_count{env="prod", cluster="eu"} _ _ _ 0+1x4
some_metric_count{env="prod", cluster="us"} _ _ _ 0+2x4
Expand Down
26 changes: 11 additions & 15 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -243,24 +243,20 @@ load 5m
http_requests_counter{path="/foo"} {{schema:0 sum:0 count:0 buckets:[0 0 0]}}+{{schema:0 sum:1 count:2 buckets:[1 1 1]}}x5
http_requests_mix{path="/foo"} 0 50 100 {{schema:0 sum:0 count:0 buckets:[0 0 0] counter_reset_hint:gauge}} {{schema:0 sum:1 count:2 buckets:[1 1 1] counter_reset_hint:gauge}}

# Unsupported by streaming engine.
# eval instant at 20m delta(http_requests[20m])
# {path="/foo"} 200
# {path="/bar"} -200
eval instant at 20m delta(http_requests[20m])
{path="/foo"} 200
{path="/bar"} -200

# Unsupported by streaming engine.
# eval instant at 20m delta(http_requests_gauge[20m])
# {path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}}
eval instant at 20m delta(http_requests_gauge[20m])
{path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}}

# Unsupported by streaming engine.
# # delta emits warn annotation for non-gauge histogram types.
# eval_warn instant at 20m delta(http_requests_counter[20m])
# {path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}}
# delta emits warn annotation for non-gauge histogram types.
eval_warn instant at 20m delta(http_requests_counter[20m])
{path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}}

# Unsupported by streaming engine.
# # delta emits warn annotation for mix of histogram and floats.
# eval_warn instant at 20m delta(http_requests_mix[20m])
# #empty
# delta emits warn annotation for mix of histogram and floats.
eval_warn instant at 20m delta(http_requests_mix[20m])
#empty

clear

Expand Down

0 comments on commit 6511332

Please sign in to comment.