Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: instead scheduler type string of types.CheckerSchedulerType #8485

Merged
merged 18 commits into from
Aug 8, 2024
Merged
5 changes: 3 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,12 @@ func (c *Cluster) updateScheduler() {
)
// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
schedulerType := types.ConvertOldStr2Type[scheduler.Type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does ConvertOldStrToType sound to you? Variable names with embedded numbers are not common in PD.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are same for me. I can update it

s, err := schedulers.CreateScheduler(
scheduler.Type,
schedulerType,
c.coordinator.GetOperatorController(),
c.storage,
schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args),
schedulers.ConfigSliceDecoder(schedulerType, scheduler.Args),
schedulersController.RemoveScheduler,
)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/mock/mockconfig/mockconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ package mockconfig

import (
sc "github.com/tikv/pd/pkg/schedule/config"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/server/config"
)

// NewTestOptions creates default options for testing.
func NewTestOptions() *config.PersistOptions {
// register default schedulers in case config check fail.
for _, d := range sc.DefaultSchedulers {
sc.RegisterScheduler(d.Type)
sc.RegisterScheduler(types.ConvertOldStr2Type[d.Type])
}
c := config.NewConfig()
c.Adjust(nil, false)
Expand Down
9 changes: 5 additions & 4 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/storelimit"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -571,10 +572,10 @@ type SchedulerConfig struct {
// If these schedulers are not in the persistent configuration, they
// will be created automatically when reloading.
var DefaultSchedulers = SchedulerConfigs{
{Type: "balance-region"},
{Type: "balance-leader"},
{Type: "hot-region"},
{Type: "evict-slow-store"},
{Type: types.SchedulerTypeCompatibleMap[types.BalanceRegionScheduler]},
{Type: types.SchedulerTypeCompatibleMap[types.BalanceLeaderScheduler]},
{Type: types.SchedulerTypeCompatibleMap[types.BalanceHotRegionScheduler]},
{Type: types.SchedulerTypeCompatibleMap[types.EvictSlowStoreScheduler]},
}

// IsDefaultScheduler checks whether the scheduler is enabled by default.
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
var schedulerMap sync.Map

// RegisterScheduler registers the scheduler type.
func RegisterScheduler(typ string) {
func RegisterScheduler(typ types.CheckerSchedulerType) {
schedulerMap.Store(typ, struct{}{})
}

// IsSchedulerRegistered checks if the named scheduler type is registered.
func IsSchedulerRegistered(name string) bool {
_, ok := schedulerMap.Load(name)
func IsSchedulerRegistered(typ types.CheckerSchedulerType) bool {
_, ok := schedulerMap.Load(typ)

Check warning on line 42 in pkg/schedule/config/config_provider.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/config/config_provider.go#L41-L42

Added lines #L41 - L42 were not covered by tests
return ok
}

Expand Down
34 changes: 22 additions & 12 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -187,7 +188,7 @@
// If the cluster was set up with `raft-kv2` engine, this cluster should
// enable `evict-slow-trend` scheduler as default.
if c.GetCluster().GetStoreConfig().IsRaftKV2() {
typ := schedulers.EvictSlowTrendType
typ := types.EvictSlowTrendScheduler

Check warning on line 191 in pkg/schedule/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/coordinator.go#L191

Added line #L191 was not covered by tests
args := []string{}

s, err := schedulers.CreateScheduler(typ, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(typ, args), c.schedulers.RemoveScheduler)
Expand Down Expand Up @@ -275,7 +276,7 @@
typ := schedulers.FindSchedulerTypeByName(name)
var cfg sc.SchedulerConfig
for _, c := range scheduleCfg.Schedulers {
if c.Type == typ {
if c.Type == types.SchedulerTypeCompatibleMap[typ] {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming to ConvertNewType2OldStr

cfg = c
break
}
Expand All @@ -288,20 +289,23 @@
log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args))
continue
}
s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler)
s, err := schedulers.CreateScheduler(types.ConvertOldStr2Type[cfg.Type], c.opController,
c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
continue
}
if needRun {
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
log.Error("can not add scheduler with independent configuration",
zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
} else {
log.Info("create scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddSchedulerHandler(s); err != nil {
log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
log.Error("can not add scheduler handler with independent configuration",
zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))

Check warning on line 308 in pkg/schedule/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/coordinator.go#L307-L308

Added lines #L307 - L308 were not covered by tests
}
}
}
Expand All @@ -316,16 +320,22 @@
continue
}

s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args), c.schedulers.RemoveScheduler)
tp := types.ConvertOldStr2Type[schedulerCfg.Type]
s, err := schedulers.CreateScheduler(tp, c.opController,
c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(tp, schedulerCfg.Args), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
log.Error("can not create scheduler", zap.Stringer("type", tp), zap.String("scheduler-type", schedulerCfg.Type),
zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
continue
}

if needRun {
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()),
zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil &&
!errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name",
s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))

