Skip to content

Commit

Permalink
distinguish the task priority
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 9, 2024
1 parent a3c5950 commit 9d375e1
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 56 deletions.
12 changes: 6 additions & 6 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool)
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -743,7 +743,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool) {
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
Expand Down Expand Up @@ -773,7 +773,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache = true, true
saveKV, saveCache, metaUpdated = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -786,7 +786,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, metaUpdated = true, true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -797,7 +797,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache = true, true
saveKV, saveCache, metaUpdated = true, true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
Expand All @@ -808,7 +808,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
// We check it first and do not return because the log is important for us to investigate,
saveCache, needSync = true, true
saveCache, needSync, metaUpdated = true, true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
Expand Down
6 changes: 3 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, needSync := RegionGuide(ContextTODO(), regionA, regionB)
_, _, needSync, _ := RegionGuide(ContextTODO(), regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down Expand Up @@ -980,7 +980,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA)
re.Equal(int32(2), regionPendingItemA.GetRef())
// check new item
saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemA, regionPendingItemA)
re.True(needSync)
re.True(saveCache)
re.False(saveKV)
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) {
re.Equal(int32(1), regionPendingItemB.GetRef())

// heartbeat again, no need updates root tree
saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB)
saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemB, regionItemB)
re.False(needSync)
re.False(saveCache)
re.False(saveKV)
Expand Down
7 changes: 6 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin)
_, saveCache, _, metaUpdated := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

if !saveCache {
// Due to some config changes need to update the region stats as well,
Expand All @@ -617,6 +617,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
}
},
ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync),
ratelimit.WithMetaUpdated(metaUpdated),
)
}
// region is not updated to the subtree.
Expand All @@ -627,6 +628,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
ratelimit.WithMetaUpdated(metaUpdated),
)
}
return nil
Expand All @@ -650,6 +652,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
ratelimit.WithMetaUpdated(metaUpdated),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
Expand All @@ -658,6 +661,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
cluster.HandleOverlaps(c, overlaps)
},
ratelimit.WithTaskName(ratelimit.HandleOverlaps),
ratelimit.WithMetaUpdated(metaUpdated),
)
}
tracer.OnSaveCacheFinished()
Expand All @@ -668,6 +672,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
cluster.Collect(c, region, hasRegionStats)
},
ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync),
ratelimit.WithMetaUpdated(metaUpdated),
)
tracer.OnCollectRegionStatsFinished()
return nil
Expand Down
33 changes: 25 additions & 8 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,41 @@ var (
Help: "The duration of tasks waiting in the runner.",
}, []string{nameStr})

RunnerTaskPendingTasks = prometheus.NewGaugeVec(
RunnerPendingTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_pending_tasks",
Name: "runner_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{nameStr})
RunnerTaskFailedTasks = prometheus.NewCounterVec(
}, []string{nameStr, "type"})
RunnerFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_failed_tasks_total",
Name: "runner_failed_tasks_total",
Help: "The number of failed tasks in the runner.",
}, []string{nameStr})
}, []string{nameStr, "type"})
RunnerSucceededTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_success_tasks_total",
Help: "The number of tasks in the runner.",
}, []string{nameStr, "type"})
RunnerTaskExecutionDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_execution_duration_seconds",
Help: "Bucketed histogram of processing time (s) of finished tasks.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
}, []string{nameStr, "type"})
)

func init() {
prometheus.MustRegister(RunnerTaskMaxWaitingDuration)
prometheus.MustRegister(RunnerTaskPendingTasks)
prometheus.MustRegister(RunnerTaskFailedTasks)
prometheus.MustRegister(RunnerPendingTasks)
prometheus.MustRegister(RunnerFailedTasks)
prometheus.MustRegister(RunnerTaskExecutionDuration)
prometheus.MustRegister(RunnerSucceededTasks)
}
81 changes: 52 additions & 29 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ type ConcurrentRunner struct {
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingTasks []*Task
pendingNormalTasks []*Task
pendingMetaTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
failedTaskCount prometheus.Counter
maxWaitingDuration prometheus.Gauge
}

Expand All @@ -76,8 +76,8 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
limiter: limiter,
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name),
pendingNormalTasks: make([]*Task, 0, initialCapacity),
pendingMetaTasks: make([]*Task, 0, initialCapacity),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
Expand All @@ -87,6 +87,8 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
type TaskOpts struct {
// TaskName is a human-readable name for the operation. TODO: metrics by name.
TaskName string
// IsMetaUpdated indicates whether the meta is updated.
MetaUpdated bool
}

