Skip to content

Commit

Permalink
scheduler: GetType() returns types.CheckerSchedulerType directly (tik…
Browse files Browse the repository at this point in the history
…v#8440)

ref tikv#8379

- GetType() returns types.CheckerSchedulerType directly
- keep the compatibility in `PersistOptions` and `PersistConfig“
- wrap `operator.OperatorLimitCounter.WithLabelValues().Inc()` to func `IncOperatorLimitCounter`

Signed-off-by: okJiang <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] committed Aug 1, 2024
1 parent cb6e6e2 commit 87ec788
Show file tree
Hide file tree
Showing 35 changed files with 175 additions and 321 deletions.
4 changes: 3 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
Expand Down Expand Up @@ -343,8 +344,9 @@ func (c *Cluster) updateScheduler() {
// Remove the deleted schedulers.
for _, name := range schedulersController.GetSchedulerNames() {
scheduler := schedulersController.GetScheduler(name)
oldType := types.SchedulerTypeCompatibleMap[scheduler.GetType()]
if slice.AnyOf(latestSchedulersConfig, func(i int) bool {
return latestSchedulersConfig[i].Type == scheduler.GetType()
return latestSchedulersConfig[i].Type == oldType
}) {
continue
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mcs/utils"
sc "github.com/tikv/pd/pkg/schedule/config"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/configutil"
Expand Down Expand Up @@ -646,10 +647,11 @@ func (o *PersistConfig) SetMaxReplicas(replicas int) {
}

// IsSchedulerDisabled returns if the scheduler is disabled.
func (o *PersistConfig) IsSchedulerDisabled(t string) bool {
func (o *PersistConfig) IsSchedulerDisabled(tp types.CheckerSchedulerType) bool {
oldType := types.SchedulerTypeCompatibleMap[tp]
schedulers := o.GetScheduleConfig().Schedulers
for _, s := range schedulers {
if t == s.Type {
if oldType == s.Type {
return s.Disable
}
}
Expand Down Expand Up @@ -739,11 +741,11 @@ func (o *PersistConfig) IsRaftKV2() bool {

// AddSchedulerCfg adds the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from API server now.
func (*PersistConfig) AddSchedulerCfg(string, []string) {}
func (*PersistConfig) AddSchedulerCfg(types.CheckerSchedulerType, []string) {}

// RemoveSchedulerCfg removes the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from API server now.
func (*PersistConfig) RemoveSchedulerCfg(string) {}
func (*PersistConfig) RemoveSchedulerCfg(types.CheckerSchedulerType) {}

// CheckLabelProperty checks if the label property is satisfied.
func (*PersistConfig) CheckLabelProperty(string, []*metapb.StoreLabel) bool {
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
if opController.OperatorCount(operator.OpReplica) < c.conf.GetReplicaScheduleLimit() {
return []*operator.Operator{op}
}
operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.Name(), operator.OpReplica.String()).Inc()
operator.IncOperatorLimitCounter(c.ruleChecker.GetType(), operator.OpReplica)
c.pendingProcessedRegions.Put(region.GetID(), nil)
}
}
Expand All @@ -253,7 +253,7 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
if opController.OperatorCount(operator.OpReplica) < c.conf.GetReplicaScheduleLimit() {
return []*operator.Operator{op}
}
operator.OperatorLimitCounter.WithLabelValues(c.replicaChecker.Name(), operator.OpReplica.String()).Inc()
operator.IncOperatorLimitCounter(c.replicaChecker.GetType(), operator.OpReplica)
c.pendingProcessedRegions.Put(region.GetID(), nil)
}
}
Expand All @@ -270,7 +270,7 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
if c.mergeChecker != nil {
allowed := opController.OperatorCount(operator.OpMerge) < c.conf.GetMergeScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(c.mergeChecker.GetType(), operator.OpMerge.String()).Inc()
operator.IncOperatorLimitCounter(c.mergeChecker.GetType(), operator.OpMerge)
} else if ops := c.mergeChecker.Check(region); ops != nil {
// It makes sure that two operators can be added successfully altogether.
return ops
Expand Down
5 changes: 3 additions & 2 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/utils/logutil"
)

Expand Down Expand Up @@ -69,8 +70,8 @@ func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf conf
}

// GetType return MergeChecker's type
func (*MergeChecker) GetType() string {
return "merge-checker"
func (*MergeChecker) GetType() types.CheckerSchedulerType {
return types.MergeChecker
}

// RecordRegionSplit put the recently split region into cache. MergeChecker
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (*ReplicaChecker) Name() string {
return types.ReplicaChecker.String()
}

// GetType return ReplicaChecker's type.
func (*ReplicaChecker) GetType() types.CheckerSchedulerType {
return types.ReplicaChecker
}

// Check verifies a region's replicas, creating an operator.Operator if need.
func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
replicaCheckerCounter.Inc()
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func (*RuleChecker) Name() string {
return types.RuleChecker.String()
}

// GetType returns RuleChecker's type.
func (*RuleChecker) GetType() types.CheckerSchedulerType {
return types.RuleChecker
}

// Check checks if the region matches placement rules and returns Operator to
// fix it.
func (c *RuleChecker) Check(region *core.RegionInfo) *operator.Operator {
Expand Down
7 changes: 4 additions & 3 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/storage/endpoint"
)

Expand Down Expand Up @@ -49,9 +50,9 @@ type SchedulerConfigProvider interface {
SetSchedulingAllowanceStatus(bool, string)
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(string) bool
AddSchedulerCfg(string, []string)
RemoveSchedulerCfg(string)
IsSchedulerDisabled(types.CheckerSchedulerType) bool
AddSchedulerCfg(types.CheckerSchedulerType, []string)
RemoveSchedulerCfg(types.CheckerSchedulerType)
Persist(endpoint.ConfigStorage) error

GetRegionScheduleLimit() uint64
Expand Down
15 changes: 11 additions & 4 deletions pkg/schedule/operator/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package operator

import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
types "github.com/tikv/pd/pkg/schedule/type"
)

var (
operatorStepDuration = prometheus.NewHistogramVec(
Expand All @@ -26,8 +29,7 @@ var (
Buckets: []float64{0.5, 1, 2, 4, 8, 16, 20, 40, 60, 90, 120, 180, 240, 300, 480, 600, 720, 900, 1200, 1800, 3600},
}, []string{"type"})

// OperatorLimitCounter exposes the counter when meeting limit.
OperatorLimitCounter = prometheus.NewCounterVec(
operatorLimitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Expand Down Expand Up @@ -82,10 +84,15 @@ var (

func init() {
prometheus.MustRegister(operatorStepDuration)
prometheus.MustRegister(OperatorLimitCounter)
prometheus.MustRegister(operatorLimitCounter)
prometheus.MustRegister(OperatorExceededStoreLimitCounter)
prometheus.MustRegister(operatorCounter)
prometheus.MustRegister(operatorDuration)
prometheus.MustRegister(operatorSizeHist)
prometheus.MustRegister(storeLimitCostCounter)
}

// IncOperatorLimitCounter increases the counter of operator meeting limit.
func IncOperatorLimitCounter(typ types.CheckerSchedulerType, kind OpKind) {
operatorLimitCounter.WithLabelValues(typ.String(), kind.String()).Inc()
}
24 changes: 3 additions & 21 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.R
type balanceLeaderScheduler struct {
*BaseScheduler
*retryQuota
name string
conf *balanceLeaderSchedulerConfig
handler http.Handler
filters []filter.Filter
Expand All @@ -176,14 +175,11 @@ type balanceLeaderScheduler struct {
// newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on
// each store balanced.
func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) Scheduler {
base := NewBaseScheduler(opController)
s := &balanceLeaderScheduler{
BaseScheduler: base,
BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler),
retryQuota: newRetryQuota(),
name: BalanceLeaderName,
conf: conf,
handler: newBalanceLeaderHandler(conf),
filterCounter: filter.NewCounter(types.BalanceLeaderScheduler.String()),
}
for _, option := range options {
option(s)
Expand All @@ -192,6 +188,7 @@ func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceL
&filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.High},
filter.NewSpecialUseFilter(s.GetName()),
}
s.filterCounter = filter.NewCounter(s.GetName())
return s
}

Expand All @@ -202,28 +199,13 @@ func (l *balanceLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Reques
// BalanceLeaderCreateOption is used to create a scheduler with an option.
type BalanceLeaderCreateOption func(s *balanceLeaderScheduler)

// WithBalanceLeaderFilterCounterName sets the filter counter name for the scheduler.
func WithBalanceLeaderFilterCounterName(name string) BalanceLeaderCreateOption {
return func(s *balanceLeaderScheduler) {
s.filterCounter.SetScope(name)
}
}

// WithBalanceLeaderName sets the name for the scheduler.
func WithBalanceLeaderName(name string) BalanceLeaderCreateOption {
return func(s *balanceLeaderScheduler) {
s.name = name
}
}

func (l *balanceLeaderScheduler) GetName() string {
return l.name
}

func (*balanceLeaderScheduler) GetType() string {
return BalanceLeaderType
}

func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
l.conf.RLock()
defer l.conf.RUnlock()
Expand Down Expand Up @@ -252,7 +234,7 @@ func (l *balanceLeaderScheduler) ReloadConfig() error {
func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(l.GetType(), operator.OpLeader.String()).Inc()
operator.IncOperatorLimitCounter(l.GetType(), operator.OpLeader)
}
return allowed
}
Expand Down
27 changes: 6 additions & 21 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ const (
)

type balanceRegionSchedulerConfig struct {
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
// TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler.
}

type balanceRegionScheduler struct {
*BaseScheduler
*retryQuota
name string
conf *balanceRegionSchedulerConfig
filters []filter.Filter
filterCounter *filter.Counter
Expand All @@ -54,12 +54,11 @@ type balanceRegionScheduler struct {
// newBalanceRegionScheduler creates a scheduler that tends to keep regions on
// each store balanced.
func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) Scheduler {
base := NewBaseScheduler(opController)
scheduler := &balanceRegionScheduler{
BaseScheduler: base,
BaseScheduler: NewBaseScheduler(opController, types.BalanceRegionScheduler),
retryQuota: newRetryQuota(),
name: types.BalanceRegionScheduler.String(),
conf: conf,
filterCounter: filter.NewCounter(types.BalanceRegionScheduler.String()),
}
for _, setOption := range opts {
setOption(scheduler)
Expand All @@ -68,6 +67,7 @@ func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceR
&filter.StoreStateFilter{ActionScope: scheduler.GetName(), MoveRegion: true, OperatorLevel: constant.Medium},
filter.NewSpecialUseFilter(scheduler.GetName()),
}
scheduler.filterCounter = filter.NewCounter(scheduler.GetName())
return scheduler
}

Expand All @@ -77,33 +77,18 @@ type BalanceRegionCreateOption func(s *balanceRegionScheduler)
// WithBalanceRegionName sets the name for the scheduler.
func WithBalanceRegionName(name string) BalanceRegionCreateOption {
return func(s *balanceRegionScheduler) {
s.conf.Name = name
s.name = name
}
}

// WithBalanceRegionFilterCounterName sets the filter counter name for the scheduler.
func WithBalanceRegionFilterCounterName(name string) BalanceRegionCreateOption {
return func(s *balanceRegionScheduler) {
s.filterCounter.SetScope(name)
}
}

func (s *balanceRegionScheduler) GetName() string {
return s.conf.Name
}

func (*balanceRegionScheduler) GetType() string {
return BalanceRegionType
}

func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetSchedulerConfig().GetRegionScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpRegion.String()).Inc()
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRegion)
}
return allowed
}
Expand Down
22 changes: 2 additions & 20 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func (handler *balanceWitnessHandler) ListConfig(w http.ResponseWriter, _ *http.
type balanceWitnessScheduler struct {
*BaseScheduler
*retryQuota
name string
conf *balanceWitnessSchedulerConfig
handler http.Handler
filters []filter.Filter
Expand All @@ -174,11 +173,9 @@ type balanceWitnessScheduler struct {
// newBalanceWitnessScheduler creates a scheduler that tends to keep witnesses on
// each store balanced.
func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) Scheduler {
base := NewBaseScheduler(opController)
s := &balanceWitnessScheduler{
BaseScheduler: base,
BaseScheduler: NewBaseScheduler(opController, types.BalanceWitnessScheduler),
retryQuota: newRetryQuota(),
name: BalanceWitnessName,
conf: conf,
handler: newBalanceWitnessHandler(conf),
counter: balanceWitnessCounter,
Expand Down Expand Up @@ -208,21 +205,6 @@ func WithBalanceWitnessCounter(counter *prometheus.CounterVec) BalanceWitnessCre
}
}

// WithBalanceWitnessName sets the name for the scheduler.
func WithBalanceWitnessName(name string) BalanceWitnessCreateOption {
return func(s *balanceWitnessScheduler) {
s.name = name
}
}

func (b *balanceWitnessScheduler) GetName() string {
return b.name
}

func (*balanceWitnessScheduler) GetType() string {
return BalanceWitnessType
}

func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) {
b.conf.RLock()
defer b.conf.RUnlock()
Expand Down Expand Up @@ -251,7 +233,7 @@ func (b *balanceWitnessScheduler) ReloadConfig() error {
func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := b.OpController.OperatorCount(operator.OpWitness) < cluster.GetSchedulerConfig().GetWitnessScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(b.GetType(), operator.OpWitness.String()).Inc()
operator.IncOperatorLimitCounter(b.GetType(), operator.OpWitness)
}
return allowed
}
Expand Down
Loading

0 comments on commit 87ec788

Please sign in to comment.