Skip to content

Commit

Permalink
*: use context to manage runner (#8394)
Browse files Browse the repository at this point in the history
ref #8386

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Jul 22, 2024
1 parent 0985e2e commit 7136d69
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 19 deletions.
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,9 @@ func (c *Cluster) StartBackgroundJobs() {
go c.runUpdateStoreStats()
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.heartbeatRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
c.heartbeatRunner.Start(c.ctx)
c.miscRunner.Start(c.ctx)
c.logRunner.Start(c.ctx)
c.running.Store(true)
}

Expand Down
26 changes: 16 additions & 10 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
// Runner is the interface for running tasks.
type Runner interface {
RunTask(id uint64, name string, f func(), opts ...TaskOption) error
Start()
Start(ctx context.Context)
Stop()
}

Expand All @@ -66,12 +66,13 @@ type taskID struct {
}

type ConcurrentRunner struct {
ctx context.Context
cancel context.CancelFunc
name string
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int
pendingTasks []*Task
Expand Down Expand Up @@ -103,8 +104,8 @@ func WithRetained(retained bool) TaskOption {
}

// Start starts the runner.
func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
func (cr *ConcurrentRunner) Start(ctx context.Context) {
cr.ctx, cr.cancel = context.WithCancel(ctx)
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
Expand All @@ -118,11 +119,11 @@ func (cr *ConcurrentRunner) Start() {
if err != nil {
continue
}
go cr.run(task, token)
go cr.run(cr.ctx, task, token)
} else {
go cr.run(task, nil)
go cr.run(cr.ctx, task, nil)
}
case <-cr.stopChan:
case <-cr.ctx.Done():
cr.pendingMu.Lock()
cr.pendingTasks = make([]*Task, 0, initialCapacity)
cr.pendingMu.Unlock()
Expand All @@ -144,8 +145,13 @@ func (cr *ConcurrentRunner) Start() {
}()
}

func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
func (cr *ConcurrentRunner) run(ctx context.Context, task *Task, token *TaskToken) {
start := time.Now()
select {
case <-ctx.Done():
return
default:
}
task.f()
if token != nil {
cr.limiter.ReleaseToken(token)
Expand Down Expand Up @@ -173,7 +179,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {

// Stop stops the runner.
func (cr *ConcurrentRunner) Stop() {
close(cr.stopChan)
cr.cancel()
cr.wg.Wait()
}

Expand Down Expand Up @@ -238,7 +244,7 @@ func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error
}

// Start starts the runner.
func (*SyncRunner) Start() {}
func (*SyncRunner) Start(context.Context) {}

// Stop stops the runner.
func (*SyncRunner) Stop() {}
7 changes: 4 additions & 3 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ratelimit

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -25,7 +26,7 @@ import (
func TestConcurrentRunner(t *testing.T) {
t.Run("RunTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Second)
runner.Start()
runner.Start(context.TODO())
defer runner.Stop()

var wg sync.WaitGroup
Expand All @@ -47,7 +48,7 @@ func TestConcurrentRunner(t *testing.T) {

t.Run("MaxPendingDuration", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), 2*time.Millisecond)
runner.Start()
runner.Start(context.TODO())
defer runner.Stop()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestConcurrentRunner(t *testing.T) {

t.Run("DuplicatedTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute)
runner.Start()
runner.Start(context.TODO())
defer runner.Stop()
for i := 1; i < 11; i++ {
regionID := uint64(i)
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ func (c *RaftCluster) Start(s Server) error {
go c.startGCTuner()

c.running = true
c.heartbeatRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
c.heartbeatRunner.Start(c.ctx)
c.miscRunner.Start(c.ctx)
c.logRunner.Start(c.ctx)
return nil
}

Expand Down

0 comments on commit 7136d69

Please sign in to comment.