// TaskOption configures TaskOp
Expand All @@ -97,6 +99,11 @@ func WithTaskName(name string) TaskOption {
return func(opts *TaskOpts) { opts.TaskName = name }
}

// WithMetaUpdated specify whether the meta is updated.
func WithMetaUpdated(metaUpdated bool) TaskOption {
return func(opts *TaskOpts) { opts.MetaUpdated = metaUpdated }
}

// Start starts the runner.
func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
Expand All @@ -112,21 +119,22 @@ func (cr *ConcurrentRunner) Start() {
if err != nil {
continue
}
go cr.run(task.Ctx, task.f, token)
go cr.run(task, token)
} else {
go cr.run(task.Ctx, task.f, nil)
go cr.run(task, nil)

Check warning on line 124 in pkg/ratelimit/runner.go

View check run for this annotation

Codecov / codecov/patch

pkg/ratelimit/runner.go#L124

Added line #L124 was not covered by tests
}
case <-cr.stopChan:
cr.pendingMu.Lock()
cr.pendingTasks = make([]*Task, 0, initialCapacity)
cr.pendingNormalTasks = make([]*Task, 0, initialCapacity)
cr.pendingMetaTasks = make([]*Task, 0, initialCapacity)
cr.pendingMu.Unlock()
log.Info("stopping async task runner", zap.String("name", cr.name))
return
case <-ticker.C:
maxDuration := time.Duration(0)
cr.pendingMu.Lock()
if len(cr.pendingTasks) > 0 {
maxDuration = time.Since(cr.pendingTasks[0].submittedAt)
if len(cr.pendingNormalTasks) > 0 {
maxDuration = time.Since(cr.pendingNormalTasks[0].submittedAt)

Check warning on line 137 in pkg/ratelimit/runner.go

View check run for this annotation

Codecov / codecov/patch

pkg/ratelimit/runner.go#L137

Added line #L137 was not covered by tests
}
cr.pendingMu.Unlock()
cr.maxWaitingDuration.Set(maxDuration.Seconds())
Expand All @@ -135,8 +143,13 @@ func (cr *ConcurrentRunner) Start() {
}()
}

func (cr *ConcurrentRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) {
task(ctx)
func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
start := time.Now()
defer func() {
RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.Opts.TaskName).Observe(time.Since(start).Seconds())
RunnerSucceededTasks.WithLabelValues(cr.name, task.Opts.TaskName).Inc()
}()
task.f(task.Ctx)
if token != nil {
token.Release()
cr.processPendingTasks()
Expand All @@ -146,15 +159,23 @@ func (cr *ConcurrentRunner) run(ctx context.Context, task func(context.Context),
func (cr *ConcurrentRunner) processPendingTasks() {
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
for len(cr.pendingTasks) > 0 {
task := cr.pendingTasks[0]
if len(cr.pendingMetaTasks) > 0 {
task := cr.pendingMetaTasks[0]
select {
case cr.taskChan <- task:
cr.pendingMetaTasks = cr.pendingMetaTasks[1:]
default:
}
return
}
if len(cr.pendingNormalTasks) > 0 {
task := cr.pendingNormalTasks[0]
select {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
return
cr.pendingNormalTasks = cr.pendingNormalTasks[1:]
default:
return
}
return
}
}

Expand All @@ -177,21 +198,23 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context)
}

cr.processPendingTasks()
select {
case cr.taskChan <- task:
default:
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
if len(cr.pendingTasks) > 0 {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
cr.failedTaskCount.Inc()
return ErrMaxWaitingTasksExceeded
}
}
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
if task.Opts.MetaUpdated {
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingMetaTasks = append(cr.pendingMetaTasks, task)
return nil
}

if len(cr.pendingNormalTasks) > 0 {
maxWait := time.Since(cr.pendingNormalTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, taskOpts.TaskName).Inc()
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingNormalTasks = append(cr.pendingNormalTasks, task)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
Tracer: core.NewNoopHeartbeatProcessTracer(),
// no limit for followers.
}
saveKV, _, _ := regionGuide(ctx, region, origin)
saveKV, _, _, _ := regionGuide(ctx, region, origin)
overlaps := bc.PutRegion(region)

if hasBuckets {
Expand Down
Loading

0 comments on commit 9d375e1

Please sign in to comment.