diff --git a/pkg/core/region.go b/pkg/core/region.go index da591436ea02..c306de73e974 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" @@ -729,7 +730,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -742,7 +743,8 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool) { + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) { + priority = constant.Medium logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i @@ -772,7 +774,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache, metaUpdated = true, true, true + saveKV, saveCache, priority = true, true, constant.High } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -785,7 +787,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - saveKV, saveCache, metaUpdated = true, true, true + saveKV, saveCache, priority = true, true, constant.High } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -796,7 +798,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - saveKV, saveCache, metaUpdated = true, true, true + saveKV, saveCache, priority = true, true, constant.High } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { @@ -807,7 +809,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { ) } // We check it first and do not return because the log is important for us to investigate, - saveCache, needSync, metaUpdated = true, true, true + saveCache, needSync, priority = true, true, constant.High } if len(region.GetPeers()) != len(origin.GetPeers()) { saveKV, saveCache = true, true diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index c306c642b33c..b13ff4c710d4 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/ratelimit" @@ -591,19 +592,12 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - - ctx.TaskRunner.RunTask( - ctx, - ratelimit.HandleStatsAsync, - func(_ context.Context) { - cluster.HandleStatsAsync(c, region) - }, - ) + cluster.HandleStatsAsync(c, region) tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _, metaUpdated := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + _, saveCache, _, priority := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, @@ -617,11 +611,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c cluster.Collect(c, region, hasRegionStats) } }, -<<<<<<< HEAD -======= - ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync), - ratelimit.WithMetaUpdated(metaUpdated), ->>>>>>> distinguish the task priority + ratelimit.WithPriority(priority), ) } // region is not updated to the subtree. @@ -632,11 +622,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, -<<<<<<< HEAD -======= - ratelimit.WithTaskName(ratelimit.UpdateSubTree), - ratelimit.WithMetaUpdated(metaUpdated), ->>>>>>> distinguish the task priority + ratelimit.WithPriority(constant.High), ) } return nil @@ -660,11 +646,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, -<<<<<<< HEAD -======= - ratelimit.WithTaskName(ratelimit.UpdateSubTree), - ratelimit.WithMetaUpdated(metaUpdated), ->>>>>>> distinguish the task priority + ratelimit.WithPriority(priority), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( @@ -673,11 +655,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, -<<<<<<< HEAD -======= - ratelimit.WithTaskName(ratelimit.HandleOverlaps), - ratelimit.WithMetaUpdated(metaUpdated), ->>>>>>> distinguish the task priority + ratelimit.WithPriority(priority), ) } tracer.OnSaveCacheFinished() @@ -688,11 +666,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, -<<<<<<< HEAD -======= - ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync), - ratelimit.WithMetaUpdated(metaUpdated), ->>>>>>> distinguish the task priority + ratelimit.WithPriority(priority), ) tracer.OnCollectRegionStatsFinished() return nil diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 2e5786d2ab2c..3ec552fb840d 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/pkg/core/constant" "go.uber.org/zap" ) @@ -35,7 +36,7 @@ const ( SaveRegionToKV = "SaveRegionToKV" ) -const initialCapacity = 100 +const initialCapacity = 10000 // Runner is the interface for running tasks. type Runner interface { @@ -88,16 +89,15 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur // TaskOpts is the options for RunTask. type TaskOpts struct { - // IsMetaUpdated indicates whether the meta is updated. - MetaUpdated bool + priority constant.PriorityLevel } // TaskOption configures TaskOp type TaskOption func(opts *TaskOpts) -// WithMetaUpdated specify whether the meta is updated. -func WithMetaUpdated(metaUpdated bool) TaskOption { - return func(opts *TaskOpts) { opts.MetaUpdated = metaUpdated } +// WithPriority sets the priority of the task. +func WithPriority(priority constant.PriorityLevel) TaskOption { + return func(opts *TaskOpts) { opts.priority = priority } } // Start starts the runner. @@ -202,9 +202,16 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con cr.processPendingTasks() cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - if task.opts.MetaUpdated { + if task.opts.priority >= constant.High { + if len(cr.pendingHighPriorityTasks) > 0 { + maxWait := time.Since(cr.pendingHighPriorityTasks[0].submittedAt) + if maxWait > cr.maxPendingDuration { + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + return ErrMaxWaitingTasksExceeded + } + } task.submittedAt = time.Now() - cr.pendingHighPriorityTasks = append(cr.pendingNormalPriorityTasks, task) + cr.pendingHighPriorityTasks = append(cr.pendingHighPriorityTasks, task) cr.pendingTaskCount[task.name]++ return nil } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 0aa580230c44..a1107fce48b6 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/gc" @@ -1030,7 +1031,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync, metaUpdated := regionGuide(ctx, region, origin) + saveKV, saveCache, needSync, priority := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1047,7 +1048,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio cluster.Collect(c, region, hasRegionStats) } }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) } // region is not updated to the subtree. @@ -1058,7 +1059,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithMetaUpdated(true), + ratelimit.WithPriority(constant.High), ) } return nil @@ -1086,7 +1087,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) tracer.OnUpdateSubTreeFinished() @@ -1097,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) } regionUpdateCacheEventCounter.Inc() @@ -1114,7 +1115,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // We need to think of a better way to reduce this part of the cost in the future. cluster.Collect(c, region, hasRegionStats) }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) tracer.OnCollectRegionStatsFinished() @@ -1144,7 +1145,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } regionUpdateKVEventCounter.Inc() }, - ratelimit.WithMetaUpdated(metaUpdated), + ratelimit.WithPriority(priority), ) } }