Skip to content

Commit

Permalink
This is an automated cherry-pick of #8663
Browse files Browse the repository at this point in the history
ref #8619

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
rleungx authored and ti-chi-bot committed Sep 24, 2024
1 parent eeead6b commit 15df331
Show file tree
Hide file tree
Showing 8 changed files with 4,191 additions and 1 deletion.
144 changes: 143 additions & 1 deletion pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,18 @@ func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
succ = true
last = len(conf.StoreIDWithRanges) == 0
}
<<<<<<< HEAD
return succ, last
=======
return false, errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
}

func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
Expand All @@ -163,6 +174,110 @@ func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRa
return nil
}

<<<<<<< HEAD
=======
func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) {
conf.RLock()
defer conf.RUnlock()
return EncodeConfig(conf)
}

func (conf *evictLeaderSchedulerConfig) reloadConfig() error {
conf.Lock()
defer conf.Unlock()
newCfg := &evictLeaderSchedulerConfig{}
if err := conf.load(newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
conf.Batch = newCfg.Batch
return nil
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.SchedulerCluster) error {
conf.RLock()
defer conf.RUnlock()
var res error
for id := range conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransfer(id); err != nil {
res = err
}
}
return res
}

func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.SchedulerCluster) {
conf.RLock()
defer conf.RUnlock()
for id := range conf.StoreIDWithRanges {
cluster.ResumeLeaderTransfer(id)
}
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) {
conf.RLock()
defer conf.RUnlock()
if _, exist := conf.StoreIDWithRanges[id]; !exist {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
return exist, err
}
}
return true, nil
}

func (conf *evictLeaderSchedulerConfig) resumeLeaderTransferIfExist(id uint64) {
conf.RLock()
defer conf.RUnlock()
conf.cluster.ResumeLeaderTransfer(id)
}

func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRange, batch int) error {
conf.Lock()
defer conf.Unlock()
if id != 0 {
conf.StoreIDWithRanges[id] = newRanges
}
conf.Batch = batch
err := conf.save()
if err != nil && id != 0 {
_, _ = conf.removeStoreLocked(id)
}
return err
}

func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) {
conf.Lock()
var resp any
last, err := conf.removeStoreLocked(id)
if err != nil {
conf.Unlock()
return resp, err
}

keyRanges := conf.StoreIDWithRanges[id]
err = conf.save()
if err != nil {
conf.resetStoreLocked(id, keyRanges)
conf.Unlock()
return resp, err
}
if !last {
conf.Unlock()
return resp, nil
}
conf.Unlock()
if err := conf.removeSchedulerCb(types.EvictLeaderScheduler.String()); err != nil {
if !errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) {
conf.resetStore(id, keyRanges)
}
return resp, err
}
resp = lastStoreDeleteInfo
return resp, nil
}

>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
type evictLeaderScheduler struct {
*BaseScheduler
conf *evictLeaderSchedulerConfig
Expand Down Expand Up @@ -381,6 +496,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
var id uint64
idFloat, ok := input["store_id"].(float64)
if ok {
<<<<<<< HEAD
id = (uint64)(idFloat)
handler.config.mu.RLock()
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
Expand All @@ -389,25 +505,51 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
=======
if batchFloat < 1 || batchFloat > 10 {
handler.config.resumeLeaderTransferIfExist(id)
handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]")
return
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
}
handler.config.mu.RUnlock()
args = append(args, strconv.FormatUint(id, 10))
}

ranges, ok := (input["ranges"]).([]string)
if ok {
<<<<<<< HEAD
args = append(args, ranges...)
} else if exists {
args = append(args, handler.config.getRanges(id)...)
=======
if !inputHasStoreID {
handler.config.resumeLeaderTransferIfExist(id)
handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id"))
return
}
} else if exist {
ranges = handler.config.getRanges(id)
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
}

handler.config.BuildWithArgs(args)
err := handler.config.Persist()
if err != nil {
handler.config.removeStore(id)
handler.config.resumeLeaderTransferIfExist(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
<<<<<<< HEAD
=======

// StoreIDWithRanges is only changed in update function.
err = handler.config.update(id, newRanges, batch)
if err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.")
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,19 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
handler.config.BuildWithArgs(args)
err := handler.config.Persist()
if err != nil {
<<<<<<< HEAD
handler.config.removeStore(id)
=======
handler.config.Lock()
handler.config.cluster.ResumeLeaderTransfer(id)
handler.config.Unlock()
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
err = handler.config.persist()
if err != nil {
_, _ = handler.config.removeStore(id)
>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663))
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
28 changes: 28 additions & 0 deletions tests/pdctl/hot/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,34 @@ func TestHotTestSuite(t *testing.T) {
suite.Run(t, new(hotTestSuite))
}

<<<<<<< HEAD:tests/pdctl/hot/hot_test.go
=======
func (suite *hotTestSuite) SetupSuite() {
suite.env = pdTests.NewSchedulingTestEnvironment(suite.T(),
func(conf *config.Config, _ string) {
conf.Schedule.MaxStoreDownTime.Duration = time.Hour
conf.Schedule.HotRegionCacheHitsThreshold = 0
},
)
}

func (suite *hotTestSuite) TearDownSuite() {
suite.env.Cleanup()
}

func (suite *hotTestSuite) TearDownTest() {
cleanFunc := func(cluster *pdTests.TestCluster) {
leader := cluster.GetLeaderServer()
hotStat := leader.GetRaftCluster().GetHotStat()
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
hotStat = sche.GetCluster().GetHotStat()
}
hotStat.HotCache.CleanCache()
}
suite.env.RunTestBasedOnMode(cleanFunc)
}

>>>>>>> f3e9d9ad0 (*: let TestEvictLeaderScheduler run in two modes (#8663)):tools/pd-ctl/tests/hot/hot_test.go
func (suite *hotTestSuite) TestHot() {
var start time.Time
start = start.Add(time.Hour)
Expand Down
Loading

0 comments on commit 15df331

Please sign in to comment.