diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index 9c307509937..f7c3c5e93b1 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -14,7 +14,11 @@ package core -import "bytes" +import ( + "bytes" + + "github.com/tikv/pd/pkg/core/constant" +) // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { @@ -137,10 +141,8 @@ type StoreSetInformer interface { // StoreSetController is used to control stores' status. type StoreSetController interface { - PauseLeaderTransferIn(id uint64) error - ResumeLeaderTransferIn(id uint64) - PauseLeaderTransferOut(id uint64) error - ResumeLeaderTransferOut(id uint64) + PauseLeaderTransfer(id uint64, d constant.Direction) error + ResumeLeaderTransfer(id uint64, d constant.Direction) SlowStoreEvicted(id uint64) error SlowStoreRecovered(id uint64) diff --git a/pkg/core/constant/kind.go b/pkg/core/constant/kind.go index d8059a306e7..178cbf766e4 100644 --- a/pkg/core/constant/kind.go +++ b/pkg/core/constant/kind.go @@ -155,3 +155,13 @@ func StringToKeyType(input string) KeyType { panic("invalid key type: " + input) } } + +// Direction distinguishes different kinds of direction. +type Direction int + +const ( + // In indicates that the direction is in. + In Direction = iota + // Out indicates that the direction is out. + Out +) diff --git a/pkg/core/store.go b/pkg/core/store.go index 582e39373a5..b8881d9b162 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -782,55 +782,32 @@ func (s *StoresInfo) ResetStores() { s.stores = make(map[uint64]*StoreInfo) } -// PauseLeaderTransferIn pauses a StoreInfo with storeID. The store can not be selected -// as target of TransferLeader again. -func (s *StoresInfo) PauseLeaderTransferIn(storeID uint64) error { +// PauseLeaderTransfer pauses a StoreInfo with storeID. The store can not be selected +// as source or target of TransferLeader. +func (s *StoresInfo) PauseLeaderTransfer(storeID uint64, direction constant.Direction) error { s.Lock() defer s.Unlock() store, ok := s.stores[storeID] if !ok { return errs.ErrStoreNotFound.FastGenByArgs(storeID) } - if !store.AllowLeaderTransferIn() { - return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID) + switch direction { + case constant.In: + if !store.AllowLeaderTransferIn() { + return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID) + } + case constant.Out: + if !store.AllowLeaderTransferOut() { + return errs.ErrPauseLeaderTransferOut.FastGenByArgs(storeID) + } } - s.stores[storeID] = store.Clone(PauseLeaderTransferIn()) + s.stores[storeID] = store.Clone(PauseLeaderTransfer(direction)) return nil } // ResumeLeaderTransferIn cleans a store's pause state. The store can be selected // as target of TransferLeader again. -func (s *StoresInfo) ResumeLeaderTransferIn(storeID uint64) { - s.Lock() - defer s.Unlock() - store, ok := s.stores[storeID] - if !ok { - log.Warn("try to clean a store's pause state, but it is not found. It may be cleanup", - zap.Uint64("store-id", storeID)) - return - } - s.stores[storeID] = store.Clone(ResumeLeaderTransferIn()) -} - -// PauseLeaderTransferOut pauses a StoreInfo with storeID. The store can not be selected -// as source of TransferLeader again. -func (s *StoresInfo) PauseLeaderTransferOut(storeID uint64) error { - s.Lock() - defer s.Unlock() - store, ok := s.stores[storeID] - if !ok { - return errs.ErrStoreNotFound.FastGenByArgs(storeID) - } - if !store.AllowLeaderTransferOut() { - return errs.ErrPauseLeaderTransferOut.FastGenByArgs(storeID) - } - s.stores[storeID] = store.Clone(PauseLeaderTransferOut()) - return nil -} - -// ResumeLeaderTransferOut cleans a store's pause state. The store can be selected -// as source of TransferLeader again. -func (s *StoresInfo) ResumeLeaderTransferOut(storeID uint64) { +func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64, direction constant.Direction) { s.Lock() defer s.Unlock() store, ok := s.stores[storeID] @@ -839,7 +816,7 @@ func (s *StoresInfo) ResumeLeaderTransferOut(storeID uint64) { zap.Uint64("store-id", storeID)) return } - s.stores[storeID] = store.Clone(ResumeLeaderTransferOut()) + s.stores[storeID] = store.Clone(ResumeLeaderTransfer(direction)) } // SlowStoreEvicted marks a store as a slow store and prevents transferring diff --git a/pkg/core/store_option.go b/pkg/core/store_option.go index 40d84b5a2c3..542d3ec84a0 100644 --- a/pkg/core/store_option.go +++ b/pkg/core/store_option.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/utils/typeutil" ) @@ -98,17 +99,27 @@ func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCr } } -// PauseLeaderTransferIn prevents the store from been selected as target store of TransferLeader. -func PauseLeaderTransferIn() StoreCreateOption { +// PauseLeaderTransfer prevents the store from been selected as source or target store of TransferLeader. +func PauseLeaderTransfer(d constant.Direction) StoreCreateOption { return func(store *StoreInfo) { - store.pauseLeaderTransferIn = true + switch d { + case constant.In: + store.pauseLeaderTransferIn = true + case constant.Out: + store.pauseLeaderTransferOut = true + } } } -// ResumeLeaderTransferIn cleans a store's pause state. The store can be selected as target of TransferLeader again. -func ResumeLeaderTransferIn() StoreCreateOption { +// ResumeLeaderTransfer cleans a store's pause state. The store can be selected as source or target of TransferLeader again. +func ResumeLeaderTransfer(d constant.Direction) StoreCreateOption { return func(store *StoreInfo) { - store.pauseLeaderTransferIn = false + switch d { + case constant.In: + store.pauseLeaderTransferIn = false + case constant.Out: + store.pauseLeaderTransferOut = false + } } } diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index f64bcb33a78..769080aac30 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mock/mockid" @@ -567,9 +568,9 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) { func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) { store := mc.GetStore(storeID) if enableEvictLeader { - mc.PutStore(store.Clone(core.PauseLeaderTransferIn())) + mc.PutStore(store.Clone(core.PauseLeaderTransfer(constant.In))) } else { - mc.PutStore(store.Clone(core.ResumeLeaderTransferIn())) + mc.PutStore(store.Clone(core.ResumeLeaderTransfer(constant.In))) } } diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 5d561ddbc4b..4aeee029b4a 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -97,14 +97,14 @@ func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, erro _, exists := conf.StoreIDWithRanges[id] if exists { delete(conf.StoreIDWithRanges, id) - conf.cluster.ResumeLeaderTransferIn(id) + conf.cluster.ResumeLeaderTransfer(id, constant.In) return len(conf.StoreIDWithRanges) == 0, nil } return false, errs.ErrScheduleConfigNotExist.FastGenByArgs() } func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) { - if err := conf.cluster.PauseLeaderTransferIn(id); err != nil { + if err := conf.cluster.PauseLeaderTransfer(id, constant.In); err != nil { log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) } conf.StoreIDWithRanges[id] = keyRange @@ -138,7 +138,7 @@ func (conf *evictLeaderSchedulerConfig) reloadConfig() error { if err := conf.load(newCfg); err != nil { return err } - pauseAndResumeLeaderTransferIn(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) + pauseAndResumeLeaderTransfer(conf.cluster, constant.In, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) conf.StoreIDWithRanges = newCfg.StoreIDWithRanges conf.Batch = newCfg.Batch return nil @@ -149,7 +149,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.Schedul defer conf.RUnlock() var res error for id := range conf.StoreIDWithRanges { - if err := cluster.PauseLeaderTransferIn(id); err != nil { + if err := cluster.PauseLeaderTransfer(id, constant.In); err != nil { res = err } } @@ -160,7 +160,7 @@ func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.Schedu conf.RLock() defer conf.RUnlock() for id := range conf.StoreIDWithRanges { - cluster.ResumeLeaderTransferIn(id) + cluster.ResumeLeaderTransfer(id, constant.In) } } @@ -168,7 +168,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id ui conf.RLock() defer conf.RUnlock() if _, exist := conf.StoreIDWithRanges[id]; !exist { - if err := conf.cluster.PauseLeaderTransferIn(id); err != nil { + if err := conf.cluster.PauseLeaderTransfer(id, constant.In); err != nil { return exist, err } } diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index ed2b8d0afc0..dfbe6db1dfb 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -201,7 +202,7 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error { for _, id := range newCfg.EvictedStores { new[id] = struct{}{} } - pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new) + pauseAndResumeLeaderTransfer(s.conf.cluster, constant.In, old, new) s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap s.conf.EvictedStores = newCfg.EvictedStores return nil diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index ae6311076df..0c223a000fb 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" @@ -301,7 +302,7 @@ func (s *evictSlowTrendScheduler) ReloadConfig() error { for _, id := range newCfg.EvictedStores { new[id] = struct{}{} } - pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new) + pauseAndResumeLeaderTransfer(s.conf.cluster, constant.In, old, new) s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap s.conf.EvictedStores = newCfg.EvictedStores return nil diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 521acf53045..b60abf13dc9 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -99,7 +99,7 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last succ, last = false, false if exists { delete(conf.StoreIDWithRanges, id) - conf.cluster.ResumeLeaderTransferOut(id) + conf.cluster.ResumeLeaderTransfer(id, constant.Out) succ = true last = len(conf.StoreIDWithRanges) == 0 } @@ -109,7 +109,7 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last func (conf *grantLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { conf.Lock() defer conf.Unlock() - if err := conf.cluster.PauseLeaderTransferOut(id); err != nil { + if err := conf.cluster.PauseLeaderTransfer(id, constant.Out); err != nil { log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) } conf.StoreIDWithRanges[id] = keyRange @@ -171,7 +171,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error { if err := s.conf.load(newCfg); err != nil { return err } - pauseAndResumeLeaderTransferOut(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) + pauseAndResumeLeaderTransfer(s.conf.cluster, constant.Out, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges return nil } @@ -182,7 +182,7 @@ func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro defer s.conf.RUnlock() var res error for id := range s.conf.StoreIDWithRanges { - if err := cluster.PauseLeaderTransferOut(id); err != nil { + if err := cluster.PauseLeaderTransfer(id, constant.Out); err != nil { res = err } } @@ -194,7 +194,7 @@ func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.RLock() defer s.conf.RUnlock() for id := range s.conf.StoreIDWithRanges { - cluster.ResumeLeaderTransferOut(id) + cluster.ResumeLeaderTransfer(id, constant.Out) } } @@ -252,7 +252,7 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R id = (uint64)(idFloat) handler.config.RLock() if _, exists = handler.config.StoreIDWithRanges[id]; !exists { - if err := handler.config.cluster.PauseLeaderTransferOut(id); err != nil { + if err := handler.config.cluster.PauseLeaderTransfer(id, constant.Out); err != nil { handler.config.RUnlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index 40f2cf7bc97..63ed39025ab 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -391,38 +391,20 @@ func (q *retryQuota) gc(keepStores []*core.StoreInfo) { } } -// pauseAndResumeLeaderTransferIn checks the old and new store IDs, and pause or resume the leader transfer in. -func pauseAndResumeLeaderTransferIn[T any](cluster *core.BasicCluster, old, new map[uint64]T) { +// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer in or out. +func pauseAndResumeLeaderTransfer[T any](cluster *core.BasicCluster, direction constant.Direction, old, new map[uint64]T) { for id := range old { if _, ok := new[id]; ok { continue } - cluster.ResumeLeaderTransferIn(id) + cluster.ResumeLeaderTransfer(id, direction) } for id := range new { if _, ok := old[id]; ok { continue } - if err := cluster.PauseLeaderTransferIn(id); err != nil { + if err := cluster.PauseLeaderTransfer(id, direction); err != nil { log.Error("pause leader transfer in failed", zap.Uint64("store-id", id), errs.ZapError(err)) } } } - -// pauseAndResumeLeaderTransferOut checks the old and new store IDs, and pause or resume the leader transfer out. -func pauseAndResumeLeaderTransferOut[T any](cluster *core.BasicCluster, old, new map[uint64]T) { - for id := range old { - if _, ok := new[id]; ok { - continue - } - cluster.ResumeLeaderTransferOut(id) - } - for id := range new { - if _, ok := old[id]; ok { - continue - } - if err := cluster.PauseLeaderTransferOut(id); err != nil { - log.Error("pause leader transfer out failed", zap.Uint64("store-id", id), errs.ZapError(err)) - } - } -} diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 2129942c3a5..f5b083eac92 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -185,7 +185,7 @@ func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro defer s.conf.mu.RUnlock() var res error for id := range s.conf.StoreIDWitRanges { - if err := cluster.PauseLeaderTransferIn(id); err != nil { + if err := cluster.PauseLeaderTransfer(id, constant.In); err != nil { res = err } } @@ -197,7 +197,7 @@ func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWitRanges { - cluster.ResumeLeaderTransferIn(id) + cluster.ResumeLeaderTransfer(id, constant.In) } } @@ -258,7 +258,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R if ok { id = (uint64)(idFloat) if _, exists = handler.config.StoreIDWitRanges[id]; !exists { - if err := handler.config.cluster.PauseLeaderTransferIn(id); err != nil { + if err := handler.config.cluster.PauseLeaderTransfer(id, constant.In); err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } @@ -307,7 +307,7 @@ func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.R return } delete(handler.config.StoreIDWitRanges, id) - handler.config.cluster.ResumeLeaderTransferIn(id) + handler.config.cluster.ResumeLeaderTransfer(id, constant.In) handler.config.mu.Unlock() if err := handler.config.Persist(); err != nil { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index ee371d21aef..fac44c56402 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1815,14 +1815,14 @@ func TestStores(t *testing.T) { for i, store := range stores { id := store.GetID() re.Nil(cache.GetStore(id)) - re.Error(cache.PauseLeaderTransferIn(id)) + re.Error(cache.PauseLeaderTransfer(id, constant.In)) cache.PutStore(store) re.Equal(store, cache.GetStore(id)) re.Equal(i+1, cache.GetStoreCount()) - re.NoError(cache.PauseLeaderTransferIn(id)) + re.NoError(cache.PauseLeaderTransfer(id, constant.In)) re.False(cache.GetStore(id).AllowLeaderTransferIn()) - re.Error(cache.PauseLeaderTransferIn(id)) - cache.ResumeLeaderTransferIn(id) + re.Error(cache.PauseLeaderTransfer(id, constant.In)) + cache.ResumeLeaderTransfer(id, constant.In) re.True(cache.GetStore(id).AllowLeaderTransferIn()) } re.Equal(int(n), cache.GetStoreCount())