Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/controller: record context error and add slowlog about token bucket (#8344) #8354

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ import (
)

const (
controllerConfigPath = "resource_group/controller"
maxRetry = 10
retryInterval = 50 * time.Millisecond
maxNotificationChanLen = 200
needTokensAmplification = 1.1
trickleReserveDuration = 1250 * time.Millisecond
controllerConfigPath = "resource_group/controller"
maxRetry = 10
retryInterval = 50 * time.Millisecond
maxNotificationChanLen = 200
needTokensAmplification = 1.1
trickleReserveDuration = 1250 * time.Millisecond
slowNotifyFilterDuration = 10 * time.Millisecond

watchRetryInterval = 30 * time.Second
)
Expand Down Expand Up @@ -115,7 +116,7 @@ type ResourceGroupsController struct {
calculators []ResourceCalculator

// When a signal is received, it means the number of available token is low.
lowTokenNotifyChan chan struct{}
lowTokenNotifyChan chan notifyMsg
// When a token bucket response received from server, it will be sent to the channel.
tokenResponseChan chan []*rmpb.TokenBucketResponse
// When the token bucket of a resource group is updated, it will be sent to the channel.
Expand Down Expand Up @@ -157,7 +158,7 @@ func NewResourceGroupController(
clientUniqueID: clientUniqueID,
provider: provider,
ruConfig: ruConfig,
lowTokenNotifyChan: make(chan struct{}, 1),
lowTokenNotifyChan: make(chan notifyMsg, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
opts: opts,
Expand Down Expand Up @@ -262,7 +263,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, notifyMsg{})
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
Expand Down Expand Up @@ -300,11 +301,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.handleTokenBucketResponse(resp)
}
c.run.currentRequests = nil
case <-c.lowTokenNotifyChan:
case notifyMsg := <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */)
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
}
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
Expand Down Expand Up @@ -484,7 +485,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
}
}

func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType) {
func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg notifyMsg) {
c.run.currentRequests = make([]*rmpb.TokenBucketRequest, 0)
c.groupsController.Range(func(name, value any) bool {
gc := value.(*groupCostController)
Expand All @@ -496,11 +497,11 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
return true
})
if len(c.run.currentRequests) > 0 {
c.sendTokenBucketRequests(ctx, c.run.currentRequests, source)
c.sendTokenBucketRequests(ctx, c.run.currentRequests, source, notifyMsg)
}
}

func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string) {
func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg notifyMsg) {
now := time.Now()
req := &rmpb.TokenBucketsRequest{
Requests: requests,
Expand All @@ -518,13 +519,16 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
if err != nil {
// Don't log any errors caused by the stopper canceling the context.
if !errors.ErrorEqual(err, context.Canceled) {
log.L().Sugar().Infof("[resource group controller] token bucket rpc error: %v", err)
log.Error("[resource group controller] token bucket rpc error", zap.Error(err))
}
resp = nil
failedTokenRequestDuration.Observe(latency.Seconds())
} else {
successfulTokenRequestDuration.Observe(latency.Seconds())
}
if !notifyMsg.startTime.IsZero() && time.Since(notifyMsg.startTime) > slowNotifyFilterDuration {
log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.startTime)))
}
logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
c.tokenResponseChan <- resp
}()
Expand Down Expand Up @@ -588,7 +592,7 @@ type groupCostController struct {
// fast path to make once token limit with un-limit burst.
burstable *atomic.Bool

lowRUNotifyChan chan<- struct{}
lowRUNotifyChan chan<- notifyMsg
tokenBucketUpdateChan chan<- *groupCostController

// run contains the state that is updated by the main loop.
Expand Down Expand Up @@ -678,7 +682,7 @@ type tokenCounter struct {
func newGroupCostController(
group *rmpb.ResourceGroup,
mainCfg *RUConfig,
lowRUNotifyChan chan struct{},
lowRUNotifyChan chan notifyMsg,
tokenBucketUpdateChan chan *groupCostController,
) (*groupCostController, error) {
switch group.Mode {
Expand Down Expand Up @@ -797,7 +801,7 @@ func (gc *groupCostController) updateRunState() {
}
*gc.run.consumption = *gc.mu.consumption
gc.mu.Unlock()
logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption))
logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption))
gc.run.now = newTime
}

Expand Down Expand Up @@ -997,7 +1001,7 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() {
cfg.NewRate = 99999999
})
counter.limiter.Reconfigure(gc.run.now, cfg, resetLowProcess())
log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource-group", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]))
log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]))
}
}

