Skip to content

Commit

Permalink
schedulers: transplant the recovery-duration setting to `evict-slow…
Browse files Browse the repository at this point in the history
…-store` scheduler. (tikv#7225)

ref tikv#7156

This pr is used to transplant the `recovery-duration` imported in `evict-slow-trend`
scheduler into `evict-slow-store` scheduler. 

It's useful and necessary for users who wanna use `evict-slow-score` scheduler
before we GA `evict-slow-trend` scheduler.

Signed-off-by: lucasliang <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
LykxSassinator and ti-chi-bot[bot] authored Oct 19, 2023
1 parent 445319f commit d9dbf77
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 24 deletions.
97 changes: 88 additions & 9 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
package schedulers

import (
"net/http"
"sync/atomic"
"time"

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand All @@ -23,6 +28,8 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/unrolled/render"
"go.uber.org/zap"
)

Expand All @@ -40,8 +47,27 @@ const (
var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule")

type evictSlowStoreSchedulerConfig struct {
storage endpoint.ConfigStorage
EvictedStores []uint64 `json:"evict-stores"`
storage endpoint.ConfigStorage
// Last timestamp of the chosen slow store for eviction.
lastSlowStoreCaptureTS time.Time
// Duration gap for recovering the candidate, unit: s.
RecoveryDurationGap uint64 `json:"recovery-duration"`
EvictedStores []uint64 `json:"evict-stores"`
}

func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig {
return &evictSlowStoreSchedulerConfig{
storage: storage,
lastSlowStoreCaptureTS: time.Time{},
RecoveryDurationGap: defaultRecoveryDurationGap,
EvictedStores: make([]uint64, 0),
}
}

func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig {
return &evictSlowStoreSchedulerConfig{
RecoveryDurationGap: atomic.LoadUint64(&conf.RecoveryDurationGap),
}
}

func (conf *evictSlowStoreSchedulerConfig) Persist() error {
Expand Down Expand Up @@ -78,23 +104,77 @@ func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 {
return conf.EvictedStores[0]
}

// readyForRecovery checks whether the last cpatured candidate is ready for recovery.
func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool {
recoveryDurationGap := atomic.LoadUint64(&conf.RecoveryDurationGap)
failpoint.Inject("transientRecoveryGap", func() {
recoveryDurationGap = 0
})
return uint64(time.Since(conf.lastSlowStoreCaptureTS).Seconds()) >= recoveryDurationGap
}

func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error {
conf.EvictedStores = []uint64{id}
conf.lastSlowStoreCaptureTS = time.Now()
return conf.Persist()
}

func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) {
oldID = conf.evictStore()
if oldID > 0 {
conf.EvictedStores = []uint64{}
conf.lastSlowStoreCaptureTS = time.Time{}
err = conf.Persist()
}
return
}

type evictSlowStoreHandler struct {
rd *render.Render
config *evictSlowStoreSchedulerConfig
}

func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handler {
h := &evictSlowStoreHandler{
config: config,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost)
router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet)
return router
}

func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil {
return
}
recoveryDurationGapFloat, ok := input["recovery-duration"].(float64)
if !ok {
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error())
return
}
recoveryDurationGap := (uint64)(recoveryDurationGapFloat)
prevRecoveryDurationGap := atomic.LoadUint64(&handler.config.RecoveryDurationGap)
atomic.StoreUint64(&handler.config.RecoveryDurationGap, recoveryDurationGap)
log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap))
handler.rd.JSON(w, http.StatusOK, nil)
}

func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, r *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
}

type evictSlowStoreScheduler struct {
*BaseScheduler
conf *evictSlowStoreSchedulerConfig
conf *evictSlowStoreSchedulerConfig
handler http.Handler
}

func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}

