Skip to content

Commit

Permalink
scheduling: sync schedulers from the API server (tikv#7076)
Browse files Browse the repository at this point in the history
ref tikv#5839

- Sync the schedulers from the API server.
- Dynamically reload the scheduler config.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Sep 14, 2023
1 parent 1a0c8fb commit e295e62
Show file tree
Hide file tree
Showing 31 changed files with 483 additions and 106 deletions.
98 changes: 98 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ import (
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/schedule"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -175,6 +179,100 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
return c.apiServerLeader.CompareAndSwap(old, new)
}

// UpdateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion.
func (c *Cluster) UpdateScheduler() {
defer logutil.LogPanic()

// Make sure the coordinator has initialized all the existing schedulers.
c.waitSchedulersInitialized()
// Establish a notifier to listen the schedulers updating.
notifier := make(chan struct{}, 1)
// Make sure the check will be triggered once later.
notifier <- struct{}{}
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)
for {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-notifier:
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
var (
schedulersController = c.coordinator.GetSchedulersController()
latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers
)
// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
s, err := schedulers.CreateScheduler(
scheduler.Type,
c.coordinator.GetOperatorController(),
c.storage,
schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args),
schedulersController.RemoveScheduler,
)
if err != nil {
log.Error("failed to create scheduler",
zap.String("scheduler-type", scheduler.Type),
zap.Strings("scheduler-args", scheduler.Args),
errs.ZapError(err))
continue
}
name := s.GetName()
if existed, _ := schedulersController.IsSchedulerExisted(name); existed {
log.Info("scheduler has already existed, skip adding it",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args))
continue
}
if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil {
log.Error("failed to add scheduler",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args),
errs.ZapError(err))
continue
}
log.Info("add scheduler successfully",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args))
}
// Remove the deleted schedulers.
for _, name := range schedulersController.GetSchedulerNames() {
scheduler := schedulersController.GetScheduler(name)
if slice.AnyOf(latestSchedulersConfig, func(i int) bool {
return latestSchedulersConfig[i].Type == scheduler.GetType()
}) {
continue
}
if err := schedulersController.RemoveScheduler(name); err != nil {
log.Error("failed to remove scheduler",
zap.String("scheduler-name", name),
errs.ZapError(err))
continue
}
log.Info("remove scheduler successfully",
zap.String("scheduler-name", name))
}
}
}

func (c *Cluster) waitSchedulersInitialized() {
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
for {
if c.coordinator.AreSchedulersInitialized() {
return
}
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop waiting the schedulers initialization")
return
case <-ticker.C:
}
}
}

// TODO: implement the following methods

// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
Expand Down
35 changes: 30 additions & 5 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ type PersistConfig struct {
schedule atomic.Value
replication atomic.Value
storeConfig atomic.Value
// schedulersUpdatingNotifier is used to notify that the schedulers have been updated.
// Store as `chan<- struct{}`.
schedulersUpdatingNotifier atomic.Value
}

// NewPersistConfig creates a new PersistConfig instance.
Expand All @@ -217,6 +220,19 @@ func NewPersistConfig(cfg *Config) *PersistConfig {
return o
}

// SetSchedulersUpdatingNotifier sets the schedulers updating notifier.
func (o *PersistConfig) SetSchedulersUpdatingNotifier(notifier chan<- struct{}) {
o.schedulersUpdatingNotifier.Store(notifier)
}

func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} {
v := o.schedulersUpdatingNotifier.Load()
if v == nil {
return nil
}
return v.(chan<- struct{})
}

// GetClusterVersion returns the cluster version.
func (o *PersistConfig) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand All @@ -232,12 +248,19 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig {
return o.schedule.Load().(*sc.ScheduleConfig)
}

// SetScheduleConfig sets the scheduling configuration.
// SetScheduleConfig sets the scheduling configuration dynamically.
func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
old := o.GetScheduleConfig()
o.schedule.Store(cfg)
// The coordinator is not aware of the underlying scheduler config changes, however, it
// should react on the scheduler number changes to handle the add/remove scheduler events.
if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil &&
len(old.Schedulers) != len(cfg.Schedulers) {
notifier <- struct{}{}
}
}

