From 17451bbb4c2dec637e33634b19a2a8051b9ae996 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 18 Sep 2023 16:37:40 +0800 Subject: [PATCH] Fix the evict-leader-scheduler sync Signed-off-by: JmPotato --- pkg/mcs/scheduling/server/cluster.go | 1 + pkg/schedule/schedulers/evict_leader.go | 13 +++ server/handler.go | 8 +- .../mcs/scheduling/server_test.go | 93 +++++++++++++------ tests/server/api/testutil.go | 25 ++++- 5 files changed, 110 insertions(+), 30 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 917831ba9cad..eb6876db0a19 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -204,6 +204,7 @@ func (c *Cluster) updateScheduler() { log.Info("cluster is closing, stop listening the schedulers updating notifier") return case <-notifier: + // This is triggered by the watcher when the schedulers are updated. } log.Info("schedulers updating notifier is triggered, try to update the scheduler") diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 1989c42ba6f9..2551b9ac9cbb 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -218,6 +218,19 @@ func (s *evictLeaderScheduler) ReloadConfig() error { if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { return err } + // Resume and pause the leader transfer for each store. + for id := range s.conf.StoreIDWithRanges { + if _, ok := newCfg.StoreIDWithRanges[id]; ok { + continue + } + s.conf.cluster.ResumeLeaderTransfer(id) + } + for id := range newCfg.StoreIDWithRanges { + if _, ok := s.conf.StoreIDWithRanges[id]; ok { + continue + } + s.conf.cluster.PauseLeaderTransfer(id) + } s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges return nil } diff --git a/server/handler.go b/server/handler.go index a90f8e3f04f3..29373dc286ec 100644 --- a/server/handler.go +++ b/server/handler.go @@ -231,7 +231,13 @@ func (h *Handler) AddScheduler(name string, args ...string) error { return err } - s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args), c.GetCoordinator().GetSchedulersController().RemoveScheduler) + var removeSchedulerCb func(string) error + if h.s.IsAPIServiceMode() { + removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveSchedulerHandler + } else { + removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveScheduler + } + s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args), removeSchedulerCb) if err != nil { return err } diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 187ba54dfcb0..7b69e66dd687 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -17,12 +17,14 @@ package scheduling import ( "context" "fmt" + "net/http" "testing" "time" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/schedulers" @@ -196,25 +198,15 @@ func (suite *serverTestSuite) TestSchedulerSync() { defer tc.Destroy() tc.WaitForPrimaryServing(re) schedulersController := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetSchedulersController() - re.Len(schedulersController.GetSchedulerNames(), 5) - re.Nil(schedulersController.GetScheduler(schedulers.EvictLeaderName)) + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, false) // Add a new evict-leader-scheduler through the API server. api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ "store_id": 1, }) // Check if the evict-leader-scheduler is added. - testutil.Eventually(re, func() bool { - return len(schedulersController.GetSchedulerNames()) == 6 && - schedulersController.GetScheduler(schedulers.EvictLeaderName) != nil - }) - handler, ok := schedulersController.GetSchedulerHandlers()[schedulers.EvictLeaderName] - re.True(ok) - h, ok := handler.(interface { - EvictStoreIDs() []uint64 - }) - re.True(ok) - re.ElementsMatch(h.EvictStoreIDs(), []uint64{1}) - // Update the evict-leader-scheduler through the API server. + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) + // Add a store_id to the evict-leader-scheduler through the API server. err = suite.pdLeader.GetServer().GetRaftCluster().PutStore( &metapb.Store{ Id: 2, @@ -229,25 +221,70 @@ func (suite *serverTestSuite) TestSchedulerSync() { api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ "store_id": 2, }) - var evictStoreIDs []uint64 - testutil.Eventually(re, func() bool { - evictStoreIDs = h.EvictStoreIDs() - return len(evictStoreIDs) == 2 + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + // Delete a store_id from the evict-leader-scheduler through the API server. + api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1)) + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{2}) + // Add a store_id to the evict-leader-scheduler through the API server by the scheduler handler. + api.MustCallSchedulerConfigAPI(re, http.MethodPost, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"config"}, map[string]interface{}{ + "name": schedulers.EvictLeaderName, + "store_id": 1, }) - re.ElementsMatch(evictStoreIDs, []uint64{1, 2}) + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + // Delete a store_id from the evict-leader-scheduler through the API server by the scheduler handler. + api.MustCallSchedulerConfigAPI(re, http.MethodDelete, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"delete", "2"}, nil) + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) + // If the last store is deleted, the scheduler should be removed. + api.MustCallSchedulerConfigAPI(re, http.MethodDelete, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"delete", "1"}, nil) + // Check if the scheduler is removed. + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, false) + + // Delete the evict-leader-scheduler through the API server by removing the last store_id. + api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1)) - testutil.Eventually(re, func() bool { - evictStoreIDs = h.EvictStoreIDs() - return len(evictStoreIDs) == 1 + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, false) + + // Delete the evict-leader-scheduler through the API server. + api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, }) - re.ElementsMatch(evictStoreIDs, []uint64{2}) - // Remove the evict-leader-scheduler through the API server. + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName) - // Check if the scheduler is removed. + checkSchedulerExist(re, schedulersController, schedulers.EvictLeaderName, false) + + // TODO: test more schedulers. +} + +func checkSchedulerExist(re *require.Assertions, sc *schedulers.Controller, schedulerName string, exist bool) { + re.NotEmpty(schedulerName) testutil.Eventually(re, func() bool { - return len(schedulersController.GetSchedulerNames()) == 5 && - schedulersController.GetScheduler(schedulers.EvictLeaderName) == nil + if !exist { + return sc.GetScheduler(schedulerName) == nil + } + return sc.GetScheduler(schedulerName) != nil }) +} - // TODO: test more schedulers. +func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) { + handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + re.True(ok) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + re.True(ok) + var evictStoreIDs []uint64 + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) + re.ElementsMatch(evictStoreIDs, expected) } diff --git a/tests/server/api/testutil.go b/tests/server/api/testutil.go index 7a33bf39048f..c6c2cc79611e 100644 --- a/tests/server/api/testutil.go +++ b/tests/server/api/testutil.go @@ -20,11 +20,15 @@ import ( "fmt" "io" "net/http" + "path" "github.com/stretchr/testify/require" ) -const schedulersPrefix = "/pd/api/v1/schedulers" +const ( + schedulersPrefix = "/pd/api/v1/schedulers" + schedulerConfigPrefix = "/pd/api/v1/scheduler-config" +) // dialClient used to dial http request. var dialClient = &http.Client{ @@ -68,3 +72,22 @@ func MustDeleteScheduler(re *require.Assertions, serverAddr, schedulerName strin re.NoError(err) re.Equal(http.StatusOK, resp.StatusCode, string(data)) } + +// MustCallSchedulerConfigAPI calls a scheduler config with HTTP API with the given args. +func MustCallSchedulerConfigAPI( + re *require.Assertions, + method, serverAddr, schedulerName string, args []string, + input map[string]interface{}, +) { + data, err := json.Marshal(input) + re.NoError(err) + args = append([]string{schedulerConfigPrefix, schedulerName}, args...) + httpReq, err := http.NewRequest(method, fmt.Sprintf("%s%s", serverAddr, path.Join(args...)), bytes.NewBuffer(data)) + re.NoError(err) + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + data, err = io.ReadAll(resp.Body) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) +}