diff --git a/pkg/daemon/server/service/rater/helper.go b/pkg/daemon/server/service/rater/helper.go index 929ac99d50..73683c06af 100644 --- a/pkg/daemon/server/service/rater/helper.go +++ b/pkg/daemon/server/service/rater/helper.go @@ -17,6 +17,7 @@ limitations under the License. package rater import ( + "math" "time" sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue" @@ -24,6 +25,9 @@ import ( const ( indexNotFound = -1 + // rateNotAvailable is returned when the processing rate cannot be derived from the currently + // available pod data, a negative min is returned to indicate this. + rateNotAvailable = float64(math.MinInt) ) // UpdateCount updates the count of processed messages for a pod at a given time @@ -48,14 +52,14 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64 { counts := q.Items() if len(counts) <= 1 { - return 0 + return rateNotAvailable } startIndex := findStartIndex(lookbackSeconds, counts) // we consider the last but one element as the end index because the last element might be incomplete // we can be sure that the last but one element in the queue is complete. endIndex := len(counts) - 2 if startIndex == indexNotFound { - return 0 + return rateNotAvailable } // time diff in seconds. @@ -63,7 +67,7 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec if timeDiff == 0 { // if the time difference is 0, we return 0 to avoid division by 0 // this should not happen in practice because we are using a 10s interval - return 0 + return rateNotAvailable } delta := float64(0) diff --git a/pkg/daemon/server/service/rater/helper_test.go b/pkg/daemon/server/service/rater/helper_test.go index e420bcc0dc..8873f99802 100644 --- a/pkg/daemon/server/service/rater/helper_test.go +++ b/pkg/daemon/server/service/rater/helper_test.go @@ -119,17 +119,17 @@ func TestUpdateCount(t *testing.T) { } func TestCalculateRate(t *testing.T) { - t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnZero", func(t *testing.T) { + t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnRateNotAvailable", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) // no data - assert.Equal(t, 0.0, CalculateRate(q, 10, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 10, "partition1")) // only one data now := time.Now() tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}}) q.Append(tc1) - assert.Equal(t, 0.0, CalculateRate(q, 10, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 10, "partition1")) }) t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { @@ -147,9 +147,9 @@ func TestCalculateRate(t *testing.T) { q.Append(tc3) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1")) // tc1 and tc2 are used to calculate the rate assert.Equal(t, 0.5, CalculateRate(q, 25, "partition1")) // tc1 and tc2 are used to calculate the rate @@ -174,9 +174,9 @@ func TestCalculateRate(t *testing.T) { q.Append(tc4) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1")) // tc2 and tc3 are used to calculate the rate assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1")) // tc1, 2 and 3 are used to calculate the rate @@ -203,11 +203,11 @@ func TestCalculateRate(t *testing.T) { q.Append(tc3) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 25, "partition1")) // tc1 and tc2 are used to calculate the rate assert.Equal(t, 15.0, CalculateRate(q, 35, "partition1")) }) @@ -230,11 +230,11 @@ func TestCalculateRate(t *testing.T) { q.Append(tc3) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 25, "partition1")) // tc1 and tc2 are used to calculate the rate assert.Equal(t, 30.0, CalculateRate(q, 35, "partition1")) }) @@ -257,11 +257,11 @@ func TestCalculateRate(t *testing.T) { q.Append(tc3) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 25, "partition1")) // tc1 and tc2 are used to calculate the rate assert.Equal(t, 25.0, CalculateRate(q, 35, "partition1")) }) @@ -292,9 +292,9 @@ func TestCalculateRate(t *testing.T) { // partition1 rate // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1")) // tc2 and tc3 are used to calculate the rate assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1")) // tc1, 2 and 3 are used to calculate the rate @@ -303,29 +303,29 @@ func TestCalculateRate(t *testing.T) { assert.Equal(t, 7.5, CalculateRate(q, 100, "partition1")) // partition2 rate - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition2")) - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition2")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition2")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition2")) assert.Equal(t, 10.0, CalculateRate(q, 25, "partition2")) assert.Equal(t, 10.5, CalculateRate(q, 35, "partition2")) assert.Equal(t, 10.5, CalculateRate(q, 100, "partition2")) // partition3 rate - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition3")) - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition3")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition3")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition3")) assert.Equal(t, 20.0, CalculateRate(q, 25, "partition3")) assert.Equal(t, 10.0, CalculateRate(q, 35, "partition3")) assert.Equal(t, 10.0, CalculateRate(q, 100, "partition3")) // partition4 rate - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition4")) - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition4")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition4")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition4")) assert.Equal(t, 10.0, CalculateRate(q, 25, "partition4")) assert.Equal(t, 5.0, CalculateRate(q, 35, "partition4")) assert.Equal(t, 5.0, CalculateRate(q, 100, "partition4")) // partition100 rate - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition100")) - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition100")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition100")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition100")) assert.Equal(t, 0.0, CalculateRate(q, 25, "partition100")) assert.Equal(t, 0.0, CalculateRate(q, 35, "partition100")) assert.Equal(t, 0.0, CalculateRate(q, 100, "partition100")) @@ -359,9 +359,9 @@ func TestCalculateRate(t *testing.T) { // partition1 rate // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1")) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1")) // tc2 and tc3 are used to calculate the rate assert.Equal(t, 111.0, CalculateRate(q, 25, "partition1")) // tc1, 2 and 3 are used to calculate the rate @@ -370,8 +370,8 @@ func TestCalculateRate(t *testing.T) { assert.Equal(t, 111.0, CalculateRate(q, 100, "partition1")) // partition2 rate - assert.Equal(t, 0.0, CalculateRate(q, 5, "partition2")) - assert.Equal(t, 0.0, CalculateRate(q, 15, "partition2")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition2")) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition2")) assert.Equal(t, 111.0, CalculateRate(q, 25, "partition2")) assert.Equal(t, 111.0, CalculateRate(q, 35, "partition2")) assert.Equal(t, 111.0, CalculateRate(q, 100, "partition2")) diff --git a/pkg/mvtxdaemon/server/service/rater/helper.go b/pkg/mvtxdaemon/server/service/rater/helper.go index 0973b3c5cb..ce9aac1ed4 100644 --- a/pkg/mvtxdaemon/server/service/rater/helper.go +++ b/pkg/mvtxdaemon/server/service/rater/helper.go @@ -17,6 +17,7 @@ limitations under the License. package rater import ( + "math" "time" sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue" @@ -25,6 +26,9 @@ import ( const ( // indexNotFound is returned when the start index cannot be found in the queue. indexNotFound = -1 + // rateNotAvailable is returned when the processing rate cannot be derived from the currently + // available pod data, a negative min is returned to indicate this. + rateNotAvailable = float64(math.MinInt) ) // UpdateCount updates the count for a given timestamp in the queue. @@ -49,14 +53,14 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64 { counts := q.Items() if len(counts) <= 1 { - return 0 + return rateNotAvailable } startIndex := findStartIndex(lookbackSeconds, counts) // we consider the last but one element as the end index because the last element might be incomplete // we can be sure that the last but one element in the queue is complete. endIndex := len(counts) - 2 if startIndex == indexNotFound { - return 0 + return rateNotAvailable } // time diff in seconds. @@ -64,7 +68,7 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec if timeDiff == 0 { // if the time difference is 0, we return 0 to avoid division by 0 // this should not happen in practice because we are using a 10s interval - return 0 + return rateNotAvailable } delta := float64(0) diff --git a/pkg/mvtxdaemon/server/service/rater/helper_test.go b/pkg/mvtxdaemon/server/service/rater/helper_test.go index 6ac878244c..f3e61e3390 100644 --- a/pkg/mvtxdaemon/server/service/rater/helper_test.go +++ b/pkg/mvtxdaemon/server/service/rater/helper_test.go @@ -106,17 +106,17 @@ func TestUpdateCount(t *testing.T) { } func TestCalculateRate(t *testing.T) { - t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnZero", func(t *testing.T) { + t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnRateNotAvailable", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) // no data - assert.Equal(t, 0.0, CalculateRate(q, 10)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 10)) // only one data now := time.Now() tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc1.Update(&PodReadCount{"pod1", 5.0}) q.Append(tc1) - assert.Equal(t, 0.0, CalculateRate(q, 10)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 10)) }) t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { @@ -134,9 +134,9 @@ func TestCalculateRate(t *testing.T) { q.Append(tc3) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5)) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15)) // tc1 and tc2 are used to calculate the rate assert.Equal(t, 0.5, CalculateRate(q, 25)) // tc1 and tc2 are used to calculate the rate @@ -161,9 +161,9 @@ func TestCalculateRate(t *testing.T) { q.Append(tc4) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5)) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15)) // tc2 and tc3 are used to calculate the rate assert.Equal(t, 5.0, CalculateRate(q, 25)) // tc1, 2 and 3 are used to calculate the rate @@ -190,11 +190,11 @@ func TestCalculateRate(t *testing.T) { q.Append(tc3) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5)) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15)) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 25)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 25)) // tc1 and tc2 are used to calculate the rate assert.Equal(t, 15.0, CalculateRate(q, 35)) }) @@ -217,11 +217,11 @@ func TestCalculateRate(t *testing.T) { q.Append(tc3) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5)) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15)) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 25)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 25)) // tc1 and tc2 are used to calculate the rate assert.Equal(t, 30.0, CalculateRate(q, 35)) }) @@ -244,11 +244,11 @@ func TestCalculateRate(t *testing.T) { q.Append(tc3) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 5)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5)) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 15)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15)) // no enough data collected within lookback seconds, expect rate 0 - assert.Equal(t, 0.0, CalculateRate(q, 25)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 25)) // tc1 and tc2 are used to calculate the rate assert.Equal(t, 25.0, CalculateRate(q, 35)) }) @@ -279,8 +279,8 @@ func TestCalculateRate(t *testing.T) { q.Append(tc4) // vertex rate - assert.Equal(t, 0.0, CalculateRate(q, 5)) - assert.Equal(t, 0.0, CalculateRate(q, 15)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 5)) + assert.Equal(t, rateNotAvailable, CalculateRate(q, 15)) assert.Equal(t, 25.0, CalculateRate(q, 25)) assert.Equal(t, 23.0, CalculateRate(q, 35)) assert.Equal(t, 23.0, CalculateRate(q, 100))