func (s *evictSlowStoreScheduler) GetName() string {
Expand Down Expand Up @@ -168,7 +248,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
// slow node next time.
log.Info("slow store has been removed",
zap.Uint64("store-id", store.GetID()))
} else if store.GetSlowScore() <= slowStoreRecoverThreshold {
} else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() {
log.Info("slow store has been recovered",
zap.Uint64("store-id", store.GetID()))
} else {
Expand Down Expand Up @@ -211,11 +291,10 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun

// newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores.
func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler {
base := NewBaseScheduler(opController)

s := &evictSlowStoreScheduler{
BaseScheduler: base,
handler := newEvictSlowStoreHandler(conf)
return &evictSlowStoreScheduler{
BaseScheduler: NewBaseScheduler(opController),
conf: conf,
handler: handler,
}
return s
}
4 changes: 4 additions & 0 deletions pkg/schedule/schedulers/evict_slow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (suite *evictSlowStoreTestSuite) TearDownTest() {
}

func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() {
suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap", "return(true)"))
storeInfo := suite.tc.GetStore(1)
newStoreInfo := storeInfo.Clone(func(store *core.StoreInfo) {
store.GetStoreStats().SlowScore = 100
Expand Down Expand Up @@ -113,6 +114,8 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() {
suite.NoError(err)
suite.Equal(es2.conf.EvictedStores, persistValue.EvictedStores)
suite.Zero(persistValue.evictStore())
suite.True(persistValue.readyForRecovery())
suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap"))
}

func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() {
Expand All @@ -124,6 +127,7 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() {

es2.conf.setStoreAndPersist(1)
suite.Equal(uint64(1), es2.conf.evictStore())
suite.False(es2.conf.readyForRecovery())
// prepare with evict store.
suite.es.Prepare(suite.tc)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func schedulersRegister() {
})

RegisterScheduler(EvictSlowStoreType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) {
conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)}
conf := initEvictSlowStoreSchedulerConfig(storage)
if err := decoder(conf); err != nil {
return nil, err
}
Expand Down
31 changes: 17 additions & 14 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,20 +407,23 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil)
re.Contains(echo, "Success!")

// test evict-slow-trend scheduler config
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-slow-trend-scheduler"}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
re.Contains(echo, "evict-slow-trend-scheduler")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "set", "recovery-duration", "100"}, nil)
re.Contains(echo, "Success!")
conf = make(map[string]interface{})
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "show"}, &conf)
re.Equal(100., conf["recovery-duration"])
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-slow-trend-scheduler"}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
re.NotContains(echo, "evict-slow-trend-scheduler")
// test evict-slow-store && evict-slow-trend schedulers config
evictSlownessSchedulers := []string{"evict-slow-store-scheduler", "evict-slow-trend-scheduler"}
for _, schedulerName := range evictSlownessSchedulers {
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
re.Contains(echo, schedulerName)
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil)
re.Contains(echo, "Success!")
conf = make(map[string]interface{})
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf)
re.Equal(100., conf["recovery-duration"])
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
re.NotContains(echo, schedulerName)
}

// test show scheduler with paused and disabled status.
checkSchedulerWithStatusCommand := func(status string, expected []string) {
Expand Down
20 changes: 20 additions & 0 deletions tools/pd-ctl/pdctl/command/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ func NewConfigSchedulerCommand() *cobra.Command {
newConfigGrantHotRegionCommand(),
newConfigBalanceLeaderCommand(),
newSplitBucketCommand(),
newConfigEvictSlowStoreCommand(),
newConfigEvictSlowTrendCommand(),
)
return c
Expand Down Expand Up @@ -776,6 +777,25 @@ func setShuffleRegionSchedulerRolesCommandFunc(cmd *cobra.Command, args []string
cmd.Println("Success!")
}

func newConfigEvictSlowStoreCommand() *cobra.Command {
c := &cobra.Command{
Use: "evict-slow-store-scheduler",
Short: "evict-slow-store-scheduler config",
Run: listSchedulerConfigCommandFunc,
}

c.AddCommand(&cobra.Command{
Use: "show",
Short: "list the config item",
Run: listSchedulerConfigCommandFunc,
}, &cobra.Command{
Use: "set <key> <value>",
Short: "set the config item",
Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) },
})
return c
}

func newConfigEvictSlowTrendCommand() *cobra.Command {
c := &cobra.Command{
Use: "evict-slow-trend-scheduler",
Expand Down

0 comments on commit d9dbf77

Please sign in to comment.