Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduling: sync schedulers from the API server #7076

Merged
merged 5 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ import (
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/schedule"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -175,6 +179,100 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
return c.apiServerLeader.CompareAndSwap(old, new)
}

// UpdateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion.
func (c *Cluster) UpdateScheduler() {
defer logutil.LogPanic()

// Make sure the coordinator has initialized all the existing schedulers.
c.waitSchedulersInitialized()
// Establish a notifier to listen the schedulers updating.
notifier := make(chan struct{}, 1)
// Make sure the check will be triggered once later.
notifier <- struct{}{}
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)
for {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-notifier:
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
var (
schedulersController = c.coordinator.GetSchedulersController()
latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers
)
// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
s, err := schedulers.CreateScheduler(
scheduler.Type,
c.coordinator.GetOperatorController(),
c.storage,
schedulers.ConfigSliceDecoder(scheduler.Type, scheduler.Args),
schedulersController.RemoveScheduler,
)
if err != nil {
log.Error("failed to create scheduler",
zap.String("scheduler-type", scheduler.Type),
zap.Strings("scheduler-args", scheduler.Args),
errs.ZapError(err))
continue
}
name := s.GetName()
if existed, _ := schedulersController.IsSchedulerExisted(name); existed {
log.Info("scheduler has already existed, skip adding it",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args))
continue
}
if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil {
log.Error("failed to add scheduler",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args),
errs.ZapError(err))
continue
}
log.Info("add scheduler successfully",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args))
}
// Remove the deleted schedulers.
for _, name := range schedulersController.GetSchedulerNames() {
scheduler := schedulersController.GetScheduler(name)
if slice.AnyOf(latestSchedulersConfig, func(i int) bool {
return latestSchedulersConfig[i].Type == scheduler.GetType()
}) {
continue
}
if err := schedulersController.RemoveScheduler(name); err != nil {
log.Error("failed to remove scheduler",
zap.String("scheduler-name", name),
errs.ZapError(err))
continue
}
log.Info("remove scheduler successfully",
zap.String("scheduler-name", name))
}
}
}

func (c *Cluster) waitSchedulersInitialized() {
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
for {
if c.coordinator.AreSchedulersInitialized() {
return
}
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop waiting the schedulers initialization")
return
case <-ticker.C:
}
}
}

// TODO: implement the following methods

// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
Expand Down
35 changes: 30 additions & 5 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ type PersistConfig struct {
schedule atomic.Value
replication atomic.Value
storeConfig atomic.Value
// schedulersUpdatingNotifier is used to notify that the schedulers have been updated.
// Store as `chan<- struct{}`.
schedulersUpdatingNotifier atomic.Value
}

// NewPersistConfig creates a new PersistConfig instance.
Expand All @@ -217,6 +220,19 @@ func NewPersistConfig(cfg *Config) *PersistConfig {
return o
}

// SetSchedulersUpdatingNotifier sets the schedulers updating notifier.
func (o *PersistConfig) SetSchedulersUpdatingNotifier(notifier chan<- struct{}) {
o.schedulersUpdatingNotifier.Store(notifier)
}

func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} {
v := o.schedulersUpdatingNotifier.Load()
if v == nil {
return nil
}
return v.(chan<- struct{})
}

// GetClusterVersion returns the cluster version.
func (o *PersistConfig) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand All @@ -232,12 +248,19 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig {
return o.schedule.Load().(*sc.ScheduleConfig)
}

// SetScheduleConfig sets the scheduling configuration.
// SetScheduleConfig sets the scheduling configuration dynamically.
func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
old := o.GetScheduleConfig()
o.schedule.Store(cfg)
// The coordinator is not aware of the underlying scheduler config changes, however, it
// should react on the scheduler number changes to handle the add/remove scheduler events.
if notifier := o.getSchedulersUpdatingNotifier(); notifier != nil &&
len(old.Schedulers) != len(cfg.Schedulers) {
notifier <- struct{}{}
}
}

