Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use context to manage runner #8394

Merged
merged 4 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,9 @@ func (c *Cluster) StartBackgroundJobs() {
go c.runUpdateStoreStats()
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.heartbeatRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
c.heartbeatRunner.Start(c.ctx)
c.miscRunner.Start(c.ctx)
c.logRunner.Start(c.ctx)
c.running.Store(true)
}

Expand Down
26 changes: 16 additions & 10 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
// Runner is the interface for running tasks.
type Runner interface {
RunTask(id uint64, name string, f func(), opts ...TaskOption) error
Start()
Start(ctx context.Context)
Stop()
}

Expand All @@ -66,12 +66,13 @@ type taskID struct {
}

type ConcurrentRunner struct {
ctx context.Context
cancel context.CancelFunc
name string
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int
pendingTasks []*Task
Expand Down Expand Up @@ -103,8 +104,8 @@ func WithRetained(retained bool) TaskOption {
}

// Start starts the runner.
func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
func (cr *ConcurrentRunner) Start(ctx context.Context) {
cr.ctx, cr.cancel = context.WithCancel(ctx)
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
Expand All @@ -118,11 +119,11 @@ func (cr *ConcurrentRunner) Start() {
if err != nil {
continue
}
go cr.run(task, token)
go cr.run(cr.ctx, task, token)
} else {
go cr.run(task, nil)
go cr.run(cr.ctx, task, nil)
}
case <-cr.stopChan:
case <-cr.ctx.Done():
cr.pendingMu.Lock()
cr.pendingTasks = make([]*Task, 0, initialCapacity)
cr.pendingMu.Unlock()
Expand All @@ -144,8 +145,13 @@ func (cr *ConcurrentRunner) Start() {
}()
}

func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
func (cr *ConcurrentRunner) run(ctx context.Context, task *Task, token *TaskToken) {
start := time.Now()
select {
case <-ctx.Done():
return
Copy link
Contributor

@nolouch nolouch Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the race problem's main reason is the task of accessing the object without protection. here still cannot avoid this kind of problem, we should let the task be safely run concurrency in task logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed, but with the context, we can exit the goroutine ASAP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think it should pass context to the task, and then the task should consider exit ASAP by context cancel. here only prevent starting the new task. the initial version has this design :)

default:
}
task.f()
if token != nil {
cr.limiter.ReleaseToken(token)
Expand Down Expand Up @@ -173,7 +179,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {

// Stop stops the runner.
func (cr *ConcurrentRunner) Stop() {
close(cr.stopChan)
cr.cancel()
cr.wg.Wait()
}

Expand Down Expand Up @@ -238,7 +244,7 @@ func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error
}

// Start starts the runner.
func (*SyncRunner) Start() {}
func (*SyncRunner) Start(context.Context) {}

// Stop stops the runner.
func (*SyncRunner) Stop() {}
7 changes: 4 additions & 3 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ratelimit

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -25,7 +26,7 @@ import (
func TestConcurrentRunner(t *testing.T) {
t.Run("RunTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Second)
runner.Start()
runner.Start(context.TODO())
defer runner.Stop()

var wg sync.WaitGroup
Expand All @@ -47,7 +48,7 @@ func TestConcurrentRunner(t *testing.T) {

t.Run("MaxPendingDuration", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), 2*time.Millisecond)
runner.Start()
runner.Start(context.TODO())
defer runner.Stop()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestConcurrentRunner(t *testing.T) {

t.Run("DuplicatedTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute)
runner.Start()
runner.Start(context.TODO())
defer runner.Stop()
for i := 1; i < 11; i++ {
regionID := uint64(i)
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ func (c *RaftCluster) Start(s Server) error {
go c.startGCTuner()

c.running = true
c.heartbeatRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
c.heartbeatRunner.Start(c.ctx)
c.miscRunner.Start(c.ctx)
c.logRunner.Start(c.ctx)
return nil
}

Expand Down