Skip to content

Commit

Permalink
add api for bbr
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB committed Oct 31, 2023
1 parent 0e1bc57 commit 6973bc8
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 96 deletions.
44 changes: 33 additions & 11 deletions pkg/ratelimit/bbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
)

const (
inf = int64(^uint64(0) >> 1)
inf = int64(^uint64(0) >> 1)
infRT = int64(time.Hour)

defaultWindowSize = time.Second * 10
defaultBucketSize = 100
Expand Down Expand Up @@ -193,7 +194,7 @@ func (l *bbr) getMinRT() int64 {
}
}
rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator window.Iterator) float64 {
var result = float64(time.Minute)
var result = float64(infRT)
for i := 1; iterator.Next() && i < l.cfg.Bucket; i++ {
bucket := iterator.Bucket()
if len(bucket.Points) == 0 {
Expand All @@ -204,15 +205,17 @@ func (l *bbr) getMinRT() int64 {
total += p
}
avg := total / float64(bucket.Count)
avg = math.Max(1., avg)
result = math.Min(result, avg)
}
return result
})))
if rawMinRT == int64(time.Minute) {
return rawMinRT
// if rtStat is empty, rawMinRT will be zero.
if rawMinRT < 1 {
rawMinRT = infRT
}
if rawMinRT <= 0 {
rawMinRT = 1
if rawMinRT == inf {
return rawMinRT
}
l.minRtCache.Store(&cache{
val: rawMinRT,
Expand All @@ -236,22 +239,41 @@ func (l *bbr) checkFullStatus() {
negative := 0
raises := math.Ceil(l.inFlightStat.Reduce(func(iterator window.Iterator) float64 {
var result = 0.
for i := 1; iterator.Next() && i < l.cfg.Bucket; i++ {
i := 1
for ; iterator.Next() && i < l.cfg.Bucket/2; i++ {
bucket := iterator.Bucket()
total := 0.0
if len(bucket.Points) == 0 {
for _, p := range bucket.Points {
total += p
}
result += total
if total > 1e-6 {
positive++
} else if total < -1e-6 {
negative++
continue
}
if positive < negative {
break
}
}
if positive <= 0 {
return result
}
for ; iterator.Next() && i < l.cfg.Bucket/2; i++ {
bucket := iterator.Bucket()
total := 0.0
for _, p := range bucket.Points {
total += p
}
result += total
if total > 0 {
if total > 1e-6 {
positive++
} else {
} else if total < -1e-6 {
negative++
}
if positive < negative {
break
}
}
return result
}))
Expand Down
8 changes: 4 additions & 4 deletions pkg/ratelimit/bbr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestBBRMinRt(t *testing.T) {
_, feedback := createConcurrencyFeedback()
bbr := newBBR(cfg, feedback)
// default max min rt is equal to maxFloat64.
re.Equal(int64(60000000000), bbr.getMinRT())
re.Equal(int64(3600000000000), bbr.getMinRT())

for i := 0; i < 10; i++ {
var wg sync.WaitGroup
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestBDP(t *testing.T) {
re := require.New(t)
_, feedback := createConcurrencyFeedback()
bbr := newBBR(cfg, feedback)
re.Equal(int64(600000), bbr.getMaxInFlight())
re.Equal(int64(36000000), bbr.getMaxInFlight())

for i := 0; i < 10; i++ {
for j := 0; j < 100; j++ {
Expand Down Expand Up @@ -184,8 +184,8 @@ func TestFullStatus(t *testing.T) {
time.Sleep(bucketDuration)
}
maxInFlight := bbr.bbrStatus.getMaxInFlight()
re.LessOrEqual(int64(7), maxInFlight)
re.GreaterOrEqual(int64(9), maxInFlight)
re.LessOrEqual(int64(6), maxInFlight)
re.GreaterOrEqual(int64(10), maxInFlight)
re.Equal(cl.limit, uint64(maxInFlight))
re.LessOrEqual(int64(200000), bbr.bbrStatus.getMinRT())
re.GreaterOrEqual(int64(220000), bbr.bbrStatus.getMinRT())
Expand Down
9 changes: 6 additions & 3 deletions pkg/ratelimit/concurrency_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package ratelimit

import "github.com/tikv/pd/pkg/utils/syncutil"
import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

type concurrencyLimiter struct {
mu syncutil.RWMutex
Expand Down Expand Up @@ -53,18 +55,19 @@ func (l *concurrencyLimiter) getLimit() uint64 {
return l.limit
}

func (l *concurrencyLimiter) tryToSetLimit(limit uint64) {
func (l *concurrencyLimiter) tryToSetLimit(limit uint64) bool {
l.mu.Lock()
defer l.mu.Unlock()
if limit < l.limit {
l.limit = limit
return true
}
return false
}

func (l *concurrencyLimiter) setLimit(limit uint64) {
l.mu.Lock()
defer l.mu.Unlock()

l.limit = limit
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/ratelimit/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func (l *Controller) GetConcurrencyLimiterStatus(label string) (limit uint64, cu
return 0, 0
}

// GetBBRStatus returns the status of a given label's BBR.
func (l *Controller) GetBBRStatus(label string) (enable bool, limit int64) {
if limit, exist := l.limiters.Load(label); exist {
return limit.(*limiter).getBBRStatus()
}
return false, 0
}

// IsInAllowList returns whether this label is in allow list.
// If returns true, the given label won't be limited
func (l *Controller) IsInAllowList(label string) bool {
Expand Down
117 changes: 113 additions & 4 deletions pkg/ratelimit/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type changeAndResult struct {
release int
waitDuration time.Duration
checkStatusFunc func(string)
requestInterval time.Duration
}

type labelCase struct {
Expand All @@ -56,19 +57,26 @@ func runMulitLabelLimiter(t *testing.T, limiter *Controller, testCase []labelCas
time.Sleep(rd.waitDuration)
for i := 0; i < rd.totalRequest; i++ {
wg.Add(1)
if rd.requestInterval > 0 {
time.Sleep(rd.requestInterval)
}
go func() {
countRateLimiterHandleResult(limiter, cas.label, &successCount, &failedCount, &lock, &wg, r)
}()
}
wg.Wait()
re.Equal(rd.fail, failedCount)
re.Equal(rd.success, successCount)
if rd.fail >= 0 {
re.Equal(rd.fail, failedCount)
}
if rd.success >= 0 {
re.Equal(rd.success, successCount)
}
for i := 0; i < rd.release; i++ {
r.release()
}
rd.checkStatusFunc(cas.label)
failedCount -= rd.fail
successCount -= rd.success
failedCount = 0
successCount = 0
}
caseWG.Done()
}()
Expand Down Expand Up @@ -411,6 +419,107 @@ func TestControllerWithTwoLimiters(t *testing.T) {
runMulitLabelLimiter(t, limiter, testCase)
}

func TestControllerWithEnableBBR(t *testing.T) {
t.Parallel()
re := require.New(t)
limiter := NewController()
testCase := []labelCase{
{
label: "test1",
round: []changeAndResult{
{
opt: UpdateDimensionConfigForTest(&DimensionConfig{
EnableBBR: true,
}, optsForTest...),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.True(status&BBRChanged != 0)
},
totalRequest: 200,
fail: -1,
success: -1,
release: 0,
waitDuration: time.Second,
requestInterval: 10 * time.Millisecond,
checkStatusFunc: func(label string) {
time.Sleep(bucketDuration)
bbr, limit := limiter.GetBBRStatus(label)
re.Less(int64(1), limit)
re.Less(limit, inf)
re.True(bbr)
climit, current := limiter.GetConcurrencyLimiterStatus(label)
re.Less(current, uint64(200))
re.Less(uint64(1), current)
re.Equal(climit, current)
},
},
{
opt: UpdateDimensionConfigForTest(&DimensionConfig{
ConcurrencyLimit: 200,
EnableBBR: true,
}, optsForTest...),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.True(status&BBRNoChange != 0)
re.True(status&ConcurrencyNoChange != 0)
},
totalRequest: 0,
fail: 0,
success: 0,
release: 20,
waitDuration: time.Second,
checkStatusFunc: func(label string) {
climit, current := limiter.GetConcurrencyLimiterStatus(label)
re.Greater(climit, current)
},
},
{
opt: UpdateDimensionConfigForTest(&DimensionConfig{
ConcurrencyLimit: 200,
EnableBBR: true,
}, optsForTest...),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.True(status&BBRNoChange != 0)
re.True(status&ConcurrencyNoChange != 0)
},
totalRequest: 100,
fail: 80,
success: 20,
release: 0,
waitDuration: time.Second,
checkStatusFunc: func(label string) {
climit, current := limiter.GetConcurrencyLimiterStatus(label)
re.Equal(climit, current)
},
},
{
opt: UpdateDimensionConfigForTest(&DimensionConfig{
ConcurrencyLimit: 200,
EnableBBR: false,
}, optsForTest...),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.True(status&BBRDeleted != 0)
re.True(status&ConcurrencyNoChange != 0)
},
totalRequest: 300,
fail: -1,
success: -1,
release: 0,
waitDuration: time.Second,
checkStatusFunc: func(label string) {
climit, current := limiter.GetConcurrencyLimiterStatus(label)
re.Equal(uint64(200), current)
re.Equal(uint64(200), climit)
},
},
},
},
}
runMulitLabelLimiter(t, limiter, testCase)
}

func countRateLimiterHandleResult(limiter *Controller, label string, successCount *int,
failedCount *int, lock *syncutil.Mutex, wg *sync.WaitGroup, r *releaseUtil) {
doneFucn, err := limiter.Allow(label)
Expand Down
57 changes: 19 additions & 38 deletions pkg/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (l *limiter) getBBRStatus() (enable bool, limit int64) {
return false, 0
}

func (l *limiter) updateBBRConfig(enable bool) UpdateStatus {
func (l *limiter) updateBBRConfig(enable bool, o ...bbrOption) UpdateStatus {
oldEnableBBR, _ := l.getBBRStatus()
if oldEnableBBR == enable {
return BBRNoChange
Expand All @@ -143,30 +143,12 @@ func (l *limiter) updateBBRConfig(enable bool) UpdateStatus {
l.concurrency = newConcurrencyLimiter(uint64(inf))
}
fb := func(s *bbrStatus) {
l.concurrency.tryToSetLimit(uint64(s.getMaxInFlight()))
}
l.bbr = newBBR(newConfig(), fb)
return BBRChanged
}

// only used in test.
func (l *limiter) updateBBRConfigForTest(enable bool, o ...bbrOption) UpdateStatus {
oldEnableBBR, _ := l.getBBRStatus()
if oldEnableBBR == enable {
return BBRNoChange
}
if !enable {
l.deleteBBR()
return BBRDeleted
}
l.mu.Lock()
defer l.mu.Unlock()
l.cfg.EnableBBR = enable
if l.concurrency == nil {
l.concurrency = newConcurrencyLimiter(uint64(inf))
}
fb := func(s *bbrStatus) {
l.concurrency.tryToSetLimit(uint64(s.getMaxInFlight()))
if s.getMinRT() == infRT {
current := l.concurrency.getCurrent()
l.concurrency.tryToSetLimit(current)
} else {
l.concurrency.tryToSetLimit(uint64(s.getMaxInFlight()))
}
}
l.bbr = newBBR(newConfig(o...), fb)
return BBRChanged
Expand All @@ -186,10 +168,17 @@ func (l *limiter) updateConcurrencyConfig(limit uint64) UpdateStatus {
defer l.mu.Unlock()
l.cfg.ConcurrencyLimit = limit
if l.concurrency != nil {
l.concurrency.setLimit(limit)
} else {
l.concurrency = newConcurrencyLimiter(limit)
if bbr := l.bbr; bbr != nil && bbr.bbrStatus.getMaxInFlight() != inf {
if l.concurrency.tryToSetLimit(limit) {
return ConcurrencyChanged
}
return ConcurrencyNoChange
} else {
l.concurrency.setLimit(limit)
return ConcurrencyChanged
}
}
l.concurrency = newConcurrencyLimiter(limit)
return ConcurrencyChanged
}

Expand All @@ -216,18 +205,10 @@ func (l *limiter) updateQPSConfig(limit float64, burst int) UpdateStatus {
return QPSChanged
}

func (l *limiter) updateDimensionConfig(cfg *DimensionConfig) UpdateStatus {
status := l.updateQPSConfig(cfg.QPS, cfg.QPSBurst)
status |= l.updateConcurrencyConfig(cfg.ConcurrencyLimit)
status |= l.updateBBRConfig(cfg.EnableBBR)
return status
}

// only used in test.
func (l *limiter) updateDimensionConfigForTest(cfg *DimensionConfig, op ...bbrOption) UpdateStatus {
func (l *limiter) updateDimensionConfig(cfg *DimensionConfig, op ...bbrOption) UpdateStatus {
status := l.updateQPSConfig(cfg.QPS, cfg.QPSBurst)
status |= l.updateConcurrencyConfig(cfg.ConcurrencyLimit)
status |= l.updateBBRConfigForTest(cfg.EnableBBR, op...)
status |= l.updateBBRConfig(cfg.EnableBBR, op...)
return status
}

Expand Down
Loading

0 comments on commit 6973bc8

Please sign in to comment.