Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: support patrol region concurrency #8094

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d1f4b8a
checker: add patrol region concurrency
lhy1024 Apr 22, 2024
6d0fbd4
speedup drain
lhy1024 Apr 22, 2024
9b43d61
fix config
lhy1024 Apr 22, 2024
b2b4f39
make config
lhy1024 Apr 22, 2024
cb285f6
update
lhy1024 May 16, 2024
2e14405
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency-2
lhy1024 May 16, 2024
351ef5c
fix race
lhy1024 May 23, 2024
c198b08
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency-2
lhy1024 May 23, 2024
a0ec33d
fix test
lhy1024 May 24, 2024
ab9ef1e
remove batch limit config
lhy1024 May 24, 2024
97a40a9
address comments
lhy1024 May 27, 2024
438efce
address comments
lhy1024 May 27, 2024
c59b47c
address comments
lhy1024 May 28, 2024
9f57397
Merge branch 'master' into patrol-concurrency
lhy1024 Jun 3, 2024
bbc1362
refactor and add patrol region context
lhy1024 Jun 3, 2024
b0eab80
address comments
lhy1024 Jun 3, 2024
5c442a3
add config test
lhy1024 Jun 4, 2024
0d02d8b
add more tests
lhy1024 Jun 4, 2024
9896228
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Jun 12, 2024
a638cce
address comments
lhy1024 Jun 13, 2024
6147373
add some test to cover branches
lhy1024 Jun 18, 2024
2a86197
Merge branch 'master' into patrol-concurrency
lhy1024 Jun 25, 2024
82785c2
address comments
lhy1024 Jul 1, 2024
a21ef83
address comments
lhy1024 Jul 1, 2024
cd1cd8b
address comments
lhy1024 Jul 4, 2024
5668d98
address comments
lhy1024 Jul 4, 2024
78e3ba5
Merge branch 'master' into patrol-concurrency
lhy1024 Jul 4, 2024
cf01076
refactor
lhy1024 Jul 4, 2024
cc51a2e
fix test and add metrics
lhy1024 Jul 4, 2024
9f7406a
fix failpoint
lhy1024 Jul 4, 2024
bd4ca79
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency-4
lhy1024 Jul 16, 2024
ae0778f
fix conflict
lhy1024 Jul 16, 2024
ecb8d8b
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Jul 22, 2024
8259ad0
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Aug 1, 2024
18db300
fix
lhy1024 Aug 1, 2024
5ead31e
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Aug 8, 2024
6bdb436
fix lint
lhy1024 Aug 8, 2024
40a2e02
Merge branch 'master' into patrol-concurrency
lhy1024 Aug 14, 2024
8cd8825
address comments
lhy1024 Aug 29, 2024
bcd5018
address comments
lhy1024 Sep 24, 2024
acb4244
Merge branch 'master' of github.com:tikv/pd into patrol-concurrency
lhy1024 Sep 24, 2024
457da3d
avoid potential data race
lhy1024 Sep 24, 2024
64abc3c
address comments: remove sleep in failpoint
lhy1024 Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,11 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int {
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
}

// GetPatrolRegionWorkerCount returns the worker count of the patrol.
func (o *PersistConfig) GetPatrolRegionWorkerCount() int {
return o.GetScheduleConfig().PatrolRegionWorkerCount
}

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 {
return o.GetScheduleConfig().MaxMovableHotPeerSize
Expand Down
102 changes: 94 additions & 8 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"bytes"
"context"
"strconv"
"sync"
"time"

"github.com/pingcap/failpoint"
Expand All @@ -31,6 +32,7 @@
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)
Expand All @@ -47,6 +49,7 @@
// MaxPatrolScanRegionLimit is the max limit of regions to scan for a batch.
MaxPatrolScanRegionLimit = 8192
patrolRegionPartition = 1024
patrolRegionChanLen = MaxPatrolScanRegionLimit
)

