Skip to content

Commit

Permalink
This is an automated cherry-pick of #8663
Browse files Browse the repository at this point in the history
ref #8619

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
rleungx authored and ti-chi-bot committed Sep 24, 2024
1 parent f672cfc commit 77082c0
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 6 deletions.
144 changes: 143 additions & 1 deletion pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,18 @@ func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
succ = true
last = len(conf.StoreIDWithRanges) == 0
}
<<<<<<< HEAD
return succ, last
=======
return false, errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
}

func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
Expand All @@ -163,6 +174,110 @@ func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRa
return nil
}

<<<<<<< HEAD
=======
func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) {
conf.RLock()
defer conf.RUnlock()
return EncodeConfig(conf)
}

func (conf *evictLeaderSchedulerConfig) reloadConfig() error {
conf.Lock()
defer conf.Unlock()
newCfg := &evictLeaderSchedulerConfig{}
if err := conf.load(newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
conf.Batch = newCfg.Batch
return nil
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.SchedulerCluster) error {
conf.RLock()
defer conf.RUnlock()
var res error
for id := range conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransfer(id); err != nil {
res = err
}
}
return res
}

func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.SchedulerCluster) {
conf.RLock()
defer conf.RUnlock()
for id := range conf.StoreIDWithRanges {
cluster.ResumeLeaderTransfer(id)
}
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) {
conf.RLock()
defer conf.RUnlock()
if _, exist := conf.StoreIDWithRanges[id]; !exist {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
return exist, err
}
}
return true, nil
}

func (conf *evictLeaderSchedulerConfig) resumeLeaderTransferIfExist(id uint64) {
conf.RLock()
defer conf.RUnlock()
conf.cluster.ResumeLeaderTransfer(id)
}

func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRange, batch int) error {
conf.Lock()
defer conf.Unlock()
if id != 0 {
conf.StoreIDWithRanges[id] = newRanges
}
conf.Batch = batch
err := conf.save()
if err != nil && id != 0 {
_, _ = conf.removeStoreLocked(id)
}
return err
}

func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) {
conf.Lock()
var resp any
last, err := conf.removeStoreLocked(id)
if err != nil {
conf.Unlock()
return resp, err
}

keyRanges := conf.StoreIDWithRanges[id]
err = conf.save()
if err != nil {
conf.resetStoreLocked(id, keyRanges)
conf.Unlock()
return resp, err
}
if !last {
conf.Unlock()
return resp, nil
}
conf.Unlock()
if err := conf.removeSchedulerCb(types.EvictLeaderScheduler.String()); err != nil {
if !errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) {
conf.resetStore(id, keyRanges)
}
return resp, err
}
resp = lastStoreDeleteInfo
return resp, nil
}

>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
type evictLeaderScheduler struct {
*BaseScheduler
conf *evictLeaderSchedulerConfig
Expand Down Expand Up @@ -365,6 +480,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
var id uint64
idFloat, ok := input["store_id"].(float64)
if ok {
<<<<<<< HEAD
id = (uint64)(idFloat)
handler.config.RLock()
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
Expand All @@ -373,25 +489,51 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
=======
if batchFloat < 1 || batchFloat > 10 {
handler.config.resumeLeaderTransferIfExist(id)
handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]")
return
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
}
handler.config.RUnlock()
args = append(args, strconv.FormatUint(id, 10))
}

ranges, ok := (input["ranges"]).([]string)
if ok {
<<<<<<< HEAD
args = append(args, ranges...)
} else if exists {
args = append(args, handler.config.getRanges(id)...)
=======
if !inputHasStoreID {
handler.config.resumeLeaderTransferIfExist(id)
handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id"))
return
}
} else if exist {
ranges = handler.config.getRanges(id)
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
}

