Skip to content

Commit

Permalink
bbr half window
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB committed Nov 2, 2023
1 parent 34042f3 commit d10a934
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
10 changes: 8 additions & 2 deletions pkg/ratelimit/bbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,12 @@ func (l *bbr) timespan(lastTime time.Time) int {
return l.cfg.Bucket
}

func (l *bbr) getBDP() float64 {
return float64(l.getMaxPASS()*l.getMinRT()*l.bucketPerSecond) / 1e6
}

func (l *bbr) getMaxInFlight() int64 {
return int64(math.Floor(float64(l.getMaxPASS()*l.getMinRT()*l.bucketPerSecond)/1e6) + 0.5)
return int64(math.Floor(l.getBDP()) + 0.5)
}

func (l *bbr) getMaxPASS() int64 {
Expand Down Expand Up @@ -280,7 +284,9 @@ func (l *bbr) checkFullStatus() {

l.inCheck.Store(0)

if raises > 0 && positive > negative && l.bbrStatus.getMaxInFlight() == inf {
check1 := raises > 0 && positive > negative
check2 := l.getBDP() > 1.0
if check1 && check2 && l.bbrStatus.getMaxInFlight() == inf {
maxInFlight := l.getMaxInFlight()
l.bbrStatus.storeMaxInFlight(maxInFlight)
l.bbrStatus.storeMinRT(l.getMinRT())
Expand Down
41 changes: 30 additions & 11 deletions tests/integrations/client/limit_and_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ func (suite *limitTestSuite) getHeader() *pdpb.RequestHeader {

func (suite *limitTestSuite) TestLimitStoreHeartbeart() {
re := suite.Require()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/slowHeartbeat", `return()`))
input := map[string]interface{}{
"enable-grpc-rate-limit": "true",
}
Expand Down Expand Up @@ -175,9 +173,30 @@ func (suite *limitTestSuite) TestLimitStoreHeartbeart() {
res, err := suite.rawClient.StoreHeartbeat(suite.ctx, in)
re.NoError(err)
re.Nil(res.Header.Error)
var breakFlag atomic.Bool

var wg sync.WaitGroup
for i := 0; i < 100 && !breakFlag.Load(); i++ {
success := int32(0)
fail := int32(0)
for i := 0; i < 50; i++ {
time.Sleep(250 * time.Millisecond)
wg.Add(1)
go func() {
res, err = suite.rawClient.StoreHeartbeat(suite.ctx, in)
if err == nil && res.Header.Error != nil {
atomic.AddInt32(&fail, 1)
} else {
atomic.AddInt32(&success, 1)
}
wg.Done()
}()
}
wg.Wait()
re.Equal(success, int32(50))

re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/slowHeartbeat", `return()`))
var breakFlag atomic.Bool

for i := 0; i < 50 && !breakFlag.Load(); i++ {
time.Sleep(250 * time.Millisecond)
wg.Add(1)
go func() {
Expand All @@ -190,9 +209,9 @@ func (suite *limitTestSuite) TestLimitStoreHeartbeart() {
}
wg.Wait()
re.True(breakFlag.Load())
success := int32(0)
fail := int32(0)
for i := 0; i < 50; i++ {
success = int32(0)
fail = int32(0)
for i := 0; i < 20; i++ {
time.Sleep(250 * time.Millisecond)
wg.Add(1)
go func() {
Expand All @@ -206,9 +225,9 @@ func (suite *limitTestSuite) TestLimitStoreHeartbeart() {
}()
}
wg.Wait()
re.Less(success, int32(30))
re.Greater(success, int32(20))
re.Less(fail, int32(30))
re.Greater(fail, int32(20))
re.Less(success, int32(15))
re.Greater(success, int32(5))
re.Less(fail, int32(15))
re.Greater(fail, int32(5))
suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/slowHeartbeat"))
}

0 comments on commit d10a934

Please sign in to comment.