Skip to content

Commit

Permalink
remove old duplicated task
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 31, 2024
1 parent c498063 commit 75a0028
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 8 deletions.
2 changes: 2 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
debug = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
region.GetID(),

Check warning on line 758 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L758

Added line #L758 was not covered by tests
"DebugLog",
func(_ context.Context) {
d(msg, fields...)
Expand All @@ -764,6 +765,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
info = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
region.GetID(),
"InfoLog",
func(_ context.Context) {
i(msg, fields...)
Expand Down
7 changes: 6 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,14 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

regionID := region.GetID()
if !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
regionID,

Check warning on line 617 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L617

Added line #L617 was not covered by tests
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -626,6 +627,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,

Check warning on line 630 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L630

Added line #L630 was not covered by tests
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
Expand All @@ -650,6 +652,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
Expand All @@ -659,6 +662,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
Expand All @@ -669,6 +673,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
Expand Down
27 changes: 20 additions & 7 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ratelimit
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -42,14 +43,15 @@ const (

// Runner is the interface for running tasks.
type Runner interface {
RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error
RunTask(ctx context.Context, regionID uint64, name string, f func(context.Context), opts ...TaskOption) error
Start()
Stop()
}

// Task is a task to be run.
type Task struct {
ctx context.Context
regionID uint64
submittedAt time.Time
f func(context.Context)
name string
Expand All @@ -71,6 +73,7 @@ type ConcurrentRunner struct {
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
pendingRegionTasks map[string]*Task
maxWaitingDuration prometheus.Gauge
}

Expand All @@ -83,6 +86,7 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
pendingTaskCount: make(map[string]int64),
pendingRegionTasks: make(map[string]*Task),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
Expand All @@ -101,6 +105,7 @@ func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
go func() {
defer cr.wg.Done()
for {
Expand Down Expand Up @@ -157,6 +162,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTaskCount[task.name]--
delete(cr.pendingRegionTasks, fmt.Sprintf("%d-%s", task.regionID, task.name))
default:
}
return
Expand All @@ -170,11 +176,13 @@ func (cr *ConcurrentRunner) Stop() {
}

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error {
func (cr *ConcurrentRunner) RunTask(ctx context.Context, regionID uint64, name string, f func(context.Context), opts ...TaskOption) error {
task := &Task{
ctx: ctx,
name: name,
f: f,
ctx: ctx,
regionID: regionID,
name: name,
f: f,
submittedAt: time.Now(),
}
for _, opt := range opts {
opt(task)
Expand All @@ -187,7 +195,12 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
}()

pendingTaskNum := len(cr.pendingTasks)
taskID := fmt.Sprintf("%d-%s", regionID, name)
if pendingTaskNum > 0 {
if _, ok := cr.pendingRegionTasks[taskID]; ok {
cr.pendingRegionTasks[taskID] = task
return nil
}
if !task.retained {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
Expand All @@ -202,9 +215,9 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingTaskCount[task.name]++
cr.pendingRegionTasks[taskID] = task
return nil
}

Expand All @@ -217,7 +230,7 @@ func NewSyncRunner() *SyncRunner {
}

// RunTask runs the task synchronously.
func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error {
func (*SyncRunner) RunTask(ctx context.Context, _ uint64, _ string, f func(context.Context), _ ...TaskOption) error {
f(ctx)
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestConcurrentRunner(t *testing.T) {
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test1",
func(context.Context) {
defer wg.Done()
Expand All @@ -55,6 +56,7 @@ func TestConcurrentRunner(t *testing.T) {
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test2",
func(context.Context) {
defer wg.Done()
Expand Down
7 changes: 7 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin)
tracer.OnRegionGuideFinished()
regionID := region.GetID()
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
Expand All @@ -1047,6 +1048,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -1059,6 +1061,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
Expand Down Expand Up @@ -1087,6 +1090,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
Expand All @@ -1098,6 +1102,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
Expand All @@ -1111,6 +1116,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// handle region stats
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
Expand All @@ -1125,6 +1131,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if saveKV {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.SaveRegionToKV,
func(_ context.Context) {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down

0 comments on commit 75a0028

Please sign in to comment.