Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Add support for delta function #9795

Merged
merged 26 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
708b4dc
First step delta function
lamida Nov 1, 2024
e0ffc19
Enable delta functions.test
lamida Nov 1, 2024
4c02e0c
Add delta to engine_test.go
lamida Nov 1, 2024
a445c86
Clear up the comment
lamida Nov 1, 2024
405bf9c
Flip floatRate isCounter check condition
lamida Nov 1, 2024
1770b95
Fix delta and implement delta annotation
lamida Nov 10, 2024
4a39a6f
Clear up comment for isCounter and isRate parameter
lamida Nov 10, 2024
d7f26fd
Remove unneeded comment
lamida Nov 10, 2024
3d01143
Fix delta should not consider reset
lamida Dec 16, 2024
da1dbc0
Add more tests
lamida Jan 5, 2025
6dce90e
Extract delta from rate function (#10353)
lamida Jan 9, 2025
d2f1f9f
Add various our tests to delta
lamida Jan 9, 2025
ab08439
Add reset_hint annotation test
lamida Jan 9, 2025
4215a5a
Revert comment change
lamida Jan 9, 2025
9a34769
Revert whitespace removal
lamida Jan 9, 2025
27fce29
Add mix reset hint sample delta annotation
lamida Jan 10, 2025
4a5b1eb
Remove too much whitespace
lamida Jan 10, 2025
b9f89fb
Add more delta tests
lamida Jan 10, 2025
350a0e1
Remove unnecessary sprintf
lamida Jan 10, 2025
470e16a
Update pkg/streamingpromql/engine_test.go
lamida Jan 10, 2025
421a94a
Update pkg/streamingpromql/engine_test.go
lamida Jan 10, 2025
9806ae6
Update pkg/streamingpromql/engine_test.go
lamida Jan 10, 2025
14a0749
Update pkg/streamingpromql/engine_test.go
lamida Jan 10, 2025
32e6f55
Update pkg/streamingpromql/engine_test.go
lamida Jan 10, 2025
617cfeb
Simplify inline map
lamida Jan 10, 2025
e72c504
Really enable delta upstream tests
lamida Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,13 @@ func TestAnnotations(t *testing.T) {
metric{series="incompatible-custom-buckets"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1 2 1]}}
`

nativeHistogramsWithResetHintsMix := `
metric{reset_hint="unknown"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3
metric{reset_hint="gauge"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3
metric{reset_hint="gauge-unknown"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3
metric{reset_hint="unknown-gauge"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3
`

testCases := map[string]annotationTestCase{
"sum() with float and native histogram at same step": {
data: mixedFloatHistogramData,
Expand All @@ -2073,6 +2080,26 @@ func TestAnnotations(t *testing.T) {
expr: `sum(metric{type="histogram"})`,
},

"delta() over a native histogram with unknown CounterResetHint": {
data: nativeHistogramsWithResetHintsMix,
expr: `delta(metric{reset_hint="unknown"}[3m])`,
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a gauge: "metric" (1:7)`},
},
"delta() over a native histogram with gauge CounterResetHint": {
data: nativeHistogramsWithResetHintsMix,
expr: `delta(metric{reset_hint="gauge"}[3m])`,
},
"delta() with first point having gauge CounterResetHint and last point having unknown CounterResetHint": {
data: nativeHistogramsWithResetHintsMix,
expr: `delta(metric{reset_hint="gauge-unknown"}[3m])`,
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a gauge: "metric" (1:7)`},
},
"delta() with first point having unknown CounterResetHint and last point having gauge CounterResetHint": {
data: nativeHistogramsWithResetHintsMix,
expr: `delta(metric{reset_hint="unknown-gauge"}[3m])`,
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a gauge: "metric" (1:7)`},
},

"stdvar() with only floats": {
data: mixedFloatHistogramData,
expr: `stdvar(metric{type="float"})`,
Expand Down Expand Up @@ -2340,6 +2367,31 @@ func TestRateIncreaseAnnotations(t *testing.T) {
runAnnotationTests(t, testCases)
}

func TestDeltaAnnotations(t *testing.T) {
nativeHistogramsWithGaugeResetHints := `
metric{series="mix-float-nh"} 10 {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1] counter_reset_hint:gauge}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1] counter_reset_hint:gauge}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1] counter_reset_hint:gauge}}
metric{series="mixed-exponential-custom-buckets"} {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}}
`

testCases := map[string]annotationTestCase{
"delta() over series with mixed floats and native histograms": {
data: nativeHistogramsWithGaugeResetHints,
expr: `delta(metric{series="mix-float-nh"}[1m1s])`,
expectedWarningAnnotations: []string{
`PromQL warning: encountered a mix of histograms and floats for metric name "metric" (1:7)`,
},
},
"delta() over metric with incompatible schema": {
data: nativeHistogramsWithGaugeResetHints,
expr: `delta(metric{series="mixed-exponential-custom-buckets"}[1m1s])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:7)`,
},
},
}
runAnnotationTests(t, testCases)
}

func TestBinaryOperationAnnotations(t *testing.T) {
mixedFloatHistogramData := `
metric{type="float", series="1"} 0+1x3
Expand Down Expand Up @@ -2801,7 +2853,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {

for _, labels := range labelCombinations {
labelRegex := strings.Join(labels, "|")
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta"} {
for _, function := range []string{"rate", "increase", "changes", "resets", "deriv", "irate", "idelta", "delta"} {
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[45s])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s(series{label=~"(%s)"}[1m])`, function, labelRegex))
expressions = append(expressions, fmt.Sprintf(`sum(%s(series{label=~"(%s)"}[2m15s]))`, function, labelRegex))
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh),
"count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime),
"deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg),
"delta": FunctionOverRangeVectorOperatorFactory("delta", functions.Delta),
"deriv": FunctionOverRangeVectorOperatorFactory("deriv", functions.Deriv),
"exp": InstantVectorTransformationFunctionOperatorFactory("exp", functions.Exp),
"floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor),
Expand Down
83 changes: 80 additions & 3 deletions pkg/streamingpromql/operators/functions/rate_increase.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ var Increase = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

