From 7731ee4df6098257322a339c102b59655055a19f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 21 May 2024 17:22:17 +0800 Subject: [PATCH 1/2] pkg: reduce the allocation of observe (#8188) ref tikv/pd#7897 Signed-off-by: Ryan Leung --- pkg/statistics/region_collection.go | 124 +++++++++++++++++----------- 1 file changed, 78 insertions(+), 46 deletions(-) diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index e4c159cf22d..30197dd43ea 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -222,61 +222,93 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store // Better to make sure once any of these conditions changes, it will trigger the heartbeat `save_cache`. // Otherwise, the state may be out-of-date for a long time, which needs another way to apply the change ASAP. // For example, see `RegionStatsNeedUpdate` above to know how `OversizedRegion` and `UndersizedRegion` are updated. - conditions := map[RegionStatisticType]bool{ - MissPeer: len(peers) < desiredReplicas, - ExtraPeer: len(peers) > desiredReplicas, - DownPeer: len(downPeers) > 0, - PendingPeer: len(pendingPeers) > 0, - OfflinePeer: func() bool { - for _, store := range stores { - if store.IsRemoving() { - peer := region.GetStorePeer(store.GetID()) - if peer != nil { - return true - } - } + var conditions RegionStatisticType + if len(peers) < desiredReplicas { + conditions |= MissPeer + } + if len(peers) > desiredReplicas { + conditions |= ExtraPeer + } + if len(downPeers) > 0 { + conditions |= DownPeer + } + if len(pendingPeers) > 0 { + conditions |= PendingPeer + } + for _, store := range stores { + if store.IsRemoving() { + peer := region.GetStorePeer(store.GetID()) + if peer != nil { + conditions |= OfflinePeer + break } - return false - }(), - LearnerPeer: len(learners) > 0, - EmptyRegion: regionSize <= core.EmptyRegionApproximateSize, - OversizedRegion: region.IsOversized(regionMaxSize, regionMaxKeys), - UndersizedRegion: region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys), - WitnessLeader: leaderIsWitness, + } + } + if len(learners) > 0 { + conditions |= LearnerPeer + } + if regionSize <= core.EmptyRegionApproximateSize { + conditions |= EmptyRegion + } + if region.IsOversized(regionMaxSize, regionMaxKeys) { + conditions |= OversizedRegion + } + if region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys) { + conditions |= UndersizedRegion + } + if leaderIsWitness { + conditions |= WitnessLeader } // Check if the region meets any of the conditions and update the corresponding info. regionID := region.GetID() - for typ, c := range conditions { - if c { - info := r.stats[typ][regionID] - if typ == DownPeer { - if info == nil { - info = &RegionInfoWithTS{} - } - if info.(*RegionInfoWithTS).startDownPeerTS != 0 { - regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startDownPeerTS)) + for i := 0; i < len(regionStatisticTypes); i++ { + condition := RegionStatisticType(1 << i) + if conditions&condition == 0 { + continue + } + info := r.stats[condition][regionID] + // The condition is met + switch condition { + case MissPeer: + if info == nil { + info = &RegionInfoWithTS{} + } + if len(voters) < desiredVoters { + if info.(*RegionInfoWithTS).startMissVoterPeerTS != 0 { + regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startMissVoterPeerTS)) } else { - info.(*RegionInfoWithTS).startDownPeerTS = time.Now().Unix() - logDownPeerWithNoDisconnectedStore(region, stores) - } - } else if typ == MissPeer { - if info == nil { - info = &RegionInfoWithTS{} - } - if len(voters) < desiredVoters { - if info.(*RegionInfoWithTS).startMissVoterPeerTS != 0 { - regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startMissVoterPeerTS)) - } else { - info.(*RegionInfoWithTS).startMissVoterPeerTS = time.Now().Unix() - } + info.(*RegionInfoWithTS).startMissVoterPeerTS = time.Now().Unix() } + } + case DownPeer: + if info == nil { + info = &RegionInfoWithTS{} + } + if info.(*RegionInfoWithTS).startDownPeerTS != 0 { + regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startDownPeerTS)) } else { - info = struct{}{} + info.(*RegionInfoWithTS).startDownPeerTS = time.Now().Unix() + logDownPeerWithNoDisconnectedStore(region, stores) } - - r.stats[typ][regionID] = info - peerTypeIndex |= typ + case ExtraPeer: + fallthrough + case PendingPeer: + fallthrough + case OfflinePeer: + fallthrough + case LearnerPeer: + fallthrough + case EmptyRegion: + fallthrough + case OversizedRegion: + fallthrough + case UndersizedRegion: + fallthrough + case WitnessLeader: + info = struct{}{} } + r.stats[condition][regionID] = info + peerTypeIndex |= condition } // Remove the info if any of the conditions are not met any more. if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic { From 58e7580209f001248c3d530ef2d315ab3c6fd767 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 21 May 2024 18:11:47 +0800 Subject: [PATCH 2/2] *: use a separate runner for updating subtree (#8158) ref tikv/pd#7897 Signed-off-by: Ryan Leung --- metrics/grafana/pd.json | 10 ++-- pkg/core/context.go | 2 + pkg/core/region.go | 10 ++-- pkg/core/region_test.go | 6 +-- pkg/mcs/scheduling/server/cluster.go | 45 ++++++++++-------- pkg/ratelimit/metrics.go | 32 +++++++++---- pkg/ratelimit/runner.go | 70 ++++++++++++++++------------ pkg/syncer/client.go | 2 +- server/cluster/cluster.go | 58 ++++++++++++----------- server/cluster/cluster_worker.go | 8 ++-- 10 files changed, 142 insertions(+), 101 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index e6d314c2e00..54a047e612e 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -11651,12 +11651,12 @@ "targets": [ { "exemplar": true, - "expr": "pd_ratelimit_runner_task_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "expr": "pd_ratelimit_runner_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", "format": "time_series", "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "{{task_type}}_({{runner_name}})", + "legendFormat": "{{task_type}}_{{runner_name}}", "refId": "A", "step": 4 } @@ -11768,12 +11768,12 @@ "targets": [ { "exemplar": true, - "expr": "rate(pd_ratelimit_runner_task_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", + "expr": "rate(pd_ratelimit_runner_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", "format": "time_series", "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "failed-tasks-({{runner_name}})", + "legendFormat": "failed-tasks-{{runner_name}}", "refId": "A", "step": 4 }, @@ -11782,7 +11782,7 @@ "expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", "hide": false, "interval": "", - "legendFormat": "max-wait-duration-({{runner_name}})", + "legendFormat": "max-wait-duration-{{runner_name}}", "refId": "B" } ], diff --git a/pkg/core/context.go b/pkg/core/context.go index a0f51e55680..7410f8394c2 100644 --- a/pkg/core/context.go +++ b/pkg/core/context.go @@ -25,6 +25,7 @@ type MetaProcessContext struct { context.Context Tracer RegionHeartbeatProcessTracer TaskRunner ratelimit.Runner + MiscRunner ratelimit.Runner LogRunner ratelimit.Runner } @@ -35,6 +36,7 @@ func ContextTODO() *MetaProcessContext { Context: context.TODO(), Tracer: NewNoopHeartbeatProcessTracer(), TaskRunner: ratelimit.NewSyncRunner(), + MiscRunner: ratelimit.NewSyncRunner(), LogRunner: ratelimit.NewSyncRunner(), // Limit default is nil } diff --git a/pkg/core/region.go b/pkg/core/region.go index c9a8455d4de..a1a61d505a9 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -729,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -742,7 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) { logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i @@ -772,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache = true, true + saveKV, saveCache, retained = true, true, true } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -785,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - saveKV, saveCache = true, true + saveKV, saveCache, retained = true, true, true } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -796,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - saveKV, saveCache = true, true + saveKV, saveCache, retained = true, true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 1b8f20cf9b2..b09c1dfd601 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, needSync := RegionGuide(ContextTODO(), regionA, regionB) + _, _, needSync, _ := RegionGuide(ContextTODO(), regionA, regionB) re.Equal(testCase.needSync, needSync) } } @@ -1031,7 +1031,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) { regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA) re.Equal(int32(2), regionPendingItemA.GetRef()) // check new item - saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA) + saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemA, regionPendingItemA) re.True(needSync) re.True(saveCache) re.False(saveKV) @@ -1060,7 +1060,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) { re.Equal(int32(1), regionPendingItemB.GetRef()) // heartbeat again, no need updates root tree - saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB) + saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemB, regionItemB) re.False(needSync) re.False(saveCache) re.False(saveKV) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index d3691516868..c6c365b03ad 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -54,8 +54,12 @@ type Cluster struct { clusterID uint64 running atomic.Bool - heartbeatRunnner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } const ( @@ -64,8 +68,9 @@ const ( collectWaitTime = time.Minute // heartbeat relative const - heartbeatTaskRunner = "heartbeat-task-runner" - logTaskRunner = "log-task-runner" + heartbeatTaskRunner = "heartbeat-task-runner" + statisticsTaskRunner = "statistics-task-runner" + logTaskRunner = "log-task-runner" ) var syncRunner = ratelimit.NewSyncRunner() @@ -93,8 +98,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, clusterID: clusterID, checkMembershipCh: checkMembershipCh, - heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) @@ -531,7 +537,8 @@ func (c *Cluster) StartBackgroundJobs() { go c.runUpdateStoreStats() go c.runCoordinator() go c.runMetricsCollectionJob() - c.heartbeatRunnner.Start() + c.heartbeatRunner.Start() + c.miscRunner.Start() c.logRunner.Start() c.running.Store(true) } @@ -543,7 +550,8 @@ func (c *Cluster) StopBackgroundJobs() { } c.running.Store(false) c.coordinator.Stop() - c.heartbeatRunnner.Stop() + c.heartbeatRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.cancel() c.wg.Wait() @@ -560,16 +568,18 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - var taskRunner, logRunner ratelimit.Runner - taskRunner, logRunner = syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - taskRunner = c.heartbeatRunnner + taskRunner = c.heartbeatRunner + miscRunner = c.miscRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ Context: c.ctx, Tracer: tracer, TaskRunner: taskRunner, + MiscRunner: miscRunner, LogRunner: logRunner, } tracer.Begin() @@ -591,19 +601,12 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - - ctx.TaskRunner.RunTask( - ctx, - ratelimit.HandleStatsAsync, - func(_ context.Context) { - cluster.HandleStatsAsync(c, region) - }, - ) + cluster.HandleStatsAsync(c, region) tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + _, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, @@ -627,6 +630,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(true), ) } return nil @@ -650,6 +654,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index 5d4443a1cc4..c5510e66b26 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -31,25 +31,41 @@ var ( Name: "runner_task_max_waiting_duration_seconds", Help: "The duration of tasks waiting in the runner.", }, []string{nameStr}) - - RunnerTaskPendingTasks = prometheus.NewGaugeVec( + RunnerPendingTasks = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "ratelimit", - Name: "runner_task_pending_tasks", + Name: "runner_pending_tasks", Help: "The number of pending tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerTaskFailedTasks = prometheus.NewCounterVec( + RunnerFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "ratelimit", - Name: "runner_task_failed_tasks_total", + Name: "runner_failed_tasks_total", Help: "The number of failed tasks in the runner.", - }, []string{nameStr}) + }, []string{nameStr, taskStr}) + RunnerSucceededTasks = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_success_tasks_total", + Help: "The number of tasks in the runner.", + }, []string{nameStr, taskStr}) + RunnerTaskExecutionDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_execution_duration_seconds", + Help: "Bucketed histogram of processing time (s) of finished tasks.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{nameStr, taskStr}) ) func init() { prometheus.MustRegister(RunnerTaskMaxWaitingDuration) - prometheus.MustRegister(RunnerTaskPendingTasks) - prometheus.MustRegister(RunnerTaskFailedTasks) + prometheus.MustRegister(RunnerPendingTasks) + prometheus.MustRegister(RunnerFailedTasks) + prometheus.MustRegister(RunnerTaskExecutionDuration) + prometheus.MustRegister(RunnerSucceededTasks) } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 07233af238b..17a45067f3d 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -35,7 +35,10 @@ const ( SaveRegionToKV = "SaveRegionToKV" ) -const initialCapacity = 100 +const ( + initialCapacity = 10000 + maxPendingTaskNum = 20000000 +) // Runner is the interface for running tasks. type Runner interface { @@ -48,9 +51,10 @@ type Runner interface { type Task struct { ctx context.Context submittedAt time.Time - opts *TaskOpts f func(context.Context) name string + // retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration. + retained bool } // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. @@ -67,7 +71,6 @@ type ConcurrentRunner struct { stopChan chan struct{} wg sync.WaitGroup pendingTaskCount map[string]int64 - failedTaskCount prometheus.Counter maxWaitingDuration prometheus.Gauge } @@ -79,18 +82,19 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), - failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), pendingTaskCount: make(map[string]int64), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } -// TaskOpts is the options for RunTask. -type TaskOpts struct{} - // TaskOption configures TaskOp -type TaskOption func(opts *TaskOpts) +type TaskOption func(opts *Task) + +// WithRetained sets whether the task should be retained. +func WithRetained(retained bool) TaskOption { + return func(opts *Task) { opts.retained = retained } +} // Start starts the runner. func (cr *ConcurrentRunner) Start() { @@ -123,8 +127,8 @@ func (cr *ConcurrentRunner) Start() { if len(cr.pendingTasks) > 0 { maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } - for name, cnt := range cr.pendingTaskCount { - RunnerTaskPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt)) + for taskName, cnt := range cr.pendingTaskCount { + RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) @@ -134,26 +138,28 @@ func (cr *ConcurrentRunner) Start() { } func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { + start := time.Now() task.f(task.ctx) if token != nil { cr.limiter.ReleaseToken(token) cr.processPendingTasks() } + RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) + RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() } func (cr *ConcurrentRunner) processPendingTasks() { cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - for len(cr.pendingTasks) > 0 { + if len(cr.pendingTasks) > 0 { task := cr.pendingTasks[0] select { case cr.taskChan <- task: cr.pendingTasks = cr.pendingTasks[1:] cr.pendingTaskCount[task.name]-- - return default: - return } + return } } @@ -165,34 +171,40 @@ func (cr *ConcurrentRunner) Stop() { // RunTask runs the task asynchronously. func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { - taskOpts := &TaskOpts{} - for _, opt := range opts { - opt(taskOpts) - } task := &Task{ ctx: ctx, name: name, f: f, - opts: taskOpts, } - + for _, opt := range opts { + opt(task) + } cr.processPendingTasks() - select { - case cr.taskChan <- task: - default: - cr.pendingMu.Lock() - defer cr.pendingMu.Unlock() - if len(cr.pendingTasks) > 0 { + cr.pendingMu.Lock() + defer func() { + cr.pendingMu.Unlock() + cr.processPendingTasks() + }() + + pendingTaskNum := len(cr.pendingTasks) + if pendingTaskNum > 0 { + if !task.retained { maxWait := time.Since(cr.pendingTasks[0].submittedAt) if maxWait > cr.maxPendingDuration { - cr.failedTaskCount.Inc() + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } - task.submittedAt = time.Now() - cr.pendingTasks = append(cr.pendingTasks, task) - cr.pendingTaskCount[task.name]++ + // We use the max task number to limit the memory usage. + // It occupies around 1.5GB memory when there is 20000000 pending task. + if len(cr.pendingTasks) > maxPendingTaskNum { + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + return ErrMaxWaitingTasksExceeded + } } + task.submittedAt = time.Now() + cr.pendingTasks = append(cr.pendingTasks, task) + cr.pendingTaskCount[task.name]++ return nil } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 8a2e757d5cd..00fa8dc389b 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -212,7 +212,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { Tracer: core.NewNoopHeartbeatProcessTracer(), // no limit for followers. } - saveKV, _, _ := regionGuide(ctx, region, origin) + saveKV, _, _, _ := regionGuide(ctx, region, origin) overlaps := bc.PutRegion(region) if hasBuckets { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a8558051dfa..148b43541a2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -107,8 +107,9 @@ const ( minSnapshotDurationSec = 5 // heartbeat relative const - heartbeatTaskRunner = "heartbeat-async" - logTaskRunner = "log-async" + heartbeatTaskRunner = "heartbeat-async" + statisticsTaskRunner = "statistics-async" + logTaskRunner = "log-async" ) // Server is the interface for cluster. @@ -173,8 +174,12 @@ type RaftCluster struct { independentServices sync.Map hbstreams *hbstream.HeartbeatStreams - heartbeatRunnner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } // Status saves some state information. @@ -191,15 +196,16 @@ type Status struct { func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, httpClient *http.Client) *RaftCluster { return &RaftCluster{ - serverCtx: ctx, - clusterID: clusterID, - regionSyncer: regionSyncer, - httpClient: httpClient, - etcdClient: etcdClient, - core: basicCluster, - storage: storage, - heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + serverCtx: ctx, + clusterID: clusterID, + regionSyncer: regionSyncer, + httpClient: httpClient, + etcdClient: etcdClient, + core: basicCluster, + storage: storage, + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } } @@ -357,7 +363,8 @@ func (c *RaftCluster) Start(s Server) error { go c.startGCTuner() c.running = true - c.heartbeatRunnner.Start() + c.heartbeatRunner.Start() + c.miscRunner.Start() c.logRunner.Start() return nil } @@ -752,7 +759,8 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } - c.heartbeatRunnner.Stop() + c.heartbeatRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.Unlock() @@ -1024,19 +1032,13 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.TaskRunner.RunTask( - ctx.Context, - ratelimit.HandleStatsAsync, - func(_ context.Context) { - cluster.HandleStatsAsync(c, region) - }, - ) + cluster.HandleStatsAsync(c, region) } tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync := regionGuide(ctx, region, origin) + saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1045,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { @@ -1063,6 +1065,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(true), ) } return nil @@ -1090,11 +1093,12 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.HandleOverlaps, func(_ context.Context) { @@ -1107,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.CollectRegionStatsAsync, func(_ context.Context) { @@ -1121,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnCollectRegionStatsFinished() if c.storage != nil { if saveKV { - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.SaveRegionToKV, func(_ context.Context) { diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 43602dbb68d..39720e7d765 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -40,10 +40,11 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { tracer = core.NewHeartbeatProcessTracer() } defer tracer.Release() - var taskRunner, logRunner ratelimit.Runner - taskRunner, logRunner = syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - taskRunner = c.heartbeatRunnner + taskRunner = c.heartbeatRunner + miscRunner = c.miscRunner logRunner = c.logRunner } @@ -51,6 +52,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { Context: c.ctx, Tracer: tracer, TaskRunner: taskRunner, + MiscRunner: miscRunner, LogRunner: logRunner, } tracer.Begin()