Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: replace pauseLeader with two flags and add source filter to transferIn #8623

Merged
merged 11 commits into from
Nov 11, 2024
9 changes: 7 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,14 @@ error = '''
can not remove store %d since there are no extra up store to store the leader
'''

["PD:core:ErrPauseLeaderTransfer"]
["PD:core:ErrPauseLeaderTransferIn"]
error = '''
store %v is paused for leader transfer
store %v is paused for leader transfer in
'''

["PD:core:ErrPauseLeaderTransferOut"]
error = '''
store %v is paused for leader transfer out
'''

["PD:core:ErrSlowStoreEvicted"]
Expand Down
6 changes: 4 additions & 2 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ type StoreSetInformer interface {

// StoreSetController is used to control stores' status.
type StoreSetController interface {
PauseLeaderTransfer(id uint64) error
ResumeLeaderTransfer(id uint64)
PauseLeaderTransferIn(id uint64) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it something like PauseLeaderTransferIn(id uint64, d direction) error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, WDYT?

ResumeLeaderTransferIn(id uint64)
PauseLeaderTransferOut(id uint64) error
ResumeLeaderTransferOut(id uint64)

SlowStoreEvicted(id uint64) error
SlowStoreRecovered(id uint64)
Expand Down
96 changes: 67 additions & 29 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,23 @@
type StoreInfo struct {
meta *metapb.Store
*storeStats
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
pauseLeaderTransferIn bool // not allow to be used as target of transfer leader
pauseLeaderTransferOut bool // not allow to be used as source of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -138,10 +139,16 @@
return &store
}

// AllowLeaderTransfer returns if the store is allowed to be selected
// as source or target of transfer leader.
func (s *StoreInfo) AllowLeaderTransfer() bool {
return !s.pauseLeaderTransfer
// AllowLeaderTransferIn returns if the store is allowed to be selected
// as target of transfer leader.
func (s *StoreInfo) AllowLeaderTransferIn() bool {
return !s.pauseLeaderTransferIn
}

// AllowLeaderTransferOut returns if the store is allowed to be selected
// as source of transfer leader.
func (s *StoreInfo) AllowLeaderTransferOut() bool {
return !s.pauseLeaderTransferOut
}

// EvictedAsSlowStore returns if the store should be evicted as a slow store.
Expand Down Expand Up @@ -775,24 +782,55 @@
s.stores = make(map[uint64]*StoreInfo)
}

// PauseLeaderTransfer pauses a StoreInfo with storeID.
func (s *StoresInfo) PauseLeaderTransfer(storeID uint64) error {
// PauseLeaderTransferIn pauses a StoreInfo with storeID. The store can not be selected
// as target of TransferLeader again.
func (s *StoresInfo) PauseLeaderTransferIn(storeID uint64) error {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}
if !store.AllowLeaderTransferIn() {
return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID)
}
s.stores[storeID] = store.Clone(PauseLeaderTransferIn())
return nil
}

// ResumeLeaderTransferIn cleans a store's pause state. The store can be selected
// as target of TransferLeader again.
func (s *StoresInfo) ResumeLeaderTransferIn(storeID uint64) {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
log.Warn("try to clean a store's pause state, but it is not found. It may be cleanup",
zap.Uint64("store-id", storeID))
return
}
s.stores[storeID] = store.Clone(ResumeLeaderTransferIn())
}

// PauseLeaderTransferOut pauses a StoreInfo with storeID. The store can not be selected
// as source of TransferLeader again.
func (s *StoresInfo) PauseLeaderTransferOut(storeID uint64) error {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}
if !store.AllowLeaderTransfer() {
return errs.ErrPauseLeaderTransfer.FastGenByArgs(storeID)
if !store.AllowLeaderTransferOut() {
return errs.ErrPauseLeaderTransferOut.FastGenByArgs(storeID)

Check warning on line 825 in pkg/core/store.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/store.go#L825

Added line #L825 was not covered by tests
}
s.stores[storeID] = store.Clone(PauseLeaderTransfer())
s.stores[storeID] = store.Clone(PauseLeaderTransferOut())
return nil
}

// ResumeLeaderTransfer cleans a store's pause state. The store can be selected
// as source or target of TransferLeader again.
func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) {
// ResumeLeaderTransferOut cleans a store's pause state. The store can be selected
// as source of TransferLeader again.
func (s *StoresInfo) ResumeLeaderTransferOut(storeID uint64) {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
Expand All @@ -801,7 +839,7 @@
zap.Uint64("store-id", storeID))
return
}
s.stores[storeID] = store.Clone(ResumeLeaderTransfer())
s.stores[storeID] = store.Clone(ResumeLeaderTransferOut())
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
Expand Down
28 changes: 20 additions & 8 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,31 @@ func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCr
}
}

// PauseLeaderTransfer prevents the store from been selected as source or
// target store of TransferLeader.
func PauseLeaderTransfer() StoreCreateOption {
// PauseLeaderTransferIn prevents the store from been selected as target store of TransferLeader.
func PauseLeaderTransferIn() StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransfer = true
store.pauseLeaderTransferIn = true
}
}

// ResumeLeaderTransfer cleans a store's pause state. The store can be selected
// as source or target of TransferLeader again.
func ResumeLeaderTransfer() StoreCreateOption {
// ResumeLeaderTransferIn cleans a store's pause state. The store can be selected as target of TransferLeader again.
func ResumeLeaderTransferIn() StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransfer = false
store.pauseLeaderTransferIn = false
}
}

// PauseLeaderTransferOut prevents the store from been selected as source store of TransferLeader.
func PauseLeaderTransferOut() StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransferOut = true
}
}

