diff --git a/errors.toml b/errors.toml index a61c23a6fbd..a7e039564d7 100644 --- a/errors.toml +++ b/errors.toml @@ -141,9 +141,14 @@ error = ''' can not remove store %d since there are no extra up store to store the leader ''' -["PD:core:ErrPauseLeaderTransfer"] +["PD:core:ErrPauseLeaderTransferIn"] error = ''' -store %v is paused for leader transfer +store %v is paused for leader transfer in +''' + +["PD:core:ErrPauseLeaderTransferOut"] +error = ''' +store %v is paused for leader transfer out ''' ["PD:core:ErrSlowStoreEvicted"] diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index f0b23bd6434..9c307509937 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -137,8 +137,10 @@ type StoreSetInformer interface { // StoreSetController is used to control stores' status. type StoreSetController interface { - PauseLeaderTransfer(id uint64) error - ResumeLeaderTransfer(id uint64) + PauseLeaderTransferIn(id uint64) error + ResumeLeaderTransferIn(id uint64) + PauseLeaderTransferOut(id uint64) error + ResumeLeaderTransferOut(id uint64) SlowStoreEvicted(id uint64) error SlowStoreRecovered(id uint64) diff --git a/pkg/core/store.go b/pkg/core/store.go index 5baedafdb05..3c37b5d22c0 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -51,22 +51,23 @@ const ( type StoreInfo struct { meta *metapb.Store *storeStats - pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader - slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it - slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it - leaderCount int - regionCount int - learnerCount int - witnessCount int - leaderSize int64 - regionSize int64 - pendingPeerCount int - lastPersistTime time.Time - leaderWeight float64 - regionWeight float64 - limiter storelimit.StoreLimit - minResolvedTS uint64 - lastAwakenTime time.Time + pauseLeaderTransferIn bool // not allow to be used as target of transfer leader + pauseLeaderTransferOut bool // not allow to be used as source of transfer leader + slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it + slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it + leaderCount int + regionCount int + learnerCount int + witnessCount int + leaderSize int64 + regionSize int64 + pendingPeerCount int + lastPersistTime time.Time + leaderWeight float64 + regionWeight float64 + limiter storelimit.StoreLimit + minResolvedTS uint64 + lastAwakenTime time.Time } // NewStoreInfo creates StoreInfo with meta data. @@ -138,10 +139,16 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo { return &store } -// AllowLeaderTransfer returns if the store is allowed to be selected -// as source or target of transfer leader. -func (s *StoreInfo) AllowLeaderTransfer() bool { - return !s.pauseLeaderTransfer +// AllowLeaderTransferIn returns if the store is allowed to be selected +// as target of transfer leader. +func (s *StoreInfo) AllowLeaderTransferIn() bool { + return !s.pauseLeaderTransferIn +} + +// AllowLeaderTransferOut returns if the store is allowed to be selected +// as source of transfer leader. +func (s *StoreInfo) AllowLeaderTransferOut() bool { + return !s.pauseLeaderTransferOut } // EvictedAsSlowStore returns if the store should be evicted as a slow store. @@ -775,24 +782,55 @@ func (s *StoresInfo) ResetStores() { s.stores = make(map[uint64]*StoreInfo) } -// PauseLeaderTransfer pauses a StoreInfo with storeID. -func (s *StoresInfo) PauseLeaderTransfer(storeID uint64) error { +// PauseLeaderTransferIn pauses a StoreInfo with storeID. The store can not be selected +// as target of TransferLeader again. +func (s *StoresInfo) PauseLeaderTransferIn(storeID uint64) error { + s.Lock() + defer s.Unlock() + store, ok := s.stores[storeID] + if !ok { + return errs.ErrStoreNotFound.FastGenByArgs(storeID) + } + if !store.AllowLeaderTransferIn() { + return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID) + } + s.stores[storeID] = store.Clone(PauseLeaderTransferIn()) + return nil +} + +// ResumeLeaderTransferIn cleans a store's pause state. The store can be selected +// as target of TransferLeader again. +func (s *StoresInfo) ResumeLeaderTransferIn(storeID uint64) { + s.Lock() + defer s.Unlock() + store, ok := s.stores[storeID] + if !ok { + log.Warn("try to clean a store's pause state, but it is not found. It may be cleanup", + zap.Uint64("store-id", storeID)) + return + } + s.stores[storeID] = store.Clone(ResumeLeaderTransferIn()) +} + +// PauseLeaderTransferOut pauses a StoreInfo with storeID. The store can not be selected +// as source of TransferLeader again. +func (s *StoresInfo) PauseLeaderTransferOut(storeID uint64) error { s.Lock() defer s.Unlock() store, ok := s.stores[storeID] if !ok { return errs.ErrStoreNotFound.FastGenByArgs(storeID) } - if !store.AllowLeaderTransfer() { - return errs.ErrPauseLeaderTransfer.FastGenByArgs(storeID) + if !store.AllowLeaderTransferOut() { + return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID) } - s.stores[storeID] = store.Clone(PauseLeaderTransfer()) + s.stores[storeID] = store.Clone(PauseLeaderTransferOut()) return nil } -// ResumeLeaderTransfer cleans a store's pause state. The store can be selected -// as source or target of TransferLeader again. -func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) { +// ResumeLeaderTransferOut cleans a store's pause state. The store can be selected +// as source of TransferLeader again. +func (s *StoresInfo) ResumeLeaderTransferOut(storeID uint64) { s.Lock() defer s.Unlock() store, ok := s.stores[storeID] @@ -801,7 +839,7 @@ func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) { zap.Uint64("store-id", storeID)) return } - s.stores[storeID] = store.Clone(ResumeLeaderTransfer()) + s.stores[storeID] = store.Clone(ResumeLeaderTransferOut()) } // SlowStoreEvicted marks a store as a slow store and prevents transferring diff --git a/pkg/core/store_option.go b/pkg/core/store_option.go index 93b25562731..40d84b5a2c3 100644 --- a/pkg/core/store_option.go +++ b/pkg/core/store_option.go @@ -98,19 +98,31 @@ func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCr } } -// PauseLeaderTransfer prevents the store from been selected as source or -// target store of TransferLeader. -func PauseLeaderTransfer() StoreCreateOption { +// PauseLeaderTransferIn prevents the store from been selected as target store of TransferLeader. +func PauseLeaderTransferIn() StoreCreateOption { return func(store *StoreInfo) { - store.pauseLeaderTransfer = true + store.pauseLeaderTransferIn = true } } -// ResumeLeaderTransfer cleans a store's pause state. The store can be selected -// as source or target of TransferLeader again. -func ResumeLeaderTransfer() StoreCreateOption { +// ResumeLeaderTransferIn cleans a store's pause state. The store can be selected as target of TransferLeader again. +func ResumeLeaderTransferIn() StoreCreateOption { return func(store *StoreInfo) { - store.pauseLeaderTransfer = false + store.pauseLeaderTransferIn = false + } +} + +// PauseLeaderTransferOut prevents the store from been selected as source store of TransferLeader. +func PauseLeaderTransferOut() StoreCreateOption { + return func(store *StoreInfo) { + store.pauseLeaderTransferOut = true + } +} + +// ResumeLeaderTransferOut cleans a store's pause state. The store can be selected as source of TransferLeader again. +func ResumeLeaderTransferOut() StoreCreateOption { + return func(store *StoreInfo) { + store.pauseLeaderTransferOut = false } } diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 1f56a821032..29065c7c13d 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -71,7 +71,8 @@ var ( var ( ErrWrongRangeKeys = errors.Normalize("wrong range keys", errors.RFCCodeText("PD:core:ErrWrongRangeKeys")) ErrStoreNotFound = errors.Normalize("store %v not found", errors.RFCCodeText("PD:core:ErrStoreNotFound")) - ErrPauseLeaderTransfer = errors.Normalize("store %v is paused for leader transfer", errors.RFCCodeText("PD:core:ErrPauseLeaderTransfer")) + ErrPauseLeaderTransferIn = errors.Normalize("store %v is paused for leader transfer in", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferIn")) + ErrPauseLeaderTransferOut = errors.Normalize("store %v is paused for leader transfer out", errors.RFCCodeText("PD:core:ErrPauseLeaderTransferOut")) ErrStoreRemoved = errors.Normalize("store %v has been removed", errors.RFCCodeText("PD:core:ErrStoreRemoved")) ErrStoreDestroyed = errors.Normalize("store %v has been physically destroyed", errors.RFCCodeText("PD:core:ErrStoreDestroyed")) ErrStoreUnhealthy = errors.Normalize("store %v is unhealthy", errors.RFCCodeText("PD:core:ErrStoreUnhealthy")) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index bbd4fbb6811..f64bcb33a78 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -567,9 +567,9 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) { func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) { store := mc.GetStore(storeID) if enableEvictLeader { - mc.PutStore(store.Clone(core.PauseLeaderTransfer())) + mc.PutStore(store.Clone(core.PauseLeaderTransferIn())) } else { - mc.PutStore(store.Clone(core.ResumeLeaderTransfer())) + mc.PutStore(store.Clone(core.ResumeLeaderTransferIn())) } } diff --git a/pkg/schedule/filter/filters.go b/pkg/schedule/filter/filters.go index e2846e6c9a6..4ea59935109 100644 --- a/pkg/schedule/filter/filters.go +++ b/pkg/schedule/filter/filters.go @@ -390,8 +390,17 @@ func (f *StoreStateFilter) isRemoving(_ config.SharedConfigProvider, store *core return statusOK } -func (f *StoreStateFilter) pauseLeaderTransfer(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { - if !store.AllowLeaderTransfer() { +func (f *StoreStateFilter) pauseLeaderTransferIn(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { + if !store.AllowLeaderTransferIn() { + f.Reason = storeStatePauseLeader + return statusStoreRejectLeader + } + f.Reason = storeStateOK + return statusOK +} + +func (f *StoreStateFilter) pauseLeaderTransferOut(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { + if !store.AllowLeaderTransferOut() { f.Reason = storeStatePauseLeader return statusStoreRejectLeader } @@ -511,13 +520,13 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, conf config.SharedConfigPr var funcs []conditionFunc switch typ { case leaderSource: - funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected} + funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransferOut, f.isDisconnected} case regionSource: funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots} case witnessSource: funcs = []conditionFunc{f.isBusy} case leaderTarget: - funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransfer, + funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.pauseLeaderTransferIn, f.slowStoreEvicted, f.slowTrendEvicted, f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty} case regionTarget: funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.isDisconnected, f.isBusy, diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 60dbee79dc4..3868d508f84 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -490,8 +490,17 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla balanceLeaderNoLeaderRegionCounter.Inc() return nil } - finalFilters := l.filters + // Check if the source store is available as a source. conf := solver.GetSchedulerConfig() + if filter.NewCandidates([]*core.StoreInfo{solver.Source}). + FilterSource(conf, nil, l.filterCounter, l.filters...).Len() == 0 { + log.Debug("store cannot be used as source", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.Source.GetID())) + balanceLeaderNoSourceStoreCounter.Inc() + return nil + } + + // Check if the target store is available as a target. + finalFilters := l.filters if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { finalFilters = append(l.filters, leaderFilter) } diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index a7d656a3e42..5d561ddbc4b 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -97,14 +97,14 @@ func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, erro _, exists := conf.StoreIDWithRanges[id] if exists { delete(conf.StoreIDWithRanges, id) - conf.cluster.ResumeLeaderTransfer(id) + conf.cluster.ResumeLeaderTransferIn(id) return len(conf.StoreIDWithRanges) == 0, nil } return false, errs.ErrScheduleConfigNotExist.FastGenByArgs() } func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) { - if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + if err := conf.cluster.PauseLeaderTransferIn(id); err != nil { log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) } conf.StoreIDWithRanges[id] = keyRange @@ -138,7 +138,7 @@ func (conf *evictLeaderSchedulerConfig) reloadConfig() error { if err := conf.load(newCfg); err != nil { return err } - pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) + pauseAndResumeLeaderTransferIn(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) conf.StoreIDWithRanges = newCfg.StoreIDWithRanges conf.Batch = newCfg.Batch return nil @@ -149,7 +149,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.Schedul defer conf.RUnlock() var res error for id := range conf.StoreIDWithRanges { - if err := cluster.PauseLeaderTransfer(id); err != nil { + if err := cluster.PauseLeaderTransferIn(id); err != nil { res = err } } @@ -160,7 +160,7 @@ func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.Schedu conf.RLock() defer conf.RUnlock() for id := range conf.StoreIDWithRanges { - cluster.ResumeLeaderTransfer(id) + cluster.ResumeLeaderTransferIn(id) } } @@ -168,7 +168,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id ui conf.RLock() defer conf.RUnlock() if _, exist := conf.StoreIDWithRanges[id]; !exist { - if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + if err := conf.cluster.PauseLeaderTransferIn(id); err != nil { return exist, err } } diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index d23fc2f8ff8..ed2b8d0afc0 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -201,7 +201,7 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error { for _, id := range newCfg.EvictedStores { new[id] = struct{}{} } - pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new) s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap s.conf.EvictedStores = newCfg.EvictedStores return nil diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 8fd76bdccd4..ae6311076df 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -301,7 +301,7 @@ func (s *evictSlowTrendScheduler) ReloadConfig() error { for _, id := range newCfg.EvictedStores { new[id] = struct{}{} } - pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new) s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap s.conf.EvictedStores = newCfg.EvictedStores return nil diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 5dbb6eef5f6..521acf53045 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -99,7 +99,7 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last succ, last = false, false if exists { delete(conf.StoreIDWithRanges, id) - conf.cluster.ResumeLeaderTransfer(id) + conf.cluster.ResumeLeaderTransferOut(id) succ = true last = len(conf.StoreIDWithRanges) == 0 } @@ -109,7 +109,7 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last func (conf *grantLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { conf.Lock() defer conf.Unlock() - if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + if err := conf.cluster.PauseLeaderTransferOut(id); err != nil { log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) } conf.StoreIDWithRanges[id] = keyRange @@ -171,7 +171,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error { if err := s.conf.load(newCfg); err != nil { return err } - pauseAndResumeLeaderTransfer(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) + pauseAndResumeLeaderTransferOut(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges return nil } @@ -182,7 +182,7 @@ func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro defer s.conf.RUnlock() var res error for id := range s.conf.StoreIDWithRanges { - if err := cluster.PauseLeaderTransfer(id); err != nil { + if err := cluster.PauseLeaderTransferOut(id); err != nil { res = err } } @@ -194,7 +194,7 @@ func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.RLock() defer s.conf.RUnlock() for id := range s.conf.StoreIDWithRanges { - cluster.ResumeLeaderTransfer(id) + cluster.ResumeLeaderTransferOut(id) } } @@ -252,7 +252,7 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R id = (uint64)(idFloat) handler.config.RLock() if _, exists = handler.config.StoreIDWithRanges[id]; !exists { - if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { + if err := handler.config.cluster.PauseLeaderTransferOut(id); err != nil { handler.config.RUnlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index a518a167af7..4afc4605f52 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -226,6 +226,7 @@ var ( balanceLeaderScheduleCounter = balanceLeaderCounterWithEvent("schedule") balanceLeaderNoLeaderRegionCounter = balanceLeaderCounterWithEvent("no-leader-region") balanceLeaderRegionHotCounter = balanceLeaderCounterWithEvent("region-hot") + balanceLeaderNoSourceStoreCounter = balanceLeaderCounterWithEvent("no-source-store") balanceLeaderNoTargetStoreCounter = balanceLeaderCounterWithEvent("no-target-store") balanceLeaderNoFollowerRegionCounter = balanceLeaderCounterWithEvent("no-follower-region") balanceLeaderSkipCounter = balanceLeaderCounterWithEvent("skip") diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index 7cbfe714aa9..40f2cf7bc97 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -391,20 +391,38 @@ func (q *retryQuota) gc(keepStores []*core.StoreInfo) { } } -// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer. -func pauseAndResumeLeaderTransfer[T any](cluster *core.BasicCluster, old, new map[uint64]T) { +// pauseAndResumeLeaderTransferIn checks the old and new store IDs, and pause or resume the leader transfer in. +func pauseAndResumeLeaderTransferIn[T any](cluster *core.BasicCluster, old, new map[uint64]T) { for id := range old { if _, ok := new[id]; ok { continue } - cluster.ResumeLeaderTransfer(id) + cluster.ResumeLeaderTransferIn(id) } for id := range new { if _, ok := old[id]; ok { continue } - if err := cluster.PauseLeaderTransfer(id); err != nil { - log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) + if err := cluster.PauseLeaderTransferIn(id); err != nil { + log.Error("pause leader transfer in failed", zap.Uint64("store-id", id), errs.ZapError(err)) + } + } +} + +// pauseAndResumeLeaderTransferOut checks the old and new store IDs, and pause or resume the leader transfer out. +func pauseAndResumeLeaderTransferOut[T any](cluster *core.BasicCluster, old, new map[uint64]T) { + for id := range old { + if _, ok := new[id]; ok { + continue + } + cluster.ResumeLeaderTransferOut(id) + } + for id := range new { + if _, ok := old[id]; ok { + continue + } + if err := cluster.PauseLeaderTransferOut(id); err != nil { + log.Error("pause leader transfer out failed", zap.Uint64("store-id", id), errs.ZapError(err)) } } } diff --git a/pkg/statistics/collector.go b/pkg/statistics/collector.go index 4e3e2fa2c7a..e63f8141638 100644 --- a/pkg/statistics/collector.go +++ b/pkg/statistics/collector.go @@ -46,7 +46,7 @@ func (tikvCollector) filter(info *StoreSummaryInfo, kind constant.ResourceKind) } switch kind { case constant.LeaderKind: - return info.AllowLeaderTransfer() + return info.AllowLeaderTransferIn() || info.AllowLeaderTransferOut() case constant.RegionKind: return true } diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index c20cfd41814..2129942c3a5 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -185,7 +185,7 @@ func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro defer s.conf.mu.RUnlock() var res error for id := range s.conf.StoreIDWitRanges { - if err := cluster.PauseLeaderTransfer(id); err != nil { + if err := cluster.PauseLeaderTransferIn(id); err != nil { res = err } } @@ -197,7 +197,7 @@ func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWitRanges { - cluster.ResumeLeaderTransfer(id) + cluster.ResumeLeaderTransferIn(id) } } @@ -258,7 +258,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R if ok { id = (uint64)(idFloat) if _, exists = handler.config.StoreIDWitRanges[id]; !exists { - if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { + if err := handler.config.cluster.PauseLeaderTransferIn(id); err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } @@ -307,7 +307,7 @@ func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.R return } delete(handler.config.StoreIDWitRanges, id) - handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.cluster.ResumeLeaderTransferIn(id) handler.config.mu.Unlock() if err := handler.config.Persist(); err != nil { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 9a9420988a1..ee371d21aef 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1815,15 +1815,15 @@ func TestStores(t *testing.T) { for i, store := range stores { id := store.GetID() re.Nil(cache.GetStore(id)) - re.Error(cache.PauseLeaderTransfer(id)) + re.Error(cache.PauseLeaderTransferIn(id)) cache.PutStore(store) re.Equal(store, cache.GetStore(id)) re.Equal(i+1, cache.GetStoreCount()) - re.NoError(cache.PauseLeaderTransfer(id)) - re.False(cache.GetStore(id).AllowLeaderTransfer()) - re.Error(cache.PauseLeaderTransfer(id)) - cache.ResumeLeaderTransfer(id) - re.True(cache.GetStore(id).AllowLeaderTransfer()) + re.NoError(cache.PauseLeaderTransferIn(id)) + re.False(cache.GetStore(id).AllowLeaderTransferIn()) + re.Error(cache.PauseLeaderTransferIn(id)) + cache.ResumeLeaderTransferIn(id) + re.True(cache.GetStore(id).AllowLeaderTransferIn()) } re.Equal(int(n), cache.GetStoreCount()) diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 3a6e29f3586..1228c614893 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -174,19 +174,19 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler", "evict-leader-scheduler", "grant-leader-scheduler"} checkStorePause := func(changedStores []uint64, schedulerName string) { - status := func() string { - switch schedulerName { - case "evict-leader-scheduler": - return "paused" - case "grant-leader-scheduler": - return "resumed" - default: - re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) - return "" - } - }() for _, store := range stores { - isStorePaused := !cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()).AllowLeaderTransfer() + storeInfo := cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()) + status, isStorePaused := func() (string, bool) { + switch schedulerName { + case "evict-leader-scheduler": + return "paused", !storeInfo.AllowLeaderTransferIn() + case "grant-leader-scheduler": + return "paused", !storeInfo.AllowLeaderTransferOut() + default: + re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) + return "", false + } + }() if slice.AnyOf(changedStores, func(i int) bool { return store.GetId() == changedStores[i] }) { @@ -197,7 +197,14 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { fmt.Sprintf("store %d should not be %s with %s", store.GetId(), status, schedulerName)) } if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransfer()) + switch schedulerName { + case "evict-leader-scheduler": + re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransferIn()) + case "grant-leader-scheduler": + re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransferOut()) + default: + re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) + } } } }