Skip to content

Commit

Permalink
chore: send negative value rateNotAvailable (#1966)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Aug 19, 2024
1 parent 4267113 commit d2fc8d7
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 54 deletions.
10 changes: 7 additions & 3 deletions pkg/daemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ limitations under the License.
package rater

import (
"math"
"time"

sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue"
)

const (
indexNotFound = -1
// rateNotAvailable is returned when the processing rate cannot be derived from the currently
// available pod data, a negative min is returned to indicate this.
rateNotAvailable = float64(math.MinInt)
)

// UpdateCount updates the count of processed messages for a pod at a given time
Expand All @@ -48,22 +52,22 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64 {
counts := q.Items()
if len(counts) <= 1 {
return 0
return rateNotAvailable
}
startIndex := findStartIndex(lookbackSeconds, counts)
// we consider the last but one element as the end index because the last element might be incomplete
// we can be sure that the last but one element in the queue is complete.
endIndex := len(counts) - 2
if startIndex == indexNotFound {
return 0
return rateNotAvailable
}

// time diff in seconds.
timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp
if timeDiff == 0 {
// if the time difference is 0, we return 0 to avoid division by 0
// this should not happen in practice because we are using a 10s interval
return 0
return rateNotAvailable
}

delta := float64(0)
Expand Down
60 changes: 30 additions & 30 deletions pkg/daemon/server/service/rater/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ func TestUpdateCount(t *testing.T) {
}

func TestCalculateRate(t *testing.T) {
t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnZero", func(t *testing.T) {
t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnRateNotAvailable", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)
// no data
assert.Equal(t, 0.0, CalculateRate(q, 10, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 10, "partition1"))

// only one data
now := time.Now()
tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20)
tc1.Update(&PodReadCount{"pod1", map[string]float64{"partition1": 5.0}})
q.Append(tc1)
assert.Equal(t, 0.0, CalculateRate(q, 10, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 10, "partition1"))
})

t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate", func(t *testing.T) {
Expand All @@ -147,9 +147,9 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc3)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1"))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 0.5, CalculateRate(q, 25, "partition1"))
// tc1 and tc2 are used to calculate the rate
Expand All @@ -174,9 +174,9 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc4)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1"))
// tc2 and tc3 are used to calculate the rate
assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1"))
// tc1, 2 and 3 are used to calculate the rate
Expand All @@ -203,11 +203,11 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc3)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 25, "partition1"))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 15.0, CalculateRate(q, 35, "partition1"))
})
Expand All @@ -230,11 +230,11 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc3)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 25, "partition1"))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 30.0, CalculateRate(q, 35, "partition1"))
})
Expand All @@ -257,11 +257,11 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc3)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 25, "partition1"))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 25.0, CalculateRate(q, 35, "partition1"))
})
Expand Down Expand Up @@ -292,9 +292,9 @@ func TestCalculateRate(t *testing.T) {

// partition1 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1"))
// tc2 and tc3 are used to calculate the rate
assert.Equal(t, 5.0, CalculateRate(q, 25, "partition1"))
// tc1, 2 and 3 are used to calculate the rate
Expand All @@ -303,29 +303,29 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 7.5, CalculateRate(q, 100, "partition1"))

// partition2 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition2"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition2"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition2"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition2"))
assert.Equal(t, 10.0, CalculateRate(q, 25, "partition2"))
assert.Equal(t, 10.5, CalculateRate(q, 35, "partition2"))
assert.Equal(t, 10.5, CalculateRate(q, 100, "partition2"))

// partition3 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition3"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition3"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition3"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition3"))
assert.Equal(t, 20.0, CalculateRate(q, 25, "partition3"))
assert.Equal(t, 10.0, CalculateRate(q, 35, "partition3"))
assert.Equal(t, 10.0, CalculateRate(q, 100, "partition3"))

// partition4 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition4"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition4"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition4"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition4"))
assert.Equal(t, 10.0, CalculateRate(q, 25, "partition4"))
assert.Equal(t, 5.0, CalculateRate(q, 35, "partition4"))
assert.Equal(t, 5.0, CalculateRate(q, 100, "partition4"))