Expand Down Expand Up @@ -1051,6 +1055,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
timerDuration = (trickleDuration + trickleReserveDuration) / 2
}
counter.notify.mu.Lock()
if counter.notify.setupNotificationTimer != nil {
counter.notify.setupNotificationTimer.Stop()
}
counter.notify.setupNotificationTimer = time.NewTimer(timerDuration)
counter.notify.setupNotificationCh = counter.notify.setupNotificationTimer.C
counter.notify.setupNotificationThreshold = 1
Expand Down Expand Up @@ -1221,7 +1228,7 @@ func (gc *groupCostController) onRequestWait(
sub(gc.mu.consumption, delta)
gc.mu.Unlock()
failpoint.Inject("triggerUpdate", func() {
gc.lowRUNotifyChan <- struct{}{}
gc.lowRUNotifyChan <- notifyMsg{}
})
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
},
},
}
ch1 := make(chan struct{})
ch1 := make(chan notifyMsg)
ch2 := make(chan *groupCostController)
gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2)
re.NoError(err)
Expand Down
43 changes: 27 additions & 16 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Limiter struct {
// last is the last time the limiter's tokens field was updated
last time.Time
notifyThreshold float64
lowTokensNotifyChan chan<- struct{}
lowTokensNotifyChan chan<- notifyMsg
// To prevent too many chan sent, the notifyThreshold is set to 0 after notify.
// So the notifyThreshold cannot show whether the limiter is in the low token state,
// isLowProcess is used to check it.
Expand All @@ -88,6 +88,11 @@ type Limiter struct {
metrics *limiterMetricsCollection
}

// notifyMsg is a message to notify the low token state.
type notifyMsg struct {
startTime time.Time
}

// limiterMetricsCollection is a collection of metrics for a limiter.
type limiterMetricsCollection struct {
lowTokenNotifyCounter prometheus.Counter
Expand All @@ -102,7 +107,7 @@ func (lim *Limiter) Limit() Limit {

// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- struct{}) *Limiter {
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- notifyMsg) *Limiter {
lim := &Limiter{
limit: r,
last: now,
Expand All @@ -116,7 +121,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify

// NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter {
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- notifyMsg) *Limiter {
lim := &Limiter{
name: name,
limit: Limit(cfg.NewRate),
Expand All @@ -136,13 +141,14 @@ func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArg
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDurtion time.Duration
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDuration time.Duration
// This is the Limit at reservation time, it can change later.
limit Limit
err error
}

// OK returns whether the limiter can provide the requested number of tokens
Expand Down Expand Up @@ -217,7 +223,8 @@ func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now
select {
case <-ctx.Done():
return &Reservation{
ok: false,
ok: false,
err: ctx.Err(),
}
default:
}
Expand Down Expand Up @@ -254,7 +261,7 @@ func (lim *Limiter) notify() {
lim.notifyThreshold = 0
lim.isLowProcess = true
select {
case lim.lowTokensNotifyChan <- struct{}{}:
case lim.lowTokensNotifyChan <- notifyMsg{startTime: time.Now()}:
if lim.metrics != nil {
lim.metrics.lowTokenNotifyCounter.Inc()
}
Expand Down Expand Up @@ -386,10 +393,10 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur

// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDurtion: waitDuration,
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDuration: waitDuration,
}
if ok {
r.tokens = n
Expand All @@ -412,7 +419,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
zap.Float64("notify-threshold", lim.notifyThreshold),
zap.Bool("is-low-process", lim.isLowProcess),
zap.Int64("burst", lim.burst),
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
zap.Int("remaining-notify-times", lim.remainingNotifyTimes),
zap.String("name", lim.name))
}
lim.last = last
if lim.limit == 0 {
Expand Down Expand Up @@ -493,7 +501,10 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled
if res.err != nil {
return res.needWaitDuration, res.err
}
return res.needWaitDuration, errs.ErrClientResourceGroupThrottled
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
Expand Down
21 changes: 17 additions & 4 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo
}

func TestSimpleReserve(t *testing.T) {
lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1))

runReserveMax(t, lim, request{t0, 3, t1, true})
runReserveMax(t, lim, request{t0, 3, t4, true})
Expand All @@ -103,7 +103,7 @@ func TestSimpleReserve(t *testing.T) {

func TestReconfig(t *testing.T) {
re := require.New(t)
lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1))

runReserveMax(t, lim, request{t0, 4, t2, true})
args := tokenBucketReconfigureArgs{
Expand All @@ -118,7 +118,7 @@ func TestReconfig(t *testing.T) {
}

func TestNotify(t *testing.T) {
nc := make(chan struct{}, 1)
nc := make(chan notifyMsg, 1)
lim := NewLimiter(t0, 1, 0, 0, nc)

args := tokenBucketReconfigureArgs{
Expand All @@ -139,7 +139,7 @@ func TestCancel(t *testing.T) {
ctx := context.Background()
ctx1, cancel1 := context.WithDeadline(ctx, t2)
re := require.New(t)
nc := make(chan struct{}, 1)
nc := make(chan notifyMsg, 1)
lim1 := NewLimiter(t0, 1, 0, 10, nc)
lim2 := NewLimiter(t0, 1, 0, 0, nc)

Expand Down Expand Up @@ -177,3 +177,16 @@ func TestCancel(t *testing.T) {
checkTokens(re, lim1, t5, 15)
checkTokens(re, lim2, t5, 5)
}

func TestCancelErrorOfReservation(t *testing.T) {
re := require.New(t)
nc := make(chan notifyMsg, 1)
lim := NewLimiter(t0, 10, 0, 10, nc)
ctx, cancel := context.WithCancel(context.Background())
cancel()
r := lim.Reserve(ctx, InfDuration, t0, 5)
d, err := WaitReservations(context.Background(), t0, []*Reservation{r})
re.Equal(0*time.Second, d)
re.Error(err)
re.Contains(err.Error(), "context canceled")
}
Loading