From e295e628f485854c3e9f8e092e86e0eae8661e39 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 14 Sep 2023 15:00:11 +0800 Subject: [PATCH] scheduling: sync schedulers from the API server (#7076) ref tikv/pd#5839 - Sync the schedulers from the API server. - Dynamically reload the scheduler config. Signed-off-by: JmPotato --- pkg/mcs/scheduling/server/cluster.go | 98 +++++++++++++++++++ pkg/mcs/scheduling/server/config/config.go | 35 ++++++- pkg/mcs/scheduling/server/config/watcher.go | 39 +++++++- pkg/mcs/scheduling/server/server.go | 2 + pkg/schedule/coordinator.go | 54 ++++++---- pkg/schedule/schedulers/balance_leader.go | 21 +++- pkg/schedule/schedulers/balance_witness.go | 21 +++- pkg/schedule/schedulers/base_scheduler.go | 5 + pkg/schedule/schedulers/evict_leader.go | 20 +++- pkg/schedule/schedulers/evict_slow_store.go | 2 +- .../schedulers/evict_slow_store_test.go | 2 +- pkg/schedule/schedulers/evict_slow_trend.go | 2 +- .../schedulers/evict_slow_trend_test.go | 2 +- pkg/schedule/schedulers/grant_hot_region.go | 2 +- pkg/schedule/schedulers/grant_leader.go | 20 +++- pkg/schedule/schedulers/hot_region_config.go | 2 +- pkg/schedule/schedulers/hot_region_test.go | 4 +- pkg/schedule/schedulers/scatter_range.go | 15 ++- pkg/schedule/schedulers/scheduler.go | 4 +- .../schedulers/scheduler_controller.go | 12 ++- pkg/schedule/schedulers/shuffle_region.go | 19 ++++ .../schedulers/shuffle_region_config.go | 2 +- pkg/schedule/schedulers/split_bucket.go | 15 ++- .../schedulers/transfer_witness_leader.go | 18 ++-- pkg/storage/endpoint/config.go | 32 +++--- pkg/storage/endpoint/key_path.go | 28 +++--- plugin/scheduler_example/evict_leader.go | 2 +- server/cluster/cluster_test.go | 8 +- server/server.go | 2 +- .../mcs/scheduling/config_test.go | 32 +++--- .../mcs/scheduling/server_test.go | 69 +++++++++++++ 31 files changed, 483 insertions(+), 106 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 28a04c5640a..61c85ba6fbd 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -10,16 +10,20 @@ import ( "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) @@ -175,6 +179,100 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { return c.apiServerLeader.CompareAndSwap(old, new) } +// UpdateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. +func (c *Cluster) UpdateScheduler() { + defer logutil.LogPanic() + + // Make sure the coordinator has initialized all the existing schedulers. + c.waitSchedulersInitialized() + // Establish a notifier to listen the schedulers updating. + notifier := make(chan struct{}, 1) + // Make sure the check will be triggered once later. + notifier <- struct{}{} + c.persistConfig.SetSchedulersUpdatingNotifier(notifier) + for { + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop listening the schedulers updating notifier") + return + case <-notifier: + } + + log.Info("schedulers updating notifier is triggered, try to update the scheduler") + var ( + schedulersController = c.coordinator.GetSchedulersController() + latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers + ) + // Create the newly added schedulers. + for _, scheduler := range latestSchedulersConfig { + s, err := schedulers.CreateScheduler( + scheduler.Type, + c.coordinator.GetOperatorController(), + c.storage, + schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args), + schedulersController.RemoveScheduler, + ) + if err != nil { + log.Error("failed to create scheduler", + zap.String("scheduler-type", scheduler.Type), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + name := s.GetName() + if existed, _ := schedulersController.IsSchedulerExisted(name); existed { + log.Info("scheduler has already existed, skip adding it", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + continue + } + if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { + log.Error("failed to add scheduler", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args), + errs.ZapError(err)) + continue + } + log.Info("add scheduler successfully", + zap.String("scheduler-name", name), + zap.Strings("scheduler-args", scheduler.Args)) + } + // Remove the deleted schedulers. + for _, name := range schedulersController.GetSchedulerNames() { + scheduler := schedulersController.GetScheduler(name) + if slice.AnyOf(latestSchedulersConfig, func(i int) bool { + return latestSchedulersConfig[i].Type == scheduler.GetType() + }) { + continue + } + if err := schedulersController.RemoveScheduler(name); err != nil { + log.Error("failed to remove scheduler", + zap.String("scheduler-name", name), + errs.ZapError(err)) + continue + } + log.Info("remove scheduler successfully", + zap.String("scheduler-name", name)) + } + } +} + +func (c *Cluster) waitSchedulersInitialized() { + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + if c.coordinator.AreSchedulersInitialized() { + return + } + select { + case <-c.ctx.Done(): + log.Info("cluster is closing, stop waiting the schedulers initialization") + return + case <-ticker.C: + } + } +} + // TODO: implement the following methods // UpdateRegionsLabelLevelStats updates the status of the region label level by types. diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 5b80612744b..82c15632b3d 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -203,6 +203,9 @@ type PersistConfig struct { schedule atomic.Value replication atomic.Value storeConfig atomic.Value + // schedulersUpdatingNotifier is used to notify that the schedulers have been updated. + // Store as `chan<- struct{}`. + schedulersUpdatingNotifier atomic.Value } // NewPersistConfig creates a new PersistConfig instance. @@ -217,6 +220,19 @@ func NewPersistConfig(cfg *Config) *PersistConfig { return o } +// SetSchedulersUpdatingNotifier sets the schedulers updating notifier. +func (o *PersistConfig) SetSchedulersUpdatingNotifier(notifier chan<- struct{}) { + o.schedulersUpdatingNotifier.Store(notifier) +} + +func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} { + v := o.schedulersUpdatingNotifier.Load() + if v == nil { + return nil + } + return v.(chan<- struct{}) +} + // GetClusterVersion returns the cluster version. func (o *PersistConfig) GetClusterVersion() *semver.Version { return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion)) @@ -232,12 +248,19 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig { return o.schedule.Load().(*sc.ScheduleConfig) } -// SetScheduleConfig sets the scheduling configuration. +// SetScheduleConfig sets the scheduling configuration dynamically. func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) { + old := o.GetScheduleConfig() o.schedule.Store(cfg) + // The coordinator is not aware of the underlying scheduler config changes, however, it + // should react on the scheduler number changes to handle the add/remove scheduler events. + if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil && + len(old.Schedulers) != len(cfg.Schedulers) { + notifier <- struct{}{} + } } -// AdjustScheduleCfg adjusts the schedule config. +// AdjustScheduleCfg adjusts the schedule config during the initialization. func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) { // In case we add new default schedulers. for _, ps := range sc.DefaultSchedulers { @@ -616,8 +639,13 @@ func (o *PersistConfig) IsRaftKV2() bool { // TODO: implement the following methods // AddSchedulerCfg adds the scheduler configurations. +// This method is a no-op since we only use configurations derived from one-way synchronization from API server now. func (o *PersistConfig) AddSchedulerCfg(string, []string) {} +// RemoveSchedulerCfg removes the scheduler configurations. +// This method is a no-op since we only use configurations derived from one-way synchronization from API server now. +func (o *PersistConfig) RemoveSchedulerCfg(tp string) {} + // CheckLabelProperty checks if the label property is satisfied. func (o *PersistConfig) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { return false @@ -633,6 +661,3 @@ func (o *PersistConfig) IsTraceRegionFlow() bool { func (o *PersistConfig) Persist(storage endpoint.ConfigStorage) error { return nil } - -// RemoveSchedulerCfg removes the scheduler configurations. -func (o *PersistConfig) RemoveSchedulerCfg(tp string) {} diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index d65f1a6b553..b413e243c2b 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -19,10 +19,12 @@ import ( "encoding/json" "strings" "sync" + "sync/atomic" "github.com/coreos/go-semver/semver" "github.com/pingcap/log" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -50,10 +52,14 @@ type Watcher struct { configWatcher *etcdutil.LoopWatcher schedulerConfigWatcher *etcdutil.LoopWatcher + // Some data, like the global schedule config, should be loaded into `PersistConfig`. *PersistConfig // Some data, like the scheduler configs, should be loaded into the storage // to make sure the coordinator could access them correctly. storage storage.Storage + // schedulersController is used to trigger the scheduler's config reloading. + // Store as `*schedulers.Controller`. + schedulersController atomic.Value } type persistedConfig struct { @@ -92,6 +98,19 @@ func NewWatcher( return cw, nil } +// SetSchedulersController sets the schedulers controller. +func (cw *Watcher) SetSchedulersController(sc *schedulers.Controller) { + cw.schedulersController.Store(sc) +} + +func (cw *Watcher) getSchedulersController() *schedulers.Controller { + sc := cw.schedulersController.Load() + if sc == nil { + return nil + } + return sc.(*schedulers.Controller) +} + func (cw *Watcher) initializeConfigWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { cfg := &persistedConfig{} @@ -126,13 +145,23 @@ func (cw *Watcher) initializeConfigWatcher() error { func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - return cw.storage.SaveScheduleConfig( - strings.TrimPrefix(string(kv.Key), prefixToTrim), - kv.Value, - ) + name := strings.TrimPrefix(string(kv.Key), prefixToTrim) + err := cw.storage.SaveSchedulerConfig(name, kv.Value) + if err != nil { + log.Warn("failed to save scheduler config", + zap.String("event-kv-key", string(kv.Key)), + zap.String("trimmed-key", name), + zap.Error(err)) + return err + } + // Ensure the scheduler config could be updated as soon as possible. + if sc := cw.getSchedulersController(); sc != nil { + return sc.ReloadSchedulerConfig(name) + } + return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - return cw.storage.RemoveScheduleConfig( + return cw.storage.RemoveSchedulerConfig( strings.TrimPrefix(string(kv.Key), prefixToTrim), ) } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 8a74abf4b5e..330085e3b82 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -442,6 +442,8 @@ func (s *Server) startCluster(context.Context) error { if err != nil { return err } + s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController()) + go s.cluster.UpdateScheduler() go s.GetCoordinator().RunUntilStop() return nil } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index ab0500d0445..8cd5567b75c 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -69,9 +69,12 @@ var ( type Coordinator struct { syncutil.RWMutex - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + + schedulersInitialized bool + cluster sche.ClusterInformer prepareChecker *prepareChecker checkers *checker.Controller @@ -91,19 +94,34 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController) checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController) return &Coordinator{ - ctx: ctx, - cancel: cancel, - cluster: cluster, - prepareChecker: newPrepareChecker(), - checkers: checkers, - regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddSuspectRegions), - regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddSuspectRegions), - schedulers: schedulers, - opController: opController, - hbStreams: hbStreams, - pluginInterface: NewPluginInterface(), - diagnosticManager: diagnostic.NewManager(schedulers, cluster.GetSchedulerConfig()), - } + ctx: ctx, + cancel: cancel, + schedulersInitialized: false, + cluster: cluster, + prepareChecker: newPrepareChecker(), + checkers: checkers, + regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddSuspectRegions), + regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddSuspectRegions), + schedulers: schedulers, + opController: opController, + hbStreams: hbStreams, + pluginInterface: NewPluginInterface(), + diagnosticManager: diagnostic.NewManager(schedulers, cluster.GetSchedulerConfig()), + } +} + +// markSchedulersInitialized marks the scheduler initialization is finished. +func (c *Coordinator) markSchedulersInitialized() { + c.Lock() + defer c.Unlock() + c.schedulersInitialized = true +} + +// AreSchedulersInitialized returns whether the schedulers have been initialized. +func (c *Coordinator) AreSchedulersInitialized() bool { + c.RLock() + defer c.RUnlock() + return c.schedulersInitialized } // GetWaitingRegions returns the regions in the waiting list. @@ -399,7 +417,7 @@ func (c *Coordinator) InitSchedulers(needRun bool) { err error ) for i := 0; i < maxLoadConfigRetries; i++ { - scheduleNames, configs, err = c.cluster.GetStorage().LoadAllScheduleConfig() + scheduleNames, configs, err = c.cluster.GetStorage().LoadAllSchedulerConfigs() select { case <-c.ctx.Done(): log.Info("init schedulers has been stopped") @@ -485,6 +503,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) { if err := c.cluster.GetSchedulerConfig().Persist(c.cluster.GetStorage()); err != nil { log.Error("cannot persist schedule config", errs.ZapError(err)) } + + c.markSchedulersInitialized() } // LoadPlugin load user plugin diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 70f1cf5cb53..e5516317f46 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -124,7 +124,7 @@ func (conf *balanceLeaderSchedulerConfig) persistLocked() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(BalanceLeaderName, data) + return conf.storage.SaveSchedulerConfig(BalanceLeaderName, data) } type balanceLeaderHandler struct { @@ -215,6 +215,25 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(l.conf) } +func (l *balanceLeaderScheduler) ReloadConfig() error { + l.conf.mu.Lock() + defer l.conf.mu.Unlock() + cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &balanceLeaderSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + l.conf.Ranges = newCfg.Ranges + l.conf.Batch = newCfg.Batch + return nil +} + func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index b60c6d81fa0..9bd8a592ba1 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -110,7 +110,7 @@ func (conf *balanceWitnessSchedulerConfig) persistLocked() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(BalanceWitnessName, data) + return conf.storage.SaveSchedulerConfig(BalanceWitnessName, data) } type balanceWitnessHandler struct { @@ -210,6 +210,25 @@ func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(b.conf) } +func (b *balanceWitnessScheduler) ReloadConfig() error { + b.conf.mu.Lock() + defer b.conf.mu.Unlock() + cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &balanceWitnessSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + b.conf.Ranges = newCfg.Ranges + b.conf.Batch = newCfg.Batch + return nil +} + func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := b.OpController.OperatorCount(operator.OpWitness) < cluster.GetSchedulerConfig().GetWitnessScheduleLimit() if !allowed { diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index 8da0f13626b..6e712c18fe3 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -82,6 +82,11 @@ func (s *BaseScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(nil) } +// ReloadConfig reloads the config from the storage. +// By default, the scheduler does not need to reload the config +// if it doesn't support the dynamic configuration. +func (s *BaseScheduler) ReloadConfig() error { return nil } + // GetNextInterval return the next interval for the scheduler func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration { return intervalGrow(interval, MaxScheduleInterval, exponentialGrowth) diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index b307bf5fb73..1989c42ba6f 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -115,7 +115,7 @@ func (conf *evictLeaderSchedulerConfig) Persist() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(name, data) + return conf.storage.SaveSchedulerConfig(name, data) } func (conf *evictLeaderSchedulerConfig) getSchedulerName() string { @@ -204,6 +204,24 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *evictLeaderScheduler) ReloadConfig() error { + s.conf.mu.Lock() + defer s.conf.mu.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictLeaderSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges + return nil +} + func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index ead92378398..a6665c3e5e7 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -53,7 +53,7 @@ func (conf *evictSlowStoreSchedulerConfig) Persist() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(name, data) + return conf.storage.SaveSchedulerConfig(name, data) } func (conf *evictSlowStoreSchedulerConfig) getSchedulerName() string { diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 58a44118048..0b0c1d9ad39 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -99,7 +99,7 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { suite.Zero(es2.conf.evictStore()) // check the value from storage. - sches, vs, err := es2.conf.storage.LoadAllScheduleConfig() + sches, vs, err := es2.conf.storage.LoadAllSchedulerConfigs() suite.NoError(err) valueStr := "" for id, sche := range sches { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 8737e3b8619..38a6141cd26 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -68,7 +68,7 @@ func (conf *evictSlowTrendSchedulerConfig) Persist() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(name, data) + return conf.storage.SaveSchedulerConfig(name, data) } func (conf *evictSlowTrendSchedulerConfig) getSchedulerName() string { diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index 9320c3ad422..2ff86524bdc 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -181,7 +181,7 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrend() { suite.Zero(es2.conf.evictedStore()) // check the value from storage. - sches, vs, err := es2.conf.storage.LoadAllScheduleConfig() + sches, vs, err := es2.conf.storage.LoadAllSchedulerConfigs() suite.NoError(err) valueStr := "" for id, sche := range sches { diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 4c8051de677..5a68da069b8 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -105,7 +105,7 @@ func (conf *grantHotRegionSchedulerConfig) Persist() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(name, data) + return conf.storage.SaveSchedulerConfig(name, data) } func (conf *grantHotRegionSchedulerConfig) getSchedulerName() string { diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 08856d101cc..7d1ff2f616c 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -95,7 +95,7 @@ func (conf *grantLeaderSchedulerConfig) Persist() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(name, data) + return conf.storage.SaveSchedulerConfig(name, data) } func (conf *grantLeaderSchedulerConfig) getSchedulerName() string { @@ -178,6 +178,24 @@ func (s *grantLeaderScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } +func (s *grantLeaderScheduler) ReloadConfig() error { + s.conf.mu.Lock() + defer s.conf.mu.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &grantLeaderSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges + return nil +} + func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index e0fd47c6447..2ff78748f02 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -445,7 +445,7 @@ func (conf *hotRegionSchedulerConfig) persistLocked() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(HotRegionName, data) + return conf.storage.SaveSchedulerConfig(HotRegionName, data) } func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index b29898cd9b9..d8f9bbc532c 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -2456,7 +2456,7 @@ func TestCompatibilityConfig(t *testing.T) { "dst-tolerance-ratio": 1.05, }) re.NoError(err) - err = storage.SaveScheduleConfig(HotRegionName, data) + err = storage.SaveSchedulerConfig(HotRegionName, data) re.NoError(err) hb, err = CreateScheduler(HotRegionType, oc, storage, ConfigJSONDecoder(data)) re.NoError(err) @@ -2472,7 +2472,7 @@ func TestCompatibilityConfig(t *testing.T) { cfg.WriteLeaderPriorities = []string{"query", "key"} data, err = EncodeConfig(cfg) re.NoError(err) - err = storage.SaveScheduleConfig(HotRegionName, data) + err = storage.SaveSchedulerConfig(HotRegionName, data) re.NoError(err) hb, err = CreateScheduler(HotRegionType, oc, storage, ConfigJSONDecoder(data)) re.NoError(err) diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index c0e8e900795..e301b4c6e76 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -87,7 +87,7 @@ func (conf *scatterRangeSchedulerConfig) Persist() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(name, data) + return conf.storage.SaveSchedulerConfig(name, data) } func (conf *scatterRangeSchedulerConfig) GetRangeName() string { @@ -166,6 +166,19 @@ func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(l.config) } +func (l *scatterRangeScheduler) ReloadConfig() error { + l.config.mu.Lock() + defer l.config.mu.Unlock() + cfgData, err := l.config.storage.LoadSchedulerConfig(l.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + return DecodeConfig([]byte(cfgData), l.config) +} + func (l *scatterRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { return l.allowBalanceLeader(cluster) || l.allowBalanceRegion(cluster) } diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 909acb3494c..1c624dcd916 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -38,6 +38,8 @@ type Scheduler interface { // GetType should in accordance with the name passing to RegisterScheduler() GetType() string EncodeConfig() ([]byte, error) + // ReloadConfig reloads the config from the storage. + ReloadConfig() error GetMinInterval() time.Duration GetNextInterval(interval time.Duration) time.Duration Prepare(cluster sche.SchedulerCluster) error @@ -130,7 +132,7 @@ func CreateScheduler(typ string, oc *operator.Controller, storage endpoint.Confi if err != nil { return nil, err } - err = storage.SaveScheduleConfig(s.GetName(), data) + err = storage.SaveSchedulerConfig(s.GetName(), data) return s, err } diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index c8f07f56678..4d72699b0fe 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -160,7 +160,7 @@ func (c *Controller) RemoveSchedulerHandler(name string) error { return err } - if err := c.storage.RemoveScheduleConfig(name); err != nil { + if err := c.storage.RemoveSchedulerConfig(name); err != nil { log.Error("can not remove the scheduler config", errs.ZapError(err)) return err } @@ -210,7 +210,7 @@ func (c *Controller) RemoveScheduler(name string) error { return err } - if err := c.storage.RemoveScheduleConfig(name); err != nil { + if err := c.storage.RemoveSchedulerConfig(name); err != nil { log.Error("can not remove the scheduler config", errs.ZapError(err)) return err } @@ -253,6 +253,14 @@ func (c *Controller) PauseOrResumeScheduler(name string, t int64) error { return err } +// ReloadSchedulerConfig reloads a scheduler's config if it exists. +func (c *Controller) ReloadSchedulerConfig(name string) error { + if exist, _ := c.IsSchedulerExisted(name); !exist { + return nil + } + return c.GetScheduler(name).ReloadConfig() +} + // IsSchedulerAllowed returns whether a scheduler is allowed to schedule, a scheduler is not allowed to schedule if it is paused or blocked by unsafe recovery. func (c *Controller) IsSchedulerAllowed(name string) (bool, error) { c.RLock() diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index 08570fe9f20..f1d35e80925 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -80,6 +80,25 @@ func (s *shuffleRegionScheduler) EncodeConfig() ([]byte, error) { return s.conf.EncodeConfig() } +func (s *shuffleRegionScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &shuffleRegionSchedulerConfig{} + if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + s.conf.Roles = newCfg.Roles + s.conf.Ranges = newCfg.Ranges + return nil +} + func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetSchedulerConfig().GetRegionScheduleLimit() if !allowed { diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index b16f20af8c9..f503a6f67c7 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -109,5 +109,5 @@ func (conf *shuffleRegionSchedulerConfig) persist() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(ShuffleRegionName, data) + return conf.storage.SaveSchedulerConfig(ShuffleRegionName, data) } diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index b5dbec7ecd7..5e75bded9b4 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -84,7 +84,7 @@ func (conf *splitBucketSchedulerConfig) persistLocked() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(SplitBucketName, data) + return conf.storage.SaveSchedulerConfig(SplitBucketName, data) } type splitBucketScheduler struct { @@ -172,6 +172,19 @@ func (s *splitBucketScheduler) GetType() string { return SplitBucketType } +func (s *splitBucketScheduler) ReloadConfig() error { + s.conf.mu.Lock() + defer s.conf.mu.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + return DecodeConfig([]byte(cfgData), s.conf) +} + // ServerHTTP implement Http server. func (s *splitBucketScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) diff --git a/pkg/schedule/schedulers/transfer_witness_leader.go b/pkg/schedule/schedulers/transfer_witness_leader.go index d31bb7c1c23..2586065ea80 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader.go +++ b/pkg/schedule/schedulers/transfer_witness_leader.go @@ -46,37 +46,37 @@ var ( transferWitnessLeaderNoTargetStoreCounter = schedulerCounter.WithLabelValues(TransferWitnessLeaderName, "no-target-store") ) -type trasferWitnessLeaderScheduler struct { +type transferWitnessLeaderScheduler struct { *BaseScheduler regions chan *core.RegionInfo } // newTransferWitnessLeaderScheduler creates an admin scheduler that transfers witness leader of a region. func newTransferWitnessLeaderScheduler(opController *operator.Controller) Scheduler { - return &trasferWitnessLeaderScheduler{ + return &transferWitnessLeaderScheduler{ BaseScheduler: NewBaseScheduler(opController), regions: make(chan *core.RegionInfo, transferWitnessLeaderRecvMaxRegionSize), } } -func (s *trasferWitnessLeaderScheduler) GetName() string { +func (s *transferWitnessLeaderScheduler) GetName() string { return TransferWitnessLeaderName } -func (s *trasferWitnessLeaderScheduler) GetType() string { +func (s *transferWitnessLeaderScheduler) GetType() string { return TransferWitnessLeaderType } -func (s *trasferWitnessLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { +func (s *transferWitnessLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { return true } -func (s *trasferWitnessLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { +func (s *transferWitnessLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { transferWitnessLeaderCounter.Inc() return s.scheduleTransferWitnessLeaderBatch(s.GetName(), s.GetType(), cluster, transferWitnessLeaderBatchSize), nil } -func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster sche.SchedulerCluster, batchSize int) []*operator.Operator { +func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeaderBatch(name, typ string, cluster sche.SchedulerCluster, batchSize int) []*operator.Operator { var ops []*operator.Operator batchLoop: for i := 0; i < batchSize; i++ { @@ -99,7 +99,7 @@ batchLoop: return ops } -func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { +func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) { var filters []filter.Filter unhealthyPeerStores := make(map[uint64]struct{}) for _, peer := range region.GetDownPeers() { @@ -127,5 +127,5 @@ func (s *trasferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ // RecvRegionInfo receives a checked region from coordinator func RecvRegionInfo(s Scheduler) chan<- *core.RegionInfo { - return s.(*trasferWitnessLeaderScheduler).regions + return s.(*transferWitnessLeaderScheduler).regions } diff --git a/pkg/storage/endpoint/config.go b/pkg/storage/endpoint/config.go index 9104e218f98..db5565a4b90 100644 --- a/pkg/storage/endpoint/config.go +++ b/pkg/storage/endpoint/config.go @@ -24,11 +24,14 @@ import ( // ConfigStorage defines the storage operations on the config. type ConfigStorage interface { + // Persisted config will be stored in the storage. LoadConfig(cfg interface{}) (bool, error) SaveConfig(cfg interface{}) error - LoadAllScheduleConfig() ([]string, []string, error) - SaveScheduleConfig(scheduleName string, data []byte) error - RemoveScheduleConfig(scheduleName string) error + // Each scheduler has its own customized config, so we need to store them separately. + LoadAllSchedulerConfigs() ([]string, []string, error) + LoadSchedulerConfig(schedulerName string) (string, error) + SaveSchedulerConfig(schedulerName string, data []byte) error + RemoveSchedulerConfig(schedulerName string) error } var _ ConfigStorage = (*StorageEndpoint)(nil) @@ -55,9 +58,9 @@ func (se *StorageEndpoint) SaveConfig(cfg interface{}) error { return se.Save(configPath, string(value)) } -// LoadAllScheduleConfig loads all schedulers' config. -func (se *StorageEndpoint) LoadAllScheduleConfig() ([]string, []string, error) { - prefix := customScheduleConfigPath + "/" +// LoadAllSchedulerConfigs loads all schedulers' config. +func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) { + prefix := customSchedulerConfigPath + "/" keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), 1000) for i, key := range keys { keys[i] = strings.TrimPrefix(key, prefix) @@ -65,12 +68,17 @@ func (se *StorageEndpoint) LoadAllScheduleConfig() ([]string, []string, error) { return keys, values, err } -// SaveScheduleConfig saves the config of scheduler. -func (se *StorageEndpoint) SaveScheduleConfig(scheduleName string, data []byte) error { - return se.Save(scheduleConfigPath(scheduleName), string(data)) +// LoadSchedulerConfig loads the config of the given scheduler. +func (se *StorageEndpoint) LoadSchedulerConfig(schedulerName string) (string, error) { + return se.Load(schedulerConfigPath(schedulerName)) } -// RemoveScheduleConfig removes the config of scheduler. -func (se *StorageEndpoint) RemoveScheduleConfig(scheduleName string) error { - return se.Remove(scheduleConfigPath(scheduleName)) +// SaveSchedulerConfig saves the config of the given scheduler. +func (se *StorageEndpoint) SaveSchedulerConfig(schedulerName string, data []byte) error { + return se.Save(schedulerConfigPath(schedulerName), string(data)) +} + +// RemoveSchedulerConfig removes the config of the given scheduler. +func (se *StorageEndpoint) RemoveSchedulerConfig(schedulerName string) error { + return se.Remove(schedulerConfigPath(schedulerName)) } diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 4b67441a5ac..cac40db29c5 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -25,17 +25,17 @@ import ( ) const ( - pdRootPath = "/pd" - clusterPath = "raft" - configPath = "config" - serviceMiddlewarePath = "service_middleware" - schedulePath = "schedule" - gcPath = "gc" - rulesPath = "rules" - ruleGroupPath = "rule_group" - regionLabelPath = "region_label" - replicationPath = "replication_mode" - customScheduleConfigPath = "scheduler_config" + pdRootPath = "/pd" + clusterPath = "raft" + configPath = "config" + serviceMiddlewarePath = "service_middleware" + schedulePath = "schedule" + gcPath = "gc" + rulesPath = "rules" + ruleGroupPath = "rule_group" + regionLabelPath = "region_label" + replicationPath = "replication_mode" + customSchedulerConfigPath = "scheduler_config" // GCWorkerServiceSafePointID is the service id of GC worker. GCWorkerServiceSafePointID = "gc_worker" minResolvedTS = "min_resolved_ts" @@ -94,7 +94,7 @@ func ConfigPath(clusterID uint64) string { // SchedulerConfigPathPrefix returns the path prefix to save the scheduler config. func SchedulerConfigPathPrefix(clusterID uint64) string { - return path.Join(PDRootPath(clusterID), customScheduleConfigPath) + return path.Join(PDRootPath(clusterID), customSchedulerConfigPath) } // RulesPathPrefix returns the path prefix to save the placement rules. @@ -112,8 +112,8 @@ func RegionLabelPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), regionLabelPath) } -func scheduleConfigPath(scheduleName string) string { - return path.Join(customScheduleConfigPath, scheduleName) +func schedulerConfigPath(schedulerName string) string { + return path.Join(customSchedulerConfigPath, schedulerName) } // StorePath returns the store meta info key path with the given store ID. diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 91b9e518089..8919d1bdb4b 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -133,7 +133,7 @@ func (conf *evictLeaderSchedulerConfig) Persist() error { if err != nil { return err } - return conf.storage.SaveScheduleConfig(name, data) + return conf.storage.SaveSchedulerConfig(name, data) } func (conf *evictLeaderSchedulerConfig) getScheduleName() string { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 605fd222502..ea8d27b155f 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3088,7 +3088,7 @@ func TestPersistScheduler(t *testing.T) { re.NoError(err) re.NoError(controller.AddScheduler(evict, "2")) re.Len(controller.GetSchedulerNames(), defaultCount+2) - sches, _, err := storage.LoadAllScheduleConfig() + sches, _, err := storage.LoadAllSchedulerConfigs() re.NoError(err) re.Len(sches, defaultCount+2) @@ -3117,7 +3117,7 @@ func TestPersistScheduler(t *testing.T) { re.Len(newOpt.GetSchedulers(), defaultCount) re.NoError(newOpt.Reload(storage)) // only remains 3 items with independent config. - sches, _, err = storage.LoadAllScheduleConfig() + sches, _, err = storage.LoadAllSchedulerConfigs() re.NoError(err) re.Len(sches, 3) @@ -3198,7 +3198,7 @@ func TestRemoveScheduler(t *testing.T) { re.NoError(err) re.NoError(controller.AddScheduler(gls1, "1")) re.Len(controller.GetSchedulerNames(), defaultCount+1) - sches, _, err := storage.LoadAllScheduleConfig() + sches, _, err := storage.LoadAllSchedulerConfigs() re.NoError(err) re.Len(sches, defaultCount+1) @@ -3210,7 +3210,7 @@ func TestRemoveScheduler(t *testing.T) { re.NoError(controller.RemoveScheduler(schedulers.BalanceWitnessName)) re.NoError(controller.RemoveScheduler(schedulers.TransferWitnessLeaderName)) // all removed - sches, _, err = storage.LoadAllScheduleConfig() + sches, _, err = storage.LoadAllSchedulerConfigs() re.NoError(err) re.Empty(sches) re.Empty(controller.GetSchedulerNames()) diff --git a/server/server.go b/server/server.go index 2a076923caf..7c19d8ff7c5 100644 --- a/server/server.go +++ b/server/server.go @@ -938,7 +938,7 @@ func (s *Server) GetConfig() *config.Config { if s.storage == nil { return cfg } - sches, configs, err := s.storage.LoadAllScheduleConfig() + sches, configs, err := s.storage.LoadAllSchedulerConfigs() if err != nil { return cfg } diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index 14c99baa6f5..8b8e284f765 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -147,14 +147,14 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { ) re.NoError(err) // Get all default scheduler names. - var namesFromAPIServer, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllScheduleConfig() + var namesFromAPIServer, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllSchedulerConfigs() testutil.Eventually(re, func() bool { return len(namesFromAPIServer) == len(sc.DefaultSchedulers) }) // Check all default schedulers' configs. var namesFromSchedulingServer []string testutil.Eventually(re, func() bool { - namesFromSchedulingServer, _, err = storage.LoadAllScheduleConfig() + namesFromSchedulingServer, _, err = storage.LoadAllSchedulerConfigs() re.NoError(err) return len(namesFromSchedulingServer) == len(namesFromAPIServer) }) @@ -165,10 +165,11 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { }) // Check the new scheduler's config. testutil.Eventually(re, func() bool { - namesFromSchedulingServer, _, err = storage.LoadAllScheduleConfig() + namesFromSchedulingServer, _, err = storage.LoadAllSchedulerConfigs() re.NoError(err) return slice.Contains(namesFromSchedulingServer, schedulers.EvictLeaderName) }) + assertEvictLeaderStoreIDs(re, storage, []uint64{1}) // Update the scheduler by adding a store. err = suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore( &metapb.Store{ @@ -192,7 +193,7 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { api.MustDeleteScheduler(re, suite.pdLeaderServer.GetAddr(), schedulers.EvictLeaderName) // Check the removed scheduler's config. testutil.Eventually(re, func() bool { - namesFromSchedulingServer, _, err = storage.LoadAllScheduleConfig() + namesFromSchedulingServer, _, err = storage.LoadAllSchedulerConfigs() re.NoError(err) return !slice.Contains(namesFromSchedulingServer, schedulers.EvictLeaderName) }) @@ -202,24 +203,15 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { func assertEvictLeaderStoreIDs( re *require.Assertions, storage *endpoint.StorageEndpoint, storeIDs []uint64, ) { - var ( - namesFromSchedulingServer, configs []string - err error - evictLeaderCfg struct { - StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` - } - ) + var evictLeaderCfg struct { + StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` + } testutil.Eventually(re, func() bool { - namesFromSchedulingServer, configs, err = storage.LoadAllScheduleConfig() + cfg, err := storage.LoadSchedulerConfig(schedulers.EvictLeaderName) + re.NoError(err) + err = schedulers.DecodeConfig([]byte(cfg), &evictLeaderCfg) re.NoError(err) - for idx, name := range namesFromSchedulingServer { - if name == schedulers.EvictLeaderName { - err = schedulers.DecodeConfig([]byte(configs[idx]), &evictLeaderCfg) - re.NoError(err) - return len(evictLeaderCfg.StoreIDWithRanges) == len(storeIDs) - } - } - return false + return len(evictLeaderCfg.StoreIDWithRanges) == len(storeIDs) }) // Validate the updated scheduler's config. for _, storeID := range storeIDs { diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index f4785a013d9..187ba54dfcb 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -16,6 +16,7 @@ package scheduling import ( "context" + "fmt" "testing" "time" @@ -24,9 +25,11 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/suite" mcs "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" "go.uber.org/goleak" ) @@ -104,6 +107,9 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { re.NoError(err) re.Greater(id1, id) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) + // Update the pdLeader in test suite. + suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader()) + suite.backendEndpoints = suite.pdLeader.GetAddr() } func (suite *serverTestSuite) TestPrimaryChange() { @@ -182,3 +188,66 @@ func (suite *serverTestSuite) TestForwardStoreHeartbeat() { store.GetStoreStats().GetBytesRead() == uint64(99) }) } + +func (suite *serverTestSuite) TestSchedulerSync() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + schedulersController := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 5) + re.Nil(schedulersController.GetScheduler(schedulers.EvictLeaderName)) + // Add a new evict-leader-scheduler through the API server. + api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + // Check if the evict-leader-scheduler is added. + testutil.Eventually(re, func() bool { + return len(schedulersController.GetSchedulerNames()) == 6 && + schedulersController.GetScheduler(schedulers.EvictLeaderName) != nil + }) + handler, ok := schedulersController.GetSchedulerHandlers()[schedulers.EvictLeaderName] + re.True(ok) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + re.True(ok) + re.ElementsMatch(h.EvictStoreIDs(), []uint64{1}) + // Update the evict-leader-scheduler through the API server. + err = suite.pdLeader.GetServer().GetRaftCluster().PutStore( + &metapb.Store{ + Id: 2, + Address: "mock://2", + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + Version: "7.0.0", + }, + ) + re.NoError(err) + api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + var evictStoreIDs []uint64 + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == 2 + }) + re.ElementsMatch(evictStoreIDs, []uint64{1, 2}) + api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1)) + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == 1 + }) + re.ElementsMatch(evictStoreIDs, []uint64{2}) + // Remove the evict-leader-scheduler through the API server. + api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName) + // Check if the scheduler is removed. + testutil.Eventually(re, func() bool { + return len(schedulersController.GetSchedulerNames()) == 5 && + schedulersController.GetScheduler(schedulers.EvictLeaderName) == nil + }) + + // TODO: test more schedulers. +}