Skip to content

Commit

Permalink
scheduler: replace pauseLeader with two flags and add source filter t…
Browse files Browse the repository at this point in the history
…o transferIn

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Sep 13, 2024
1 parent 0ca83cf commit c0b3e8a
Show file tree
Hide file tree
Showing 18 changed files with 194 additions and 92 deletions.
9 changes: 7 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,14 @@ error = '''
can not remove store %d since there are no extra up store to store the leader
'''

["PD:core:ErrPauseLeaderTransfer"]
["PD:core:ErrPauseLeaderTransferIn"]
error = '''
store %v is paused for leader transfer
store %v is paused for leader transfer in
'''

["PD:core:ErrPauseLeaderTransferOut"]
error = '''
store %v is paused for leader transfer out
'''

["PD:core:ErrSlowStoreEvicted"]
Expand Down
6 changes: 4 additions & 2 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ type StoreSetInformer interface {

// StoreSetController is used to control stores' status.
type StoreSetController interface {
PauseLeaderTransfer(id uint64) error
ResumeLeaderTransfer(id uint64)
PauseLeaderTransferIn(id uint64) error
ResumeLeaderTransferIn(id uint64)
PauseLeaderTransferOut(id uint64) error
ResumeLeaderTransferOut(id uint64)

SlowStoreEvicted(id uint64) error
SlowStoreRecovered(id uint64)
Expand Down
96 changes: 67 additions & 29 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,23 @@ const (
type StoreInfo struct {
meta *metapb.Store
*storeStats
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
pauseLeaderTransferIn bool // not allow to be used as target of transfer leader
pauseLeaderTransferOut bool // not allow to be used as source of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -138,10 +139,16 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo {
return &store
}

// AllowLeaderTransfer returns if the store is allowed to be selected
// as source or target of transfer leader.
func (s *StoreInfo) AllowLeaderTransfer() bool {
return !s.pauseLeaderTransfer
// AllowLeaderTransferIn returns if the store is allowed to be selected
// as target of transfer leader.
func (s *StoreInfo) AllowLeaderTransferIn() bool {
return !s.pauseLeaderTransferIn
}

// AllowLeaderTransferOut returns if the store is allowed to be selected
// as source of transfer leader.
func (s *StoreInfo) AllowLeaderTransferOut() bool {
return !s.pauseLeaderTransferOut
}

// EvictedAsSlowStore returns if the store should be evicted as a slow store.
Expand Down Expand Up @@ -775,24 +782,55 @@ func (s *StoresInfo) ResetStores() {
s.stores = make(map[uint64]*StoreInfo)
}

// PauseLeaderTransfer pauses a StoreInfo with storeID.
func (s *StoresInfo) PauseLeaderTransfer(storeID uint64) error {
// PauseLeaderTransferIn pauses a StoreInfo with storeID. The store can not be selected
// as target of TransferLeader again.
func (s *StoresInfo) PauseLeaderTransferIn(storeID uint64) error {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}
if !store.AllowLeaderTransferIn() {
return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID)
}
s.stores[storeID] = store.Clone(PauseLeaderTransferIn())
return nil
}

// ResumeLeaderTransferIn cleans a store's pause state. The store can be selected
// as target of TransferLeader again.
func (s *StoresInfo) ResumeLeaderTransferIn(storeID uint64) {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
log.Warn("try to clean a store's pause state, but it is not found. It may be cleanup",
zap.Uint64("store-id", storeID))
return
}
s.stores[storeID] = store.Clone(ResumeLeaderTransferIn())
}

// PauseLeaderTransferOut pauses a StoreInfo with storeID. The store can not be selected
// as source of TransferLeader again.
func (s *StoresInfo) PauseLeaderTransferOut(storeID uint64) error {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}
if !store.AllowLeaderTransfer() {
return errs.ErrPauseLeaderTransfer.FastGenByArgs(storeID)
if !store.AllowLeaderTransferOut() {
return errs.ErrPauseLeaderTransferOut.FastGenByArgs(storeID)
}
s.stores[storeID] = store.Clone(PauseLeaderTransfer())
s.stores[storeID] = store.Clone(PauseLeaderTransferOut())
return nil
}

// ResumeLeaderTransfer cleans a store's pause state. The store can be selected
// as source or target of TransferLeader again.
func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) {
// ResumeLeaderTransferOut cleans a store's pause state. The store can be selected
// as source of TransferLeader again.
func (s *StoresInfo) ResumeLeaderTransferOut(storeID uint64) {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
Expand All @@ -801,7 +839,7 @@ func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) {
zap.Uint64("store-id", storeID))
return
}
s.stores[storeID] = store.Clone(ResumeLeaderTransfer())
s.stores[storeID] = store.Clone(ResumeLeaderTransferOut())
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
Expand Down
28 changes: 20 additions & 8 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,31 @@ func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCr
}
}

// PauseLeaderTransfer prevents the store from been selected as source or
// target store of TransferLeader.
func PauseLeaderTransfer() StoreCreateOption {
// PauseLeaderTransferIn prevents the store from been selected as target store of TransferLeader.
func PauseLeaderTransferIn() StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransfer = true
store.pauseLeaderTransferIn = true
}
}

// ResumeLeaderTransfer cleans a store's pause state. The store can be selected
// as source or target of TransferLeader again.
func ResumeLeaderTransfer() StoreCreateOption {
// ResumeLeaderTransferIn cleans a store's pause state. The store can be selected as target of TransferLeader again.
func ResumeLeaderTransferIn() StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransfer = false
store.pauseLeaderTransferIn = false
}
}

// PauseLeaderTransferOut prevents the store from been selected as source store of TransferLeader.
func PauseLeaderTransferOut() StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransferOut = true
}
}

