Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use context to manage runner #8394

Merged
merged 4 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(ctx, heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(ctx, miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(ctx, logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel())
Expand Down
24 changes: 16 additions & 8 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@
}

type ConcurrentRunner struct {
ctx context.Context
cancel context.CancelFunc
name string
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int
pendingTasks []*Task
Expand All @@ -80,8 +81,11 @@
}

// NewConcurrentRunner creates a new ConcurrentRunner.
func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner {
func NewConcurrentRunner(ctx context.Context, name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner {
ctx, cancel := context.WithCancel(ctx)
s := &ConcurrentRunner{
ctx: ctx,
cancel: cancel,
name: name,
limiter: limiter,
maxPendingDuration: maxPendingDuration,
Expand All @@ -104,7 +108,6 @@

// Start starts the runner.
func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
Expand All @@ -118,11 +121,11 @@
if err != nil {
continue
}
go cr.run(task, token)
go cr.run(cr.ctx, task, token)
} else {
go cr.run(task, nil)
go cr.run(cr.ctx, task, nil)

Check warning on line 126 in pkg/ratelimit/runner.go

View check run for this annotation

Codecov / codecov/patch

pkg/ratelimit/runner.go#L126

Added line #L126 was not covered by tests
}
case <-cr.stopChan:
case <-cr.ctx.Done():
cr.pendingMu.Lock()
cr.pendingTasks = make([]*Task, 0, initialCapacity)
cr.pendingMu.Unlock()
Expand All @@ -144,8 +147,13 @@
}()
}

func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
func (cr *ConcurrentRunner) run(ctx context.Context, task *Task, token *TaskToken) {
start := time.Now()
select {
case <-ctx.Done():
return
Copy link
Contributor

@nolouch nolouch Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the race problem's main reason is the task of accessing the object without protection. here still cannot avoid this kind of problem, we should let the task be safely run concurrency in task logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed, but with the context, we can exit the goroutine ASAP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think it should pass context to the task, and then the task should consider exit ASAP by context cancel. here only prevent starting the new task. the initial version has this design :)

default:
}
task.f()
if token != nil {
cr.limiter.ReleaseToken(token)
Expand Down Expand Up @@ -173,7 +181,7 @@

// Stop stops the runner.
func (cr *ConcurrentRunner) Stop() {
close(cr.stopChan)
cr.cancel()
cr.wg.Wait()
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ratelimit

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -24,7 +25,7 @@ import (

func TestConcurrentRunner(t *testing.T) {
t.Run("RunTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Second)
runner := NewConcurrentRunner(context.TODO(), "test", NewConcurrencyLimiter(1), time.Second)
runner.Start()
defer runner.Stop()

Expand All @@ -46,7 +47,7 @@ func TestConcurrentRunner(t *testing.T) {
})

t.Run("MaxPendingDuration", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), 2*time.Millisecond)
runner := NewConcurrentRunner(context.TODO(), "test", NewConcurrencyLimiter(1), 2*time.Millisecond)
runner.Start()
defer runner.Stop()
var wg sync.WaitGroup
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestConcurrentRunner(t *testing.T) {
})

t.Run("DuplicatedTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute)
runner := NewConcurrentRunner(context.TODO(), "test", NewConcurrencyLimiter(1), time.Minute)
runner.Start()
defer runner.Stop()
for i := 1; i < 11; i++ {
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba
etcdClient: etcdClient,
BasicCluster: basicCluster,
storage: storage,
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(ctx, heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(ctx, miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(ctx, logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
}

Expand Down