Skip to content

Commit

Permalink
remove high priority task queue
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 17, 2024
1 parent 0f349a0 commit 2004b2d
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 98 deletions.
14 changes: 6 additions & 8 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
Expand Down Expand Up @@ -730,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel)
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -743,8 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) {
priority = constant.Medium
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) {
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
Expand Down Expand Up @@ -774,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache, priority = true, true, constant.High
saveKV, saveCache, retained = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -787,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache, priority = true, true, constant.High
saveKV, saveCache, retained = true, true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -798,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache, priority = true, true, constant.High
saveKV, saveCache, retained = true, true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
Expand All @@ -809,7 +807,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
// We check it first and do not return because the log is important for us to investigate,
saveCache, needSync, priority = true, true, constant.High
saveCache, needSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
Expand Down
10 changes: 3 additions & 7 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cluster"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/ratelimit"
Expand Down Expand Up @@ -603,7 +602,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _, priority := core.GenerateRegionGuideFunc(true)(ctx, region, origin)
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

if !saveCache {
// Due to some config changes need to update the region stats as well,
Expand All @@ -617,7 +616,6 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
cluster.Collect(c, region, hasRegionStats)
}
},
ratelimit.WithPriority(priority),
)
}
// region is not updated to the subtree.
Expand All @@ -628,7 +626,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithPriority(constant.High),
ratelimit.WithRetained(true),
)
}
return nil
Expand All @@ -652,7 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithPriority(priority),
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
Expand All @@ -661,7 +659,6 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
ratelimit.WithPriority(priority),
)
}
tracer.OnSaveCacheFinished()
Expand All @@ -672,7 +669,6 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
ratelimit.WithPriority(priority),
)
tracer.OnCollectRegionStatsFinished()
return nil
Expand Down
7 changes: 3 additions & 4 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import (
)

const (
nameStr = "runner_name"
taskStr = "task_type"
priorityStr = "priority"
nameStr = "runner_name"
taskStr = "task_type"
)

var (
Expand All @@ -38,7 +37,7 @@ var (
Subsystem: "ratelimit",
Name: "runner_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{nameStr, priorityStr, taskStr})
}, []string{nameStr, taskStr})
RunnerFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Expand Down
122 changes: 48 additions & 74 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/core/constant"
"go.uber.org/zap"
)

Expand All @@ -37,8 +36,8 @@ const (
)

const (
initialCapacity = 10000
maxHighPriorityTaskNum = 20000000
initialCapacity = 10000
maxPendingTaskNum = 20000000
)

// Runner is the interface for running tasks.
Expand All @@ -54,48 +53,46 @@ type Task struct {
submittedAt time.Time
f func(context.Context)
name string
priority constant.PriorityLevel
retained bool
}

// ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum.
var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded")

// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks.
type ConcurrentRunner struct {
name string
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingNormalPriorityTasks []*Task
pendingHighPriorityTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]map[string]int64
maxWaitingDuration prometheus.Gauge
name string
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
maxWaitingDuration prometheus.Gauge
}

