Skip to content

Commit

Permalink
*: use a separate runner for updating subtree (#8158)
Browse files Browse the repository at this point in the history
ref #7897

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored May 21, 2024
1 parent 7731ee4 commit 58e7580
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 101 deletions.
10 changes: 5 additions & 5 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -11651,12 +11651,12 @@
"targets": [
{
"exemplar": true,
"expr": "pd_ratelimit_runner_task_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"expr": "pd_ratelimit_runner_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{task_type}}_({{runner_name}})",
"legendFormat": "{{task_type}}_{{runner_name}}",
"refId": "A",
"step": 4
}
Expand Down Expand Up @@ -11768,12 +11768,12 @@
"targets": [
{
"exemplar": true,
"expr": "rate(pd_ratelimit_runner_task_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60",
"expr": "rate(pd_ratelimit_runner_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "failed-tasks-({{runner_name}})",
"legendFormat": "failed-tasks-{{runner_name}}",
"refId": "A",
"step": 4
},
Expand All @@ -11782,7 +11782,7 @@
"expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"hide": false,
"interval": "",
"legendFormat": "max-wait-duration-({{runner_name}})",
"legendFormat": "max-wait-duration-{{runner_name}}",
"refId": "B"
}
],
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type MetaProcessContext struct {
context.Context
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
MiscRunner ratelimit.Runner
LogRunner ratelimit.Runner
}

Expand All @@ -35,6 +36,7 @@ func ContextTODO() *MetaProcessContext {
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
MiscRunner: ratelimit.NewSyncRunner(),
LogRunner: ratelimit.NewSyncRunner(),
// Limit default is nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool)
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -742,7 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) {
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
Expand Down Expand Up @@ -772,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache = true, true
saveKV, saveCache, retained = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -785,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, retained = true, true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -796,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, retained = true, true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, needSync := RegionGuide(ContextTODO(), regionA, regionB)
_, _, needSync, _ := RegionGuide(ContextTODO(), regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down Expand Up @@ -1031,7 +1031,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA)
re.Equal(int32(2), regionPendingItemA.GetRef())
// check new item
saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemA, regionPendingItemA)
re.True(needSync)
re.True(saveCache)
re.False(saveKV)
Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
re.Equal(int32(1), regionPendingItemB.GetRef())

// heartbeat again, no need updates root tree
saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemB, regionItemB)
re.False(needSync)
re.False(saveCache)
re.False(saveKV)
Expand Down
45 changes: 25 additions & 20 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ type Cluster struct {
clusterID uint64
running atomic.Bool

heartbeatRunnner ratelimit.Runner
logRunner ratelimit.Runner
// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
// miscRunner is used to process the statistics and persistent tasks asynchronously.
miscRunner ratelimit.Runner
// logRunner is used to process the log asynchronously.
logRunner ratelimit.Runner
}

const (
Expand All @@ -64,8 +68,9 @@ const (
collectWaitTime = time.Minute

// heartbeat relative const
heartbeatTaskRunner = "heartbeat-task-runner"
logTaskRunner = "log-task-runner"
heartbeatTaskRunner = "heartbeat-task-runner"
statisticsTaskRunner = "statistics-task-runner"
logTaskRunner = "log-task-runner"
)

var syncRunner = ratelimit.NewSyncRunner()
Expand Down Expand Up @@ -93,8 +98,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel())
Expand Down Expand Up @@ -531,7 +537,8 @@ func (c *Cluster) StartBackgroundJobs() {
go c.runUpdateStoreStats()
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.heartbeatRunnner.Start()
c.heartbeatRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
c.running.Store(true)
}
Expand All @@ -543,7 +550,8 @@ func (c *Cluster) StopBackgroundJobs() {
}
c.running.Store(false)
c.coordinator.Stop()
c.heartbeatRunnner.Stop()
c.heartbeatRunner.Stop()
c.miscRunner.Stop()
c.logRunner.Stop()
c.cancel()
c.wg.Wait()
Expand All @@ -560,16 +568,18 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics {
tracer = core.NewHeartbeatProcessTracer()
}
var taskRunner, logRunner ratelimit.Runner
taskRunner, logRunner = syncRunner, syncRunner
var taskRunner, miscRunner, logRunner ratelimit.Runner
taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner
if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
taskRunner = c.heartbeatRunnner
taskRunner = c.heartbeatRunner
miscRunner = c.miscRunner
logRunner = c.logRunner
}
ctx := &core.MetaProcessContext{
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
MiscRunner: miscRunner,
LogRunner: logRunner,
}
tracer.Begin()
Expand All @@ -591,19 +601,12 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
return err
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

ctx.TaskRunner.RunTask(
ctx,
ratelimit.HandleStatsAsync,
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
)
cluster.HandleStatsAsync(c, region)
tracer.OnAsyncHotStatsFinished()
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin)
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

if !saveCache {
// Due to some config changes need to update the region stats as well,
Expand All @@ -627,6 +630,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
)
}
return nil
Expand All @@ -650,6 +654,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
Expand Down
32 changes: 24 additions & 8 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,41 @@ var (
Name: "runner_task_max_waiting_duration_seconds",
Help: "The duration of tasks waiting in the runner.",
}, []string{nameStr})

RunnerTaskPendingTasks = prometheus.NewGaugeVec(
RunnerPendingTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_pending_tasks",
Name: "runner_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerTaskFailedTasks = prometheus.NewCounterVec(
RunnerFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_failed_tasks_total",
Name: "runner_failed_tasks_total",
Help: "The number of failed tasks in the runner.",
}, []string{nameStr})
}, []string{nameStr, taskStr})
RunnerSucceededTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_success_tasks_total",
Help: "The number of tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerTaskExecutionDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_execution_duration_seconds",
Help: "Bucketed histogram of processing time (s) of finished tasks.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{nameStr, taskStr})
)

func init() {
prometheus.MustRegister(RunnerTaskMaxWaitingDuration)
prometheus.MustRegister(RunnerTaskPendingTasks)
prometheus.MustRegister(RunnerTaskFailedTasks)
prometheus.MustRegister(RunnerPendingTasks)
prometheus.MustRegister(RunnerFailedTasks)
prometheus.MustRegister(RunnerTaskExecutionDuration)
prometheus.MustRegister(RunnerSucceededTasks)
}
Loading

0 comments on commit 58e7580

Please sign in to comment.