From c0f2c94acb5c1db890a058b0226c723bb8efdf20 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 9 May 2024 11:26:01 +0800 Subject: [PATCH 01/11] distinguish the task priority Signed-off-by: Ryan Leung --- pkg/core/region.go | 12 +-- pkg/core/region_test.go | 6 +- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/ratelimit/metrics.go | 32 ++++++-- pkg/ratelimit/runner.go | 108 ++++++++++++++++----------- pkg/syncer/client.go | 2 +- server/cluster/cluster.go | 16 ++-- 7 files changed, 109 insertions(+), 69 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index be8fcddc179..da591436ea0 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -729,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -742,7 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool) { logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i @@ -772,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache = true, true + saveKV, saveCache, metaUpdated = true, true, true } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -785,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - saveKV, saveCache = true, true + saveKV, saveCache, metaUpdated = true, true, true } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -796,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - saveKV, saveCache = true, true + saveKV, saveCache, metaUpdated = true, true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { @@ -807,7 +807,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { ) } // We check it first and do not return because the log is important for us to investigate, - saveCache, needSync = true, true + saveCache, needSync, metaUpdated = true, true, true } if len(region.GetPeers()) != len(origin.GetPeers()) { saveKV, saveCache = true, true diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 43629fccda0..dfdc9ab082f 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, needSync := RegionGuide(ContextTODO(), regionA, regionB) + _, _, needSync, _ := RegionGuide(ContextTODO(), regionA, regionB) re.Equal(testCase.needSync, needSync) } } @@ -980,7 +980,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) { regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA) re.Equal(int32(2), regionPendingItemA.GetRef()) // check new item - saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA) + saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemA, regionPendingItemA) re.True(needSync) re.True(saveCache) re.False(saveKV) @@ -1009,7 +1009,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) { re.Equal(int32(1), regionPendingItemB.GetRef()) // heartbeat again, no need updates root tree - saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB) + saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemB, regionItemB) re.False(needSync) re.False(saveCache) re.False(saveKV) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index d3691516868..3de7d2d00e3 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -603,7 +603,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + _, saveCache, _, metaUpdated := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index 5d4443a1cc4..c5510e66b26 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -31,25 +31,41 @@ var ( Name: "runner_task_max_waiting_duration_seconds", Help: "The duration of tasks waiting in the runner.", }, []string{nameStr}) - - RunnerTaskPendingTasks = prometheus.NewGaugeVec( + RunnerPendingTasks = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "ratelimit", - Name: "runner_task_pending_tasks", + Name: "runner_pending_tasks", Help: "The number of pending tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerTaskFailedTasks = prometheus.NewCounterVec( + RunnerFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "ratelimit", - Name: "runner_task_failed_tasks_total", + Name: "runner_failed_tasks_total", Help: "The number of failed tasks in the runner.", - }, []string{nameStr}) + }, []string{nameStr, taskStr}) + RunnerSucceededTasks = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_success_tasks_total", + Help: "The number of tasks in the runner.", + }, []string{nameStr, taskStr}) + RunnerTaskExecutionDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_execution_duration_seconds", + Help: "Bucketed histogram of processing time (s) of finished tasks.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{nameStr, taskStr}) ) func init() { prometheus.MustRegister(RunnerTaskMaxWaitingDuration) - prometheus.MustRegister(RunnerTaskPendingTasks) - prometheus.MustRegister(RunnerTaskFailedTasks) + prometheus.MustRegister(RunnerPendingTasks) + prometheus.MustRegister(RunnerFailedTasks) + prometheus.MustRegister(RunnerTaskExecutionDuration) + prometheus.MustRegister(RunnerSucceededTasks) } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 07233af238b..1529cf736b1 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -58,40 +58,48 @@ var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") // ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. type ConcurrentRunner struct { - name string - limiter *ConcurrencyLimiter - maxPendingDuration time.Duration - taskChan chan *Task - pendingTasks []*Task - pendingMu sync.Mutex - stopChan chan struct{} - wg sync.WaitGroup - pendingTaskCount map[string]int64 - failedTaskCount prometheus.Counter - maxWaitingDuration prometheus.Gauge + name string + limiter *ConcurrencyLimiter + maxPendingDuration time.Duration + taskChan chan *Task + pendingNormalPriorityTasks []*Task + pendingHighPriorityTasks []*Task + pendingMu sync.Mutex + stopChan chan struct{} + wg sync.WaitGroup + pendingTaskCount map[string]int64 + maxWaitingDuration prometheus.Gauge } // NewConcurrentRunner creates a new ConcurrentRunner. func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner { s := &ConcurrentRunner{ - name: name, - limiter: limiter, - maxPendingDuration: maxPendingDuration, - taskChan: make(chan *Task), - pendingTasks: make([]*Task, 0, initialCapacity), - failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), - pendingTaskCount: make(map[string]int64), - maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), + name: name, + limiter: limiter, + maxPendingDuration: maxPendingDuration, + taskChan: make(chan *Task), + pendingNormalPriorityTasks: make([]*Task, 0, initialCapacity), + pendingHighPriorityTasks: make([]*Task, 0, initialCapacity), + pendingTaskCount: make(map[string]int64), + maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } // TaskOpts is the options for RunTask. -type TaskOpts struct{} +type TaskOpts struct { + // IsMetaUpdated indicates whether the meta is updated. + MetaUpdated bool +} // TaskOption configures TaskOp type TaskOption func(opts *TaskOpts) +// WithMetaUpdated specify whether the meta is updated. +func WithMetaUpdated(metaUpdated bool) TaskOption { + return func(opts *TaskOpts) { opts.MetaUpdated = metaUpdated } +} + // Start starts the runner. func (cr *ConcurrentRunner) Start() { cr.stopChan = make(chan struct{}) @@ -113,18 +121,19 @@ func (cr *ConcurrentRunner) Start() { } case <-cr.stopChan: cr.pendingMu.Lock() - cr.pendingTasks = make([]*Task, 0, initialCapacity) + cr.pendingNormalPriorityTasks = make([]*Task, 0, initialCapacity) + cr.pendingHighPriorityTasks = make([]*Task, 0, initialCapacity) cr.pendingMu.Unlock() log.Info("stopping async task runner", zap.String("name", cr.name)) return case <-ticker.C: maxDuration := time.Duration(0) cr.pendingMu.Lock() - if len(cr.pendingTasks) > 0 { - maxDuration = time.Since(cr.pendingTasks[0].submittedAt) + if len(cr.pendingNormalPriorityTasks) > 0 { + maxDuration = time.Since(cr.pendingNormalPriorityTasks[0].submittedAt) } for name, cnt := range cr.pendingTaskCount { - RunnerTaskPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt)) + RunnerPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt)) } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) @@ -134,26 +143,38 @@ func (cr *ConcurrentRunner) Start() { } func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { + start := time.Now() task.f(task.ctx) if token != nil { cr.limiter.ReleaseToken(token) cr.processPendingTasks() } + RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) + RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() } func (cr *ConcurrentRunner) processPendingTasks() { cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - for len(cr.pendingTasks) > 0 { - task := cr.pendingTasks[0] + if len(cr.pendingHighPriorityTasks) > 0 { + task := cr.pendingHighPriorityTasks[0] + select { + case cr.taskChan <- task: + cr.pendingHighPriorityTasks = cr.pendingHighPriorityTasks[1:] + cr.pendingTaskCount[task.name]-- + default: + } + return + } + if len(cr.pendingNormalPriorityTasks) > 0 { + task := cr.pendingNormalPriorityTasks[0] select { case cr.taskChan <- task: - cr.pendingTasks = cr.pendingTasks[1:] + cr.pendingNormalPriorityTasks = cr.pendingNormalPriorityTasks[1:] cr.pendingTaskCount[task.name]-- - return default: - return } + return } } @@ -177,22 +198,25 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con } cr.processPendingTasks() - select { - case cr.taskChan <- task: - default: - cr.pendingMu.Lock() - defer cr.pendingMu.Unlock() - if len(cr.pendingTasks) > 0 { - maxWait := time.Since(cr.pendingTasks[0].submittedAt) - if maxWait > cr.maxPendingDuration { - cr.failedTaskCount.Inc() - return ErrMaxWaitingTasksExceeded - } - } + cr.pendingMu.Lock() + defer cr.pendingMu.Unlock() + if task.opts.MetaUpdated { task.submittedAt = time.Now() - cr.pendingTasks = append(cr.pendingTasks, task) + cr.pendingHighPriorityTasks = append(cr.pendingNormalPriorityTasks, task) cr.pendingTaskCount[task.name]++ + return nil + } + + if len(cr.pendingNormalPriorityTasks) > 0 { + maxWait := time.Since(cr.pendingNormalPriorityTasks[0].submittedAt) + if maxWait > cr.maxPendingDuration { + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + return ErrMaxWaitingTasksExceeded + } } + task.submittedAt = time.Now() + cr.pendingNormalPriorityTasks = append(cr.pendingNormalPriorityTasks, task) + cr.pendingTaskCount[task.name]++ return nil } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 8a2e757d5cd..00fa8dc389b 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -212,7 +212,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { Tracer: core.NewNoopHeartbeatProcessTracer(), // no limit for followers. } - saveKV, _, _ := regionGuide(ctx, region, origin) + saveKV, _, _, _ := regionGuide(ctx, region, origin) overlaps := bc.PutRegion(region) if hasBuckets { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a8558051dfa..0aa580230c4 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1024,19 +1024,13 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.TaskRunner.RunTask( - ctx.Context, - ratelimit.HandleStatsAsync, - func(_ context.Context) { - cluster.HandleStatsAsync(c, region) - }, - ) + cluster.HandleStatsAsync(c, region) } tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync := regionGuide(ctx, region, origin) + saveKV, saveCache, needSync, metaUpdated := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1053,6 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio cluster.Collect(c, region, hasRegionStats) } }, + ratelimit.WithMetaUpdated(metaUpdated), ) } // region is not updated to the subtree. @@ -1063,6 +1058,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithMetaUpdated(true), ) } return nil @@ -1090,6 +1086,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithMetaUpdated(metaUpdated), ) tracer.OnUpdateSubTreeFinished() @@ -1100,6 +1097,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, + ratelimit.WithMetaUpdated(metaUpdated), ) } regionUpdateCacheEventCounter.Inc() @@ -1116,6 +1114,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // We need to think of a better way to reduce this part of the cost in the future. cluster.Collect(c, region, hasRegionStats) }, + ratelimit.WithMetaUpdated(metaUpdated), ) tracer.OnCollectRegionStatsFinished() @@ -1145,6 +1144,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } regionUpdateKVEventCounter.Inc() }, + ratelimit.WithMetaUpdated(metaUpdated), ) } } From 43c60cb3432a6783d79bbef9fab1296b1dc126c2 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 13 May 2024 12:18:17 +0800 Subject: [PATCH 02/11] prevent oom Signed-off-by: Ryan Leung --- pkg/core/region.go | 14 ++++++++------ pkg/mcs/scheduling/server/cluster.go | 17 ++++++++--------- pkg/ratelimit/runner.go | 23 +++++++++++++++-------- server/cluster/cluster.go | 15 ++++++++------- 4 files changed, 39 insertions(+), 30 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index da591436ea0..c306de73e97 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" @@ -729,7 +730,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -742,7 +743,8 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool) { + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) { + priority = constant.Medium logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i @@ -772,7 +774,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache, metaUpdated = true, true, true + saveKV, saveCache, priority = true, true, constant.High } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -785,7 +787,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - saveKV, saveCache, metaUpdated = true, true, true + saveKV, saveCache, priority = true, true, constant.High } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -796,7 +798,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - saveKV, saveCache, metaUpdated = true, true, true + saveKV, saveCache, priority = true, true, constant.High } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { @@ -807,7 +809,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { ) } // We check it first and do not return because the log is important for us to investigate, - saveCache, needSync, metaUpdated = true, true, true + saveCache, needSync, priority = true, true, constant.High } if len(region.GetPeers()) != len(origin.GetPeers()) { saveKV, saveCache = true, true diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 3de7d2d00e3..b13ff4c710d 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/ratelimit" @@ -591,19 +592,12 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - - ctx.TaskRunner.RunTask( - ctx, - ratelimit.HandleStatsAsync, - func(_ context.Context) { - cluster.HandleStatsAsync(c, region) - }, - ) + cluster.HandleStatsAsync(c, region) tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _, metaUpdated := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + _, saveCache, _, priority := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, @@ -617,6 +611,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c cluster.Collect(c, region, hasRegionStats) } }, + ratelimit.WithPriority(priority), ) } // region is not updated to the subtree. @@ -627,6 +622,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithPriority(constant.High), ) } return nil @@ -650,6 +646,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithPriority(priority), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( @@ -658,6 +655,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, + ratelimit.WithPriority(priority), ) } tracer.OnSaveCacheFinished() @@ -668,6 +666,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, + ratelimit.WithPriority(priority), ) tracer.OnCollectRegionStatsFinished() return nil diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 1529cf736b1..9062aa2788a 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/pkg/core/constant" "go.uber.org/zap" ) @@ -35,7 +36,7 @@ const ( SaveRegionToKV = "SaveRegionToKV" ) -const initialCapacity = 100 +const initialCapacity = 10000 // Runner is the interface for running tasks. type Runner interface { @@ -88,16 +89,15 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur // TaskOpts is the options for RunTask. type TaskOpts struct { - // IsMetaUpdated indicates whether the meta is updated. - MetaUpdated bool + priority constant.PriorityLevel } // TaskOption configures TaskOp type TaskOption func(opts *TaskOpts) -// WithMetaUpdated specify whether the meta is updated. -func WithMetaUpdated(metaUpdated bool) TaskOption { - return func(opts *TaskOpts) { opts.MetaUpdated = metaUpdated } +// WithPriority sets the priority of the task. +func WithPriority(priority constant.PriorityLevel) TaskOption { + return func(opts *TaskOpts) { opts.priority = priority } } // Start starts the runner. @@ -200,9 +200,16 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con cr.processPendingTasks() cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - if task.opts.MetaUpdated { + if task.opts.priority >= constant.High { + if len(cr.pendingHighPriorityTasks) > 0 { + maxWait := time.Since(cr.pendingHighPriorityTasks[0].submittedAt) + if maxWait > cr.maxPendingDuration { + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + return ErrMaxWaitingTasksExceeded + } + } task.submittedAt = time.Now() - cr.pendingHighPriorityTasks = append(cr.pendingNormalPriorityTasks, task) + cr.pendingHighPriorityTasks = append(cr.pendingHighPriorityTasks, task) cr.pendingTaskCount[task.name]++ return nil } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 0aa580230c4..a1107fce48b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/gc" @@ -1030,7 +1031,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync, metaUpdated := regionGuide(ctx, region, origin) + saveKV, saveCache, needSync, priority := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1047,7 +1048,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio cluster.Collect(c, region, hasRegionStats) } }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) } // region is not updated to the subtree. @@ -1058,7 +1059,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithMetaUpdated(true), + ratelimit.WithPriority(constant.High), ) } return nil @@ -1086,7 +1087,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) tracer.OnUpdateSubTreeFinished() @@ -1097,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) } regionUpdateCacheEventCounter.Inc() @@ -1114,7 +1115,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // We need to think of a better way to reduce this part of the cost in the future. cluster.Collect(c, region, hasRegionStats) }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) tracer.OnCollectRegionStatsFinished() @@ -1144,7 +1145,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } regionUpdateKVEventCounter.Inc() }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) } } From d458613ef5ea3cca5d47a0a7582a3190b8c7ed9c Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 13 May 2024 14:32:54 +0800 Subject: [PATCH 03/11] address comments Signed-off-by: Ryan Leung --- metrics/grafana/pd.json | 4 ++-- pkg/ratelimit/metrics.go | 7 +++--- pkg/ratelimit/runner.go | 52 +++++++++++++++++++++------------------- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index e6d314c2e00..871ae5b075e 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -11651,7 +11651,7 @@ "targets": [ { "exemplar": true, - "expr": "pd_ratelimit_runner_task_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "expr": "pd_ratelimit_runner_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", "format": "time_series", "hide": false, "interval": "", @@ -11768,7 +11768,7 @@ "targets": [ { "exemplar": true, - "expr": "rate(pd_ratelimit_runner_task_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", + "expr": "rate(pd_ratelimit_runner_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", "format": "time_series", "hide": false, "interval": "", diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index c5510e66b26..8cb69fcc2fa 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -19,8 +19,9 @@ import ( ) const ( - nameStr = "runner_name" - taskStr = "task_type" + nameStr = "runner_name" + taskStr = "task_type" + priorityStr = "priority" ) var ( @@ -37,7 +38,7 @@ var ( Subsystem: "ratelimit", Name: "runner_pending_tasks", Help: "The number of pending tasks in the runner.", - }, []string{nameStr, taskStr}) + }, []string{nameStr, priorityStr, taskStr}) RunnerFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 9062aa2788a..80f979236ce 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -49,9 +49,9 @@ type Runner interface { type Task struct { ctx context.Context submittedAt time.Time - opts *TaskOpts f func(context.Context) name string + priority constant.PriorityLevel } // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. @@ -68,7 +68,7 @@ type ConcurrentRunner struct { pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup - pendingTaskCount map[string]int64 + pendingTaskCount map[string]map[string]int64 maxWaitingDuration prometheus.Gauge } @@ -81,23 +81,18 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur taskChan: make(chan *Task), pendingNormalPriorityTasks: make([]*Task, 0, initialCapacity), pendingHighPriorityTasks: make([]*Task, 0, initialCapacity), - pendingTaskCount: make(map[string]int64), + pendingTaskCount: make(map[string]map[string]int64), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } -// TaskOpts is the options for RunTask. -type TaskOpts struct { - priority constant.PriorityLevel -} - // TaskOption configures TaskOp -type TaskOption func(opts *TaskOpts) +type TaskOption func(opts *Task) // WithPriority sets the priority of the task. func WithPriority(priority constant.PriorityLevel) TaskOption { - return func(opts *TaskOpts) { opts.priority = priority } + return func(opts *Task) { opts.priority = priority } } // Start starts the runner. @@ -132,8 +127,10 @@ func (cr *ConcurrentRunner) Start() { if len(cr.pendingNormalPriorityTasks) > 0 { maxDuration = time.Since(cr.pendingNormalPriorityTasks[0].submittedAt) } - for name, cnt := range cr.pendingTaskCount { - RunnerPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt)) + for name, priorityMap := range cr.pendingTaskCount { + for priority, cnt := range priorityMap { + RunnerPendingTasks.WithLabelValues(cr.name, priority, name).Set(float64(cnt)) + } } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) @@ -161,7 +158,7 @@ func (cr *ConcurrentRunner) processPendingTasks() { select { case cr.taskChan <- task: cr.pendingHighPriorityTasks = cr.pendingHighPriorityTasks[1:] - cr.pendingTaskCount[task.name]-- + cr.pendingTaskCount[task.priority.String()][task.name]-- default: } return @@ -171,7 +168,7 @@ func (cr *ConcurrentRunner) processPendingTasks() { select { case cr.taskChan <- task: cr.pendingNormalPriorityTasks = cr.pendingNormalPriorityTasks[1:] - cr.pendingTaskCount[task.name]-- + cr.pendingTaskCount[task.priority.String()][task.name]-- default: } return @@ -186,21 +183,20 @@ func (cr *ConcurrentRunner) Stop() { // RunTask runs the task asynchronously. func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { - taskOpts := &TaskOpts{} - for _, opt := range opts { - opt(taskOpts) - } task := &Task{ - ctx: ctx, - name: name, - f: f, - opts: taskOpts, + ctx: ctx, + name: name, + f: f, + priority: constant.Medium, + } + for _, opt := range opts { + opt(task) } cr.processPendingTasks() cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - if task.opts.priority >= constant.High { + if task.priority >= constant.High { if len(cr.pendingHighPriorityTasks) > 0 { maxWait := time.Since(cr.pendingHighPriorityTasks[0].submittedAt) if maxWait > cr.maxPendingDuration { @@ -210,7 +206,10 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con } task.submittedAt = time.Now() cr.pendingHighPriorityTasks = append(cr.pendingHighPriorityTasks, task) - cr.pendingTaskCount[task.name]++ + if _, ok := cr.pendingTaskCount[task.priority.String()]; !ok { + cr.pendingTaskCount[task.priority.String()] = make(map[string]int64) + } + cr.pendingTaskCount[task.priority.String()][task.name]++ return nil } @@ -223,7 +222,10 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con } task.submittedAt = time.Now() cr.pendingNormalPriorityTasks = append(cr.pendingNormalPriorityTasks, task) - cr.pendingTaskCount[task.name]++ + if _, ok := cr.pendingTaskCount[task.priority.String()]; !ok { + cr.pendingTaskCount[task.priority.String()] = make(map[string]int64) + } + cr.pendingTaskCount[task.priority.String()][task.name]++ return nil } From 835257a8011255493d92bd39999c59b2911979bf Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 13 May 2024 17:09:25 +0800 Subject: [PATCH 04/11] limit the count of high priority queue Signed-off-by: Ryan Leung --- pkg/ratelimit/runner.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 80f979236ce..0245a5a1d0f 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -36,7 +36,10 @@ const ( SaveRegionToKV = "SaveRegionToKV" ) -const initialCapacity = 10000 +const ( + initialCapacity = 10000 + maxHighPriorityTaskNum = 20000000 +) // Runner is the interface for running tasks. type Runner interface { @@ -197,12 +200,11 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con cr.pendingMu.Lock() defer cr.pendingMu.Unlock() if task.priority >= constant.High { - if len(cr.pendingHighPriorityTasks) > 0 { - maxWait := time.Since(cr.pendingHighPriorityTasks[0].submittedAt) - if maxWait > cr.maxPendingDuration { - RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() - return ErrMaxWaitingTasksExceeded - } + // We use the max task number to prevent the OOM issue. + // It occupies around 1.5GB memory when there is 20000000 pending task. + if len(cr.pendingHighPriorityTasks) > maxHighPriorityTaskNum { + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + return ErrMaxWaitingTasksExceeded } task.submittedAt = time.Now() cr.pendingHighPriorityTasks = append(cr.pendingHighPriorityTasks, task) From 1c4336a935776c722d9a860bc46b5547cfd04c89 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 14 May 2024 14:53:57 +0800 Subject: [PATCH 05/11] address comments Signed-off-by: Ryan Leung --- pkg/ratelimit/runner.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 0245a5a1d0f..a3f6a5e4a17 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -196,11 +196,13 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con opt(task) } - cr.processPendingTasks() cr.pendingMu.Lock() - defer cr.pendingMu.Unlock() + defer func() { + cr.pendingMu.Unlock() + cr.processPendingTasks() + }() if task.priority >= constant.High { - // We use the max task number to prevent the OOM issue. + // We use the max task number to limit the memory usage. // It occupies around 1.5GB memory when there is 20000000 pending task. if len(cr.pendingHighPriorityTasks) > maxHighPriorityTaskNum { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() From 1b64ad0e066fbfb8b57a94f47a08feb325062cb7 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 14 May 2024 15:35:45 +0800 Subject: [PATCH 06/11] address comments Signed-off-by: Ryan Leung --- pkg/ratelimit/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index a3f6a5e4a17..0ce3a77df22 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -195,7 +195,7 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con for _, opt := range opts { opt(task) } - + cr.processPendingTasks() cr.pendingMu.Lock() defer func() { cr.pendingMu.Unlock() From 0f349a0eb838c66366adef90302c8ba1f8db3f88 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 15 May 2024 18:10:14 +0800 Subject: [PATCH 07/11] split statistics runner Signed-off-by: Ryan Leung --- pkg/core/context.go | 16 ++++++++------ pkg/mcs/scheduling/server/cluster.go | 32 +++++++++++++++++----------- server/cluster/cluster.go | 28 +++++++++++++----------- server/cluster/cluster_worker.go | 16 ++++++++------ 4 files changed, 52 insertions(+), 40 deletions(-) diff --git a/pkg/core/context.go b/pkg/core/context.go index a0f51e55680..5ed38d90eca 100644 --- a/pkg/core/context.go +++ b/pkg/core/context.go @@ -23,19 +23,21 @@ import ( // MetaProcessContext is a context for meta process. type MetaProcessContext struct { context.Context - Tracer RegionHeartbeatProcessTracer - TaskRunner ratelimit.Runner - LogRunner ratelimit.Runner + Tracer RegionHeartbeatProcessTracer + TaskRunner ratelimit.Runner + StatisticsRunner ratelimit.Runner + LogRunner ratelimit.Runner } // NewMetaProcessContext creates a new MetaProcessContext. // used in tests, can be changed if no need to test concurrency. func ContextTODO() *MetaProcessContext { return &MetaProcessContext{ - Context: context.TODO(), - Tracer: NewNoopHeartbeatProcessTracer(), - TaskRunner: ratelimit.NewSyncRunner(), - LogRunner: ratelimit.NewSyncRunner(), + Context: context.TODO(), + Tracer: NewNoopHeartbeatProcessTracer(), + TaskRunner: ratelimit.NewSyncRunner(), + StatisticsRunner: ratelimit.NewSyncRunner(), + LogRunner: ratelimit.NewSyncRunner(), // Limit default is nil } } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index b13ff4c710d..34483e7523d 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -55,7 +55,8 @@ type Cluster struct { clusterID uint64 running atomic.Bool - heartbeatRunnner ratelimit.Runner + heartbeatRunner ratelimit.Runner + statisticsRunner ratelimit.Runner logRunner ratelimit.Runner } @@ -65,8 +66,9 @@ const ( collectWaitTime = time.Minute // heartbeat relative const - heartbeatTaskRunner = "heartbeat-task-runner" - logTaskRunner = "log-task-runner" + heartbeatTaskRunner = "heartbeat-task-runner" + statisticsTaskRunner = "statistics-task-runner" + logTaskRunner = "log-task-runner" ) var syncRunner = ratelimit.NewSyncRunner() @@ -94,7 +96,8 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, clusterID: clusterID, checkMembershipCh: checkMembershipCh, - heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) @@ -532,7 +535,8 @@ func (c *Cluster) StartBackgroundJobs() { go c.runUpdateStoreStats() go c.runCoordinator() go c.runMetricsCollectionJob() - c.heartbeatRunnner.Start() + c.heartbeatRunner.Start() + c.statisticsRunner.Start() c.logRunner.Start() c.running.Store(true) } @@ -544,7 +548,8 @@ func (c *Cluster) StopBackgroundJobs() { } c.running.Store(false) c.coordinator.Stop() - c.heartbeatRunnner.Stop() + c.heartbeatRunner.Stop() + c.statisticsRunner.Stop() c.logRunner.Stop() c.cancel() c.wg.Wait() @@ -561,17 +566,18 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - var taskRunner, logRunner ratelimit.Runner - taskRunner, logRunner = syncRunner, syncRunner + var taskRunner, statisticsRunner, logRunner ratelimit.Runner + taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - taskRunner = c.heartbeatRunnner + taskRunner = c.heartbeatRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ - Context: c.ctx, - Tracer: tracer, - TaskRunner: taskRunner, - LogRunner: logRunner, + Context: c.ctx, + Tracer: tracer, + TaskRunner: taskRunner, + StatisticsRunner: statisticsRunner, + LogRunner: logRunner, } tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a1107fce48b..39f1fc82bdd 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -108,8 +108,9 @@ const ( minSnapshotDurationSec = 5 // heartbeat relative const - heartbeatTaskRunner = "heartbeat-async" - logTaskRunner = "log-async" + heartbeatTaskRunner = "heartbeat-async" + statisticsTaskRunner = "statistics-async" + logTaskRunner = "log-async" ) // Server is the interface for cluster. @@ -174,7 +175,8 @@ type RaftCluster struct { independentServices sync.Map hbstreams *hbstream.HeartbeatStreams - heartbeatRunnner ratelimit.Runner + heartbeatRunner ratelimit.Runner + statisticsRunner ratelimit.Runner logRunner ratelimit.Runner } @@ -199,7 +201,8 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba etcdClient: etcdClient, core: basicCluster, storage: storage, - heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } } @@ -358,7 +361,8 @@ func (c *RaftCluster) Start(s Server) error { go c.startGCTuner() c.running = true - c.heartbeatRunnner.Start() + c.heartbeatRunner.Start() + c.statisticsRunner.Start() c.logRunner.Start() return nil } @@ -753,7 +757,8 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } - c.heartbeatRunnner.Stop() + c.heartbeatRunner.Stop() + c.statisticsRunner.Stop() c.logRunner.Stop() c.Unlock() @@ -1040,7 +1045,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - ctx.TaskRunner.RunTask( + ctx.StatisticsRunner.RunTask( ctx.Context, ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { @@ -1048,7 +1053,6 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio cluster.Collect(c, region, hasRegionStats) } }, - ratelimit.WithPriority(priority), ) } // region is not updated to the subtree. @@ -1092,13 +1096,12 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnUpdateSubTreeFinished() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.TaskRunner.RunTask( + ctx.StatisticsRunner.RunTask( ctx.Context, ratelimit.HandleOverlaps, func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, - ratelimit.WithPriority(priority), ) } regionUpdateCacheEventCounter.Inc() @@ -1106,7 +1109,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats - ctx.TaskRunner.RunTask( + ctx.StatisticsRunner.RunTask( ctx.Context, ratelimit.CollectRegionStatsAsync, func(_ context.Context) { @@ -1115,13 +1118,12 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // We need to think of a better way to reduce this part of the cost in the future. cluster.Collect(c, region, hasRegionStats) }, - ratelimit.WithPriority(priority), ) tracer.OnCollectRegionStatsFinished() if c.storage != nil { if saveKV { - ctx.TaskRunner.RunTask( + ctx.StatisticsRunner.RunTask( ctx.Context, ratelimit.SaveRegionToKV, func(_ context.Context) { diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 43602dbb68d..a8d44dbec3e 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -40,18 +40,20 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { tracer = core.NewHeartbeatProcessTracer() } defer tracer.Release() - var taskRunner, logRunner ratelimit.Runner - taskRunner, logRunner = syncRunner, syncRunner + var taskRunner, statisticsRunner, logRunner ratelimit.Runner + taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - taskRunner = c.heartbeatRunnner + taskRunner = c.heartbeatRunner + statisticsRunner = c.statisticsRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ - Context: c.ctx, - Tracer: tracer, - TaskRunner: taskRunner, - LogRunner: logRunner, + Context: c.ctx, + Tracer: tracer, + TaskRunner: taskRunner, + StatisticsRunner: statisticsRunner, + LogRunner: logRunner, } tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil { From 2004b2d40924c8eaac684e1c9586c0fc77548deb Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 16 May 2024 11:50:15 +0800 Subject: [PATCH 08/11] remove high priority task queue Signed-off-by: Ryan Leung --- pkg/core/region.go | 14 ++- pkg/mcs/scheduling/server/cluster.go | 10 +-- pkg/ratelimit/metrics.go | 7 +- pkg/ratelimit/runner.go | 122 +++++++++++---------------- server/cluster/cluster.go | 8 +- 5 files changed, 63 insertions(+), 98 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index c306de73e97..719d7cd0a57 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" - "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" @@ -730,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -743,8 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) { - priority = constant.Medium + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) { logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i @@ -774,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache, priority = true, true, constant.High + saveKV, saveCache, retained = true, true, true } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -787,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - saveKV, saveCache, priority = true, true, constant.High + saveKV, saveCache, retained = true, true, true } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -798,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - saveKV, saveCache, priority = true, true, constant.High + saveKV, saveCache, retained = true, true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { @@ -809,7 +807,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { ) } // We check it first and do not return because the log is important for us to investigate, - saveCache, needSync, priority = true, true, constant.High + saveCache, needSync = true, true } if len(region.GetPeers()) != len(origin.GetPeers()) { saveKV, saveCache = true, true diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 34483e7523d..4c137272c05 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -14,7 +14,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/ratelimit" @@ -603,7 +602,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _, priority := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + _, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, @@ -617,7 +616,6 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c cluster.Collect(c, region, hasRegionStats) } }, - ratelimit.WithPriority(priority), ) } // region is not updated to the subtree. @@ -628,7 +626,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithPriority(constant.High), + ratelimit.WithRetained(true), ) } return nil @@ -652,7 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithPriority(priority), + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( @@ -661,7 +659,6 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, - ratelimit.WithPriority(priority), ) } tracer.OnSaveCacheFinished() @@ -672,7 +669,6 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, - ratelimit.WithPriority(priority), ) tracer.OnCollectRegionStatsFinished() return nil diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index 8cb69fcc2fa..c5510e66b26 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -19,9 +19,8 @@ import ( ) const ( - nameStr = "runner_name" - taskStr = "task_type" - priorityStr = "priority" + nameStr = "runner_name" + taskStr = "task_type" ) var ( @@ -38,7 +37,7 @@ var ( Subsystem: "ratelimit", Name: "runner_pending_tasks", Help: "The number of pending tasks in the runner.", - }, []string{nameStr, priorityStr, taskStr}) + }, []string{nameStr, taskStr}) RunnerFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 0ce3a77df22..5ac4d408a03 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" - "github.com/tikv/pd/pkg/core/constant" "go.uber.org/zap" ) @@ -37,8 +36,8 @@ const ( ) const ( - initialCapacity = 10000 - maxHighPriorityTaskNum = 20000000 + initialCapacity = 10000 + maxPendingTaskNum = 20000000 ) // Runner is the interface for running tasks. @@ -54,7 +53,7 @@ type Task struct { submittedAt time.Time f func(context.Context) name string - priority constant.PriorityLevel + retained bool } // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. @@ -62,30 +61,28 @@ var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") // ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. type ConcurrentRunner struct { - name string - limiter *ConcurrencyLimiter - maxPendingDuration time.Duration - taskChan chan *Task - pendingNormalPriorityTasks []*Task - pendingHighPriorityTasks []*Task - pendingMu sync.Mutex - stopChan chan struct{} - wg sync.WaitGroup - pendingTaskCount map[string]map[string]int64 - maxWaitingDuration prometheus.Gauge + name string + limiter *ConcurrencyLimiter + maxPendingDuration time.Duration + taskChan chan *Task + pendingTasks []*Task + pendingMu sync.Mutex + stopChan chan struct{} + wg sync.WaitGroup + pendingTaskCount map[string]int64 + maxWaitingDuration prometheus.Gauge } // NewConcurrentRunner creates a new ConcurrentRunner. func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner { s := &ConcurrentRunner{ - name: name, - limiter: limiter, - maxPendingDuration: maxPendingDuration, - taskChan: make(chan *Task), - pendingNormalPriorityTasks: make([]*Task, 0, initialCapacity), - pendingHighPriorityTasks: make([]*Task, 0, initialCapacity), - pendingTaskCount: make(map[string]map[string]int64), - maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), + name: name, + limiter: limiter, + maxPendingDuration: maxPendingDuration, + taskChan: make(chan *Task), + pendingTasks: make([]*Task, 0, initialCapacity), + pendingTaskCount: make(map[string]int64), + maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } @@ -93,9 +90,9 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur // TaskOption configures TaskOp type TaskOption func(opts *Task) -// WithPriority sets the priority of the task. -func WithPriority(priority constant.PriorityLevel) TaskOption { - return func(opts *Task) { opts.priority = priority } +// WithRetained sets whether the task should be retained. +func WithRetained(retained bool) TaskOption { + return func(opts *Task) { opts.retained = retained } } // Start starts the runner. @@ -119,21 +116,18 @@ func (cr *ConcurrentRunner) Start() { } case <-cr.stopChan: cr.pendingMu.Lock() - cr.pendingNormalPriorityTasks = make([]*Task, 0, initialCapacity) - cr.pendingHighPriorityTasks = make([]*Task, 0, initialCapacity) + cr.pendingTasks = make([]*Task, 0, initialCapacity) cr.pendingMu.Unlock() log.Info("stopping async task runner", zap.String("name", cr.name)) return case <-ticker.C: maxDuration := time.Duration(0) cr.pendingMu.Lock() - if len(cr.pendingNormalPriorityTasks) > 0 { - maxDuration = time.Since(cr.pendingNormalPriorityTasks[0].submittedAt) + if len(cr.pendingTasks) > 0 { + maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } - for name, priorityMap := range cr.pendingTaskCount { - for priority, cnt := range priorityMap { - RunnerPendingTasks.WithLabelValues(cr.name, priority, name).Set(float64(cnt)) - } + for taskName, cnt := range cr.pendingTaskCount { + RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) @@ -156,22 +150,12 @@ func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { func (cr *ConcurrentRunner) processPendingTasks() { cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - if len(cr.pendingHighPriorityTasks) > 0 { - task := cr.pendingHighPriorityTasks[0] + if len(cr.pendingTasks) > 0 { + task := cr.pendingTasks[0] select { case cr.taskChan <- task: - cr.pendingHighPriorityTasks = cr.pendingHighPriorityTasks[1:] - cr.pendingTaskCount[task.priority.String()][task.name]-- - default: - } - return - } - if len(cr.pendingNormalPriorityTasks) > 0 { - task := cr.pendingNormalPriorityTasks[0] - select { - case cr.taskChan <- task: - cr.pendingNormalPriorityTasks = cr.pendingNormalPriorityTasks[1:] - cr.pendingTaskCount[task.priority.String()][task.name]-- + cr.pendingTasks = cr.pendingTasks[1:] + cr.pendingTaskCount[task.name]-- default: } return @@ -187,10 +171,9 @@ func (cr *ConcurrentRunner) Stop() { // RunTask runs the task asynchronously. func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { task := &Task{ - ctx: ctx, - name: name, - f: f, - priority: constant.Medium, + ctx: ctx, + name: name, + f: f, } for _, opt := range opts { opt(task) @@ -201,35 +184,26 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con cr.pendingMu.Unlock() cr.processPendingTasks() }() - if task.priority >= constant.High { + + pendingTaskNum := len(cr.pendingTasks) + if pendingTaskNum > 0 { + if !task.retained { + maxWait := time.Since(cr.pendingTasks[0].submittedAt) + if maxWait > cr.maxPendingDuration { + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + return ErrMaxWaitingTasksExceeded + } + } // We use the max task number to limit the memory usage. // It occupies around 1.5GB memory when there is 20000000 pending task. - if len(cr.pendingHighPriorityTasks) > maxHighPriorityTaskNum { - RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() - return ErrMaxWaitingTasksExceeded - } - task.submittedAt = time.Now() - cr.pendingHighPriorityTasks = append(cr.pendingHighPriorityTasks, task) - if _, ok := cr.pendingTaskCount[task.priority.String()]; !ok { - cr.pendingTaskCount[task.priority.String()] = make(map[string]int64) - } - cr.pendingTaskCount[task.priority.String()][task.name]++ - return nil - } - - if len(cr.pendingNormalPriorityTasks) > 0 { - maxWait := time.Since(cr.pendingNormalPriorityTasks[0].submittedAt) - if maxWait > cr.maxPendingDuration { + if len(cr.pendingTasks) > maxPendingTaskNum { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } task.submittedAt = time.Now() - cr.pendingNormalPriorityTasks = append(cr.pendingNormalPriorityTasks, task) - if _, ok := cr.pendingTaskCount[task.priority.String()]; !ok { - cr.pendingTaskCount[task.priority.String()] = make(map[string]int64) - } - cr.pendingTaskCount[task.priority.String()][task.name]++ + cr.pendingTasks = append(cr.pendingTasks, task) + cr.pendingTaskCount[task.name]++ return nil } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 39f1fc82bdd..5a0e78f6609 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/gc" @@ -1036,7 +1035,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync, priority := regionGuide(ctx, region, origin) + saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1063,7 +1062,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithPriority(constant.High), + ratelimit.WithRetained(true), ) } return nil @@ -1091,7 +1090,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithPriority(priority), + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() @@ -1147,7 +1146,6 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } regionUpdateKVEventCounter.Inc() }, - ratelimit.WithPriority(priority), ) } } From 61951f21fcec5082fac028e694ec10195b8857f1 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 16 May 2024 11:52:58 +0800 Subject: [PATCH 09/11] tiny change Signed-off-by: Ryan Leung --- metrics/grafana/pd.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 871ae5b075e..54a047e612e 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -11656,7 +11656,7 @@ "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "{{task_type}}_({{runner_name}})", + "legendFormat": "{{task_type}}_{{runner_name}}", "refId": "A", "step": 4 } @@ -11773,7 +11773,7 @@ "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "failed-tasks-({{runner_name}})", + "legendFormat": "failed-tasks-{{runner_name}}", "refId": "A", "step": 4 }, @@ -11782,7 +11782,7 @@ "expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", "hide": false, "interval": "", - "legendFormat": "max-wait-duration-({{runner_name}})", + "legendFormat": "max-wait-duration-{{runner_name}}", "refId": "B" } ], From 62d98f4dff589dc6b53f053ec3a576780e691799 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 17 May 2024 14:11:10 +0800 Subject: [PATCH 10/11] tiny fix Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 4c137272c05..f5220adfe2c 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -569,6 +569,7 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { taskRunner = c.heartbeatRunner + statisticsRunner = c.statisticsRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ From ed084151bc714bddf9863acf73092c9a8edc8867 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 21 May 2024 11:29:29 +0800 Subject: [PATCH 11/11] add more comments and rename Signed-off-by: Ryan Leung --- pkg/core/context.go | 18 ++++++------ pkg/mcs/scheduling/server/cluster.go | 35 +++++++++++++----------- pkg/ratelimit/runner.go | 3 +- server/cluster/cluster.go | 41 +++++++++++++++------------- server/cluster/cluster_worker.go | 16 +++++------ 5 files changed, 60 insertions(+), 53 deletions(-) diff --git a/pkg/core/context.go b/pkg/core/context.go index 5ed38d90eca..7410f8394c2 100644 --- a/pkg/core/context.go +++ b/pkg/core/context.go @@ -23,21 +23,21 @@ import ( // MetaProcessContext is a context for meta process. type MetaProcessContext struct { context.Context - Tracer RegionHeartbeatProcessTracer - TaskRunner ratelimit.Runner - StatisticsRunner ratelimit.Runner - LogRunner ratelimit.Runner + Tracer RegionHeartbeatProcessTracer + TaskRunner ratelimit.Runner + MiscRunner ratelimit.Runner + LogRunner ratelimit.Runner } // NewMetaProcessContext creates a new MetaProcessContext. // used in tests, can be changed if no need to test concurrency. func ContextTODO() *MetaProcessContext { return &MetaProcessContext{ - Context: context.TODO(), - Tracer: NewNoopHeartbeatProcessTracer(), - TaskRunner: ratelimit.NewSyncRunner(), - StatisticsRunner: ratelimit.NewSyncRunner(), - LogRunner: ratelimit.NewSyncRunner(), + Context: context.TODO(), + Tracer: NewNoopHeartbeatProcessTracer(), + TaskRunner: ratelimit.NewSyncRunner(), + MiscRunner: ratelimit.NewSyncRunner(), + LogRunner: ratelimit.NewSyncRunner(), // Limit default is nil } } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index f5220adfe2c..c6c365b03ad 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -54,9 +54,12 @@ type Cluster struct { clusterID uint64 running atomic.Bool - heartbeatRunner ratelimit.Runner - statisticsRunner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } const ( @@ -95,9 +98,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, clusterID: clusterID, checkMembershipCh: checkMembershipCh, - heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) @@ -535,7 +538,7 @@ func (c *Cluster) StartBackgroundJobs() { go c.runCoordinator() go c.runMetricsCollectionJob() c.heartbeatRunner.Start() - c.statisticsRunner.Start() + c.miscRunner.Start() c.logRunner.Start() c.running.Store(true) } @@ -548,7 +551,7 @@ func (c *Cluster) StopBackgroundJobs() { c.running.Store(false) c.coordinator.Stop() c.heartbeatRunner.Stop() - c.statisticsRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.cancel() c.wg.Wait() @@ -565,19 +568,19 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - var taskRunner, statisticsRunner, logRunner ratelimit.Runner - taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { taskRunner = c.heartbeatRunner - statisticsRunner = c.statisticsRunner + miscRunner = c.miscRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ - Context: c.ctx, - Tracer: tracer, - TaskRunner: taskRunner, - StatisticsRunner: statisticsRunner, - LogRunner: logRunner, + Context: c.ctx, + Tracer: tracer, + TaskRunner: taskRunner, + MiscRunner: miscRunner, + LogRunner: logRunner, } tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil { diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 5ac4d408a03..17a45067f3d 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -53,7 +53,8 @@ type Task struct { submittedAt time.Time f func(context.Context) name string - retained bool + // retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration. + retained bool } // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5a0e78f6609..148b43541a2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -174,9 +174,12 @@ type RaftCluster struct { independentServices sync.Map hbstreams *hbstream.HeartbeatStreams - heartbeatRunner ratelimit.Runner - statisticsRunner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } // Status saves some state information. @@ -193,16 +196,16 @@ type Status struct { func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, httpClient *http.Client) *RaftCluster { return &RaftCluster{ - serverCtx: ctx, - clusterID: clusterID, - regionSyncer: regionSyncer, - httpClient: httpClient, - etcdClient: etcdClient, - core: basicCluster, - storage: storage, - heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - statisticsRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + serverCtx: ctx, + clusterID: clusterID, + regionSyncer: regionSyncer, + httpClient: httpClient, + etcdClient: etcdClient, + core: basicCluster, + storage: storage, + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } } @@ -361,7 +364,7 @@ func (c *RaftCluster) Start(s Server) error { c.running = true c.heartbeatRunner.Start() - c.statisticsRunner.Start() + c.miscRunner.Start() c.logRunner.Start() return nil } @@ -757,7 +760,7 @@ func (c *RaftCluster) Stop() { c.stopSchedulingJobs() } c.heartbeatRunner.Stop() - c.statisticsRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.Unlock() @@ -1044,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - ctx.StatisticsRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { @@ -1095,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnUpdateSubTreeFinished() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.StatisticsRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.HandleOverlaps, func(_ context.Context) { @@ -1108,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats - ctx.StatisticsRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.CollectRegionStatsAsync, func(_ context.Context) { @@ -1122,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnCollectRegionStatsFinished() if c.storage != nil { if saveKV { - ctx.StatisticsRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.SaveRegionToKV, func(_ context.Context) { diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index a8d44dbec3e..39720e7d765 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -40,20 +40,20 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { tracer = core.NewHeartbeatProcessTracer() } defer tracer.Release() - var taskRunner, statisticsRunner, logRunner ratelimit.Runner - taskRunner, statisticsRunner, logRunner = syncRunner, syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { taskRunner = c.heartbeatRunner - statisticsRunner = c.statisticsRunner + miscRunner = c.miscRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ - Context: c.ctx, - Tracer: tracer, - TaskRunner: taskRunner, - StatisticsRunner: statisticsRunner, - LogRunner: logRunner, + Context: c.ctx, + Tracer: tracer, + TaskRunner: taskRunner, + MiscRunner: miscRunner, + LogRunner: logRunner, } tracer.Begin() if err := c.processRegionHeartbeat(ctx, region); err != nil {