diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 3c3f0603408..a5c67856df8 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -181,7 +181,7 @@ func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeade } } -// EvictStores returns the IDs of the evict-stores. +// EvictStoreIDs returns the IDs of the evict-stores. func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { return s.conf.getStores() } diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 1c624dcd916..ba02c280d40 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -124,16 +124,16 @@ func CreateScheduler(typ string, oc *operator.Controller, storage endpoint.Confi return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } - s, err := fn(oc, storage, dec, removeSchedulerCb...) - if err != nil { - return nil, err - } + return fn(oc, storage, dec, removeSchedulerCb...) +} + +// SaveSchedulerConfig saves the config of the specified scheduler. +func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error { data, err := s.EncodeConfig() if err != nil { - return nil, err + return err } - err = storage.SaveSchedulerConfig(s.GetName(), data) - return s, err + return storage.SaveSchedulerConfig(s.GetName(), data) } // FindSchedulerTypeByName finds the type of the specified name. diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 4f22f50f81d..46b4947b6cd 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -138,6 +138,10 @@ func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) er } c.schedulerHandlers[name] = scheduler + if err := SaveSchedulerConfig(c.storage, scheduler); err != nil { + log.Error("can not save HTTP scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args) return nil } @@ -188,6 +192,10 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.Scheduler.GetName()] = s + if err := SaveSchedulerConfig(c.storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.GetSchedulerConfig().AddSchedulerCfg(s.Scheduler.GetType(), args) return nil } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index aa826e34406..33236c5d40c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -3111,8 +3111,9 @@ func TestPersistScheduler(t *testing.T) { // whether the schedulers added or removed in dynamic way are recorded in opt _, newOpt, err := newTestScheduleConfig() re.NoError(err) - _, err = schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) + shuffle, err := schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) re.NoError(err) + re.NoError(controller.AddScheduler(shuffle)) // suppose we add a new default enable scheduler sc.DefaultSchedulers = append(sc.DefaultSchedulers, sc.SchedulerConfig{Type: "shuffle-region"}) defer func() { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index e1b04c4ebc1..701eb9b5d69 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/tso" @@ -47,6 +48,7 @@ import ( "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -1275,6 +1277,119 @@ func TestStaleTermHeartbeat(t *testing.T) { re.NoError(err) } +func TestTransferLeaderForScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + // start + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + rc := leaderServer.GetServer().GetRaftCluster() + re.NotNil(rc) + + storesNum := 2 + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + for i := 1; i <= storesNum; i++ { + store := &metapb.Store{ + Id: uint64(i), + Address: "127.0.0.1:" + strconv.Itoa(i), + } + resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store) + re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) + } + // region heartbeat + id := leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Add evict leader scheduler + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + // Check scheduler updated. + schedulersController := rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader to another PD + tc.ResignLeader() + rc.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc1 := leaderServer.GetServer().GetRaftCluster() + rc1.Start(leaderServer.GetServer()) + re.NoError(err) + re.NotNil(rc1) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc1, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated. + schedulersController = rc1.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + rc1.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + rc.Start(leaderServer.GetServer()) + re.NotNil(rc) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated + schedulersController = rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) +} + +func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) { + testutil.Eventually(re, func() bool { + if !exist { + return sc.GetScheduler(schedulers.EvictLeaderName) == nil + } + return sc.GetScheduler(schedulers.EvictLeaderName) != nil + }) +} + +func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) { + handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + re.True(ok) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + re.True(ok) + var evictStoreIDs []uint64 + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) + re.ElementsMatch(evictStoreIDs, expected) +} + func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) { for i := 0; i < 3; i++ { regionID, err := id.Alloc()