var (
Expand All @@ -71,6 +74,7 @@
priorityInspector *PriorityInspector
pendingProcessedRegions *cache.TTLUint64
suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix
patrolRegionContext *PatrolRegionContext

// duration is the duration of the last patrol round.
// It's exported, so it should be protected by a mutex.
Expand All @@ -82,6 +86,8 @@
// It's used to update the ticker, so we need to
// record it to avoid updating the ticker frequently.
interval time.Duration
// workerCount is the count of workers to patrol regions.
workerCount int
// patrolRegionScanLimit is the limit of regions to scan.
// It is calculated by the number of regions.
patrolRegionScanLimit int
Expand All @@ -104,6 +110,7 @@
priorityInspector: NewPriorityInspector(cluster, conf),
pendingProcessedRegions: pendingProcessedRegions,
suspectKeyRanges: cache.NewStringTTL(ctx, time.Minute, 3*time.Minute),
patrolRegionContext: &PatrolRegionContext{},
interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(),
patrolRegionScanLimit: calculateScanLimit(cluster),
}
Expand All @@ -112,6 +119,9 @@
// PatrolRegions is used to scan regions.
// The checkers will check these regions to decide if they need to do some operations.
func (c *Controller) PatrolRegions() {
c.patrolRegionContext.init(c.ctx)
c.patrolRegionContext.startPatrolRegionWorkers(c)
defer c.patrolRegionContext.stop()
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
start := time.Now()
Expand All @@ -123,11 +133,20 @@
select {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
c.updatePatrolWorkersIfNeeded()
if c.cluster.IsSchedulingHalted() {
for len(c.patrolRegionContext.regionChan) > 0 {
<-c.patrolRegionContext.regionChan
}
log.Debug("skip patrol regions due to scheduling is halted")
continue
}

// wait for the regionChan to be drained
if len(c.patrolRegionContext.regionChan) > 0 {
continue
}
Comment on lines +145 to +148
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need we wait for? If the regionChan is full, it will be blocked.


// Check priority regions first.
c.checkPriorityRegions()
// Check pending processed regions first.
Expand Down Expand Up @@ -160,6 +179,32 @@
}
}

func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) {
// Note: we reset the ticker here to support updating configuration dynamically.
newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval()
if c.interval != newInterval {
c.interval = newInterval
ticker.Reset(newInterval)
log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval))
}
}

func (c *Controller) updatePatrolWorkersIfNeeded() {
newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionWorkerCount()
if c.workerCount != newWorkersCount {
oldWorkersCount := c.workerCount
c.workerCount = newWorkersCount
// Stop the old workers and start the new workers.
c.patrolRegionContext.workersCancel()
c.patrolRegionContext.wg.Wait()
c.patrolRegionContext.workersCtx, c.patrolRegionContext.workersCancel = context.WithCancel(c.ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we adjust the workers more gracefully? For example, if the new worker count is more than the current workers, we can scale out more wroker and no need to build all workers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's necessary, and generally speaking we don't change this configuration very often.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that it maybe wait some time to stop all and start all wokers.

c.patrolRegionContext.startPatrolRegionWorkers(c)
log.Info("checkers starts patrol regions with new workers count",
zap.Int("old-workers-count", oldWorkersCount),
zap.Int("new-workers-count", newWorkersCount))
}
}

// GetPatrolRegionsDuration returns the duration of the last patrol region round.
func (c *Controller) GetPatrolRegionsDuration() time.Duration {
c.mu.RLock()
Expand All @@ -182,7 +227,7 @@
}

for _, region := range regions {
c.tryAddOperators(region)
c.patrolRegionContext.regionChan <- region
key = region.GetEndKey()
}
return
Expand Down Expand Up @@ -446,13 +491,54 @@
}
}