// ResumeLeaderTransferOut cleans a store's pause state. The store can be selected as source of TransferLeader again.
func ResumeLeaderTransferOut() StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransferOut = false
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ var (
var (
ErrWrongRangeKeys = errors.Normalize("wrong range keys", errors.RFCCodeText("PD:core:ErrWrongRangeKeys"))
ErrStoreNotFound = errors.Normalize("store %v not found", errors.RFCCodeText("PD:core:ErrStoreNotFound"))
ErrPauseLeaderTransfer = errors.Normalize("store %v is paused for leader transfer", errors.RFCCodeText("PD:core:ErrPauseLeaderTransfer"))
ErrPauseLeaderTransferIn = errors.Normalize("store %v is paused for leader transfer in", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferIn"))
ErrPauseLeaderTransferOut = errors.Normalize("store %v is paused for leader transfer out", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferOut"))
ErrStoreRemoved = errors.Normalize("store %v has been removed", errors.RFCCodeText("PD:core:ErrStoreRemoved"))
ErrStoreDestroyed = errors.Normalize("store %v has been physically destroyed", errors.RFCCodeText("PD:core:ErrStoreDestroyed"))
ErrStoreUnhealthy = errors.Normalize("store %v is unhealthy", errors.RFCCodeText("PD:core:ErrStoreUnhealthy"))
Expand Down
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,9 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.PauseLeaderTransfer()))
mc.PutStore(store.Clone(core.PauseLeaderTransferIn()))
} else {
mc.PutStore(store.Clone(core.ResumeLeaderTransfer()))
mc.PutStore(store.Clone(core.ResumeLeaderTransferIn()))
}
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,17 @@ func (f *StoreStateFilter) isRemoving(_ config.SharedConfigProvider, store *core
return statusOK
}

func (f *StoreStateFilter) pauseLeaderTransfer(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if !store.AllowLeaderTransfer() {
func (f *StoreStateFilter) pauseLeaderTransferIn(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if !store.AllowLeaderTransferIn() {
f.Reason = storeStatePauseLeader
return statusStoreRejectLeader
}
f.Reason = storeStateOK
return statusOK
}

func (f *StoreStateFilter) pauseLeaderTransferOut(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if !store.AllowLeaderTransferOut() {
f.Reason = storeStatePauseLeader
return statusStoreRejectLeader
}
Expand Down Expand Up @@ -511,13 +520,13 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, conf config.SharedConfigPr
var funcs []conditionFunc
switch typ {
case leaderSource:
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected}
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransferOut, f.isDisconnected}
case regionSource:
funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots}
case witnessSource:
funcs = []conditionFunc{f.isBusy}
case leaderTarget:
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransfer,
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransferIn,
f.slowStoreEvicted, f.slowTrendEvicted, f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty}
case regionTarget:
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.isDisconnected, f.isBusy,
Expand Down
11 changes: 10 additions & 1 deletion pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,17 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla
balanceLeaderNoLeaderRegionCounter.Inc()
return nil
}
finalFilters := l.filters
// Check if the source store is available as a source.
conf := solver.GetSchedulerConfig()
if filter.NewCandidates([]*core.StoreInfo{solver.Source}).
FilterSource(conf, nil, l.filterCounter, l.filters...).Len() == 0 {
log.Debug("store cannot be used as source", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.Source.GetID()))
balanceLeaderNoSourceStoreCounter.Inc()
return nil
}

// Check if the target store is available as a target.
finalFilters := l.filters
if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil {
finalFilters = append(l.filters, leaderFilter)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, erro
_, exists := conf.StoreIDWithRanges[id]
if exists {
delete(conf.StoreIDWithRanges, id)
conf.cluster.ResumeLeaderTransfer(id)
conf.cluster.ResumeLeaderTransferIn(id)
return len(conf.StoreIDWithRanges) == 0, nil
}
return false, errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
if err := conf.cluster.PauseLeaderTransferIn(id); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
Expand Down Expand Up @@ -138,7 +138,7 @@ func (conf *evictLeaderSchedulerConfig) reloadConfig() error {
if err := conf.load(newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
pauseAndResumeLeaderTransferIn(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
conf.Batch = newCfg.Batch
return nil
Expand All @@ -149,7 +149,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.Schedul
defer conf.RUnlock()
var res error
for id := range conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransfer(id); err != nil {
if err := cluster.PauseLeaderTransferIn(id); err != nil {
res = err
}
}
Expand All @@ -160,15 +160,15 @@ func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.Schedu
conf.RLock()
defer conf.RUnlock()
for id := range conf.StoreIDWithRanges {
cluster.ResumeLeaderTransfer(id)
cluster.ResumeLeaderTransferIn(id)
}
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) {
conf.RLock()
defer conf.RUnlock()
if _, exist := conf.StoreIDWithRanges[id]; !exist {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
if err := conf.cluster.PauseLeaderTransferIn(id); err != nil {
return exist, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error {
for _, id := range newCfg.EvictedStores {
new[id] = struct{}{}
}
pauseAndResumeLeaderTransfer(s.conf.cluster, old, new)
pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new)
s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap
s.conf.EvictedStores = newCfg.EvictedStores
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (s *evictSlowTrendScheduler) ReloadConfig() error {
for _, id := range newCfg.EvictedStores {
new[id] = struct{}{}
}
pauseAndResumeLeaderTransfer(s.conf.cluster, old, new)
pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new)
s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap
s.conf.EvictedStores = newCfg.EvictedStores
return nil
Expand Down
Loading

0 comments on commit c0b3e8a

Please sign in to comment.