Skip to content

Commit

Permalink
update store stats
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 15, 2023
1 parent e295e62 commit 430dba6
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 7 deletions.
12 changes: 12 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,18 @@ func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, key
return bc.getWriteRate(bc.RegionsInfo.GetStoreWriteRate)
}

// UpdateAllStoreStatus updates the information of all stores.
func (bc *BasicCluster) UpdateAllStoreStatus() {
// Update related stores.
stores := bc.GetStores()
for _, store := range stores {
if store.IsRemoved() {
continue
}
bc.UpdateStoreStatus(store.GetID())
}
}

// RegionSetInformer provides access to a shared informer of regions.
type RegionSetInformer interface {
GetTotalRegionCount() int
Expand Down
18 changes: 18 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,21 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
return nil
}

// RunUpdateStoreStats updates store stats periodically.
func (c *Cluster) RunUpdateStoreStats() {
defer logutil.LogPanic()

ticker := time.NewTicker(9 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
log.Info("update store stats background jobs has been stopped")
return
case <-ticker.C:
c.UpdateAllStoreStatus()
}
}
}
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ func (s *Server) startCluster(context.Context) error {
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
go s.cluster.UpdateScheduler()
go s.cluster.RunUpdateStoreStats()
go s.GetCoordinator().RunUntilStop()
return nil
}
Expand Down
8 changes: 1 addition & 7 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,13 +684,7 @@ func (c *RaftCluster) runUpdateStoreStats() {
case <-ticker.C:
// Update related stores.
start := time.Now()
stores := c.GetStores()
for _, store := range stores {
if store.IsRemoved() {
continue
}
c.core.UpdateStoreStatus(store.GetID())
}
c.core.UpdateAllStoreStatus()
updateStoreStatsGauge.Set(time.Since(start).Seconds())
}
}
Expand Down

0 comments on commit 430dba6

Please sign in to comment.