Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use a separate runner for updating subtree #8158

Merged
merged 12 commits into from
May 21, 2024
10 changes: 5 additions & 5 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -11651,12 +11651,12 @@
"targets": [
{
"exemplar": true,
"expr": "pd_ratelimit_runner_task_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"expr": "pd_ratelimit_runner_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{task_type}}_({{runner_name}})",
"legendFormat": "{{task_type}}_{{runner_name}}",
"refId": "A",
"step": 4
}
Expand Down Expand Up @@ -11768,12 +11768,12 @@
"targets": [
{
"exemplar": true,
"expr": "rate(pd_ratelimit_runner_task_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60",
"expr": "rate(pd_ratelimit_runner_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "failed-tasks-({{runner_name}})",
"legendFormat": "failed-tasks-{{runner_name}}",
"refId": "A",
"step": 4
},
Expand All @@ -11782,7 +11782,7 @@
"expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"hide": false,
"interval": "",
"legendFormat": "max-wait-duration-({{runner_name}})",
"legendFormat": "max-wait-duration-{{runner_name}}",
"refId": "B"
}
],
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type MetaProcessContext struct {
context.Context
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
MiscRunner ratelimit.Runner
LogRunner ratelimit.Runner
}

Expand All @@ -35,6 +36,7 @@ func ContextTODO() *MetaProcessContext {
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
MiscRunner: ratelimit.NewSyncRunner(),
LogRunner: ratelimit.NewSyncRunner(),
// Limit default is nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool)
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -742,7 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) {
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
Expand Down Expand Up @@ -772,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache = true, true
saveKV, saveCache, retained = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -785,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, retained = true, true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -796,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, retained = true, true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, needSync := RegionGuide(ContextTODO(), regionA, regionB)
_, _, needSync, _ := RegionGuide(ContextTODO(), regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down Expand Up @@ -1031,7 +1031,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA)
re.Equal(int32(2), regionPendingItemA.GetRef())
// check new item
saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemA, regionPendingItemA)
re.True(needSync)
re.True(saveCache)
re.False(saveKV)
Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
re.Equal(int32(1), regionPendingItemB.GetRef())

// heartbeat again, no need updates root tree
saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemB, regionItemB)
re.False(needSync)
re.False(saveCache)
re.False(saveKV)
Expand Down
45 changes: 25 additions & 20 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@
clusterID uint64
running atomic.Bool

heartbeatRunnner ratelimit.Runner
logRunner ratelimit.Runner
// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
// miscRunner is used to process the statistics and persistent tasks asynchronously.
miscRunner ratelimit.Runner
// logRunner is used to process the log asynchronously.
logRunner ratelimit.Runner
}

const (
Expand All @@ -64,8 +68,9 @@
collectWaitTime = time.Minute

// heartbeat relative const
heartbeatTaskRunner = "heartbeat-task-runner"
logTaskRunner = "log-task-runner"
heartbeatTaskRunner = "heartbeat-task-runner"
statisticsTaskRunner = "statistics-task-runner"
logTaskRunner = "log-task-runner"
)

var syncRunner = ratelimit.NewSyncRunner()
Expand Down Expand Up @@ -93,8 +98,9 @@
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel())
Expand Down Expand Up @@ -531,7 +537,8 @@
go c.runUpdateStoreStats()
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.heartbeatRunnner.Start()
c.heartbeatRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
c.running.Store(true)
}
Expand All @@ -543,7 +550,8 @@
}
c.running.Store(false)
c.coordinator.Stop()
c.heartbeatRunnner.Stop()
c.heartbeatRunner.Stop()
c.miscRunner.Stop()
c.logRunner.Stop()
c.cancel()
c.wg.Wait()
Expand All @@ -560,16 +568,18 @@
if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics {
tracer = core.NewHeartbeatProcessTracer()
}
var taskRunner, logRunner ratelimit.Runner
taskRunner, logRunner = syncRunner, syncRunner
var taskRunner, miscRunner, logRunner ratelimit.Runner
taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner
if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
taskRunner = c.heartbeatRunnner
taskRunner = c.heartbeatRunner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add in here?

statisticsRunner = c.statisticsRunner

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

miscRunner = c.miscRunner

Check warning on line 575 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L574-L575

Added lines #L574 - L575 were not covered by tests
logRunner = c.logRunner
}
ctx := &core.MetaProcessContext{
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
MiscRunner: miscRunner,
LogRunner: logRunner,
}
tracer.Begin()
Expand All @@ -591,19 +601,12 @@
return err
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

ctx.TaskRunner.RunTask(
ctx,
ratelimit.HandleStatsAsync,
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
)
cluster.HandleStatsAsync(c, region)
tracer.OnAsyncHotStatsFinished()
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin)
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

if !saveCache {
// Due to some config changes need to update the region stats as well,
Expand All @@ -627,6 +630,7 @@
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
)
}
return nil
Expand All @@ -650,6 +654,7 @@
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
Expand Down
32 changes: 24 additions & 8 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,41 @@ var (
Name: "runner_task_max_waiting_duration_seconds",
Help: "The duration of tasks waiting in the runner.",
}, []string{nameStr})

RunnerTaskPendingTasks = prometheus.NewGaugeVec(
RunnerPendingTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_pending_tasks",
Name: "runner_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerTaskFailedTasks = prometheus.NewCounterVec(
RunnerFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_failed_tasks_total",
Name: "runner_failed_tasks_total",
nolouch marked this conversation as resolved.
Show resolved Hide resolved
Help: "The number of failed tasks in the runner.",
}, []string{nameStr})
}, []string{nameStr, taskStr})
RunnerSucceededTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_success_tasks_total",
Help: "The number of tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerTaskExecutionDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_execution_duration_seconds",
Help: "Bucketed histogram of processing time (s) of finished tasks.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{nameStr, taskStr})
)

func init() {
prometheus.MustRegister(RunnerTaskMaxWaitingDuration)
prometheus.MustRegister(RunnerTaskPendingTasks)
prometheus.MustRegister(RunnerTaskFailedTasks)
prometheus.MustRegister(RunnerPendingTasks)
prometheus.MustRegister(RunnerFailedTasks)
prometheus.MustRegister(RunnerTaskExecutionDuration)
prometheus.MustRegister(RunnerSucceededTasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add metrics to tell the task number of different priorities?

}
Loading