// AdjustScheduleCfg adjusts the schedule config.
// AdjustScheduleCfg adjusts the schedule config during the initialization.
func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
Expand Down Expand Up @@ -616,8 +639,13 @@ func (o *PersistConfig) IsRaftKV2() bool {
// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from API server now.
func (o *PersistConfig) AddSchedulerCfg(string, []string) {}

// RemoveSchedulerCfg removes the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from API server now.
func (o *PersistConfig) RemoveSchedulerCfg(tp string) {}

// CheckLabelProperty checks if the label property is satisfied.
func (o *PersistConfig) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
return false
Expand All @@ -633,6 +661,3 @@ func (o *PersistConfig) IsTraceRegionFlow() bool {
func (o *PersistConfig) Persist(storage endpoint.ConfigStorage) error {
return nil
}

// RemoveSchedulerCfg removes the scheduler configurations.
func (o *PersistConfig) RemoveSchedulerCfg(tp string) {}
39 changes: 34 additions & 5 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"encoding/json"
"strings"
"sync"
"sync/atomic"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/log"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand Down Expand Up @@ -50,10 +52,14 @@ type Watcher struct {
configWatcher *etcdutil.LoopWatcher
schedulerConfigWatcher *etcdutil.LoopWatcher

// Some data, like the global schedule config, should be loaded into `PersistConfig`.
*PersistConfig
// Some data, like the scheduler configs, should be loaded into the storage
// to make sure the coordinator could access them correctly.
storage storage.Storage
// schedulersController is used to trigger the scheduler's config reloading.
// Store as `*schedulers.Controller`.
schedulersController atomic.Value
}

type persistedConfig struct {
Expand Down Expand Up @@ -92,6 +98,19 @@ func NewWatcher(
return cw, nil
}

// SetSchedulersController sets the schedulers controller.
func (cw *Watcher) SetSchedulersController(sc *schedulers.Controller) {
cw.schedulersController.Store(sc)
}

func (cw *Watcher) getSchedulersController() *schedulers.Controller {
sc := cw.schedulersController.Load()
if sc == nil {
return nil
}
return sc.(*schedulers.Controller)
}

func (cw *Watcher) initializeConfigWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
cfg := &persistedConfig{}
Expand Down Expand Up @@ -126,13 +145,23 @@ func (cw *Watcher) initializeConfigWatcher() error {
func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
return cw.storage.SaveScheduleConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
kv.Value,
)
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
zap.String("event-kv-key", string(kv.Key)),
zap.String("trimmed-key", name),
zap.Error(err))
return err
}
// Ensure the scheduler config could be updated as soon as possible.
if sc := cw.getSchedulersController(); sc != nil {
return sc.ReloadSchedulerConfig(name)
}
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return cw.storage.RemoveScheduleConfig(
return cw.storage.RemoveSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ func (s *Server) startCluster(context.Context) error {
if err != nil {
return err
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
go s.cluster.UpdateScheduler()
go s.GetCoordinator().RunUntilStop()
return nil
}
Expand Down
54 changes: 37 additions & 17 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ var (
type Coordinator struct {
syncutil.RWMutex

wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

schedulersInitialized bool

cluster sche.ClusterInformer
prepareChecker *prepareChecker
checkers *checker.Controller
Expand All @@ -91,19 +94,34 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController)
checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController)
return &Coordinator{
ctx: ctx,
cancel: cancel,
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checkers,
regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddSuspectRegions),
regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddSuspectRegions),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
diagnosticManager: diagnostic.NewManager(schedulers, cluster.GetSchedulerConfig()),
}
ctx: ctx,
cancel: cancel,
schedulersInitialized: false,
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checkers,
regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddSuspectRegions),
regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddSuspectRegions),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
diagnosticManager: diagnostic.NewManager(schedulers, cluster.GetSchedulerConfig()),
}
}

// markSchedulersInitialized marks the scheduler initialization is finished.
func (c *Coordinator) markSchedulersInitialized() {
c.Lock()
defer c.Unlock()
c.schedulersInitialized = true
}

// AreSchedulersInitialized returns whether the schedulers have been initialized.
func (c *Coordinator) AreSchedulersInitialized() bool {
c.RLock()
defer c.RUnlock()
return c.schedulersInitialized
}

// GetWaitingRegions returns the regions in the waiting list.
Expand Down Expand Up @@ -399,7 +417,7 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
err error
)
for i := 0; i < maxLoadConfigRetries; i++ {
scheduleNames, configs, err = c.cluster.GetStorage().LoadAllScheduleConfig()
scheduleNames, configs, err = c.cluster.GetStorage().LoadAllSchedulerConfigs()
select {
case <-c.ctx.Done():
log.Info("init schedulers has been stopped")
Expand Down Expand Up @@ -485,6 +503,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
if err := c.cluster.GetSchedulerConfig().Persist(c.cluster.GetStorage()); err != nil {
log.Error("cannot persist schedule config", errs.ZapError(err))
}

c.markSchedulersInitialized()
}

// LoadPlugin load user plugin
Expand Down
21 changes: 20 additions & 1 deletion pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (conf *balanceLeaderSchedulerConfig) persistLocked() error {
if err != nil {
return err
}
return conf.storage.SaveScheduleConfig(BalanceLeaderName, data)
return conf.storage.SaveSchedulerConfig(BalanceLeaderName, data)
}

type balanceLeaderHandler struct {
Expand Down Expand Up @@ -215,6 +215,25 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(l.conf)
}

func (l *balanceLeaderScheduler) ReloadConfig() error {
l.conf.mu.Lock()
defer l.conf.mu.Unlock()
cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName())
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}
newCfg := &balanceLeaderSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
return err
}
l.conf.Ranges = newCfg.Ranges
l.conf.Batch = newCfg.Batch
return nil
}

func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit()
if !allowed {
Expand Down
Loading

0 comments on commit e295e62

Please sign in to comment.