Skip to content

Commit

Permalink
Merge branch 'master' into rate_limit/bbr_v1
Browse files Browse the repository at this point in the history
  • Loading branch information
CabinfeverB committed Nov 8, 2023
2 parents defcd0e + 47ba96f commit 9993b38
Show file tree
Hide file tree
Showing 56 changed files with 1,990 additions and 837 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ concurrency:
jobs:
statics:
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 20
steps:
- uses: actions/setup-go@v3
with:
Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ error = '''
init file log error, %s
'''

["PD:mcs:ErrNotFoundSchedulingAddr"]
error = '''
cannot find scheduling address
'''

["PD:mcs:ErrSchedulingServer"]
error = '''
scheduling server meets %v
'''

["PD:member:ErrCheckCampaign"]
error = '''
check campaign failed
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo=
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw=
github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9 h1:xIeaDUq2ItkYMIgpWXAYKC/N3hs8aurfFvvz79lhHYE=
github.com/pingcap/tidb-dashboard v0.0.0-20231102083420-865955cd15d9/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,8 +1339,8 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo

// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
r.st.RLock()
defer r.st.RUnlock()
r.t.RLock()
defer r.t.RUnlock()
return r.tree.notFromStorageRegionsCnt
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,9 @@ var (
ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup"))
ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup"))
)

// Micro service errors
var (
ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ func NewService(srv *rmserver.Service) *Service {
c.Set(multiservicesapi.ServiceContextKey, manager.GetBasicServer())
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
apiHandlerEngine.GET("metrics", utils.PromHandler())
pprof.Register(apiHandlerEngine)
endpoint := apiHandlerEngine.Group(APIPathPrefix)
endpoint.Use(multiservicesapi.ServiceRedirector())
s := &Service{
manager: manager,
apiHandlerEngine: apiHandlerEngine,
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
// RequestRU requests the RU of the resource group.
func (rg *ResourceGroup) RequestRU(
now time.Time,
neededTokens float64,
requiredToken float64,
targetPeriodMs, clientUniqueID uint64,
) *rmpb.GrantedRUTokenBucket {
rg.Lock()
Expand All @@ -147,7 +147,7 @@ func (rg *ResourceGroup) RequestRU(
if rg.RUSettings == nil || rg.RUSettings.RU.Settings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs, clientUniqueID)
tb, trickleTimeMs := rg.RUSettings.RU.request(now, requiredToken, targetPeriodMs, clientUniqueID)
return &rmpb.GrantedRUTokenBucket{GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

Expand Down
42 changes: 21 additions & 21 deletions pkg/mcs/resourcemanager/server/token_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) {
}

// updateTokens updates the tokens and settings.
func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clientUniqueID uint64, consumptionToken float64) {
func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clientUniqueID uint64, requiredToken float64) {
var elapseTokens float64
if !gtb.Initialized {
gtb.init(now, clientUniqueID)
Expand All @@ -288,46 +288,46 @@ func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clien
gtb.Tokens = burst
}
// Balance each slots.
gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, consumptionToken, elapseTokens)
gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, requiredToken, elapseTokens)
}

// request requests tokens from the corresponding slot.
func (gtb *GroupTokenBucket) request(now time.Time,
neededTokens float64,
requiredToken float64,
targetPeriodMs, clientUniqueID uint64,
) (*rmpb.TokenBucket, int64) {
burstLimit := gtb.Settings.GetBurstLimit()
gtb.updateTokens(now, burstLimit, clientUniqueID, neededTokens)
gtb.updateTokens(now, burstLimit, clientUniqueID, requiredToken)
slot, ok := gtb.tokenSlots[clientUniqueID]
if !ok {
return &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{BurstLimit: burstLimit}}, 0
}
res, trickleDuration := slot.assignSlotTokens(neededTokens, targetPeriodMs)
res, trickleDuration := slot.assignSlotTokens(requiredToken, targetPeriodMs)
// Update bucket to record all tokens.
gtb.Tokens -= slot.lastTokenCapacity - slot.tokenCapacity
slot.lastTokenCapacity = slot.tokenCapacity

return res, trickleDuration
}

func (ts *TokenSlot) assignSlotTokens(neededTokens float64, targetPeriodMs uint64) (*rmpb.TokenBucket, int64) {
func (ts *TokenSlot) assignSlotTokens(requiredToken float64, targetPeriodMs uint64) (*rmpb.TokenBucket, int64) {
var res rmpb.TokenBucket
burstLimit := ts.settings.GetBurstLimit()
res.Settings = &rmpb.TokenLimitSettings{BurstLimit: burstLimit}
// If BurstLimit < 0, just return.
if burstLimit < 0 {
res.Tokens = neededTokens
res.Tokens = requiredToken
return &res, 0
}
// FillRate is used for the token server unavailable in abnormal situation.
if neededTokens <= 0 {
if requiredToken <= 0 {
return &res, 0
}
// If the current tokens can directly meet the requirement, returns the need token.
if ts.tokenCapacity >= neededTokens {
ts.tokenCapacity -= neededTokens
if ts.tokenCapacity >= requiredToken {
ts.tokenCapacity -= requiredToken
// granted the total request tokens
res.Tokens = neededTokens
res.Tokens = requiredToken
return &res, 0
}

Expand All @@ -336,7 +336,7 @@ func (ts *TokenSlot) assignSlotTokens(neededTokens float64, targetPeriodMs uint6
hasRemaining := false
if ts.tokenCapacity > 0 {
grantedTokens = ts.tokenCapacity
neededTokens -= grantedTokens
requiredToken -= grantedTokens
ts.tokenCapacity = 0
hasRemaining = true
}
Expand Down Expand Up @@ -373,36 +373,36 @@ func (ts *TokenSlot) assignSlotTokens(neededTokens float64, targetPeriodMs uint6
for i := 1; i < loanCoefficient; i++ {
p[i] = float64(loanCoefficient-i)*float64(fillRate)*targetPeriodTimeSec + p[i-1]
}
for i := 0; i < loanCoefficient && neededTokens > 0 && trickleTime < targetPeriodTimeSec; i++ {
for i := 0; i < loanCoefficient && requiredToken > 0 && trickleTime < targetPeriodTimeSec; i++ {
loan := -ts.tokenCapacity
if loan >= p[i] {
continue
}
roundReserveTokens := p[i] - loan
fillRate := float64(loanCoefficient-i) * float64(fillRate)
if roundReserveTokens > neededTokens {
ts.tokenCapacity -= neededTokens
grantedTokens += neededTokens
if roundReserveTokens > requiredToken {
ts.tokenCapacity -= requiredToken
grantedTokens += requiredToken
trickleTime += grantedTokens / fillRate
neededTokens = 0
requiredToken = 0
} else {
roundReserveTime := roundReserveTokens / fillRate
if roundReserveTime+trickleTime >= targetPeriodTimeSec {
roundTokens := (targetPeriodTimeSec - trickleTime) * fillRate
neededTokens -= roundTokens
requiredToken -= roundTokens
ts.tokenCapacity -= roundTokens
grantedTokens += roundTokens
trickleTime = targetPeriodTimeSec
} else {
grantedTokens += roundReserveTokens
neededTokens -= roundReserveTokens
requiredToken -= roundReserveTokens
ts.tokenCapacity -= roundReserveTokens
trickleTime += roundReserveTime
}
}
}
if neededTokens > 0 && grantedTokens < defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec {
reservedTokens := math.Min(neededTokens+grantedTokens, defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec)
if requiredToken > 0 && grantedTokens < defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec {
reservedTokens := math.Min(requiredToken+grantedTokens, defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec)
ts.tokenCapacity -= reservedTokens - grantedTokens
grantedTokens = reservedTokens
}
Expand Down
Loading

0 comments on commit 9993b38

Please sign in to comment.