Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use a separate runner for updating subtree #8158

Merged
merged 12 commits into from
May 21, 2024
10 changes: 5 additions & 5 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -11651,12 +11651,12 @@
"targets": [
{
"exemplar": true,
"expr": "pd_ratelimit_runner_task_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"expr": "pd_ratelimit_runner_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{task_type}}_({{runner_name}})",
"legendFormat": "{{task_type}}_{{runner_name}}",
"refId": "A",
"step": 4
}
Expand Down Expand Up @@ -11768,12 +11768,12 @@
"targets": [
{
"exemplar": true,
"expr": "rate(pd_ratelimit_runner_task_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60",
"expr": "rate(pd_ratelimit_runner_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "failed-tasks-({{runner_name}})",
"legendFormat": "failed-tasks-{{runner_name}}",
"refId": "A",
"step": 4
},
Expand All @@ -11782,7 +11782,7 @@
"expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}",
"hide": false,
"interval": "",
"legendFormat": "max-wait-duration-({{runner_name}})",
"legendFormat": "max-wait-duration-{{runner_name}}",
"refId": "B"
}
],
Expand Down
16 changes: 9 additions & 7 deletions pkg/core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,21 @@ import (
// MetaProcessContext is a context for meta process.
type MetaProcessContext struct {
context.Context
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
LogRunner ratelimit.Runner
Tracer RegionHeartbeatProcessTracer
TaskRunner ratelimit.Runner
StatisticsRunner ratelimit.Runner
LogRunner ratelimit.Runner
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help add some comments here to explain the different responsibilities of these runners?

}

// NewMetaProcessContext creates a new MetaProcessContext.
// used in tests, can be changed if no need to test concurrency.
func ContextTODO() *MetaProcessContext {
return &MetaProcessContext{
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
LogRunner: ratelimit.NewSyncRunner(),
Context: context.TODO(),
Tracer: NewNoopHeartbeatProcessTracer(),
TaskRunner: ratelimit.NewSyncRunner(),
StatisticsRunner: ratelimit.NewSyncRunner(),
LogRunner: ratelimit.NewSyncRunner(),
// Limit default is nil
}
}
10 changes: 5 additions & 5 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool)
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -742,7 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) {
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
Expand Down Expand Up @@ -772,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache = true, true
saveKV, saveCache, retained = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -785,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, retained = true, true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -796,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, retained = true, true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, needSync := RegionGuide(ContextTODO(), regionA, regionB)
_, _, needSync, _ := RegionGuide(ContextTODO(), regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down Expand Up @@ -980,7 +980,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA)
re.Equal(int32(2), regionPendingItemA.GetRef())
// check new item
saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemA, regionPendingItemA)
re.True(needSync)
re.True(saveCache)
re.False(saveKV)
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
re.Equal(int32(1), regionPendingItemB.GetRef())

// heartbeat again, no need updates root tree
saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemB, regionItemB)
re.False(needSync)
re.False(saveCache)
re.False(saveKV)
Expand Down
46 changes: 24 additions & 22 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
clusterID uint64
running atomic.Bool

heartbeatRunnner ratelimit.Runner
heartbeatRunner ratelimit.Runner
statisticsRunner ratelimit.Runner
logRunner ratelimit.Runner
}

Expand All @@ -64,8 +65,9 @@
collectWaitTime = time.Minute

// heartbeat relative const
heartbeatTaskRunner = "heartbeat-task-runner"
logTaskRunner = "log-task-runner"
heartbeatTaskRunner = "heartbeat-task-runner"
statisticsTaskRunner = "statistics-task-runner"
logTaskRunner = "log-task-runner"
)

var syncRunner = ratelimit.NewSyncRunner()
Expand Down Expand Up @@ -93,7 +95,8 @@
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
Expand Down Expand Up @@ -531,7 +534,8 @@
go c.runUpdateStoreStats()
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.heartbeatRunnner.Start()
c.heartbeatRunner.Start()
c.statisticsRunner.Start()
c.logRunner.Start()
c.running.Store(true)
}
Expand All @@ -543,7 +547,8 @@
}
c.running.Store(false)
c.coordinator.Stop()
c.heartbeatRunnner.Stop()
c.heartbeatRunner.Stop()
c.statisticsRunner.Stop()
c.logRunner.Stop()
c.cancel()
c.wg.Wait()
Expand All @@ -560,17 +565,19 @@
if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics {
tracer = core.NewHeartbeatProcessTracer()
}
var taskRunner, logRunner ratelimit.Runner
taskRunner, logRunner = syncRunner, syncRunner
var taskRunner, statisticsRunner, logRunner ratelimit.Runner
taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner
if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner {
taskRunner = c.heartbeatRunnner
taskRunner = c.heartbeatRunner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add in here?

statisticsRunner = c.statisticsRunner

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

statisticsRunner = c.statisticsRunner

Check warning on line 572 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L571-L572

Added lines #L571 - L572 were not covered by tests
logRunner = c.logRunner
}
ctx := &core.MetaProcessContext{
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
LogRunner: logRunner,
Context: c.ctx,
Tracer: tracer,
TaskRunner: taskRunner,
StatisticsRunner: statisticsRunner,
LogRunner: logRunner,
}
tracer.Begin()
if err := c.processRegionHeartbeat(ctx, region); err != nil {
Expand All @@ -591,19 +598,12 @@
return err
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

ctx.TaskRunner.RunTask(
ctx,
ratelimit.HandleStatsAsync,
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
)
cluster.HandleStatsAsync(c, region)
tracer.OnAsyncHotStatsFinished()
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin)
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

if !saveCache {
// Due to some config changes need to update the region stats as well,
Expand All @@ -627,6 +627,7 @@
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
)
}
return nil
Expand All @@ -650,6 +651,7 @@
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
Expand Down
32 changes: 24 additions & 8 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,41 @@ var (
Name: "runner_task_max_waiting_duration_seconds",
Help: "The duration of tasks waiting in the runner.",
}, []string{nameStr})

RunnerTaskPendingTasks = prometheus.NewGaugeVec(
RunnerPendingTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_pending_tasks",
Name: "runner_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerTaskFailedTasks = prometheus.NewCounterVec(
RunnerFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_failed_tasks_total",
Name: "runner_failed_tasks_total",
nolouch marked this conversation as resolved.
Show resolved Hide resolved
Help: "The number of failed tasks in the runner.",
}, []string{nameStr})
}, []string{nameStr, taskStr})
RunnerSucceededTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_success_tasks_total",
Help: "The number of tasks in the runner.",
}, []string{nameStr, taskStr})
RunnerTaskExecutionDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_execution_duration_seconds",
Help: "Bucketed histogram of processing time (s) of finished tasks.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{nameStr, taskStr})
)

func init() {
prometheus.MustRegister(RunnerTaskMaxWaitingDuration)
prometheus.MustRegister(RunnerTaskPendingTasks)
prometheus.MustRegister(RunnerTaskFailedTasks)
prometheus.MustRegister(RunnerPendingTasks)
prometheus.MustRegister(RunnerFailedTasks)
prometheus.MustRegister(RunnerTaskExecutionDuration)
prometheus.MustRegister(RunnerSucceededTasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add metrics to tell the task number of different priorities?

}
Loading