Skip to content

Commit

Permalink
mcs/resourcemanager: delete expire tokenSlot (tikv#7344)
Browse files Browse the repository at this point in the history
close tikv#7346

Signed-off-by: guo-shaoge <[email protected]>
  • Loading branch information
guo-shaoge authored Nov 10, 2023
1 parent 0c35227 commit f1cee6c
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions pkg/mcs/resourcemanager/server/token_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/gogo/protobuf/proto"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
)

const (
Expand All @@ -31,6 +33,7 @@ const (
defaultReserveRatio = 0.5
defaultLoanCoefficient = 2
maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024
slotExpireTimeout = 10 * time.Minute
)

// GroupTokenBucket is a token bucket for a resource group.
Expand Down Expand Up @@ -62,6 +65,7 @@ type TokenSlot struct {
// tokenCapacity is the number of tokens in the slot.
tokenCapacity float64
lastTokenCapacity float64
lastReqTime time.Time
}

// GroupTokenBucketState is the running state of TokenBucket.
Expand All @@ -75,7 +79,8 @@ type GroupTokenBucketState struct {
LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
// settingChanged is used to avoid that the number of tokens returned is jitter because of changing fill rate.
settingChanged bool
settingChanged bool
lastCheckExpireSlot time.Time
}

// Clone returns the copy of GroupTokenBucketState
Expand All @@ -95,6 +100,7 @@ func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState {
Initialized: gts.Initialized,
tokenSlots: tokenSlots,
clientConsumptionTokensSum: gts.clientConsumptionTokensSum,
lastCheckExpireSlot: gts.lastCheckExpireSlot,
}
}

Expand All @@ -119,16 +125,18 @@ func (gts *GroupTokenBucketState) balanceSlotTokens(
clientUniqueID uint64,
settings *rmpb.TokenLimitSettings,
requiredToken, elapseTokens float64) {
now := time.Now()
slot, exist := gts.tokenSlots[clientUniqueID]
if !exist {
// Only slots that require a positive number will be considered alive,
// but still need to allocate the elapsed tokens as well.
if requiredToken != 0 {
slot = &TokenSlot{}
slot = &TokenSlot{lastReqTime: now}
gts.tokenSlots[clientUniqueID] = slot
gts.clientConsumptionTokensSum = 0
}
} else {
slot.lastReqTime = now
if gts.clientConsumptionTokensSum >= maxAssignTokens {
gts.clientConsumptionTokensSum = 0
}
Expand All @@ -139,6 +147,16 @@ func (gts *GroupTokenBucketState) balanceSlotTokens(
}
}

if time.Since(gts.lastCheckExpireSlot) >= slotExpireTimeout {
gts.lastCheckExpireSlot = now
for clientUniqueID, slot := range gts.tokenSlots {
if time.Since(slot.lastReqTime) >= slotExpireTimeout {
delete(gts.tokenSlots, clientUniqueID)
log.Info("delete resource group slot because expire", zap.Time("last-req-time", slot.lastReqTime),
zap.Any("expire timeout", slotExpireTimeout), zap.Any("del client id", clientUniqueID), zap.Any("len", len(gts.tokenSlots)))
}
}
}
if len(gts.tokenSlots) == 0 {
return
}
Expand Down Expand Up @@ -264,6 +282,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) {
lastTokenCapacity: gtb.Tokens,
}
gtb.LastUpdate = &now
gtb.lastCheckExpireSlot = now
gtb.Initialized = true
}

Expand Down

0 comments on commit f1cee6c

Please sign in to comment.