handler.config.BuildWithArgs(args)
err := handler.config.Persist()
if err != nil {
handler.config.removeStore(id)
handler.config.resumeLeaderTransferIfExist(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
<<<<<<< HEAD
=======

// StoreIDWithRanges is only changed in update function.
err = handler.config.update(id, newRanges, batch)
if err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.")
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,19 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
handler.config.BuildWithArgs(args)
err := handler.config.Persist()
if err != nil {
<<<<<<< HEAD
handler.config.removeStore(id)
=======
handler.config.Lock()
handler.config.cluster.ResumeLeaderTransfer(id)
handler.config.Unlock()
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
err = handler.config.persist()
if err != nil {
_, _ = handler.config.removeStore(id)
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
2 changes: 1 addition & 1 deletion tests/server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (suite *regionTestSuite) TearDownTest() {
return true
})
}
suite.env.RunFuncInTwoModes(cleanFunc)
suite.env.RunTestBasedOnMode(cleanFunc)
}

func (suite *regionTestSuite) TestSplitRegions() {
Expand Down
2 changes: 1 addition & 1 deletion tests/server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (suite *ruleTestSuite) TearDownTest() {
err = tu.CheckPostJSON(testDialClient, urlPrefix+"/pd/api/v1/config/placement-rule", data, tu.StatusOK(re))
re.NoError(err)
}
suite.env.RunFuncInTwoModes(cleanFunc)
suite.env.RunTestBasedOnMode(cleanFunc)
}

func (suite *ruleTestSuite) TestSet() {
Expand Down
3 changes: 3 additions & 0 deletions tests/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func (s *SchedulingTestEnvironment) RunTestInAPIMode(test func(*TestCluster)) {
test(s.clusters[apiMode])
}

<<<<<<< HEAD
// RunFuncInTwoModes is to run func in two modes.
func (s *SchedulingTestEnvironment) RunFuncInTwoModes(f func(*TestCluster)) {
if c, ok := s.clusters[pdMode]; ok {
Expand All @@ -320,6 +321,8 @@ func (s *SchedulingTestEnvironment) RunFuncInTwoModes(f func(*TestCluster)) {
}
}

=======
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
// Cleanup is to cleanup the environment.
func (s *SchedulingTestEnvironment) Cleanup() {
for _, cluster := range s.clusters {
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/tests/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (suite *configTestSuite) TearDownTest() {
err = testutil.CheckPostJSON(testDialClient, urlPrefix+"/pd/api/v1/config/placement-rule", data, testutil.StatusOK(re))
re.NoError(err)
}
suite.env.RunFuncInTwoModes(cleanFunc)
suite.env.RunTestBasedOnMode(cleanFunc)
suite.env.Cleanup()
}

Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/tests/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (suite *hotTestSuite) TearDownTest() {
}
hotStat.HotCache.CleanCache()
}
suite.env.RunFuncInTwoModes(cleanFunc)
suite.env.RunTestBasedOnMode(cleanFunc)
}

func (suite *hotTestSuite) TestHot() {
Expand Down
63 changes: 62 additions & 1 deletion tools/pd-ctl/tests/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (suite *schedulerTestSuite) TearDownTest() {
}
}
}
suite.env.RunFuncInTwoModes(cleanFunc)
suite.env.RunTestBasedOnMode(cleanFunc)
suite.env.Cleanup()
}

Expand Down Expand Up @@ -722,6 +722,67 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestC
checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "")
}

<<<<<<< HEAD
=======
func (suite *schedulerTestSuite) TestEvictLeaderScheduler() {
suite.env.RunTestBasedOnMode(suite.checkEvictLeaderScheduler)
}

func (suite *schedulerTestSuite) checkEvictLeaderScheduler(cluster *pdTests.TestCluster) {
re := suite.Require()
pdAddr := cluster.GetConfig().GetClientURL()
cmd := ctl.GetRootCmd()

stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 2,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 3,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 4,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
}
for _, store := range stores {
pdTests.MustPutStore(re, cluster, store)
}

pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"))
output, err := tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
testutil.Eventually(re, func() bool {
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}...)
return err == nil && strings.Contains(string(output), "Success!")
})
testutil.Eventually(re, func() bool {
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "show"}...)
return err == nil && !strings.Contains(string(output), "evict-leader-scheduler")
})
}

>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v any) string {
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
Expand Down

0 comments on commit 77082c0

Please sign in to comment.