From a5cdd290c40518759a8b3c600db9f6fcaf28f377 Mon Sep 17 00:00:00 2001 From: husharp Date: Thu, 21 Sep 2023 19:30:33 +0800 Subject: [PATCH 1/2] fix Signed-off-by: husharp --- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/schedule/coordinator.go | 12 +- pkg/schedule/schedulers/evict_leader.go | 2 +- pkg/schedule/schedulers/scheduler.go | 14 +-- .../schedulers/scheduler_controller.go | 12 +- server/cluster/cluster.go | 8 +- server/cluster/cluster_test.go | 19 +-- server/handler.go | 4 +- tests/server/cluster/cluster_test.go | 115 ++++++++++++++++++ 9 files changed, 156 insertions(+), 32 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index b2986f722df..fb18a79e918 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -252,7 +252,7 @@ func (c *Cluster) updateScheduler() { zap.Strings("scheduler-args", scheduler.Args)) continue } - if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { + if err := schedulersController.AddScheduler(c.storage, s, scheduler.Args...); err != nil { log.Error("failed to add scheduler", zap.String("scheduler-name", name), zap.Strings("scheduler-args", scheduler.Args), diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 8cd5567b75c..1e820cba864 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -357,7 +357,7 @@ func (c *Coordinator) driveSlowNodeScheduler() { s, err := schedulers.CreateScheduler(typ, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(typ, args), c.schedulers.RemoveScheduler) if err != nil { log.Warn("initializing evict-slow-trend scheduler failed", errs.ZapError(err)) - } else if err = c.schedulers.AddScheduler(s, args...); err != nil { + } else if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s, args...); err != nil { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) } } @@ -459,10 +459,10 @@ func (c *Coordinator) InitSchedulers(needRun bool) { } log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if needRun { - if err = c.schedulers.AddScheduler(s); err != nil { + if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s); err != nil { log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } - } else if err = c.schedulers.AddSchedulerHandler(s); err != nil { + } else if err = c.schedulers.AddSchedulerHandler(c.cluster.GetStorage(), s); err != nil { log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } } @@ -485,14 +485,14 @@ func (c *Coordinator) InitSchedulers(needRun bool) { log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if needRun { - if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } else { // Only records the valid scheduler config. scheduleCfg.Schedulers[k] = schedulerCfg k++ } - } else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + } else if err = c.schedulers.AddSchedulerHandler(c.cluster.GetStorage(), s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } } @@ -532,7 +532,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) { } log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) // TODO: handle the plugin in API service mode. - if err = c.schedulers.AddScheduler(s); err != nil { + if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s); err != nil { log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err)) return } diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 2551b9ac9cb..51d99013158 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -181,7 +181,7 @@ func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeade } } -// EvictStores returns the IDs of the evict-stores. +// EvictStoreIDs returns the IDs of the evict-stores. func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { return s.conf.getStores() } diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 1c624dcd916..ba02c280d40 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -124,16 +124,16 @@ func CreateScheduler(typ string, oc *operator.Controller, storage endpoint.Confi return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } - s, err := fn(oc, storage, dec, removeSchedulerCb...) - if err != nil { - return nil, err - } + return fn(oc, storage, dec, removeSchedulerCb...) +} + +// SaveSchedulerConfig saves the config of the specified scheduler. +func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error { data, err := s.EncodeConfig() if err != nil { - return nil, err + return err } - err = storage.SaveSchedulerConfig(s.GetName(), data) - return s, err + return storage.SaveSchedulerConfig(s.GetName(), data) } // FindSchedulerTypeByName finds the type of the specified name. diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 4d72699b0fe..80e88b87989 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -127,7 +127,7 @@ func (c *Controller) ResetSchedulerMetrics() { } // AddSchedulerHandler adds the HTTP handler for a scheduler. -func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) error { +func (c *Controller) AddSchedulerHandler(storage endpoint.ConfigStorage, scheduler Scheduler, args ...string) error { c.Lock() defer c.Unlock() @@ -137,6 +137,10 @@ func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) er } c.schedulerHandlers[name] = scheduler + if err := SaveSchedulerConfig(storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args) return nil } @@ -171,7 +175,7 @@ func (c *Controller) RemoveSchedulerHandler(name string) error { } // AddScheduler adds a scheduler. -func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { +func (c *Controller) AddScheduler(storage endpoint.ConfigStorage, scheduler Scheduler, args ...string) error { c.Lock() defer c.Unlock() @@ -187,6 +191,10 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.Scheduler.GetName()] = s + if err := SaveSchedulerConfig(storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.GetSchedulerConfig().AddSchedulerCfg(s.Scheduler.GetType(), args) return nil } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index d42dbb21ed1..670f7d181d4 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -791,8 +791,8 @@ func (c *RaftCluster) GetSchedulerHandlers() map[string]http.Handler { } // AddSchedulerHandler adds a scheduler handler. -func (c *RaftCluster) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error { - return c.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...) +func (c *RaftCluster) AddSchedulerHandler(storage endpoint.ConfigStorage, scheduler schedulers.Scheduler, args ...string) error { + return c.coordinator.GetSchedulersController().AddSchedulerHandler(storage, scheduler, args...) } // RemoveSchedulerHandler removes a scheduler handler. @@ -801,8 +801,8 @@ func (c *RaftCluster) RemoveSchedulerHandler(name string) error { } // AddScheduler adds a scheduler. -func (c *RaftCluster) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { - return c.coordinator.GetSchedulersController().AddScheduler(scheduler, args...) +func (c *RaftCluster) AddScheduler(storage endpoint.ConfigStorage, scheduler schedulers.Scheduler, args ...string) error { + return c.coordinator.GetSchedulersController().AddScheduler(storage, scheduler, args...) } // RemoveScheduler removes a scheduler. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index aa826e34406..725f87bc070 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -328,7 +328,7 @@ func addEvictLeaderScheduler(cluster *RaftCluster, storeID uint64) (evictSchedul if err != nil { return } - if err = cluster.AddScheduler(evictScheduler, args...); err != nil { + if err = cluster.AddScheduler(cluster.storage, evictScheduler, args...); err != nil { return } else if err = cluster.opt.Persist(cluster.GetStorage()); err != nil { return @@ -3035,12 +3035,12 @@ func TestAddScheduler(t *testing.T) { re.Equal(4, int(batch)) gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), controller.RemoveScheduler) re.NoError(err) - re.NotNil(controller.AddScheduler(gls)) + re.NotNil(controller.AddScheduler(storage.NewStorageWithMemoryBackend(), gls)) re.NotNil(controller.RemoveScheduler(gls.GetName())) gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(gls)) + re.NoError(controller.AddScheduler(storage.NewStorageWithMemoryBackend(), gls)) hb, err := schedulers.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) @@ -3087,10 +3087,10 @@ func TestPersistScheduler(t *testing.T) { gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(gls1, "1")) + re.NoError(controller.AddScheduler(storage, gls1, "1")) evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(evict, "2")) + re.NoError(controller.AddScheduler(storage, evict, "2")) re.Len(controller.GetSchedulerNames(), defaultCount+2) sches, _, err := storage.LoadAllSchedulerConfigs() re.NoError(err) @@ -3111,8 +3111,9 @@ func TestPersistScheduler(t *testing.T) { // whether the schedulers added or removed in dynamic way are recorded in opt _, newOpt, err := newTestScheduleConfig() re.NoError(err) - _, err = schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) + shuffle, err := schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) re.NoError(err) + re.NoError(controller.AddScheduler(storage, shuffle)) // suppose we add a new default enable scheduler sc.DefaultSchedulers = append(sc.DefaultSchedulers, sc.SchedulerConfig{Type: "shuffle-region"}) defer func() { @@ -3148,10 +3149,10 @@ func TestPersistScheduler(t *testing.T) { re.Len(controller.GetSchedulerNames(), 3) bls, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) - re.NoError(controller.AddScheduler(bls)) + re.NoError(controller.AddScheduler(storage, bls)) brs, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) - re.NoError(controller.AddScheduler(brs)) + re.NoError(controller.AddScheduler(storage, brs)) re.Len(controller.GetSchedulerNames(), defaultCount) // the scheduler option should contain 6 items @@ -3200,7 +3201,7 @@ func TestRemoveScheduler(t *testing.T) { gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(gls1, "1")) + re.NoError(controller.AddScheduler(storage, gls1, "1")) re.Len(controller.GetSchedulerNames(), defaultCount+1) sches, _, err := storage.LoadAllSchedulerConfigs() re.NoError(err) diff --git a/server/handler.go b/server/handler.go index ecc337b7193..b355792578d 100644 --- a/server/handler.go +++ b/server/handler.go @@ -226,13 +226,13 @@ func (h *Handler) AddScheduler(name string, args ...string) error { } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args)) if h.s.IsAPIServiceMode() { - if err = c.AddSchedulerHandler(s, args...); err != nil { + if err = c.AddSchedulerHandler(h.s.storage, s, args...); err != nil { log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) return err } log.Info("add scheduler handler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) } else { - if err = c.AddScheduler(s, args...); err != nil { + if err = c.AddScheduler(h.s.storage, s, args...); err != nil { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) return err } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index e1b04c4ebc1..701eb9b5d69 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/tso" @@ -47,6 +48,7 @@ import ( "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -1275,6 +1277,119 @@ func TestStaleTermHeartbeat(t *testing.T) { re.NoError(err) } +func TestTransferLeaderForScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + // start + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + rc := leaderServer.GetServer().GetRaftCluster() + re.NotNil(rc) + + storesNum := 2 + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + for i := 1; i <= storesNum; i++ { + store := &metapb.Store{ + Id: uint64(i), + Address: "127.0.0.1:" + strconv.Itoa(i), + } + resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store) + re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) + } + // region heartbeat + id := leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Add evict leader scheduler + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + // Check scheduler updated. + schedulersController := rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader to another PD + tc.ResignLeader() + rc.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc1 := leaderServer.GetServer().GetRaftCluster() + rc1.Start(leaderServer.GetServer()) + re.NoError(err) + re.NotNil(rc1) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc1, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated. + schedulersController = rc1.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + rc1.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + rc.Start(leaderServer.GetServer()) + re.NotNil(rc) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated + schedulersController = rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) +} + +func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) { + testutil.Eventually(re, func() bool { + if !exist { + return sc.GetScheduler(schedulers.EvictLeaderName) == nil + } + return sc.GetScheduler(schedulers.EvictLeaderName) != nil + }) +} + +func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) { + handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + re.True(ok) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + re.True(ok) + var evictStoreIDs []uint64 + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) + re.ElementsMatch(evictStoreIDs, expected) +} + func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) { for i := 0; i < 3; i++ { regionID, err := id.Alloc() From 122f9f3e2c41dd24352b8f70c1580a87d45dd90b Mon Sep 17 00:00:00 2001 From: husharp Date: Tue, 26 Sep 2023 15:26:46 +0800 Subject: [PATCH 2/2] remove redundant storage Signed-off-by: husharp --- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/schedule/coordinator.go | 12 ++++++------ .../schedulers/scheduler_controller.go | 10 +++++----- server/cluster/cluster.go | 8 ++++---- server/cluster/cluster_test.go | 18 +++++++++--------- server/handler.go | 4 ++-- 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index edce8163c72..20af077d241 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -252,7 +252,7 @@ func (c *Cluster) updateScheduler() { zap.Strings("scheduler-args", scheduler.Args)) continue } - if err := schedulersController.AddScheduler(c.storage, s, scheduler.Args...); err != nil { + if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { log.Error("failed to add scheduler", zap.String("scheduler-name", name), zap.Strings("scheduler-args", scheduler.Args), diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 1e820cba864..8cd5567b75c 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -357,7 +357,7 @@ func (c *Coordinator) driveSlowNodeScheduler() { s, err := schedulers.CreateScheduler(typ, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(typ, args), c.schedulers.RemoveScheduler) if err != nil { log.Warn("initializing evict-slow-trend scheduler failed", errs.ZapError(err)) - } else if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s, args...); err != nil { + } else if err = c.schedulers.AddScheduler(s, args...); err != nil { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) } } @@ -459,10 +459,10 @@ func (c *Coordinator) InitSchedulers(needRun bool) { } log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if needRun { - if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s); err != nil { + if err = c.schedulers.AddScheduler(s); err != nil { log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } - } else if err = c.schedulers.AddSchedulerHandler(c.cluster.GetStorage(), s); err != nil { + } else if err = c.schedulers.AddSchedulerHandler(s); err != nil { log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } } @@ -485,14 +485,14 @@ func (c *Coordinator) InitSchedulers(needRun bool) { log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if needRun { - if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } else { // Only records the valid scheduler config. scheduleCfg.Schedulers[k] = schedulerCfg k++ } - } else if err = c.schedulers.AddSchedulerHandler(c.cluster.GetStorage(), s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + } else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } } @@ -532,7 +532,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) { } log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) // TODO: handle the plugin in API service mode. - if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s); err != nil { + if err = c.schedulers.AddScheduler(s); err != nil { log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err)) return } diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 80e88b87989..72ae1db10fd 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -127,7 +127,7 @@ func (c *Controller) ResetSchedulerMetrics() { } // AddSchedulerHandler adds the HTTP handler for a scheduler. -func (c *Controller) AddSchedulerHandler(storage endpoint.ConfigStorage, scheduler Scheduler, args ...string) error { +func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) error { c.Lock() defer c.Unlock() @@ -137,8 +137,8 @@ func (c *Controller) AddSchedulerHandler(storage endpoint.ConfigStorage, schedul } c.schedulerHandlers[name] = scheduler - if err := SaveSchedulerConfig(storage, scheduler); err != nil { - log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + if err := SaveSchedulerConfig(c.storage, scheduler); err != nil { + log.Error("can not save HTTP scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) return err } c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args) @@ -175,7 +175,7 @@ func (c *Controller) RemoveSchedulerHandler(name string) error { } // AddScheduler adds a scheduler. -func (c *Controller) AddScheduler(storage endpoint.ConfigStorage, scheduler Scheduler, args ...string) error { +func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { c.Lock() defer c.Unlock() @@ -191,7 +191,7 @@ func (c *Controller) AddScheduler(storage endpoint.ConfigStorage, scheduler Sche c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.Scheduler.GetName()] = s - if err := SaveSchedulerConfig(storage, scheduler); err != nil { + if err := SaveSchedulerConfig(c.storage, scheduler); err != nil { log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) return err } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 09a9c2f1e6f..22e1b16d822 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -791,8 +791,8 @@ func (c *RaftCluster) GetSchedulerHandlers() map[string]http.Handler { } // AddSchedulerHandler adds a scheduler handler. -func (c *RaftCluster) AddSchedulerHandler(storage endpoint.ConfigStorage, scheduler schedulers.Scheduler, args ...string) error { - return c.coordinator.GetSchedulersController().AddSchedulerHandler(storage, scheduler, args...) +func (c *RaftCluster) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error { + return c.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...) } // RemoveSchedulerHandler removes a scheduler handler. @@ -801,8 +801,8 @@ func (c *RaftCluster) RemoveSchedulerHandler(name string) error { } // AddScheduler adds a scheduler. -func (c *RaftCluster) AddScheduler(storage endpoint.ConfigStorage, scheduler schedulers.Scheduler, args ...string) error { - return c.coordinator.GetSchedulersController().AddScheduler(storage, scheduler, args...) +func (c *RaftCluster) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { + return c.coordinator.GetSchedulersController().AddScheduler(scheduler, args...) } // RemoveScheduler removes a scheduler. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 725f87bc070..33236c5d40c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -328,7 +328,7 @@ func addEvictLeaderScheduler(cluster *RaftCluster, storeID uint64) (evictSchedul if err != nil { return } - if err = cluster.AddScheduler(cluster.storage, evictScheduler, args...); err != nil { + if err = cluster.AddScheduler(evictScheduler, args...); err != nil { return } else if err = cluster.opt.Persist(cluster.GetStorage()); err != nil { return @@ -3035,12 +3035,12 @@ func TestAddScheduler(t *testing.T) { re.Equal(4, int(batch)) gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), controller.RemoveScheduler) re.NoError(err) - re.NotNil(controller.AddScheduler(storage.NewStorageWithMemoryBackend(), gls)) + re.NotNil(controller.AddScheduler(gls)) re.NotNil(controller.RemoveScheduler(gls.GetName())) gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(storage.NewStorageWithMemoryBackend(), gls)) + re.NoError(controller.AddScheduler(gls)) hb, err := schedulers.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) @@ -3087,10 +3087,10 @@ func TestPersistScheduler(t *testing.T) { gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(storage, gls1, "1")) + re.NoError(controller.AddScheduler(gls1, "1")) evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(storage, evict, "2")) + re.NoError(controller.AddScheduler(evict, "2")) re.Len(controller.GetSchedulerNames(), defaultCount+2) sches, _, err := storage.LoadAllSchedulerConfigs() re.NoError(err) @@ -3113,7 +3113,7 @@ func TestPersistScheduler(t *testing.T) { re.NoError(err) shuffle, err := schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) re.NoError(err) - re.NoError(controller.AddScheduler(storage, shuffle)) + re.NoError(controller.AddScheduler(shuffle)) // suppose we add a new default enable scheduler sc.DefaultSchedulers = append(sc.DefaultSchedulers, sc.SchedulerConfig{Type: "shuffle-region"}) defer func() { @@ -3149,10 +3149,10 @@ func TestPersistScheduler(t *testing.T) { re.Len(controller.GetSchedulerNames(), 3) bls, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) - re.NoError(controller.AddScheduler(storage, bls)) + re.NoError(controller.AddScheduler(bls)) brs, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) - re.NoError(controller.AddScheduler(storage, brs)) + re.NoError(controller.AddScheduler(brs)) re.Len(controller.GetSchedulerNames(), defaultCount) // the scheduler option should contain 6 items @@ -3201,7 +3201,7 @@ func TestRemoveScheduler(t *testing.T) { gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(storage, gls1, "1")) + re.NoError(controller.AddScheduler(gls1, "1")) re.Len(controller.GetSchedulerNames(), defaultCount+1) sches, _, err := storage.LoadAllSchedulerConfigs() re.NoError(err) diff --git a/server/handler.go b/server/handler.go index b355792578d..ecc337b7193 100644 --- a/server/handler.go +++ b/server/handler.go @@ -226,13 +226,13 @@ func (h *Handler) AddScheduler(name string, args ...string) error { } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args)) if h.s.IsAPIServiceMode() { - if err = c.AddSchedulerHandler(h.s.storage, s, args...); err != nil { + if err = c.AddSchedulerHandler(s, args...); err != nil { log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) return err } log.Info("add scheduler handler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) } else { - if err = c.AddScheduler(h.s.storage, s, args...); err != nil { + if err = c.AddScheduler(s, args...); err != nil { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) return err }