diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index f47953ee32a..36ef99cebe8 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -140,7 +140,7 @@ type ResourceGroupsController struct { calculators []ResourceCalculator // When a signal is received, it means the number of available token is low. - lowTokenNotifyChan chan NotifyMsg + lowTokenNotifyChan chan notifyMsg // When a token bucket response received from server, it will be sent to the channel. tokenResponseChan chan []*rmpb.TokenBucketResponse // When the token bucket of a resource group is updated, it will be sent to the channel. @@ -182,7 +182,7 @@ func NewResourceGroupController( clientUniqueID: clientUniqueID, provider: provider, ruConfig: ruConfig, - lowTokenNotifyChan: make(chan NotifyMsg, 1), + lowTokenNotifyChan: make(chan notifyMsg, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), opts: opts, @@ -288,7 +288,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, NotifyMsg{}) + c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, notifyMsg{}) } case <-watchRetryTimer.C: if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { @@ -509,7 +509,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB } } -func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg NotifyMsg) { +func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg notifyMsg) { c.run.currentRequests = make([]*rmpb.TokenBucketRequest, 0) c.groupsController.Range(func(_, value any) bool { gc := value.(*groupCostController) @@ -525,7 +525,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex } } -func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg NotifyMsg) { +func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg notifyMsg) { now := time.Now() req := &rmpb.TokenBucketsRequest{ Requests: requests, @@ -550,8 +550,8 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, } else { successfulTokenRequestDuration.Observe(latency.Seconds()) } - if !notifyMsg.StartTime.IsZero() && time.Since(notifyMsg.StartTime) > slowNotifyFilterDuration { - log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.StartTime))) + if !notifyMsg.startTime.IsZero() && time.Since(notifyMsg.startTime) > slowNotifyFilterDuration { + log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.startTime))) } logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) c.tokenResponseChan <- resp @@ -648,7 +648,7 @@ type groupCostController struct { // fast path to make once token limit with un-limit burst. burstable *atomic.Bool - lowRUNotifyChan chan<- NotifyMsg + lowRUNotifyChan chan<- notifyMsg tokenBucketUpdateChan chan<- *groupCostController // run contains the state that is updated by the main loop. @@ -738,7 +738,7 @@ type tokenCounter struct { func newGroupCostController( group *rmpb.ResourceGroup, mainCfg *RUConfig, - lowRUNotifyChan chan NotifyMsg, + lowRUNotifyChan chan notifyMsg, tokenBucketUpdateChan chan *groupCostController, ) (*groupCostController, error) { switch group.Mode { @@ -1286,7 +1286,7 @@ func (gc *groupCostController) onRequestWait( sub(gc.mu.consumption, delta) gc.mu.Unlock() failpoint.Inject("triggerUpdate", func() { - gc.lowRUNotifyChan <- NotifyMsg{} + gc.lowRUNotifyChan <- notifyMsg{} }) return nil, nil, waitDuration, 0, err } diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 3b36cb43ef7..9bbe34aaf7e 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController JobTypes: []string{"lightning", "br"}, }, } - ch1 := make(chan NotifyMsg) + ch1 := make(chan notifyMsg) ch2 := make(chan *groupCostController) gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2) re.NoError(err) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index a2dd2dae2a9..985b03761fd 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -75,7 +75,7 @@ type Limiter struct { // last is the last time the limiter's tokens field was updated last time.Time notifyThreshold float64 - lowTokensNotifyChan chan<- NotifyMsg + lowTokensNotifyChan chan<- notifyMsg // To prevent too many chan sent, the notifyThreshold is set to 0 after notify. // So the notifyThreshold cannot show whether the limiter is in the low token state, // isLowProcess is used to check it. @@ -88,9 +88,9 @@ type Limiter struct { metrics *limiterMetricsCollection } -// NotifyMsg is a message to notify the low token state. -type NotifyMsg struct { - StartTime time.Time +// notifyMsg is a message to notify the low token state. +type notifyMsg struct { + startTime time.Time } // limiterMetricsCollection is a collection of metrics for a limiter. @@ -107,7 +107,7 @@ func (lim *Limiter) Limit() Limit { // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- NotifyMsg) *Limiter { +func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- notifyMsg) *Limiter { lim := &Limiter{ limit: r, last: now, @@ -121,7 +121,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify // NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- NotifyMsg) *Limiter { +func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- notifyMsg) *Limiter { lim := &Limiter{ name: name, limit: Limit(cfg.NewRate), @@ -262,7 +262,7 @@ func (lim *Limiter) notify() { lim.notifyThreshold = 0 lim.isLowProcess = true select { - case lim.lowTokensNotifyChan <- NotifyMsg{StartTime: time.Now()}: + case lim.lowTokensNotifyChan <- notifyMsg{startTime: time.Now()}: if lim.metrics != nil { lim.metrics.lowTokenNotifyCounter.Inc() } diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index ccea59a656c..d8f9ba12592 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo } func TestSimpleReserve(t *testing.T) { - lim := NewLimiter(t0, 1, 0, 2, make(chan NotifyMsg, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1)) runReserveMax(t, lim, request{t0, 3, t1, true}) runReserveMax(t, lim, request{t0, 3, t4, true}) @@ -103,7 +103,7 @@ func TestSimpleReserve(t *testing.T) { func TestReconfig(t *testing.T) { re := require.New(t) - lim := NewLimiter(t0, 1, 0, 2, make(chan NotifyMsg, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1)) runReserveMax(t, lim, request{t0, 4, t2, true}) args := tokenBucketReconfigureArgs{ @@ -126,7 +126,7 @@ func TestReconfig(t *testing.T) { } func TestNotify(t *testing.T) { - nc := make(chan NotifyMsg, 1) + nc := make(chan notifyMsg, 1) lim := NewLimiter(t0, 1, 0, 0, nc) args := tokenBucketReconfigureArgs{ @@ -147,7 +147,7 @@ func TestCancel(t *testing.T) { ctx := context.Background() ctx1, cancel1 := context.WithDeadline(ctx, t2) re := require.New(t) - nc := make(chan NotifyMsg, 1) + nc := make(chan notifyMsg, 1) lim1 := NewLimiter(t0, 1, 0, 10, nc) lim2 := NewLimiter(t0, 1, 0, 0, nc) @@ -189,7 +189,7 @@ func TestCancel(t *testing.T) { func TestCancelErrorOfReservation(t *testing.T) { re := require.New(t) - nc := make(chan NotifyMsg, 1) + nc := make(chan notifyMsg, 1) lim := NewLimiter(t0, 10, 0, 10, nc) ctx, cancel := context.WithCancel(context.Background()) cancel()