Skip to content

Commit

Permalink
split statistics runner
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 15, 2024
1 parent 103be36 commit 3f47763
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 40 deletions.
16 changes: 9 additions & 7 deletions pkg/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,21 @@ import (
// MetaProcessContext is a context for meta process.
type MetaProcessContext struct {
context.Context
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
LogRunner ratelimit.Runner
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
StatisticsRunner ratelimit.Runner
LogRunner ratelimit.Runner
}

// NewMetaProcessContext creates a new MetaProcessContext.
// used in tests, can be changed if no need to test concurrency.
func ContextTODO() *MetaProcessContext {
return &MetaProcessContext{
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
LogRunner: ratelimit.NewSyncRunner(),
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
StatisticsRunner: ratelimit.NewSyncRunner(),
LogRunner: ratelimit.NewSyncRunner(),
// Limit default is nil
}
}
32 changes: 19 additions & 13 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type Cluster struct {
clusterID uint64
running atomic.Bool

heartbeatRunnner ratelimit.Runner
heartbeatRunner ratelimit.Runner
statisticsRunner ratelimit.Runner
logRunner ratelimit.Runner
}

Expand All @@ -65,8 +66,9 @@ const (
collectWaitTime = time.Minute

// heartbeat relative const
heartbeatTaskRunner = "heartbeat-task-runner"
logTaskRunner = "log-task-runner"
heartbeatTaskRunner = "heartbeat-task-runner"
statisticsTaskRunner = "statistics-task-runner"
logTaskRunner = "log-task-runner"
)

var syncRunner = ratelimit.NewSyncRunner()
Expand Down Expand Up @@ -94,7 +96,8 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
Expand Down Expand Up @@ -532,7 +535,8 @@ func (c *Cluster) StartBackgroundJobs() {
go c.runUpdateStoreStats()
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.heartbeatRunnner.Start()
c.heartbeatRunner.Start()
c.statisticsRunner.Start()
c.logRunner.Start()
c.running.Store(true)
}
Expand All @@ -544,7 +548,8 @@ func (c *Cluster) StopBackgroundJobs() {
}
c.running.Store(false)
c.coordinator.Stop()
c.heartbeatRunnner.Stop()
c.heartbeatRunner.Stop()
c.statisticsRunner.Stop()
c.logRunner.Stop()
c.cancel()
c.wg.Wait()
Expand All @@ -561,17 +566,18 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics {
tracer = core.NewHeartbeatProcessTracer()
}
var taskRunner, logRunner ratelimit.Runner
taskRunner, logRunner = syncRunner, syncRunner
var taskRunner, statisticsRunner, logRunner ratelimit.Runner
taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner
if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
taskRunner = c.heartbeatRunnner
taskRunner = c.heartbeatRunner

Check warning on line 572 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L572

Added line #L572 was not covered by tests
logRunner = c.logRunner
}
ctx := &core.MetaProcessContext{
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
LogRunner: logRunner,
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
StatisticsRunner: statisticsRunner,
LogRunner: logRunner,
}
tracer.Begin()
if err := c.processRegionHeartbeat(ctx, region); err != nil {
Expand Down
28 changes: 15 additions & 13 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ const (
minSnapshotDurationSec = 5

// heartbeat relative const
heartbeatTaskRunner = "heartbeat-async"
logTaskRunner = "log-async"
heartbeatTaskRunner = "heartbeat-async"
statisticsTaskRunner = "statistics-async"
logTaskRunner = "log-async"
)

// Server is the interface for cluster.
Expand Down Expand Up @@ -174,7 +175,8 @@ type RaftCluster struct {
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams

heartbeatRunnner ratelimit.Runner
heartbeatRunner ratelimit.Runner
statisticsRunner ratelimit.Runner
logRunner ratelimit.Runner
}

Expand All @@ -199,7 +201,8 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba
etcdClient: etcdClient,
core: basicCluster,
storage: storage,
heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
}
Expand Down Expand Up @@ -358,7 +361,8 @@ func (c *RaftCluster) Start(s Server) error {
go c.startGCTuner()

c.running = true
c.heartbeatRunnner.Start()
c.heartbeatRunner.Start()
c.statisticsRunner.Start()
c.logRunner.Start()
return nil
}
Expand Down Expand Up @@ -753,7 +757,8 @@ func (c *RaftCluster) Stop() {
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.stopSchedulingJobs()
}
c.heartbeatRunnner.Stop()
c.heartbeatRunner.Stop()
c.statisticsRunner.Stop()
c.logRunner.Stop()
c.Unlock()

Expand Down Expand Up @@ -1040,15 +1045,14 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx.StatisticsRunner.RunTask(
ctx.Context,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
},
ratelimit.WithPriority(priority),
)
}
// region is not updated to the subtree.
Expand Down Expand Up @@ -1092,21 +1096,20 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
tracer.OnUpdateSubTreeFinished()

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.TaskRunner.RunTask(
ctx.StatisticsRunner.RunTask(
ctx.Context,
ratelimit.HandleOverlaps,
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
ratelimit.WithPriority(priority),
)
}
regionUpdateCacheEventCounter.Inc()
}

tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
ctx.StatisticsRunner.RunTask(
ctx.Context,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
Expand All @@ -1115,13 +1118,12 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(c, region, hasRegionStats)
},
ratelimit.WithPriority(priority),
)

tracer.OnCollectRegionStatsFinished()
if c.storage != nil {
if saveKV {
ctx.TaskRunner.RunTask(
ctx.StatisticsRunner.RunTask(
ctx.Context,
ratelimit.SaveRegionToKV,
func(_ context.Context) {
Expand Down
16 changes: 9 additions & 7 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
tracer = core.NewHeartbeatProcessTracer()
}

var taskRunner, logRunner ratelimit.Runner
taskRunner, logRunner = syncRunner, syncRunner
var taskRunner, statisticsRunner, logRunner ratelimit.Runner
taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner
if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
taskRunner = c.heartbeatRunnner
taskRunner = c.heartbeatRunner
statisticsRunner = c.statisticsRunner
logRunner = c.logRunner
}

ctx := &core.MetaProcessContext{
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
LogRunner: logRunner,
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
StatisticsRunner: statisticsRunner,
LogRunner: logRunner,
}
tracer.Begin()
if err := c.processRegionHeartbeat(ctx, region); err != nil {
Expand Down

0 comments on commit 3f47763

Please sign in to comment.