From e07be6a4630e4b02ebcdb3d3b7d700e5f93587c3 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Tue, 2 Jul 2024 17:37:27 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #8344 close tikv/pd#8343 Signed-off-by: ti-chi-bot --- .../resource_group/controller/controller.go | 42 ++++++++++++------- .../controller/controller_test.go | 2 +- client/resource_group/controller/limiter.go | 32 +++++++++++--- .../resource_group/controller/limiter_test.go | 21 ++++++++-- 4 files changed, 72 insertions(+), 25 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 1834e509696..919bcd40443 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -36,12 +36,20 @@ import ( ) const ( +<<<<<<< HEAD controllerConfigPath = "resource_group/controller" maxRetry = 10 retryInterval = 50 * time.Millisecond maxNotificationChanLen = 200 needTokensAmplification = 1.1 trickleReserveDuration = 1250 * time.Millisecond +======= + controllerConfigPath = "resource_group/controller" + maxNotificationChanLen = 200 + needTokensAmplification = 1.1 + trickleReserveDuration = 1250 * time.Millisecond + slowNotifyFilterDuration = 10 * time.Millisecond +>>>>>>> 49f9b115c (client/controller: record context error and add slowlog about token bucket (#8344)) watchRetryInterval = 30 * time.Second ) @@ -115,7 +123,7 @@ type ResourceGroupsController struct { calculators []ResourceCalculator // When a signal is received, it means the number of available token is low. - lowTokenNotifyChan chan struct{} + lowTokenNotifyChan chan notifyMsg // When a token bucket response received from server, it will be sent to the channel. tokenResponseChan chan []*rmpb.TokenBucketResponse // When the token bucket of a resource group is updated, it will be sent to the channel. @@ -157,7 +165,7 @@ func NewResourceGroupController( clientUniqueID: clientUniqueID, provider: provider, ruConfig: ruConfig, - lowTokenNotifyChan: make(chan struct{}, 1), + lowTokenNotifyChan: make(chan notifyMsg, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), opts: opts, @@ -262,7 +270,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) + c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, notifyMsg{}) } case <-watchRetryTimer.C: if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { @@ -300,11 +308,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.handleTokenBucketResponse(resp) } c.run.currentRequests = nil - case <-c.lowTokenNotifyChan: + case notifyMsg := <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */) + c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg) } if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) @@ -484,7 +492,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB } } -func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType) { +func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg notifyMsg) { c.run.currentRequests = make([]*rmpb.TokenBucketRequest, 0) c.groupsController.Range(func(name, value any) bool { gc := value.(*groupCostController) @@ -496,11 +504,11 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex return true }) if len(c.run.currentRequests) > 0 { - c.sendTokenBucketRequests(ctx, c.run.currentRequests, source) + c.sendTokenBucketRequests(ctx, c.run.currentRequests, source, notifyMsg) } } -func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string) { +func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg notifyMsg) { now := time.Now() req := &rmpb.TokenBucketsRequest{ Requests: requests, @@ -518,13 +526,16 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, if err != nil { // Don't log any errors caused by the stopper canceling the context. if !errors.ErrorEqual(err, context.Canceled) { - log.L().Sugar().Infof("[resource group controller] token bucket rpc error: %v", err) + log.Error("[resource group controller] token bucket rpc error", zap.Error(err)) } resp = nil failedTokenRequestDuration.Observe(latency.Seconds()) } else { successfulTokenRequestDuration.Observe(latency.Seconds()) } + if !notifyMsg.startTime.IsZero() && time.Since(notifyMsg.startTime) > slowNotifyFilterDuration { + log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.startTime))) + } logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) c.tokenResponseChan <- resp }() @@ -588,7 +599,7 @@ type groupCostController struct { // fast path to make once token limit with un-limit burst. burstable *atomic.Bool - lowRUNotifyChan chan<- struct{} + lowRUNotifyChan chan<- notifyMsg tokenBucketUpdateChan chan<- *groupCostController // run contains the state that is updated by the main loop. @@ -678,7 +689,7 @@ type tokenCounter struct { func newGroupCostController( group *rmpb.ResourceGroup, mainCfg *RUConfig, - lowRUNotifyChan chan struct{}, + lowRUNotifyChan chan notifyMsg, tokenBucketUpdateChan chan *groupCostController, ) (*groupCostController, error) { switch group.Mode { @@ -797,7 +808,7 @@ func (gc *groupCostController) updateRunState() { } *gc.run.consumption = *gc.mu.consumption gc.mu.Unlock() - logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption)) + logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption)) gc.run.now = newTime } @@ -997,7 +1008,7 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() { cfg.NewRate = 99999999 }) counter.limiter.Reconfigure(gc.run.now, cfg, resetLowProcess()) - log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource-group", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)])) + log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)])) } } @@ -1051,6 +1062,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket timerDuration = (trickleDuration + trickleReserveDuration) / 2 } counter.notify.mu.Lock() + if counter.notify.setupNotificationTimer != nil { + counter.notify.setupNotificationTimer.Stop() + } counter.notify.setupNotificationTimer = time.NewTimer(timerDuration) counter.notify.setupNotificationCh = counter.notify.setupNotificationTimer.C counter.notify.setupNotificationThreshold = 1 @@ -1221,7 +1235,7 @@ func (gc *groupCostController) onRequestWait( sub(gc.mu.consumption, delta) gc.mu.Unlock() failpoint.Inject("triggerUpdate", func() { - gc.lowRUNotifyChan <- struct{}{} + gc.lowRUNotifyChan <- notifyMsg{} }) return nil, nil, err } diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 3696dcba845..fbd3ab0548f 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -41,7 +41,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController }, }, } - ch1 := make(chan struct{}) + ch1 := make(chan notifyMsg) ch2 := make(chan *groupCostController) gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2) re.NoError(err) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index bb1bc18dbfc..073ceff6f54 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -75,7 +75,7 @@ type Limiter struct { // last is the last time the limiter's tokens field was updated last time.Time notifyThreshold float64 - lowTokensNotifyChan chan<- struct{} + lowTokensNotifyChan chan<- notifyMsg // To prevent too many chan sent, the notifyThreshold is set to 0 after notify. // So the notifyThreshold cannot show whether the limiter is in the low token state, // isLowProcess is used to check it. @@ -88,6 +88,11 @@ type Limiter struct { metrics *limiterMetricsCollection } +// notifyMsg is a message to notify the low token state. +type notifyMsg struct { + startTime time.Time +} + // limiterMetricsCollection is a collection of metrics for a limiter. type limiterMetricsCollection struct { lowTokenNotifyCounter prometheus.Counter @@ -102,7 +107,7 @@ func (lim *Limiter) Limit() Limit { // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- struct{}) *Limiter { +func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- notifyMsg) *Limiter { lim := &Limiter{ limit: r, last: now, @@ -116,7 +121,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify // NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter { +func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- notifyMsg) *Limiter { lim := &Limiter{ name: name, limit: Limit(cfg.NewRate), @@ -142,7 +147,13 @@ type Reservation struct { timeToAct time.Time needWaitDurtion time.Duration // This is the Limit at reservation time, it can change later. +<<<<<<< HEAD limit Limit +======= + limit Limit + remainingTokens float64 + err error +>>>>>>> 49f9b115c (client/controller: record context error and add slowlog about token bucket (#8344)) } // OK returns whether the limiter can provide the requested number of tokens @@ -217,7 +228,8 @@ func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now select { case <-ctx.Done(): return &Reservation{ - ok: false, + ok: false, + err: ctx.Err(), } default: } @@ -254,7 +266,7 @@ func (lim *Limiter) notify() { lim.notifyThreshold = 0 lim.isLowProcess = true select { - case lim.lowTokensNotifyChan <- struct{}{}: + case lim.lowTokensNotifyChan <- notifyMsg{startTime: time.Now()}: if lim.metrics != nil { lim.metrics.lowTokenNotifyCounter.Inc() } @@ -412,7 +424,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur zap.Float64("notify-threshold", lim.notifyThreshold), zap.Bool("is-low-process", lim.isLowProcess), zap.Int64("burst", lim.burst), - zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) + zap.Int("remaining-notify-times", lim.remainingNotifyTimes), + zap.String("name", lim.name)) } lim.last = last if lim.limit == 0 { @@ -493,7 +506,14 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv for _, res := range reservations { if !res.ok { cancel() +<<<<<<< HEAD return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled +======= + if res.err != nil { + return res.needWaitDuration, res.err + } + return res.needWaitDuration, errs.ErrClientResourceGroupThrottled.FastGenByArgs(res.needWaitDuration, res.limit, res.remainingTokens) +>>>>>>> 49f9b115c (client/controller: record context error and add slowlog about token bucket (#8344)) } delay := res.DelayFrom(now) if delay > longestDelayDuration { diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index d8703a1e674..ba4771810d8 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo } func TestSimpleReserve(t *testing.T) { - lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1)) runReserveMax(t, lim, request{t0, 3, t1, true}) runReserveMax(t, lim, request{t0, 3, t4, true}) @@ -103,7 +103,7 @@ func TestSimpleReserve(t *testing.T) { func TestReconfig(t *testing.T) { re := require.New(t) - lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1)) runReserveMax(t, lim, request{t0, 4, t2, true}) args := tokenBucketReconfigureArgs{ @@ -118,7 +118,7 @@ func TestReconfig(t *testing.T) { } func TestNotify(t *testing.T) { - nc := make(chan struct{}, 1) + nc := make(chan notifyMsg, 1) lim := NewLimiter(t0, 1, 0, 0, nc) args := tokenBucketReconfigureArgs{ @@ -139,7 +139,7 @@ func TestCancel(t *testing.T) { ctx := context.Background() ctx1, cancel1 := context.WithDeadline(ctx, t2) re := require.New(t) - nc := make(chan struct{}, 1) + nc := make(chan notifyMsg, 1) lim1 := NewLimiter(t0, 1, 0, 10, nc) lim2 := NewLimiter(t0, 1, 0, 0, nc) @@ -177,3 +177,16 @@ func TestCancel(t *testing.T) { checkTokens(re, lim1, t5, 15) checkTokens(re, lim2, t5, 5) } + +func TestCancelErrorOfReservation(t *testing.T) { + re := require.New(t) + nc := make(chan notifyMsg, 1) + lim := NewLimiter(t0, 10, 0, 10, nc) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + r := lim.Reserve(ctx, InfDuration, t0, 5) + d, err := WaitReservations(context.Background(), t0, []*Reservation{r}) + re.Equal(0*time.Second, d) + re.Error(err) + re.Contains(err.Error(), "context canceled") +} From dcebb36ceac32f380c10969fc0de414837c24d1b Mon Sep 17 00:00:00 2001 From: Shuning Chen Date: Wed, 3 Jul 2024 17:49:08 +0800 Subject: [PATCH 2/3] fix Signed-off-by: Shuning Chen --- client/resource_group/controller/controller.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 919bcd40443..29f0853cb92 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -36,20 +36,13 @@ import ( ) const ( -<<<<<<< HEAD - controllerConfigPath = "resource_group/controller" - maxRetry = 10 - retryInterval = 50 * time.Millisecond - maxNotificationChanLen = 200 - needTokensAmplification = 1.1 - trickleReserveDuration = 1250 * time.Millisecond -======= controllerConfigPath = "resource_group/controller" + maxRetry = 10 + retryInterval = 50 * time.Millisecond maxNotificationChanLen = 200 needTokensAmplification = 1.1 trickleReserveDuration = 1250 * time.Millisecond slowNotifyFilterDuration = 10 * time.Millisecond ->>>>>>> 49f9b115c (client/controller: record context error and add slowlog about token bucket (#8344)) watchRetryInterval = 30 * time.Second ) From 834e620ff249e2f154ee39f6c4b57f5d4a2dede3 Mon Sep 17 00:00:00 2001 From: Shuning Chen Date: Wed, 3 Jul 2024 19:22:22 +0800 Subject: [PATCH 3/3] fix Signed-off-by: Shuning Chen --- client/resource_group/controller/limiter.go | 31 ++++++++------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 073ceff6f54..1d250c91214 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -141,19 +141,14 @@ func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArg // A Reservation holds information about events that are permitted by a Limiter to happen after a delay. // A Reservation may be canceled, which may enable the Limiter to permit additional events. type Reservation struct { - ok bool - lim *Limiter - tokens float64 - timeToAct time.Time - needWaitDurtion time.Duration + ok bool + lim *Limiter + tokens float64 + timeToAct time.Time + needWaitDuration time.Duration // This is the Limit at reservation time, it can change later. -<<<<<<< HEAD limit Limit -======= - limit Limit - remainingTokens float64 - err error ->>>>>>> 49f9b115c (client/controller: record context error and add slowlog about token bucket (#8344)) + err error } // OK returns whether the limiter can provide the requested number of tokens @@ -398,10 +393,10 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur // Prepare reservation r := Reservation{ - ok: ok, - lim: lim, - limit: lim.limit, - needWaitDurtion: waitDuration, + ok: ok, + lim: lim, + limit: lim.limit, + needWaitDuration: waitDuration, } if ok { r.tokens = n @@ -506,14 +501,10 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv for _, res := range reservations { if !res.ok { cancel() -<<<<<<< HEAD - return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled -======= if res.err != nil { return res.needWaitDuration, res.err } - return res.needWaitDuration, errs.ErrClientResourceGroupThrottled.FastGenByArgs(res.needWaitDuration, res.limit, res.remainingTokens) ->>>>>>> 49f9b115c (client/controller: record context error and add slowlog about token bucket (#8344)) + return res.needWaitDuration, errs.ErrClientResourceGroupThrottled } delay := res.DelayFrom(now) if delay > longestDelayDuration {