Skip to content

Commit

Permalink
save work
Browse files Browse the repository at this point in the history
Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Aug 2, 2024
1 parent 87ec788 commit ede4112
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 148 deletions.
5 changes: 3 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,12 @@ func (c *Cluster) updateScheduler() {
)
// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
schedulerType := types.SchedulerTypeCompatibleMapReverse[scheduler.Type]
s, err := schedulers.CreateScheduler(
scheduler.Type,
schedulerType,
c.coordinator.GetOperatorController(),
c.storage,
schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args),
schedulers.ConfigSliceDecoder(schedulerType, scheduler.Args),
schedulersController.RemoveScheduler,
)
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/mock/mockconfig/mockconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,24 @@ package mockconfig

import (
sc "github.com/tikv/pd/pkg/schedule/config"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/server/config"
)

// oldName2NewType is used to convert old scheduler name to new scheduler type.
// It is just used for testing.
var oldName2NewType = map[string]types.CheckerSchedulerType{
"balance-leader": types.BalanceLeaderScheduler,
"balance-region": types.BalanceRegionScheduler,
"hot-region": types.BalanceHotRegionScheduler,
"evict-slow-store": types.EvictLeaderScheduler,
}

// NewTestOptions creates default options for testing.
func NewTestOptions() *config.PersistOptions {
// register default schedulers in case config check fail.
for _, d := range sc.DefaultSchedulers {
sc.RegisterScheduler(d.Type)
sc.RegisterScheduler(oldName2NewType[d.Type])
}
c := config.NewConfig()
c.Adjust(nil, false)
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ const RejectLeader = "reject-leader"
var schedulerMap sync.Map

// RegisterScheduler registers the scheduler type.
func RegisterScheduler(typ string) {
func RegisterScheduler(typ types.CheckerSchedulerType) {
schedulerMap.Store(typ, struct{}{})
}

// IsSchedulerRegistered checks if the named scheduler type is registered.
func IsSchedulerRegistered(name string) bool {
_, ok := schedulerMap.Load(name)
func IsSchedulerRegistered(typ types.CheckerSchedulerType) bool {
_, ok := schedulerMap.Load(typ)
return ok
}

Expand Down
32 changes: 21 additions & 11 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -187,7 +188,7 @@ func (c *Coordinator) driveSlowNodeScheduler() {
// If the cluster was set up with `raft-kv2` engine, this cluster should
// enable `evict-slow-trend` scheduler as default.
if c.GetCluster().GetStoreConfig().IsRaftKV2() {
typ := schedulers.EvictSlowTrendType
typ := types.EvictSlowTrendScheduler
args := []string{}

s, err := schedulers.CreateScheduler(typ, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(typ, args), c.schedulers.RemoveScheduler)
Expand Down Expand Up @@ -288,20 +289,23 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args))
continue
}
s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler)
s, err := schedulers.CreateScheduler(types.SchedulerStr2Type[cfg.Type], c.opController,
c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
continue
}
if needRun {
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
log.Error("can not add scheduler with independent configuration",
zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
} else {
log.Info("create scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddSchedulerHandler(s); err != nil {
log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
log.Error("can not add scheduler handler with independent configuration",
zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
}
}
Expand All @@ -316,16 +320,22 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
continue
}

s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args), c.schedulers.RemoveScheduler)
tp := types.SchedulerStr2Type[schedulerCfg.Type]
s, err := schedulers.CreateScheduler(tp, c.opController,
c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(tp, schedulerCfg.Args), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
log.Error("can not create scheduler", zap.Stringer("scheduler-type", tp),
zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
continue
}

if needRun {
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()),
zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil &&
!errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name",
s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
Expand Down Expand Up @@ -362,7 +372,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
log.Error("GetFunction SchedulerType error", errs.ZapError(err))
return
}
schedulerType := SchedulerType.(func() string)
schedulerType := SchedulerType.(func() types.CheckerSchedulerType)
// get func: SchedulerArgs from plugin
SchedulerArgs, err := c.pluginInterface.GetFunction(pluginPath, "SchedulerArgs")
if err != nil {
Expand All @@ -373,7 +383,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
// create and add user scheduler
s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), errs.ZapError(err))
log.Error("can not create scheduler", zap.Stringer("scheduler-type", schedulerType()), errs.ZapError(err))
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
Expand Down
12 changes: 6 additions & 6 deletions pkg/schedule/schedulers/hot_region_rank_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
sche, err := CreateScheduler(writeType, oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
hb := sche.(*hotScheduler)
hb.types = []resourceType{writePeer}
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
sche, err := CreateScheduler(writeType, oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
hb := sche.(*hotScheduler)
hb.types = []resourceType{writePeer}
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
sche, err := CreateScheduler(writeType, oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
hb := sche.(*hotScheduler)
hb.types = []resourceType{writePeer}
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
sche, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
sche, err := CreateScheduler(readType, oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
hb := sche.(*hotScheduler)
hb.conf.SetDstToleranceRatio(0.0)
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestSkipUniformStore(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
hb, err := CreateScheduler(readType, oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
Expand Down Expand Up @@ -419,7 +419,7 @@ func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLo
addOtherRegions func(*mockcluster.Cluster, *hotScheduler)) []*operator.Operator {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
sche, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil)
sche, err := CreateScheduler(readType, oc, storage.NewStorageWithMemoryBackend(), nil, nil)
re.NoError(err)
hb := sche.(*hotScheduler)
hb.conf.SetSrcToleranceRatio(1)
Expand Down
Loading

0 comments on commit ede4112

Please sign in to comment.