var Delta = FunctionOverRangeVectorDefinition{
StepFunc: delta,
SeriesMetadataFunction: DropSeriesName,
NeedsSeriesNamesForAnnotations: true,
}

// isRate is true for `rate` function, or false for `instant` function
func rate(isRate bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
Expand Down Expand Up @@ -169,7 +175,7 @@ func floatRate(isRate bool, fCount int, fHead []promql.FPoint, fTail []promql.FP
accumulate(fHead)
accumulate(fTail)

val := calculateFloatRate(isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
val := calculateFloatRate(true, isRate, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
return val
}

Expand Down Expand Up @@ -208,7 +214,7 @@ func calculateHistogramRate(isRate bool, rangeStart, rangeEnd int64, rangeSecond

// This is based on extrapolatedRate from promql/functions.go.
// https://github.com/prometheus/prometheus/pull/13725 has a good explanation of the intended behaviour here.
func calculateFloatRate(isRate bool, rangeStart, rangeEnd int64, rangeSeconds float64, firstPoint, lastPoint promql.FPoint, delta float64, count int) float64 {
func calculateFloatRate(isCounter, isRate bool, rangeStart, rangeEnd int64, rangeSeconds float64, firstPoint, lastPoint promql.FPoint, delta float64, count int) float64 {
durationToStart := float64(firstPoint.T-rangeStart) / 1000
durationToEnd := float64(rangeEnd-lastPoint.T) / 1000

Expand All @@ -222,7 +228,7 @@ func calculateFloatRate(isRate bool, rangeStart, rangeEnd int64, rangeSeconds fl
durationToStart = averageDurationBetweenSamples / 2
}

if delta > 0 && firstPoint.F >= 0 {
if isCounter && delta > 0 && firstPoint.F >= 0 {
// Counters cannot be negative. If we have any slope at all
// (i.e. delta went up), we can extrapolate the zero point
// of the counter. If the duration to the zero point is shorter
Expand Down Expand Up @@ -272,3 +278,74 @@ func rateSeriesValidator() RangeVectorSeriesValidationFunction {
lastCheckedMetricName = metricName
}
}

func delta(step *types.RangeVectorStepData, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
fCount := len(fHead) + len(fTail)

hHead, hTail := step.Histograms.UnsafePoints()
hCount := len(hHead) + len(hTail)

if fCount > 0 && hCount > 0 {
// We need either at least two histograms and no floats, or at least two floats and no histograms to calculate a delta.
// Otherwise, emit a warning and drop this sample.
emitAnnotation(annotations.NewMixedFloatsHistogramsWarning)
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
return 0, false, nil, nil
}

if fCount >= 2 {
val := floatDelta(fCount, fHead, fTail, step.RangeStart, step.RangeEnd, rangeSeconds)
return val, true, nil, nil
}

if hCount >= 2 {
val, err := histogramDelta(hCount, hHead, hTail, step.RangeStart, step.RangeEnd, rangeSeconds, emitAnnotation)
if err != nil {
err = NativeHistogramErrorToAnnotation(err, emitAnnotation)
return 0, false, nil, err
}
return 0, false, val, nil
}

return 0, false, nil, nil
}

func floatDelta(fCount int, fHead []promql.FPoint, fTail []promql.FPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64) float64 {
firstPoint := fHead[0]

var lastPoint promql.FPoint
if len(fTail) > 0 {
lastPoint = fTail[len(fTail)-1]
} else {
lastPoint = fHead[len(fHead)-1]
}

delta := lastPoint.F - firstPoint.F
return calculateFloatRate(false, false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, fCount)
}

func histogramDelta(hCount int, hHead []promql.HPoint, hTail []promql.HPoint, rangeStart int64, rangeEnd int64, rangeSeconds float64, emitAnnotation types.EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
firstPoint := hHead[0]

var lastPoint promql.HPoint
if len(hTail) > 0 {
lastPoint = hTail[len(hTail)-1]
} else {
lastPoint = hHead[len(hHead)-1]
}

if firstPoint.H.UsesCustomBuckets() != lastPoint.H.UsesCustomBuckets() {
return nil, histogram.ErrHistogramsIncompatibleSchema
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
}

delta, err := lastPoint.H.Copy().Sub(firstPoint.H)
if err != nil {
return nil, err
}
if firstPoint.H.CounterResetHint != histogram.GaugeType || lastPoint.H.CounterResetHint != histogram.GaugeType {
emitAnnotation(annotations.NewNativeHistogramNotGaugeWarning)
}

val := calculateHistogramRate(false, rangeStart, rangeEnd, rangeSeconds, firstPoint, lastPoint, delta, hCount)
return val, nil
}
50 changes: 50 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,23 @@ eval range from 0 to 4m step 1m increase(some_metric_count[1m1s])
{env="test", cluster="eu"} _ 180 183 183 183
{env="test", cluster="us"} _ 240 244 244 244

# Range query with delta.
eval range from 0 to 4m step 1m delta(some_metric_count[1m1s])
{env="prod", cluster="eu"} _ 61 61 61 61
{env="prod", cluster="us"} _ 122 122 122 122
{env="test", cluster="eu"} _ 183 183 183 183
{env="test", cluster="us"} _ 244 244 244 244

# If no series are matched, we shouldn't return any results.
eval range from 0 to 4m step 1m rate(some_nonexistent_metric[1m])
# Should return no results.

eval range from 0 to 4m step 1m increase(some_nonexistent_metric[1m])
# Should return no results.

eval range from 0 to 4m step 1m delta(some_nonexistent_metric[1m])
# Should return no results.

# Ensure we don't include points outside the range of each individual step.
#
# When evaluating a range selector, if there is no point with timestamp equal to the end of the range,
Expand Down Expand Up @@ -622,6 +632,46 @@ eval range from 0 to 8m step 1m irate(metric[3m1s])

clear

# Testing irate and idelta
# nh stands for native histogram
# nhcb stands for native histogram custom bucket
load 1m
metric{case="1 float"} 9
metric{case="2 floats"} 1 5
metric{case="all floats with reset"} 1 7 1 7 1 7 1 7
metric{case="2 floats with missing middle sample"} 1 _ 5
metric{case="2 floats with missing 2 middle samples"} 1 _ _ 5
metric{case="2 floats with missing last sample"} 1 5 _
metric{case="2 floats with NaN middle sample"} 1 NaN 5
metric{case="2 floats with NaN 2 middle samples"} 1 NaN NaN 5
metric{case="2 floats with NaN last sample"} 1 5 NaN
metric{case="2 floats with Inf middle sample"} 1 Inf 5
metric{case="2 floats with Inf 2 middle samples"} 1 Inf Inf 5
metric{case="2 floats with Inf last sample"} 1 5 Inf
metric{case="all NaN"} NaN NaN NaN NaN
metric{case="all Inf"} Inf Inf Inf Inf
metric{case="all nh up down"} {{schema:3 sum:0 count:0 buckets:[1 1 1] counter_reset_hint:gauge}} {{schema:3 sum:0 count:0 buckets:[4 1 1] counter_reset_hint:gauge}} {{schema:3 sum:0 count:0 buckets:[2 1 1] counter_reset_hint:gauge}}
metric{case="all nhcb up down"} {{schema:-53 sum:0 count:0 buckets:[1 1 1] custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 sum:0 count:0 buckets:[4 1 1] custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 sum:0 count:0 buckets:[1 1 1] custom_values:[5 10] counter_reset_hint:gauge}}

eval range from 0 to 8m step 1m delta(metric[3m1s])
{case="2 floats"} _ 6 12.066666666666666 6.066666666666666
{case="2 floats with Inf 2 middle samples"} _ +Inf +Inf 4.022222222222222 -Inf -Inf
{case="2 floats with Inf last sample"} _ 6 +Inf +Inf +Inf
{case="2 floats with Inf middle sample"} _ +Inf 6.033333333333333 6.033333333333333 -Inf
{case="2 floats with NaN 2 middle samples"} _ NaN NaN 4.022222222222222 NaN NaN
{case="2 floats with NaN last sample"} _ 6 NaN NaN NaN
{case="2 floats with NaN middle sample"} _ NaN 6.033333333333333 6.033333333333333 NaN
{case="2 floats with missing 2 middle samples"} _ _ _ 4.022222222222222
{case="2 floats with missing last sample"} _ 6 12.066666666666666 6.066666666666666
{case="2 floats with missing middle sample"} _ _ 6.033333333333333 6.033333333333333
{case="all Inf"} _ NaN NaN NaN NaN NaN
{case="all NaN"} _ NaN NaN NaN NaN NaN
{case="all floats with reset"} _ 9 0 6.033333333333333 -6.033333333333333 6.033333333333333 -6.033333333333333 6.033333333333333 0
{case="all nh up down"} _ {{schema:3 counter_reset_hint:gauge buckets:[4.5]}} {{schema:3 counter_reset_hint:gauge buckets:[1.5083333333333333]}} {{schema:3 counter_reset_hint:gauge buckets:[1.5083333333333333]}} {{schema:3 counter_reset_hint:gauge buckets:[-3.033333333333333]}}
{case="all nhcb up down"} _ {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge buckets:[4.5]}} {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge}} {{schema:-53 custom_values:[5 10] counter_reset_hint:gauge buckets:[-4.55]}}

clear

load 1m
some_metric_count{env="prod", cluster="eu"} _ _ _ 0+1x4
some_metric_count{env="prod", cluster="us"} _ _ _ 0+2x4
Expand Down
26 changes: 11 additions & 15 deletions pkg/streamingpromql/testdata/upstream/functions.test
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -243,24 +243,20 @@ load 5m
http_requests_counter{path="/foo"} {{schema:0 sum:0 count:0 buckets:[0 0 0]}}+{{schema:0 sum:1 count:2 buckets:[1 1 1]}}x5
http_requests_mix{path="/foo"} 0 50 100 {{schema:0 sum:0 count:0 buckets:[0 0 0] counter_reset_hint:gauge}} {{schema:0 sum:1 count:2 buckets:[1 1 1] counter_reset_hint:gauge}}

# Unsupported by streaming engine.
# eval instant at 20m delta(http_requests[20m])
# {path="/foo"} 200
# {path="/bar"} -200
eval instant at 20m delta(http_requests[20m])
{path="/foo"} 200
{path="/bar"} -200

# Unsupported by streaming engine.
# eval instant at 20m delta(http_requests_gauge[20m])
# {path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}}
eval instant at 20m delta(http_requests_gauge[20m])
{path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}}

# Unsupported by streaming engine.
# # delta emits warn annotation for non-gauge histogram types.
# eval_warn instant at 20m delta(http_requests_counter[20m])
# {path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}}
# delta emits warn annotation for non-gauge histogram types.
eval_warn instant at 20m delta(http_requests_counter[20m])
{path="/foo"} {{schema:0 sum:4 count:8 buckets:[4 4 4]}}

# Unsupported by streaming engine.
# # delta emits warn annotation for mix of histogram and floats.
# eval_warn instant at 20m delta(http_requests_mix[20m])
# #empty
# delta emits warn annotation for mix of histogram and floats.
eval_warn instant at 20m delta(http_requests_mix[20m])
#empty

clear

Expand Down
Loading