Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed May 24, 2024
1 parent a2d31b6 commit e71f635
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 21 deletions.
13 changes: 0 additions & 13 deletions pkg/cache/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package cache

import (
"github.com/tikv/pd/pkg/btree"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// defaultDegree default btree degree, the depth is h<log(degree)(capacity+1)/2
Expand All @@ -27,7 +26,6 @@ type PriorityQueue struct {
items map[uint64]*Entry
btree *btree.BTreeG[*Entry]
capacity int
mutex syncutil.RWMutex
}

// NewPriorityQueue construct of priority queue
Expand All @@ -46,8 +44,6 @@ type PriorityQueueItem interface {

// Put put value with priority into queue
func (pq *PriorityQueue) Put(priority int, value PriorityQueueItem) bool {
pq.mutex.Lock()
defer pq.mutex.Unlock()
id := value.ID()
entry, ok := pq.items[id]
if !ok {
Expand All @@ -58,9 +54,7 @@ func (pq *PriorityQueue) Put(priority int, value PriorityQueueItem) bool {
if !found || !min.Less(entry) {
return false
}
pq.mutex.Unlock()
pq.Remove(min.Value.ID())
pq.mutex.Lock()
}
} else if entry.Priority != priority { // delete before update
pq.btree.Delete(entry)
Expand All @@ -74,8 +68,6 @@ func (pq *PriorityQueue) Put(priority int, value PriorityQueueItem) bool {

// Get find entry by id from queue
func (pq *PriorityQueue) Get(id uint64) *Entry {
pq.mutex.RLock()
defer pq.mutex.RUnlock()
return pq.items[id]
}

Expand All @@ -92,7 +84,6 @@ func (pq *PriorityQueue) GetAndCheckPriority(id uint64, priority int) *Entry {
}

// peek return the highest priority entry
// It is used test only
func (pq *PriorityQueue) peek() *Entry {
if max, ok := pq.btree.Max(); ok {
return max
Expand All @@ -110,8 +101,6 @@ func (pq *PriorityQueue) tail() *Entry {

// Elems return all elements in queue
func (pq *PriorityQueue) Elems() []*Entry {
pq.mutex.RLock()
defer pq.mutex.RUnlock()
rs := make([]*Entry, pq.Len())
count := 0
pq.btree.Descend(func(i *Entry) bool {
Expand All @@ -124,8 +113,6 @@ func (pq *PriorityQueue) Elems() []*Entry {

// Remove remove value from queue
func (pq *PriorityQueue) Remove(id uint64) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
if v, ok := pq.items[id]; ok {
pq.btree.Delete(v)
delete(pq.items, id)
Expand Down
8 changes: 8 additions & 0 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const defaultPriorityQueueSize = 1280

// PriorityInspector ensures high priority region should run first
type PriorityInspector struct {
syncutil.Mutex
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
queue *cache.PriorityQueue
Expand Down Expand Up @@ -103,6 +104,8 @@ func (p *PriorityInspector) inspectRegionInReplica(region *core.RegionInfo) (mak
// it will remove if region's priority equal 0
// it's Attempt will increase if region's priority equal last
func (p *PriorityInspector) addOrRemoveRegion(priority int, regionID uint64) {
p.Lock()
defer p.Unlock()
if priority < 0 {
if entry := p.queue.GetAndCheckPriority(regionID, priority); entry != nil {
e := entry.Value.(*RegionPriorityEntry)
Expand All @@ -122,6 +125,9 @@ func (p *PriorityInspector) addOrRemoveRegion(priority int, regionID uint64) {

// GetPriorityRegions returns all regions in priority queue that needs rerun
func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) {
// we modify the queue entry in this function, so we need to lock it
p.Lock()
defer p.Unlock()
entries := p.queue.Elems()
for _, e := range entries {
re := e.Value.(*RegionPriorityEntry)
Expand All @@ -138,5 +144,7 @@ func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) {

// RemovePriorityRegion removes priority region from priority queue
func (p *PriorityInspector) RemovePriorityRegion(regionID uint64) {
p.Lock()
defer p.Unlock()
p.queue.Remove(regionID)
}
12 changes: 7 additions & 5 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,15 @@ func (c *Coordinator) PatrolRegions() {
ticker := time.NewTicker(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
defer ticker.Stop()

workersCount := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency()
regionChan := make(chan *core.RegionInfo, patrolRegionChanLen)
quit := make(chan bool)
var wg sync.WaitGroup
defer func() {
close(regionChan)
close(quit)
wg.Wait()
}()
workersCount := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency()
c.startPatrolRegionWorkers(workersCount, regionChan, quit, &wg)

log.Info("coordinator starts patrol regions")
Expand Down Expand Up @@ -219,15 +224,12 @@ func (c *Coordinator) PatrolRegions() {
start = time.Now()
}
failpoint.Inject("break-patrol", func() {
failpoint.Break()
failpoint.Return()
})
case <-c.ctx.Done():
patrolCheckRegionsGauge.Set(0)
c.setPatrolRegionsDuration(0)
log.Info("patrol regions has been stopped")
close(regionChan)
close(quit)
wg.Wait()
return
}
}
Expand Down
3 changes: 2 additions & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2833,6 +2833,7 @@ func TestCheckCache(t *testing.T) {
cfg.ReplicaScheduleLimit = 0
}, nil, nil, re)
defer cleanup()
oc := co.GetOperatorController()

re.NoError(tc.addRegionStore(1, 0))
re.NoError(tc.addRegionStore(2, 0))
Expand All @@ -2845,6 +2846,7 @@ func TestCheckCache(t *testing.T) {
// case 1: operator cannot be created due to replica-schedule-limit restriction
co.GetWaitGroup().Add(1)
co.PatrolRegions()
re.Empty(oc.GetOperators())
re.Len(co.GetCheckerController().GetWaitingRegions(), 1)

// cancel the replica-schedule-limit restriction
Expand All @@ -2853,7 +2855,6 @@ func TestCheckCache(t *testing.T) {
tc.SetScheduleConfig(cfg)
co.GetWaitGroup().Add(1)
co.PatrolRegions()
oc := co.GetOperatorController()
re.Len(oc.GetOperators(), 1)
re.Empty(co.GetCheckerController().GetWaitingRegions())

Expand Down
4 changes: 2 additions & 2 deletions tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ var (
serverLogLevel = flag.String("serverLog", "info", "pd server log level")
simLogLevel = flag.String("simLog", "info", "simulator log level")
simLogFile = flag.String("log-file", "", "simulator log file")
regionNum = flag.Int("regionNum", 50000, "regionNum of one store")
storeNum = flag.Int("storeNum", 20, "storeNum")
regionNum = flag.Int("regionNum", 0, "regionNum of one store")
storeNum = flag.Int("storeNum", 0, "storeNum")
enableTransferRegionCounter = flag.Bool("enableTransferRegionCounter", false, "enableTransferRegionCounter")
statusAddress = flag.String("status-addr", "0.0.0.0:20180", "status address")
)
Expand Down

0 comments on commit e71f635

Please sign in to comment.