diff --git a/engine/engine_test.go b/engine/engine_test.go index 6cfd42ff..68005959 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -637,6 +637,15 @@ or end: time.Unix(3000, 0), step: 2 * time.Second, }, + { + name: "rate with counter reset and step larger than window", + load: `load 30s + http_requests_total{pod="nginx-1", series="1"} 0+1x3 0+1x2 0+1x3`, + query: `rate(http_requests_total[1m])`, + start: time.Unix(0, 0), + end: time.Unix(200, 0), + step: 90 * time.Second, + }, { name: "sum rate", load: `load 30s diff --git a/ringbuffer/rate.go b/ringbuffer/rate.go index 8df86953..caeb3343 100644 --- a/ringbuffer/rate.go +++ b/ringbuffer/rate.go @@ -36,6 +36,7 @@ type RateBuffer struct { // last is the last sample in the current evaluation step. last Sample + currentMint int64 selectRange int64 step int64 offset int64 @@ -83,6 +84,7 @@ func NewRateBuffer(opts query.Options, isCounter, isRate bool, selectRange, offs stepRanges: stepRanges, firstSamples: firstSamples, last: Sample{T: math.MinInt64}, + currentMint: math.MaxInt64, } } @@ -93,7 +95,7 @@ func (r *RateBuffer) MaxT() int64 { return r.last.T } func (r *RateBuffer) Push(t int64, v Value) { // Detect resets and store the current and previous sample so that // the rate is properly adjusted. - if v.H != nil && r.last.V.H != nil { + if r.last.T >= r.currentMint && v.H != nil && r.last.V.H != nil { if v.H.DetectReset(r.last.V.H) { r.resets = append(r.resets, Sample{ T: r.last.T, @@ -104,7 +106,7 @@ func (r *RateBuffer) Push(t int64, v Value) { V: Value{H: v.H.Copy()}, }) } - } else if r.last.V.F > v.F { + } else if r.last.T >= r.currentMint && r.last.V.F > v.F { r.resets = append(r.resets, Sample{T: r.last.T, V: Value{F: r.last.V.F}}) r.resets = append(r.resets, Sample{T: t, V: Value{F: v.F}}) } @@ -142,7 +144,7 @@ func (r *RateBuffer) Push(t int64, v Value) { } func (r *RateBuffer) Reset(mint int64, evalt int64) { - r.evalTs = evalt + r.currentMint, r.evalTs = mint, evalt if r.stepRanges[0].mint == mint { return }