// NewConcurrentRunner creates a new ConcurrentRunner.
func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner {
s := &ConcurrentRunner{
name: name,
limiter: limiter,
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingNormalPriorityTasks: make([]*Task, 0, initialCapacity),
pendingHighPriorityTasks: make([]*Task, 0, initialCapacity),
pendingTaskCount: make(map[string]map[string]int64),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
name: name,
limiter: limiter,
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
pendingTaskCount: make(map[string]int64),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
}

// TaskOption configures TaskOp
type TaskOption func(opts *Task)

// WithPriority sets the priority of the task.
func WithPriority(priority constant.PriorityLevel) TaskOption {
return func(opts *Task) { opts.priority = priority }
// WithRetained sets whether the task should be retained.
func WithRetained(retained bool) TaskOption {
return func(opts *Task) { opts.retained = retained }
}

// Start starts the runner.
Expand All @@ -119,21 +116,18 @@ func (cr *ConcurrentRunner) Start() {
}
case <-cr.stopChan:
cr.pendingMu.Lock()
cr.pendingNormalPriorityTasks = make([]*Task, 0, initialCapacity)
cr.pendingHighPriorityTasks = make([]*Task, 0, initialCapacity)
cr.pendingTasks = make([]*Task, 0, initialCapacity)
cr.pendingMu.Unlock()
log.Info("stopping async task runner", zap.String("name", cr.name))
return
case <-ticker.C:
maxDuration := time.Duration(0)
cr.pendingMu.Lock()
if len(cr.pendingNormalPriorityTasks) > 0 {
maxDuration = time.Since(cr.pendingNormalPriorityTasks[0].submittedAt)
if len(cr.pendingTasks) > 0 {
maxDuration = time.Since(cr.pendingTasks[0].submittedAt)
}
for name, priorityMap := range cr.pendingTaskCount {
for priority, cnt := range priorityMap {
RunnerPendingTasks.WithLabelValues(cr.name, priority, name).Set(float64(cnt))
}
for taskName, cnt := range cr.pendingTaskCount {
RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt))
}
cr.pendingMu.Unlock()
cr.maxWaitingDuration.Set(maxDuration.Seconds())
Expand All @@ -156,22 +150,12 @@ func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
func (cr *ConcurrentRunner) processPendingTasks() {
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
if len(cr.pendingHighPriorityTasks) > 0 {
task := cr.pendingHighPriorityTasks[0]
if len(cr.pendingTasks) > 0 {
task := cr.pendingTasks[0]
select {
case cr.taskChan <- task:
cr.pendingHighPriorityTasks = cr.pendingHighPriorityTasks[1:]
cr.pendingTaskCount[task.priority.String()][task.name]--
default:
}
return
}
if len(cr.pendingNormalPriorityTasks) > 0 {
task := cr.pendingNormalPriorityTasks[0]
select {
case cr.taskChan <- task:
cr.pendingNormalPriorityTasks = cr.pendingNormalPriorityTasks[1:]
cr.pendingTaskCount[task.priority.String()][task.name]--
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTaskCount[task.name]--
default:
}
return
Expand All @@ -187,10 +171,9 @@ func (cr *ConcurrentRunner) Stop() {
// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error {
task := &Task{
ctx: ctx,
name: name,
f: f,
priority: constant.Medium,
ctx: ctx,
name: name,
f: f,
}
for _, opt := range opts {
opt(task)
Expand All @@ -201,35 +184,26 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
cr.pendingMu.Unlock()
cr.processPendingTasks()
}()
if task.priority >= constant.High {

pendingTaskNum := len(cr.pendingTasks)
if pendingTaskNum > 0 {
if !task.retained {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
// We use the max task number to limit the memory usage.
// It occupies around 1.5GB memory when there is 20000000 pending task.
if len(cr.pendingHighPriorityTasks) > maxHighPriorityTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
task.submittedAt = time.Now()
cr.pendingHighPriorityTasks = append(cr.pendingHighPriorityTasks, task)
if _, ok := cr.pendingTaskCount[task.priority.String()]; !ok {
cr.pendingTaskCount[task.priority.String()] = make(map[string]int64)
}
cr.pendingTaskCount[task.priority.String()][task.name]++
return nil
}

if len(cr.pendingNormalPriorityTasks) > 0 {
maxWait := time.Since(cr.pendingNormalPriorityTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
if len(cr.pendingTasks) > maxPendingTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingNormalPriorityTasks = append(cr.pendingNormalPriorityTasks, task)
if _, ok := cr.pendingTaskCount[task.priority.String()]; !ok {
cr.pendingTaskCount[task.priority.String()] = make(map[string]int64)
}
cr.pendingTaskCount[task.priority.String()][task.name]++
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingTaskCount[task.name]++
return nil
}

Expand Down
8 changes: 3 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cluster"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/gc"
Expand Down Expand Up @@ -1036,7 +1035,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync, priority := regionGuide(ctx, region, origin)
saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin)
tracer.OnRegionGuideFinished()
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
Expand All @@ -1063,7 +1062,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithPriority(constant.High),
ratelimit.WithRetained(true),
)
}
return nil
Expand Down Expand Up @@ -1091,7 +1090,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithPriority(priority),
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()

Expand Down Expand Up @@ -1147,7 +1146,6 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}
regionUpdateKVEventCounter.Inc()
},
ratelimit.WithPriority(priority),
)
}
}
Expand Down

0 comments on commit 2004b2d

Please sign in to comment.