diff --git a/pkg/cache/priority_queue.go b/pkg/cache/priority_queue.go index 31a0ee77f62c..8598c15820db 100644 --- a/pkg/cache/priority_queue.go +++ b/pkg/cache/priority_queue.go @@ -79,6 +79,18 @@ func (pq *PriorityQueue) Get(id uint64) *Entry { return pq.items[id] } +// GetAndCheckPriority find entry by id from queue and judge whether it is the priority +func (pq *PriorityQueue) GetAndCheckPriority(id uint64, priority int) *Entry { + pq.mutex.RLock() + defer pq.mutex.RUnlock() + if v, ok := pq.items[id]; ok { + if v.Priority == priority { + return v + } + } + return nil +} + // peek return the highest priority entry // It is used test only func (pq *PriorityQueue) peek() *Entry { diff --git a/pkg/schedule/checker/priority_inspector.go b/pkg/schedule/checker/priority_inspector.go index 0bcbcc810d0a..cb7b1abb0df0 100644 --- a/pkg/schedule/checker/priority_inspector.go +++ b/pkg/schedule/checker/priority_inspector.go @@ -22,6 +22,7 @@ import ( "github.com/tikv/pd/pkg/schedule/config" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/syncutil" ) // the default value of priority queue size @@ -45,13 +46,16 @@ func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfig // RegionPriorityEntry records region priority info type RegionPriorityEntry struct { + syncutil.RWMutex Attempt int Last time.Time regionID uint64 } // ID implement PriorityQueueItem interface -func (r RegionPriorityEntry) ID() uint64 { +func (r *RegionPriorityEntry) ID() uint64 { + r.RLock() + defer r.RUnlock() return r.regionID } @@ -100,10 +104,12 @@ func (p *PriorityInspector) inspectRegionInReplica(region *core.RegionInfo) (mak // it's Attempt will increase if region's priority equal last func (p *PriorityInspector) addOrRemoveRegion(priority int, regionID uint64) { if priority < 0 { - if entry := p.queue.Get(regionID); entry != nil && entry.Priority == priority { + if entry := p.queue.GetAndCheckPriority(regionID, priority); entry != nil { e := entry.Value.(*RegionPriorityEntry) + e.Lock() e.Attempt++ e.Last = time.Now() + e.Unlock() p.queue.Put(priority, e) } else { entry := NewRegionEntry(regionID) @@ -119,11 +125,13 @@ func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) { entries := p.queue.Elems() for _, e := range entries { re := e.Value.(*RegionPriorityEntry) + re.RLock() // avoid to some priority region occupy checker, region don't need check on next check interval // the next run time is : last_time+retry*10*patrol_region_interval if t := re.Last.Add(time.Duration(re.Attempt*10) * p.conf.GetPatrolRegionInterval()); t.Before(time.Now()) { ids = append(ids, re.regionID) } + re.RUnlock() } return }