func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) {
// Note: we reset the ticker here to support updating configuration dynamically.
newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval()
if c.interval != newInterval {
c.interval = newInterval
ticker.Reset(newInterval)
log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval))
func (c *Controller) IsPatrolRegionChanEmpty() bool {

Check failure on line 494 in pkg/schedule/checker/checker_controller.go

View workflow job for this annotation

GitHub Actions / statics

exported: exported method Controller.IsPatrolRegionChanEmpty should have comment or be unexported (revive)
if c.patrolRegionContext == nil {
return true

Check warning on line 496 in pkg/schedule/checker/checker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/checker/checker_controller.go#L496

Added line #L496 was not covered by tests
}
return len(c.patrolRegionContext.regionChan) == 0
}

// PatrolRegionContext is used to store the context of patrol regions.
type PatrolRegionContext struct {
workersCtx context.Context
workersCancel context.CancelFunc
regionChan chan *core.RegionInfo
wg sync.WaitGroup
}

func (p *PatrolRegionContext) init(ctx context.Context) {
p.regionChan = make(chan *core.RegionInfo, patrolRegionChanLen)
p.workersCtx, p.workersCancel = context.WithCancel(ctx)
}

func (p *PatrolRegionContext) stop() {
log.Debug("closing patrol region workers")
close(p.regionChan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to add a failpoint to wait for all rest region consumed in regionChan before close regionChan, rather than time.Sleep(100 * time.Millisecond) in L171. This is more stable.

If possible, we can always wait it even if it is not in testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.Sleep(100 * time.Millisecond) in L171 is used to wait that regions to be consumed.

co.PatrolRegions()
re.Empty(oc.GetOperators())

For example, if we enable this failpoint, it will wait 100 ms for goroutine consuming regions. And it will check re.Empty(oc.GetOperators()) immediately after failpoint.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the role of L171. But I think sleep is a destabilizing factor. And it's fine to wait for it to finish consuming, or actively consume all regions before exiting and then exit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove sleep, PTAL

lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
p.workersCancel()
p.wg.Wait()
log.Debug("patrol region workers are closed")
}

func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Controller) {
for i := 0; i < c.workerCount; i++ {
p.wg.Add(1)
go func(i int) {
defer logutil.LogPanic()
defer p.wg.Done()
for {
select {
case region, ok := <-p.regionChan:
if !ok {
log.Debug("region channel is closed", zap.Int("worker-id", i))
return
}
c.tryAddOperators(region)
case <-p.workersCtx.Done():
log.Debug("region worker is closed", zap.Int("worker-id", i))
return
}
}
}(i)
}
}