// ResumeLeaderTransferOut cleans a store's pause state. The store can be selected as source of TransferLeader again.
func ResumeLeaderTransferOut() StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransferOut = false
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ var (
var (
ErrWrongRangeKeys = errors.Normalize("wrong range keys", errors.RFCCodeText("PD:core:ErrWrongRangeKeys"))
ErrStoreNotFound = errors.Normalize("store %v not found", errors.RFCCodeText("PD:core:ErrStoreNotFound"))
ErrPauseLeaderTransfer = errors.Normalize("store %v is paused for leader transfer", errors.RFCCodeText("PD:core:ErrPauseLeaderTransfer"))
ErrPauseLeaderTransferIn = errors.Normalize("store %v is paused for leader transfer in", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferIn"))
ErrPauseLeaderTransferOut = errors.Normalize("store %v is paused for leader transfer out", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferOut"))
ErrStoreRemoved = errors.Normalize("store %v has been removed", errors.RFCCodeText("PD:core:ErrStoreRemoved"))
ErrStoreDestroyed = errors.Normalize("store %v has been physically destroyed", errors.RFCCodeText("PD:core:ErrStoreDestroyed"))
ErrStoreUnhealthy = errors.Normalize("store %v is unhealthy", errors.RFCCodeText("PD:core:ErrStoreUnhealthy"))
Expand Down
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,9 @@
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.PauseLeaderTransfer()))
mc.PutStore(store.Clone(core.PauseLeaderTransferIn()))
} else {
mc.PutStore(store.Clone(core.ResumeLeaderTransfer()))
mc.PutStore(store.Clone(core.ResumeLeaderTransferIn()))

Check warning on line 572 in pkg/mock/mockcluster/mockcluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mock/mockcluster/mockcluster.go#L572

Added line #L572 was not covered by tests
}
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,17 @@ func (f *StoreStateFilter) isRemoving(_ config.SharedConfigProvider, store *core
return statusOK
}

func (f *StoreStateFilter) pauseLeaderTransfer(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if !store.AllowLeaderTransfer() {
func (f *StoreStateFilter) pauseLeaderTransferIn(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if !store.AllowLeaderTransferIn() {
f.Reason = storeStatePauseLeader
return statusStoreRejectLeader
}
f.Reason = storeStateOK
return statusOK
}

func (f *StoreStateFilter) pauseLeaderTransferOut(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if !store.AllowLeaderTransferOut() {
f.Reason = storeStatePauseLeader
return statusStoreRejectLeader
}
Expand Down Expand Up @@ -511,13 +520,13 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, conf config.SharedConfigPr
var funcs []conditionFunc
switch typ {
case leaderSource:
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected}
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransferOut, f.isDisconnected}
case regionSource:
funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots}
case witnessSource:
funcs = []conditionFunc{f.isBusy}
case leaderTarget:
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransfer,
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransferIn,
f.slowStoreEvicted, f.slowTrendEvicted, f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty}
case regionTarget:
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.isDisconnected, f.isBusy,
Expand Down
11 changes: 10 additions & 1 deletion pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,17 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla
balanceLeaderNoLeaderRegionCounter.Inc()
return nil
}
finalFilters := l.filters
// Check if the source store is available as a source.
conf := solver.GetSchedulerConfig()
if filter.NewCandidates([]*core.StoreInfo{solver.Source}).
FilterSource(conf, nil, l.filterCounter, l.filters...).Len() == 0 {
log.Debug("store cannot be used as source", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.Source.GetID()))
balanceLeaderNoSourceStoreCounter.Inc()
return nil
}

// Check if the target store is available as a target.
finalFilters := l.filters
if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil {
finalFilters = append(l.filters, leaderFilter)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, erro
_, exists := conf.StoreIDWithRanges[id]
if exists {
delete(conf.StoreIDWithRanges, id)
conf.cluster.ResumeLeaderTransfer(id)
conf.cluster.ResumeLeaderTransferIn(id)
return len(conf.StoreIDWithRanges) == 0, nil
}
return false, errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
if err := conf.cluster.PauseLeaderTransferIn(id); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
Expand Down Expand Up @@ -138,7 +138,7 @@ func (conf *evictLeaderSchedulerConfig) reloadConfig() error {
if err := conf.load(newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
pauseAndResumeLeaderTransferIn(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
conf.Batch = newCfg.Batch
return nil
Expand All @@ -149,7 +149,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.Schedul
defer conf.RUnlock()
var res error
for id := range conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransfer(id); err != nil {
if err := cluster.PauseLeaderTransferIn(id); err != nil {
res = err
}
}
Expand All @@ -160,15 +160,15 @@ func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.Schedu
conf.RLock()
defer conf.RUnlock()
for id := range conf.StoreIDWithRanges {
cluster.ResumeLeaderTransfer(id)
cluster.ResumeLeaderTransferIn(id)
}
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) {
conf.RLock()
defer conf.RUnlock()
if _, exist := conf.StoreIDWithRanges[id]; !exist {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
if err := conf.cluster.PauseLeaderTransferIn(id); err != nil {
return exist, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error {
for _, id := range newCfg.EvictedStores {
new[id] = struct{}{}
}
pauseAndResumeLeaderTransfer(s.conf.cluster, old, new)
pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new)
s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap
s.conf.EvictedStores = newCfg.EvictedStores
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (s *evictSlowTrendScheduler) ReloadConfig() error {
for _, id := range newCfg.EvictedStores {
new[id] = struct{}{}
}
pauseAndResumeLeaderTransfer(s.conf.cluster, old, new)
pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new)
s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap
s.conf.EvictedStores = newCfg.EvictedStores
return nil
Expand Down
Loading