diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 28309acddf5..ec068b9550b 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -103,6 +103,13 @@ func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, erro return false, errs.ErrScheduleConfigNotExist.FastGenByArgs() } +func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) { + conf.Lock() + defer conf.Unlock() + // if the store is not existed, no need to resume leader transfer + _, _ = conf.removeStoreLocked(id) +} + func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) { if err := conf.cluster.PauseLeaderTransfer(id); err != nil { log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) @@ -408,6 +415,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R batchFloat, ok := input["batch"].(float64) if ok { if batchFloat < 1 || batchFloat > 10 { + handler.config.removeStore(id) handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]") return } @@ -417,6 +425,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R ranges, ok := (input["ranges"]).([]string) if ok { if !inputHasStoreID { + handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id")) return } @@ -426,6 +435,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R newRanges, err = getKeyRanges(ranges) if err != nil { + handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 55b51a14cba..a15b14d210a 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -271,12 +271,13 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R err := handler.config.buildWithArgs(args) if err != nil { + _, _ = handler.config.removeStore(id) handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } err = handler.config.persist() if err != nil { - handler.config.removeStore(id) + _, _ = handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index ff3cd861973..901dd94c983 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -275,11 +275,18 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R err := handler.config.BuildWithArgs(args) if err != nil { + handler.config.mu.Lock() + handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.mu.Unlock() handler.rd.JSON(w, http.StatusBadRequest, err.Error()) return } err = handler.config.Persist() if err != nil { + handler.config.mu.Lock() + delete(handler.config.StoreIDWitRanges, id) + handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.mu.Unlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } @@ -301,7 +308,7 @@ func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.R handler.config.mu.Lock() defer handler.config.mu.Unlock() - _, exists := handler.config.StoreIDWitRanges[id] + ranges, exists := handler.config.StoreIDWitRanges[id] if !exists { handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) return @@ -309,14 +316,12 @@ func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.R delete(handler.config.StoreIDWitRanges, id) handler.config.cluster.ResumeLeaderTransfer(id) - handler.config.mu.Unlock() if err := handler.config.Persist(); err != nil { - handler.config.mu.Lock() + handler.config.StoreIDWitRanges[id] = ranges + _ = handler.config.cluster.PauseLeaderTransfer(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - handler.config.mu.Lock() - var resp any if len(handler.config.StoreIDWitRanges) == 0 { resp = noStoreInSchedulerInfo diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 0f14c48f091..63d0f091b8c 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -815,6 +815,60 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestC checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") } +func (suite *schedulerTestSuite) TestEvictLeaderScheduler() { + // FIXME: API mode may have the problem + suite.env.RunTestInPDMode(suite.checkEvictLeaderScheduler) +} + +func (suite *schedulerTestSuite) checkEvictLeaderScheduler(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b")) + output, err := tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}...) + re.NoError(err) + re.Contains(string(output), "Success!") +} + func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v any) string { output, err := tests.ExecuteCommand(cmd, args...) re.NoError(err)