Expand Down
33 changes: 23 additions & 10 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// defaultPriorityQueueSize is the default value of priority queue size.
Expand All @@ -31,16 +32,20 @@ const defaultPriorityQueueSize = 1280
type PriorityInspector struct {
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
queue *cache.PriorityQueue
mu struct {
syncutil.RWMutex
queue *cache.PriorityQueue
}
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *PriorityInspector {
return &PriorityInspector{
res := &PriorityInspector{
cluster: cluster,
conf: conf,
queue: cache.NewPriorityQueue(defaultPriorityQueueSize),
}
res.mu.queue = cache.NewPriorityQueue(defaultPriorityQueueSize)
return res
}

// RegionPriorityEntry records region priority info.
Expand Down Expand Up @@ -99,24 +104,28 @@ func (p *PriorityInspector) inspectRegionInReplica(region *core.RegionInfo) (mak
// It will remove if region's priority equal 0.
// It's Attempt will increase if region's priority equal last.
func (p *PriorityInspector) addOrRemoveRegion(priority int, regionID uint64) {
p.mu.Lock()
defer p.mu.Unlock()
if priority < 0 {
if entry := p.queue.Get(regionID); entry != nil && entry.Priority == priority {
if entry := p.mu.queue.Get(regionID); entry != nil && entry.Priority == priority {
e := entry.Value.(*RegionPriorityEntry)
e.Attempt++
e.Last = time.Now()
p.queue.Put(priority, e)
p.mu.queue.Put(priority, e)
} else {
entry := NewRegionEntry(regionID)
p.queue.Put(priority, entry)
p.mu.queue.Put(priority, entry)
}
} else {
p.queue.Remove(regionID)
p.mu.queue.Remove(regionID)
}
}

// GetPriorityRegions returns all regions in priority queue that needs rerun.
func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) {
entries := p.queue.Elems()
p.mu.RLock()
defer p.mu.RUnlock()
entries := p.mu.queue.Elems()
for _, e := range entries {
re := e.Value.(*RegionPriorityEntry)
// avoid to some priority region occupy checker, region don't need check on next check interval
Expand All @@ -130,11 +139,15 @@ func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) {

// RemovePriorityRegion removes priority region from priority queue.
func (p *PriorityInspector) RemovePriorityRegion(regionID uint64) {
p.queue.Remove(regionID)
p.mu.Lock()
defer p.mu.Unlock()
p.mu.queue.Remove(regionID)
}

// getQueueLen returns the length of priority queue.
// it's only used for test.
func (p *PriorityInspector) getQueueLen() int {
return p.queue.Len()
p.mu.RLock()
defer p.mu.RUnlock()
return p.mu.queue.Len()
}
6 changes: 6 additions & 0 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -650,6 +651,7 @@ func (c *RuleChecker) handleFilterState(region *core.RegionInfo, filterByTempSta
}

type recorder struct {
syncutil.RWMutex
offlineLeaderCounter map[uint64]uint64
lastUpdateTime time.Time
}
Expand All @@ -662,10 +664,14 @@ func newRecord() *recorder {
}

func (o *recorder) getOfflineLeaderCount(storeID uint64) uint64 {
o.RLock()
defer o.RUnlock()
return o.offlineLeaderCounter[storeID]
}

func (o *recorder) incOfflineLeaderCount(storeID uint64) {
o.Lock()
defer o.Unlock()
o.offlineLeaderCounter[storeID] += 1
o.lastUpdateTime = time.Now()
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ const (
defaultRegionScoreFormulaVersion = "v2"
defaultLeaderSchedulePolicy = "count"
defaultStoreLimitVersion = "v1"
defaultPatrolRegionWorkerCount = 1
maxPatrolRegionWorkerCount = 8

// DefaultSplitMergeInterval is the default value of config split merge interval.
DefaultSplitMergeInterval = time.Hour
defaultSwitchWitnessInterval = time.Hour
Expand Down Expand Up @@ -306,6 +309,9 @@ type ScheduleConfig struct {
// HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling,
// and any other scheduling configs will be ignored.
HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"`

// PatrolRegionWorkerCount is the number of workers to patrol region.
PatrolRegionWorkerCount int `toml:"patrol-region-worker-count" json:"patrol-region-worker-count"`
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -374,6 +380,9 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool)
if !meta.IsDefined("store-limit-version") {
configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion)
}
if !meta.IsDefined("patrol-region-worker-count") {
configutil.AdjustInt(&c.PatrolRegionWorkerCount, defaultPatrolRegionWorkerCount)
}

if !meta.IsDefined("enable-joint-consensus") {
c.EnableJointConsensus = defaultEnableJointConsensus
Expand Down Expand Up @@ -518,6 +527,9 @@ func (c *ScheduleConfig) Validate() error {
if c.SlowStoreEvictingAffectedStoreRatioThreshold == 0 {
return errors.Errorf("slow-store-evicting-affected-store-ratio-threshold is not set")
}
if c.PatrolRegionWorkerCount > maxPatrolRegionWorkerCount || c.PatrolRegionWorkerCount < 1 {
return errors.Errorf("patrol-region-worker-count should be between 1 and %d", maxPatrolRegionWorkerCount)
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type CheckerConfigProvider interface {
GetIsolationLevel() string
GetSplitMergeInterval() time.Duration
GetPatrolRegionInterval() time.Duration
GetPatrolRegionWorkerCount() int
GetMaxMergeRegionSize() uint64
GetMaxMergeRegionKeys() uint64
GetReplicaScheduleLimit() uint64
Expand Down
Loading
Loading