// partition100 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition100"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition100"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition100"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition100"))
assert.Equal(t, 0.0, CalculateRate(q, 25, "partition100"))
assert.Equal(t, 0.0, CalculateRate(q, 35, "partition100"))
assert.Equal(t, 0.0, CalculateRate(q, 100, "partition100"))
Expand Down Expand Up @@ -359,9 +359,9 @@ func TestCalculateRate(t *testing.T) {

// partition1 rate
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition1"))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition1"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition1"))
// tc2 and tc3 are used to calculate the rate
assert.Equal(t, 111.0, CalculateRate(q, 25, "partition1"))
// tc1, 2 and 3 are used to calculate the rate
Expand All @@ -370,8 +370,8 @@ func TestCalculateRate(t *testing.T) {
assert.Equal(t, 111.0, CalculateRate(q, 100, "partition1"))

// partition2 rate
assert.Equal(t, 0.0, CalculateRate(q, 5, "partition2"))
assert.Equal(t, 0.0, CalculateRate(q, 15, "partition2"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5, "partition2"))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15, "partition2"))
assert.Equal(t, 111.0, CalculateRate(q, 25, "partition2"))
assert.Equal(t, 111.0, CalculateRate(q, 35, "partition2"))
assert.Equal(t, 111.0, CalculateRate(q, 100, "partition2"))
Expand Down
10 changes: 7 additions & 3 deletions pkg/mvtxdaemon/server/service/rater/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package rater

import (
"math"
"time"

sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue"
Expand All @@ -25,6 +26,9 @@ import (
const (
// indexNotFound is returned when the start index cannot be found in the queue.
indexNotFound = -1
// rateNotAvailable is returned when the processing rate cannot be derived from the currently
// available pod data, a negative min is returned to indicate this.
rateNotAvailable = float64(math.MinInt)
)

// UpdateCount updates the count for a given timestamp in the queue.
Expand All @@ -49,22 +53,22 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64 {
counts := q.Items()
if len(counts) <= 1 {
return 0
return rateNotAvailable
}
startIndex := findStartIndex(lookbackSeconds, counts)
// we consider the last but one element as the end index because the last element might be incomplete
// we can be sure that the last but one element in the queue is complete.
endIndex := len(counts) - 2
if startIndex == indexNotFound {
return 0
return rateNotAvailable
}

// time diff in seconds.
timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp
if timeDiff == 0 {
// if the time difference is 0, we return 0 to avoid division by 0
// this should not happen in practice because we are using a 10s interval
return 0
return rateNotAvailable
}

delta := float64(0)
Expand Down
36 changes: 18 additions & 18 deletions pkg/mvtxdaemon/server/service/rater/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ func TestUpdateCount(t *testing.T) {
}

func TestCalculateRate(t *testing.T) {
t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnZero", func(t *testing.T) {
t.Run("givenCollectedTimeLessThanTwo_whenCalculateRate_thenReturnRateNotAvailable", func(t *testing.T) {
q := sharedqueue.New[*TimestampedCounts](1800)
// no data
assert.Equal(t, 0.0, CalculateRate(q, 10))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 10))

// only one data
now := time.Now()
tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20)
tc1.Update(&PodReadCount{"pod1", 5.0})
q.Append(tc1)
assert.Equal(t, 0.0, CalculateRate(q, 10))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 10))
})

t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate", func(t *testing.T) {
Expand All @@ -134,9 +134,9 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc3)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 0.5, CalculateRate(q, 25))
// tc1 and tc2 are used to calculate the rate
Expand All @@ -161,9 +161,9 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc4)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15))
// tc2 and tc3 are used to calculate the rate
assert.Equal(t, 5.0, CalculateRate(q, 25))
// tc1, 2 and 3 are used to calculate the rate
Expand All @@ -190,11 +190,11 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc3)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 25))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 25))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 15.0, CalculateRate(q, 35))
})
Expand All @@ -217,11 +217,11 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc3)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 25))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 25))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 30.0, CalculateRate(q, 35))
})
Expand All @@ -244,11 +244,11 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc3)

// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 15))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15))
// no enough data collected within lookback seconds, expect rate 0
assert.Equal(t, 0.0, CalculateRate(q, 25))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 25))
// tc1 and tc2 are used to calculate the rate
assert.Equal(t, 25.0, CalculateRate(q, 35))
})
Expand Down Expand Up @@ -279,8 +279,8 @@ func TestCalculateRate(t *testing.T) {
q.Append(tc4)

// vertex rate
assert.Equal(t, 0.0, CalculateRate(q, 5))
assert.Equal(t, 0.0, CalculateRate(q, 15))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 5))
assert.Equal(t, rateNotAvailable, CalculateRate(q, 15))
assert.Equal(t, 25.0, CalculateRate(q, 25))
assert.Equal(t, 23.0, CalculateRate(q, 35))
assert.Equal(t, 23.0, CalculateRate(q, 100))
Expand Down

0 comments on commit d2fc8d7

Please sign in to comment.