diff --git a/pkg/core/region.go b/pkg/core/region.go index c306de73e97..719d7cd0a57 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" - "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" @@ -730,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -743,8 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) { - priority = constant.Medium + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) { logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i @@ -774,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache, priority = true, true, constant.High + saveKV, saveCache, retained = true, true, true } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -787,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - saveKV, saveCache, priority = true, true, constant.High + saveKV, saveCache, retained = true, true, true } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -798,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - saveKV, saveCache, priority = true, true, constant.High + saveKV, saveCache, retained = true, true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { @@ -809,7 +807,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { ) } // We check it first and do not return because the log is important for us to investigate, - saveCache, needSync, priority = true, true, constant.High + saveCache, needSync = true, true } if len(region.GetPeers()) != len(origin.GetPeers()) { saveKV, saveCache = true, true diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 34483e7523d..4c137272c05 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -14,7 +14,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/ratelimit" @@ -603,7 +602,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _, priority := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + _, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, @@ -617,7 +616,6 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c cluster.Collect(c, region, hasRegionStats) } }, - ratelimit.WithPriority(priority), ) } // region is not updated to the subtree. @@ -628,7 +626,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithPriority(constant.High), + ratelimit.WithRetained(true), ) } return nil @@ -652,7 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithPriority(priority), + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( @@ -661,7 +659,6 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, - ratelimit.WithPriority(priority), ) } tracer.OnSaveCacheFinished() @@ -672,7 +669,6 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, - ratelimit.WithPriority(priority), ) tracer.OnCollectRegionStatsFinished() return nil diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index 8cb69fcc2fa..c5510e66b26 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -19,9 +19,8 @@ import ( ) const ( - nameStr = "runner_name" - taskStr = "task_type" - priorityStr = "priority" + nameStr = "runner_name" + taskStr = "task_type" ) var ( @@ -38,7 +37,7 @@ var ( Subsystem: "ratelimit", Name: "runner_pending_tasks", Help: "The number of pending tasks in the runner.", - }, []string{nameStr, priorityStr, taskStr}) + }, []string{nameStr, taskStr}) RunnerFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 0ce3a77df22..5ac4d408a03 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" - "github.com/tikv/pd/pkg/core/constant" "go.uber.org/zap" ) @@ -37,8 +36,8 @@ const ( ) const ( - initialCapacity = 10000 - maxHighPriorityTaskNum = 20000000 + initialCapacity = 10000 + maxPendingTaskNum = 20000000 ) // Runner is the interface for running tasks. @@ -54,7 +53,7 @@ type Task struct { submittedAt time.Time f func(context.Context) name string - priority constant.PriorityLevel + retained bool } // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. @@ -62,30 +61,28 @@ var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") // ConcurrentRunner is a simple task runner that limits the number of concurrent tasks. type ConcurrentRunner struct { - name string - limiter *ConcurrencyLimiter - maxPendingDuration time.Duration - taskChan chan *Task - pendingNormalPriorityTasks []*Task - pendingHighPriorityTasks []*Task - pendingMu sync.Mutex - stopChan chan struct{} - wg sync.WaitGroup - pendingTaskCount map[string]map[string]int64 - maxWaitingDuration prometheus.Gauge + name string + limiter *ConcurrencyLimiter + maxPendingDuration time.Duration + taskChan chan *Task + pendingTasks []*Task + pendingMu sync.Mutex + stopChan chan struct{} + wg sync.WaitGroup + pendingTaskCount map[string]int64 + maxWaitingDuration prometheus.Gauge } // NewConcurrentRunner creates a new ConcurrentRunner. func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner { s := &ConcurrentRunner{ - name: name, - limiter: limiter, - maxPendingDuration: maxPendingDuration, - taskChan: make(chan *Task), - pendingNormalPriorityTasks: make([]*Task, 0, initialCapacity), - pendingHighPriorityTasks: make([]*Task, 0, initialCapacity), - pendingTaskCount: make(map[string]map[string]int64), - maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), + name: name, + limiter: limiter, + maxPendingDuration: maxPendingDuration, + taskChan: make(chan *Task), + pendingTasks: make([]*Task, 0, initialCapacity), + pendingTaskCount: make(map[string]int64), + maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } @@ -93,9 +90,9 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur // TaskOption configures TaskOp type TaskOption func(opts *Task) -// WithPriority sets the priority of the task. -func WithPriority(priority constant.PriorityLevel) TaskOption { - return func(opts *Task) { opts.priority = priority } +// WithRetained sets whether the task should be retained. +func WithRetained(retained bool) TaskOption { + return func(opts *Task) { opts.retained = retained } } // Start starts the runner. @@ -119,21 +116,18 @@ func (cr *ConcurrentRunner) Start() { } case <-cr.stopChan: cr.pendingMu.Lock() - cr.pendingNormalPriorityTasks = make([]*Task, 0, initialCapacity) - cr.pendingHighPriorityTasks = make([]*Task, 0, initialCapacity) + cr.pendingTasks = make([]*Task, 0, initialCapacity) cr.pendingMu.Unlock() log.Info("stopping async task runner", zap.String("name", cr.name)) return case <-ticker.C: maxDuration := time.Duration(0) cr.pendingMu.Lock() - if len(cr.pendingNormalPriorityTasks) > 0 { - maxDuration = time.Since(cr.pendingNormalPriorityTasks[0].submittedAt) + if len(cr.pendingTasks) > 0 { + maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } - for name, priorityMap := range cr.pendingTaskCount { - for priority, cnt := range priorityMap { - RunnerPendingTasks.WithLabelValues(cr.name, priority, name).Set(float64(cnt)) - } + for taskName, cnt := range cr.pendingTaskCount { + RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) @@ -156,22 +150,12 @@ func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { func (cr *ConcurrentRunner) processPendingTasks() { cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - if len(cr.pendingHighPriorityTasks) > 0 { - task := cr.pendingHighPriorityTasks[0] + if len(cr.pendingTasks) > 0 { + task := cr.pendingTasks[0] select { case cr.taskChan <- task: - cr.pendingHighPriorityTasks = cr.pendingHighPriorityTasks[1:] - cr.pendingTaskCount[task.priority.String()][task.name]-- - default: - } - return - } - if len(cr.pendingNormalPriorityTasks) > 0 { - task := cr.pendingNormalPriorityTasks[0] - select { - case cr.taskChan <- task: - cr.pendingNormalPriorityTasks = cr.pendingNormalPriorityTasks[1:] - cr.pendingTaskCount[task.priority.String()][task.name]-- + cr.pendingTasks = cr.pendingTasks[1:] + cr.pendingTaskCount[task.name]-- default: } return @@ -187,10 +171,9 @@ func (cr *ConcurrentRunner) Stop() { // RunTask runs the task asynchronously. func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { task := &Task{ - ctx: ctx, - name: name, - f: f, - priority: constant.Medium, + ctx: ctx, + name: name, + f: f, } for _, opt := range opts { opt(task) @@ -201,35 +184,26 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con cr.pendingMu.Unlock() cr.processPendingTasks() }() - if task.priority >= constant.High { + + pendingTaskNum := len(cr.pendingTasks) + if pendingTaskNum > 0 { + if !task.retained { + maxWait := time.Since(cr.pendingTasks[0].submittedAt) + if maxWait > cr.maxPendingDuration { + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + return ErrMaxWaitingTasksExceeded + } + } // We use the max task number to limit the memory usage. // It occupies around 1.5GB memory when there is 20000000 pending task. - if len(cr.pendingHighPriorityTasks) > maxHighPriorityTaskNum { - RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() - return ErrMaxWaitingTasksExceeded - } - task.submittedAt = time.Now() - cr.pendingHighPriorityTasks = append(cr.pendingHighPriorityTasks, task) - if _, ok := cr.pendingTaskCount[task.priority.String()]; !ok { - cr.pendingTaskCount[task.priority.String()] = make(map[string]int64) - } - cr.pendingTaskCount[task.priority.String()][task.name]++ - return nil - } - - if len(cr.pendingNormalPriorityTasks) > 0 { - maxWait := time.Since(cr.pendingNormalPriorityTasks[0].submittedAt) - if maxWait > cr.maxPendingDuration { + if len(cr.pendingTasks) > maxPendingTaskNum { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } task.submittedAt = time.Now() - cr.pendingNormalPriorityTasks = append(cr.pendingNormalPriorityTasks, task) - if _, ok := cr.pendingTaskCount[task.priority.String()]; !ok { - cr.pendingTaskCount[task.priority.String()] = make(map[string]int64) - } - cr.pendingTaskCount[task.priority.String()][task.name]++ + cr.pendingTasks = append(cr.pendingTasks, task) + cr.pendingTaskCount[task.name]++ return nil } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 39f1fc82bdd..5a0e78f6609 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/gc" @@ -1036,7 +1035,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync, priority := regionGuide(ctx, region, origin) + saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1063,7 +1062,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithPriority(constant.High), + ratelimit.WithRetained(true), ) } return nil @@ -1091,7 +1090,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, - ratelimit.WithPriority(priority), + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() @@ -1147,7 +1146,6 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio } regionUpdateKVEventCounter.Inc() }, - ratelimit.WithPriority(priority), ) } }