diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 0e6c2281f2f..7e21919b214 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -379,7 +379,7 @@ func (c *Coordinator) Run() { } } log.Info("Coordinator starts to run schedulers") - c.initSchedulers() + c.InitSchedulers(true) c.wg.Add(4) // Starts to patrol regions. @@ -391,7 +391,8 @@ func (c *Coordinator) Run() { go c.driveSlowNodeScheduler() } -func (c *Coordinator) initSchedulers() { +// InitSchedulers initializes schedulers. +func (c *Coordinator) InitSchedulers(needRun bool) { var ( scheduleNames []string configs []string @@ -401,7 +402,7 @@ func (c *Coordinator) initSchedulers() { scheduleNames, configs, err = c.cluster.GetStorage().LoadAllScheduleConfig() select { case <-c.ctx.Done(): - log.Info("Coordinator stops running") + log.Info("init schedulers has been stopped") return default: } @@ -439,8 +440,10 @@ func (c *Coordinator) initSchedulers() { continue } log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) - if err = c.schedulers.AddScheduler(s); err != nil { - log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + if needRun { + if err = c.schedulers.AddScheduler(s); err != nil { + log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } } } @@ -461,12 +464,14 @@ func (c *Coordinator) initSchedulers() { } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) - if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { - log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) - } else { - // Only records the valid scheduler config. - scheduleCfg.Schedulers[k] = schedulerCfg - k++ + if needRun { + if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + // Only records the valid scheduler config. + scheduleCfg.Schedulers[k] = schedulerCfg + k++ + } } } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 90d37bf66c4..dc07b260d87 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -323,21 +323,21 @@ func (c *RaftCluster) Start(s Server) error { log.Error("load external timestamp meets error", zap.Error(err)) } - // bootstrap keyspace group manager after starting other parts successfully. - // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. if s.IsAPIServiceMode() { + // bootstrap keyspace group manager after starting other parts successfully. + // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. err = c.keyspaceGroupManager.Bootstrap(c.ctx) if err != nil { return err } - } - - if !s.IsAPIServiceMode() { + c.initSchedulers() + } else { c.wg.Add(3) go c.runCoordinator() go c.runStatsBackgroundJobs() go c.runMetricsCollectionJob() } + c.wg.Add(7) go c.runNodeStateCheckJob() go c.syncRegions() @@ -849,6 +849,10 @@ func (c *RaftCluster) GetOpts() sc.ConfProvider { return c.opt } +func (c *RaftCluster) initSchedulers() { + c.coordinator.InitSchedulers(false) +} + // GetScheduleConfig returns scheduling configurations. func (c *RaftCluster) GetScheduleConfig() *sc.ScheduleConfig { return c.opt.GetScheduleConfig() diff --git a/server/handler.go b/server/handler.go index 02ec6da4808..d52d1c3e069 100644 --- a/server/handler.go +++ b/server/handler.go @@ -236,14 +236,18 @@ func (h *Handler) AddScheduler(name string, args ...string) error { return err } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args)) - if err = c.AddScheduler(s, args...); err != nil { - log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) - } else if err = h.opt.Persist(c.GetStorage()); err != nil { + if !h.s.IsAPIServiceMode() { + if err = c.AddScheduler(s, args...); err != nil { + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) + return err + } + } + if err = h.opt.Persist(c.GetStorage()); err != nil { log.Error("can not persist scheduler config", errs.ZapError(err)) - } else { - log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) + return err } - return err + log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) + return nil } // RemoveScheduler removes a scheduler by name. @@ -252,10 +256,23 @@ func (h *Handler) RemoveScheduler(name string) error { if err != nil { return err } - if err = c.RemoveScheduler(name); err != nil { - log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) + if !h.s.IsAPIServiceMode() { + if err = c.RemoveScheduler(name); err != nil { + log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) + } else { + log.Info("remove scheduler successfully", zap.String("scheduler-name", name)) + } } else { - log.Info("remove scheduler successfully", zap.String("scheduler-name", name)) + conf := c.GetSchedulerConfig() + if err := conf.Persist(c.GetStorage()); err != nil { + log.Error("the option can not persist scheduler config", errs.ZapError(err)) + return err + } + + if err := c.GetStorage().RemoveScheduleConfig(name); err != nil { + log.Error("can not remove the scheduler config", errs.ZapError(err)) + return err + } } return err } diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index ef52203e349..16a701b3e6c 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -137,17 +137,10 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { re.NoError(err) // Get all default scheduler names. var ( - schedulerNames []string - schedulerController = suite.pdLeaderServer.GetRaftCluster().GetCoordinator().GetSchedulersController() + schedulerNames, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllScheduleConfig() ) testutil.Eventually(re, func() bool { - schedulerNames = schedulerController.GetSchedulerNames() targetCount := len(sc.DefaultSchedulers) - // In the previous case, StoreConfig of raft-kv2 has been persisted. So, it might - // have EvictSlowTrendName. - if exists, _ := schedulerController.IsSchedulerExisted(schedulers.EvictSlowTrendName); exists { - targetCount += 1 - } return len(schedulerNames) == targetCount }) // Check all default schedulers' configs.