Check warning on line 338 in pkg/schedule/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/coordinator.go#L337-L338

Added lines #L337 - L338 were not covered by tests
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
Expand Down Expand Up @@ -362,7 +372,7 @@
log.Error("GetFunction SchedulerType error", errs.ZapError(err))
return
}
schedulerType := SchedulerType.(func() string)
schedulerType := SchedulerType.(func() types.CheckerSchedulerType)

Check warning on line 375 in pkg/schedule/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/coordinator.go#L375

Added line #L375 was not covered by tests
// get func: SchedulerArgs from plugin
SchedulerArgs, err := c.pluginInterface.GetFunction(pluginPath, "SchedulerArgs")
if err != nil {
Expand All @@ -373,7 +383,7 @@
// create and add user scheduler
s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), errs.ZapError(err))
log.Error("can not create scheduler", zap.Stringer("scheduler-type", schedulerType()), errs.ZapError(err))

Check warning on line 386 in pkg/schedule/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/coordinator.go#L386

Added line #L386 was not covered by tests
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
Expand Down
14 changes: 7 additions & 7 deletions pkg/schedule/schedulers/balance_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func BenchmarkPlacementRule(b *testing.B) {
re := assert.New(b)
cancel, tc, oc := newBenchCluster(true, true, false)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
var ops []*operator.Operator
var plans []plan.Plan
Expand All @@ -171,7 +171,7 @@ func BenchmarkPlacementRule(b *testing.B) {
func BenchmarkLabel(b *testing.B) {
cancel, tc, oc := newBenchCluster(false, true, false)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, false)
Expand All @@ -181,7 +181,7 @@ func BenchmarkLabel(b *testing.B) {
func BenchmarkNoLabel(b *testing.B) {
cancel, tc, oc := newBenchCluster(false, false, false)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, false)
Expand All @@ -191,7 +191,7 @@ func BenchmarkNoLabel(b *testing.B) {
func BenchmarkDiagnosticNoLabel1(b *testing.B) {
cancel, tc, oc := newBenchCluster(false, false, false)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, true)
Expand All @@ -201,7 +201,7 @@ func BenchmarkDiagnosticNoLabel1(b *testing.B) {
func BenchmarkDiagnosticNoLabel2(b *testing.B) {
cancel, tc, oc := newBenchBigCluster(100, 100)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, true)
Expand All @@ -211,7 +211,7 @@ func BenchmarkDiagnosticNoLabel2(b *testing.B) {
func BenchmarkNoLabel2(b *testing.B) {
cancel, tc, oc := newBenchBigCluster(100, 100)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, false)
Expand All @@ -221,7 +221,7 @@ func BenchmarkNoLabel2(b *testing.B) {
func BenchmarkTombStore(b *testing.B) {
cancel, tc, oc := newBenchCluster(false, false, true)
defer cancel()
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
sc.Schedule(tc, false)
Expand Down
4 changes: 1 addition & 3 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ import (
const (
// BalanceLeaderName is balance leader scheduler name.
BalanceLeaderName = "balance-leader-scheduler"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we will unify it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be in the next pr

// BalanceLeaderType is balance leader scheduler type.
BalanceLeaderType = "balance-leader"
// BalanceLeaderBatchSize is the default number of operators to transfer leaders by one scheduling.
// Default value is 4 which is subjected by scheduler-max-waiting-operator and leader-schedule-limit
// If you want to increase balance speed more, please increase above-mentioned param.
Expand Down Expand Up @@ -536,7 +534,7 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
}
solver.Step++
defer func() { solver.Step-- }()
op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(l.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader)
if err != nil {
log.Debug("fail to create balance leader operator", errs.ZapError(err))
if collector != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
const (
// BalanceRegionName is balance region scheduler name.
BalanceRegionName = "balance-region-scheduler"
// BalanceRegionType is balance region scheduler type.
BalanceRegionType = "balance-region"
)

type balanceRegionSchedulerConfig struct {
Expand Down Expand Up @@ -245,7 +243,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
oldPeer := solver.Region.GetStorePeer(sourceID)
newPeer := &metapb.Peer{StoreId: solver.Target.GetID(), Role: oldPeer.Role}
solver.Step++
op, err := operator.CreateMovePeerOperator(BalanceRegionType, solver, solver.Region, operator.OpRegion, oldPeer.GetStoreId(), newPeer)
op, err := operator.CreateMovePeerOperator(s.GetName(), solver, solver.Region, operator.OpRegion, oldPeer.GetStoreId(), newPeer)
if err != nil {
balanceRegionCreateOpFailCounter.Inc()
if collector != nil {
Expand Down
Loading