Skip to content

Commit

Permalink
mcs: update store stats (tikv#7097)
Browse files Browse the repository at this point in the history
ref tikv#5839

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 15, 2023
1 parent 4eb9aea commit d7d4756
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 12 deletions.
12 changes: 12 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,18 @@ func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, key
return bc.getWriteRate(bc.RegionsInfo.GetStoreWriteRate)
}

// UpdateAllStoreStatus updates the information of all stores.
func (bc *BasicCluster) UpdateAllStoreStatus() {
// Update related stores.
stores := bc.GetStores()
for _, store := range stores {
if store.IsRemoved() {
continue
}
bc.UpdateStoreStatus(store.GetID())
}
}

// RegionSetInformer provides access to a shared informer of regions.
type RegionSetInformer interface {
GetTotalRegionCount() int
Expand Down
48 changes: 44 additions & 4 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -29,7 +30,9 @@ import (

// Cluster is used to manage all information for scheduling purpose.
type Cluster struct {
ctx context.Context
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
*core.BasicCluster
persistConfig *config.PersistConfig
ruleManager *placement.RuleManager
Expand All @@ -47,14 +50,17 @@ type Cluster struct {
const regionLabelGCInterval = time.Hour

// NewCluster creates a new cluster.
func NewCluster(ctx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) {
func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) {
ctx, cancel := context.WithCancel(parentCtx)
labelerManager, err := labeler.NewRegionLabeler(ctx, storage, regionLabelGCInterval)
if err != nil {
cancel()
return nil, err
}
ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig)
c := &Cluster{
ctx: ctx,
cancel: cancel,
BasicCluster: basicCluster,
ruleManager: ruleManager,
labelerManager: labelerManager,
Expand All @@ -69,6 +75,7 @@ func NewCluster(ctx context.Context, persistConfig *config.PersistConfig, storag
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels())
if err != nil {
cancel()
return nil, err
}
return c, nil
Expand Down Expand Up @@ -179,9 +186,10 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
return c.apiServerLeader.CompareAndSwap(old, new)
}

// UpdateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion.
func (c *Cluster) UpdateScheduler() {
// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion.
func (c *Cluster) updateScheduler() {
defer logutil.LogPanic()
defer c.wg.Done()

// Make sure the coordinator has initialized all the existing schedulers.
c.waitSchedulersInitialized()
Expand Down Expand Up @@ -348,3 +356,35 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
return nil
}

// runUpdateStoreStats updates store stats periodically.
func (c *Cluster) runUpdateStoreStats() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(9 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
log.Info("update store stats background jobs has been stopped")
return
case <-ticker.C:
c.UpdateAllStoreStatus()
}
}
}

// StartBackgroundJobs starts background jobs.
func (c *Cluster) StartBackgroundJobs() {
c.wg.Add(2)
go c.updateScheduler()
go c.runUpdateStoreStats()
}

// StopBackgroundJobs stops background jobs.
func (c *Cluster) StopBackgroundJobs() {
c.cancel()
c.wg.Wait()
}
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,14 @@ func (s *Server) startCluster(context.Context) error {
return err
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
go s.cluster.UpdateScheduler()
s.cluster.StartBackgroundJobs()
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.cluster.StopBackgroundJobs()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
Expand Down
8 changes: 1 addition & 7 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,13 +684,7 @@ func (c *RaftCluster) runUpdateStoreStats() {
case <-ticker.C:
// Update related stores.
start := time.Now()
stores := c.GetStores()
for _, store := range stores {
if store.IsRemoved() {
continue
}
c.core.UpdateStoreStatus(store.GetID())
}
c.core.UpdateAllStoreStatus()
updateStoreStatsGauge.Set(time.Since(start).Seconds())
}
}
Expand Down

0 comments on commit d7d4756

Please sign in to comment.