Skip to content

Commit

Permalink
move ticker to PatrolRegionContext
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jul 31, 2024
1 parent 2b2570a commit 339d545
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
DefaultPendingRegionCacheSize = 100000
// It takes about 1.3 minutes(1000000/128*10/60/1000) to iterate 1 million regions(with DefaultPatrolRegionInterval=10ms).
patrolScanRegionLimit = 128
suspectRegionLimit = 1024
)

var (
Expand Down Expand Up @@ -92,16 +93,15 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
// The checkers will check these regions to decide if they need to do some operations.
func (c *Controller) PatrolRegions() {
c.patrolRegionContext.init(c.cluster)
ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
defer ticker.Stop()
defer c.patrolRegionContext.stop()
var (
key []byte
regions []*core.RegionInfo
)
for {
select {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
case <-c.patrolRegionContext.ticker.C:
c.patrolRegionContext.updateTickerIfNeeded()
case <-c.ctx.Done():
patrolCheckRegionsGauge.Set(0)
c.patrolRegionContext.setPatrolRegionsDuration(0)
Expand Down Expand Up @@ -131,16 +131,6 @@ func (c *Controller) PatrolRegions() {
}
}

func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) {
// Note: we reset the ticker here to support updating configuration dynamically.
newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval()
if c.patrolRegionContext.interval != newInterval {
c.patrolRegionContext.interval = newInterval
ticker.Reset(newInterval)
log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval))
}
}

// GetPatrolRegionsDuration returns the duration of the last patrol region round.
func (c *Controller) GetPatrolRegionsDuration() time.Duration {
return c.patrolRegionContext.getPatrolRegionsDuration()
Expand Down Expand Up @@ -345,8 +335,7 @@ func (c *Controller) CheckSuspectRanges() {
if !success {
continue
}
limit := 1024
regions := c.cluster.ScanRegions(keyRange[0], keyRange[1], limit)
regions := c.cluster.ScanRegions(keyRange[0], keyRange[1], suspectRegionLimit)
if len(regions) == 0 {
continue
}
Expand Down Expand Up @@ -420,6 +409,8 @@ func (c *Controller) GetPauseController(name string) (*PauseController, error) {

// PatrolRegionContext is used to store the context of patrol regions.
type PatrolRegionContext struct {
cluster sche.CheckerCluster
ticker *time.Ticker
// config
interval time.Duration
// status
Expand All @@ -431,10 +422,27 @@ type PatrolRegionContext struct {
}

func (p *PatrolRegionContext) init(cluster sche.CheckerCluster) {
p.cluster = cluster
p.interval = cluster.GetCheckerConfig().GetPatrolRegionInterval()
p.patrolRoundStartTime = time.Now()
}

func (p *PatrolRegionContext) stop() {
if p.ticker != nil {
p.ticker.Stop()
}
}

func (p *PatrolRegionContext) updateTickerIfNeeded() {
// Note: we reset the ticker here to support updating configuration dynamically.
newInterval := p.cluster.GetCheckerConfig().GetPatrolRegionInterval()
if p.interval != newInterval {
p.interval = newInterval
p.ticker.Reset(newInterval)
log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval))
}
}

func (p *PatrolRegionContext) getPatrolRegionsDuration() time.Duration {
p.mu.RLock()
defer p.mu.RUnlock()
Expand Down

0 comments on commit 339d545

Please sign in to comment.