From d7d475679ea435ac324c3599972a752a36425606 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 15 Sep 2023 15:58:39 +0800 Subject: [PATCH] mcs: update store stats (#7097) ref tikv/pd#5839 Signed-off-by: Ryan Leung --- pkg/core/basic_cluster.go | 12 +++++++ pkg/mcs/scheduling/server/cluster.go | 48 +++++++++++++++++++++++++--- pkg/mcs/scheduling/server/server.go | 3 +- server/cluster/cluster.go | 8 +---- 4 files changed, 59 insertions(+), 12 deletions(-) diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index 1c8902ca8cb..2258a816324 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -246,6 +246,18 @@ func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, key return bc.getWriteRate(bc.RegionsInfo.GetStoreWriteRate) } +// UpdateAllStoreStatus updates the information of all stores. +func (bc *BasicCluster) UpdateAllStoreStatus() { + // Update related stores. + stores := bc.GetStores() + for _, store := range stores { + if store.IsRemoved() { + continue + } + bc.UpdateStoreStatus(store.GetID()) + } +} + // RegionSetInformer provides access to a shared informer of regions. type RegionSetInformer interface { GetTotalRegionCount() int diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 61c85ba6fbd..917831ba9ca 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -2,6 +2,7 @@ package server import ( "context" + "sync" "sync/atomic" "time" @@ -29,7 +30,9 @@ import ( // Cluster is used to manage all information for scheduling purpose. type Cluster struct { - ctx context.Context + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup *core.BasicCluster persistConfig *config.PersistConfig ruleManager *placement.RuleManager @@ -47,14 +50,17 @@ type Cluster struct { const regionLabelGCInterval = time.Hour // NewCluster creates a new cluster. -func NewCluster(ctx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { +func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { + ctx, cancel := context.WithCancel(parentCtx) labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval) if err != nil { + cancel() return nil, err } ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig) c := &Cluster{ ctx: ctx, + cancel: cancel, BasicCluster: basicCluster, ruleManager: ruleManager, labelerManager: labelerManager, @@ -69,6 +75,7 @@ func NewCluster(ctx context.Context, persistConfig *config.PersistConfig, storag c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels()) if err != nil { + cancel() return nil, err } return c, nil @@ -179,9 +186,10 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { return c.apiServerLeader.CompareAndSwap(old, new) } -// UpdateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. -func (c *Cluster) UpdateScheduler() { +// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion. +func (c *Cluster) updateScheduler() { defer logutil.LogPanic() + defer c.wg.Done() // Make sure the coordinator has initialized all the existing schedulers. c.waitSchedulersInitialized() @@ -348,3 +356,35 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) return nil } + +// runUpdateStoreStats updates store stats periodically. +func (c *Cluster) runUpdateStoreStats() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(9 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + log.Info("update store stats background jobs has been stopped") + return + case <-ticker.C: + c.UpdateAllStoreStatus() + } + } +} + +// StartBackgroundJobs starts background jobs. +func (c *Cluster) StartBackgroundJobs() { + c.wg.Add(2) + go c.updateScheduler() + go c.runUpdateStoreStats() +} + +// StopBackgroundJobs stops background jobs. +func (c *Cluster) StopBackgroundJobs() { + c.cancel() + c.wg.Wait() +} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 330085e3b82..f4c5c676dd3 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -443,13 +443,14 @@ func (s *Server) startCluster(context.Context) error { return err } s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController()) - go s.cluster.UpdateScheduler() + s.cluster.StartBackgroundJobs() go s.GetCoordinator().RunUntilStop() return nil } func (s *Server) stopCluster() { s.GetCoordinator().Stop() + s.cluster.StopBackgroundJobs() s.ruleWatcher.Close() s.configWatcher.Close() s.metaWatcher.Close() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 18bf66b8188..29a8709bdac 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -684,13 +684,7 @@ func (c *RaftCluster) runUpdateStoreStats() { case <-ticker.C: // Update related stores. start := time.Now() - stores := c.GetStores() - for _, store := range stores { - if store.IsRemoved() { - continue - } - c.core.UpdateStoreStatus(store.GetID()) - } + c.core.UpdateAllStoreStatus() updateStoreStatsGauge.Set(time.Since(start).Seconds()) } }