From 67ef9a17eaff67a2d6b49ff88dee8f898cb0efde Mon Sep 17 00:00:00 2001 From: Julien Duchesne Date: Thu, 10 Oct 2024 13:38:15 -0400 Subject: [PATCH] `ReusableGoroutinesPool`: Fix datarace on `Close` Without the mutex: ``` julienduchesne@triceratops dskit % go test ./concurrency -race ================== WARNING: DATA RACE Write at 0x00c00001e240 by goroutine 103: runtime.recvDirect() /opt/homebrew/opt/go/libexec/src/runtime/chan.go:388 +0x7c github.com/grafana/dskit/concurrency.(*ReusableGoroutinesPool).Close() /Users/julienduchesne/Repos/dskit/concurrency/worker.go:40 +0x1d4 github.com/grafana/dskit/concurrency.TestReusableGoroutinesPool_Race() /Users/julienduchesne/Repos/dskit/concurrency/worker_test.go:89 +0x1c4 testing.tRunner() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1690 +0x184 testing.(*T).Run.gowrap1() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1743 +0x40 Previous read at 0x00c00001e240 by goroutine 106: runtime.chansend1() /opt/homebrew/opt/go/libexec/src/runtime/chan.go:157 +0x2c github.com/grafana/dskit/concurrency.(*ReusableGoroutinesPool).Go() /Users/julienduchesne/Repos/dskit/concurrency/worker.go:29 +0x48 github.com/grafana/dskit/concurrency.TestReusableGoroutinesPool_Race.func1() /Users/julienduchesne/Repos/dskit/concurrency/worker_test.go:82 +0xfc Goroutine 103 (running) created at: testing.(*T).Run() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1743 +0x5e0 testing.runTests.func1() /opt/homebrew/opt/go/libexec/src/testing/testing.go:2168 +0x80 testing.tRunner() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1690 +0x184 testing.runTests() /opt/homebrew/opt/go/libexec/src/testing/testing.go:2166 +0x6e0 testing.(*M).Run() /opt/homebrew/opt/go/libexec/src/testing/testing.go:2034 +0xb74 go.uber.org/goleak.VerifyTestMain() /Users/julienduchesne/go/pkg/mod/go.uber.org/goleak@v1.2.0/testmain.go:53 +0x44 github.com/grafana/dskit/concurrency.TestMain() /Users/julienduchesne/Repos/dskit/concurrency/limited_concurrency_singleflight_test.go:16 +0x130 main.main() _testmain.go:85 +0x114 Goroutine 106 (running) created at: github.com/grafana/dskit/concurrency.TestReusableGoroutinesPool_Race() /Users/julienduchesne/Repos/dskit/concurrency/worker_test.go:74 +0x1ac testing.tRunner() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1690 +0x184 testing.(*T).Run.gowrap1() /opt/homebrew/opt/go/libexec/src/testing/testing.go:1743 +0x40 ================== --- FAIL: TestReusableGoroutinesPool_Race (0.01s) testing.go:1399: race detected during execution of test FAIL FAIL github.com/grafana/dskit/concurrency 3.240s FAIL ``` --- concurrency/worker.go | 18 +++++++++++++++--- concurrency/worker_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/concurrency/worker.go b/concurrency/worker.go index f40f03348..179791efe 100644 --- a/concurrency/worker.go +++ b/concurrency/worker.go @@ -1,5 +1,9 @@ package concurrency +import ( + "sync" +) + // NewReusableGoroutinesPool creates a new worker pool with the given size. // These workers will run the workloads passed through Go() calls. // If all workers are busy, Go() will spawn a new goroutine to run the workload. @@ -18,12 +22,16 @@ func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { } type ReusableGoroutinesPool struct { - jobs chan func() + jobsMu sync.Mutex + jobs chan func() } // Go will run the given function in a worker of the pool. // If all workers are busy, Go() will spawn a new goroutine to run the workload. func (p *ReusableGoroutinesPool) Go(f func()) { + p.jobsMu.Lock() + defer p.jobsMu.Unlock() + select { case p.jobs <- f: default: @@ -32,7 +40,11 @@ func (p *ReusableGoroutinesPool) Go(f func()) { } // Close stops the workers of the pool. -// No new Do() calls should be performed after calling Close(). +// No new Go() calls should be performed after calling Close(). // Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads. // Close is intended to be used in tests to ensure that no goroutines are leaked. -func (p *ReusableGoroutinesPool) Close() { close(p.jobs) } +func (p *ReusableGoroutinesPool) Close() { + p.jobsMu.Lock() + defer p.jobsMu.Unlock() + close(p.jobs) +} diff --git a/concurrency/worker_test.go b/concurrency/worker_test.go index 338062055..1dab55928 100644 --- a/concurrency/worker_test.go +++ b/concurrency/worker_test.go @@ -4,10 +4,12 @@ import ( "regexp" "runtime" "strings" + "sync" "testing" "time" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) func TestReusableGoroutinesPool(t *testing.T) { @@ -59,3 +61,36 @@ func TestReusableGoroutinesPool(t *testing.T) { } t.Fatalf("expected %d goroutines after closing, got %d", 0, countGoroutines()) } + +func TestReusableGoroutinesPool_Race(t *testing.T) { + w := NewReusableGoroutinesPool(2) + + var panicked bool + var runCountAtomic atomic.Int32 + const maxMsgCount = 10 + + var testWG sync.WaitGroup + testWG.Add(1) + go func() { + defer testWG.Done() + defer func() { + if r := recover(); r != nil { + panicked = true + } + }() + for i := 0; i < maxMsgCount; i++ { + w.Go(func() { + runCountAtomic.Add(1) + }) + time.Sleep(10 * time.Millisecond) + } + }() + time.Sleep(10 * time.Millisecond) + w.Close() // close the pool + testWG.Wait() // wait for the test to finish + + runCt := int(runCountAtomic.Load()) + require.NotZero(t, runCt, "expected at least one run") + require.Less(t, runCt, 10, "expected less than 10 runs") + require.True(t, panicked, "expected panic") +}