Skip to content

Commit

Permalink
mcs: disable PD scheduling when enabling scheduling service (#7039)
Browse files Browse the repository at this point in the history
ref #5839, close #7041

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 6, 2023
1 parent 7d50755 commit 745c942
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 119 deletions.
3 changes: 3 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -162,6 +163,7 @@ func (s *Server) updateAPIServerMemberLoop() {
members, err := s.GetClient().MemberList(ctx)
if err != nil {
log.Warn("failed to list members", errs.ZapError(err))
continue
}
for _, ep := range members.Members {
status, err := s.GetClient().Status(ctx, ep.ClientURLs[0])
Expand Down Expand Up @@ -438,6 +440,7 @@ func CreateServer(ctx context.Context, cfg *config.Config) *Server {

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(cmd *cobra.Command, args []string) {
schedulers.Register()
cmd.Flags().Parse(args)
cfg := config.NewConfig()
flagSet := cmd.Flags()
Expand Down
27 changes: 16 additions & 11 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (c *Coordinator) Run() {
}
}
log.Info("Coordinator starts to run schedulers")
c.initSchedulers()
c.InitSchedulers(true)

c.wg.Add(4)
// Starts to patrol regions.
Expand All @@ -391,7 +391,8 @@ func (c *Coordinator) Run() {
go c.driveSlowNodeScheduler()
}

func (c *Coordinator) initSchedulers() {
// InitSchedulers initializes schedulers.
func (c *Coordinator) InitSchedulers(needRun bool) {
var (
scheduleNames []string
configs []string
Expand All @@ -401,7 +402,7 @@ func (c *Coordinator) initSchedulers() {
scheduleNames, configs, err = c.cluster.GetStorage().LoadAllScheduleConfig()
select {
case <-c.ctx.Done():
log.Info("Coordinator stops running")
log.Info("init schedulers has been stopped")
return
default:
}
Expand Down Expand Up @@ -439,8 +440,10 @@ func (c *Coordinator) initSchedulers() {
continue
}
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
if needRun {
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
}
}

Expand All @@ -461,12 +464,14 @@ func (c *Coordinator) initSchedulers() {
}

log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
k++
if needRun {
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
k++
}
}
}

Expand Down
14 changes: 11 additions & 3 deletions pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newStoreStatistics(opt *config.PersistOptions) *storeStatistics {
}
}

func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
func (s *storeStatistics) Observe(store *core.StoreInfo) {
for _, k := range s.opt.GetLocationLabels() {
v := store.GetLabelValue(k)
if v == "" {
Expand Down Expand Up @@ -146,8 +146,12 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
storeStatusGauge.WithLabelValues(storeAddress, id, "store_slow_trend_result_value").Set(slowTrend.ResultValue)
storeStatusGauge.WithLabelValues(storeAddress, id, "store_slow_trend_result_rate").Set(slowTrend.ResultRate)
}
}

func (s *storeStatistics) ObserveHotStat(store *core.StoreInfo, stats *StoresStats) {
// Store flows.
storeAddress := store.GetAddress()
id := strconv.FormatUint(store.GetID(), 10)
storeFlowStats := stats.GetRollingStoreStats(store.GetID())
if storeFlowStats == nil {
return
Expand Down Expand Up @@ -298,8 +302,12 @@ func NewStoreStatisticsMap(opt *config.PersistOptions) *storeStatisticsMap {
}
}

func (m *storeStatisticsMap) Observe(store *core.StoreInfo, stats *StoresStats) {
m.stats.Observe(store, stats)
func (m *storeStatisticsMap) Observe(store *core.StoreInfo) {
m.stats.Observe(store)
}

func (m *storeStatisticsMap) ObserveHotStat(store *core.StoreInfo, stats *StoresStats) {
m.stats.ObserveHotStat(store, stats)
}

func (m *storeStatisticsMap) Collect() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/store_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func TestStoreStatistics(t *testing.T) {
stores[5] = store5
storeStats := NewStoreStatisticsMap(opt)
for _, store := range stores {
storeStats.Observe(store, storesStats)
storeStats.Observe(store)
storeStats.ObserveHotStat(store, storesStats)
}
stats := storeStats.stats

Expand Down
Loading

0 comments on commit 745c942

Please sign in to comment.