// AdjustScheduleCfg adjusts the schedule config.
// AdjustScheduleCfg adjusts the schedule config during the initialization.
func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
Expand Down Expand Up @@ -616,8 +639,13 @@ func (o *PersistConfig) IsRaftKV2() bool {
// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from API server now.
func (o *PersistConfig) AddSchedulerCfg(string, []string) {}

// RemoveSchedulerCfg removes the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from API server now.
func (o *PersistConfig) RemoveSchedulerCfg(tp string) {}

// CheckLabelProperty checks if the label property is satisfied.
func (o *PersistConfig) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
return false
Expand All @@ -633,6 +661,3 @@ func (o *PersistConfig) IsTraceRegionFlow() bool {
func (o *PersistConfig) Persist(storage endpoint.ConfigStorage) error {
return nil
}

// RemoveSchedulerCfg removes the scheduler configurations.
func (o *PersistConfig) RemoveSchedulerCfg(tp string) {}
39 changes: 34 additions & 5 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"encoding/json"
"strings"
"sync"
"sync/atomic"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/log"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand Down Expand Up @@ -50,10 +52,14 @@ type Watcher struct {
configWatcher *etcdutil.LoopWatcher
schedulerConfigWatcher *etcdutil.LoopWatcher

// Some data, like the global schedule config, should be loaded into `PersistConfig`.
*PersistConfig
// Some data, like the scheduler configs, should be loaded into the storage
// to make sure the coordinator could access them correctly.
storage storage.Storage
// schedulersController is used to trigger the scheduler's config reloading.
// Store as `*schedulers.Controller`.
schedulersController atomic.Value
}

type persistedConfig struct {
Expand Down Expand Up @@ -92,6 +98,19 @@ func NewWatcher(
return cw, nil
}

// SetSchedulersController sets the schedulers controller.
func (cw *Watcher) SetSchedulersController(sc *schedulers.Controller) {
cw.schedulersController.Store(sc)
}

func (cw *Watcher) getSchedulersController() *schedulers.Controller {
sc := cw.schedulersController.Load()
if sc == nil {
return nil
}
return sc.(*schedulers.Controller)
}

func (cw *Watcher) initializeConfigWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
cfg := &persistedConfig{}
Expand Down Expand Up @@ -126,13 +145,23 @@ func (cw *Watcher) initializeConfigWatcher() error {
func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
return cw.storage.SaveScheduleConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
kv.Value,
)
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
zap.String("event-kv-key", string(kv.Key)),
zap.String("trimmed-key", name),
zap.Error(err))
return err
}
// Ensure the scheduler config could be updated as soon as possible.
if sc := cw.getSchedulersController(); sc != nil {
return sc.ReloadSchedulerConfig(name)
}
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return cw.storage.RemoveScheduleConfig(
return cw.storage.RemoveSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ func (s *Server) startCluster(context.Context) error {
if err != nil {
return err
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
go s.cluster.UpdateScheduler()
go s.GetCoordinator().RunUntilStop()
return nil
}
Expand Down
54 changes: 37 additions & 17 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ var (
type Coordinator struct {
syncutil.RWMutex

wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

schedulersInitialized bool

cluster sche.ClusterInformer
prepareChecker *prepareChecker
checkers *checker.Controller
Expand All @@ -91,19 +94,34 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController)
checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController)
return &Coordinator{
ctx: ctx,
cancel: cancel,
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checkers,
regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddSuspectRegions),
regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddSuspectRegions),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
diagnosticManager: diagnostic.NewManager(schedulers, cluster.GetSchedulerConfig()),
}
ctx: ctx,
cancel: cancel,
schedulersInitialized: false,
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checkers,
regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddSuspectRegions),
regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddSuspectRegions),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
diagnosticManager: diagnostic.NewManager(schedulers, cluster.GetSchedulerConfig()),
}
}

// markSchedulersInitialized marks the scheduler initialization is finished.
func (c *Coordinator) markSchedulersInitialized() {
c.Lock()
defer c.Unlock()
c.schedulersInitialized = true
}

// AreSchedulersInitialized returns whether the schedulers have been initialized.
func (c *Coordinator) AreSchedulersInitialized() bool {
c.RLock()
defer c.RUnlock()
return c.schedulersInitialized
}

// GetWaitingRegions returns the regions in the waiting list.
Expand Down Expand Up @@ -399,7 +417,7 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
err error
)
for i := 0; i < maxLoadConfigRetries; i++ {
scheduleNames, configs, err = c.cluster.GetStorage().LoadAllScheduleConfig()
scheduleNames, configs, err = c.cluster.GetStorage().LoadAllSchedulerConfigs()
select {
case <-c.ctx.Done():
log.Info("init schedulers has been stopped")
Expand Down Expand Up @@ -485,6 +503,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
if err := c.cluster.GetSchedulerConfig().Persist(c.cluster.GetStorage()); err != nil {
log.Error("cannot persist schedule config", errs.ZapError(err))
}

c.markSchedulersInitialized()
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}

// LoadPlugin load user plugin
Expand Down
21 changes: 20 additions & 1 deletion pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (conf *balanceLeaderSchedulerConfig) persistLocked() error {
if err != nil {
return err
}
return conf.storage.SaveScheduleConfig(BalanceLeaderName, data)
return conf.storage.SaveSchedulerConfig(BalanceLeaderName, data)
}

type balanceLeaderHandler struct {
Expand Down Expand Up @@ -215,6 +215,25 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(l.conf)
}

func (l *balanceLeaderScheduler) ReloadConfig() error {
l.conf.mu.Lock()
defer l.conf.mu.Unlock()
cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName())
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}
newCfg := &balanceLeaderSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
return err
}
l.conf.Ranges = newCfg.Ranges
l.conf.Batch = newCfg.Batch
return nil
}

func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit()
if !allowed {
Expand Down
Loading
Loading