Skip to content

Commit

Permalink
ReusableGoroutinesPool: Fix datarace on Close
Browse files Browse the repository at this point in the history
Without the mutex:
```
julienduchesne@triceratops dskit % go test ./concurrency -race
==================
WARNING: DATA RACE
Write at 0x00c00001e240 by goroutine 103:
  runtime.recvDirect()
      /opt/homebrew/opt/go/libexec/src/runtime/chan.go:388 +0x7c
  github.com/grafana/dskit/concurrency.(*ReusableGoroutinesPool).Close()
      /Users/julienduchesne/Repos/dskit/concurrency/worker.go:40 +0x1d4
  github.com/grafana/dskit/concurrency.TestReusableGoroutinesPool_Race()
      /Users/julienduchesne/Repos/dskit/concurrency/worker_test.go:89 +0x1c4
  testing.tRunner()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:1690 +0x184
  testing.(*T).Run.gowrap1()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:1743 +0x40

Previous read at 0x00c00001e240 by goroutine 106:
  runtime.chansend1()
      /opt/homebrew/opt/go/libexec/src/runtime/chan.go:157 +0x2c
  github.com/grafana/dskit/concurrency.(*ReusableGoroutinesPool).Go()
      /Users/julienduchesne/Repos/dskit/concurrency/worker.go:29 +0x48
  github.com/grafana/dskit/concurrency.TestReusableGoroutinesPool_Race.func1()
      /Users/julienduchesne/Repos/dskit/concurrency/worker_test.go:82 +0xfc

Goroutine 103 (running) created at:
  testing.(*T).Run()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:1743 +0x5e0
  testing.runTests.func1()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:2168 +0x80
  testing.tRunner()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:1690 +0x184
  testing.runTests()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:2166 +0x6e0
  testing.(*M).Run()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:2034 +0xb74
  go.uber.org/goleak.VerifyTestMain()
      /Users/julienduchesne/go/pkg/mod/go.uber.org/[email protected]/testmain.go:53 +0x44
  github.com/grafana/dskit/concurrency.TestMain()
      /Users/julienduchesne/Repos/dskit/concurrency/limited_concurrency_singleflight_test.go:16 +0x130
  main.main()
      _testmain.go:85 +0x114

Goroutine 106 (running) created at:
  github.com/grafana/dskit/concurrency.TestReusableGoroutinesPool_Race()
      /Users/julienduchesne/Repos/dskit/concurrency/worker_test.go:74 +0x1ac
  testing.tRunner()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:1690 +0x184
  testing.(*T).Run.gowrap1()
      /opt/homebrew/opt/go/libexec/src/testing/testing.go:1743 +0x40
==================
--- FAIL: TestReusableGoroutinesPool_Race (0.01s)
    testing.go:1399: race detected during execution of test
FAIL
FAIL    github.com/grafana/dskit/concurrency    3.240s
FAIL
```
  • Loading branch information
julienduchesne committed Oct 10, 2024
1 parent 879ff5a commit 67ef9a1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
18 changes: 15 additions & 3 deletions concurrency/worker.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package concurrency

import (
"sync"
)

// NewReusableGoroutinesPool creates a new worker pool with the given size.
// These workers will run the workloads passed through Go() calls.
// If all workers are busy, Go() will spawn a new goroutine to run the workload.
Expand All @@ -18,12 +22,16 @@ func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool {
}

type ReusableGoroutinesPool struct {
jobs chan func()
jobsMu sync.Mutex
jobs chan func()
}

// Go will run the given function in a worker of the pool.
// If all workers are busy, Go() will spawn a new goroutine to run the workload.
func (p *ReusableGoroutinesPool) Go(f func()) {
p.jobsMu.Lock()
defer p.jobsMu.Unlock()

select {
case p.jobs <- f:
default:
Expand All @@ -32,7 +40,11 @@ func (p *ReusableGoroutinesPool) Go(f func()) {
}

// Close stops the workers of the pool.
// No new Do() calls should be performed after calling Close().
// No new Go() calls should be performed after calling Close().
// Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads.
// Close is intended to be used in tests to ensure that no goroutines are leaked.
func (p *ReusableGoroutinesPool) Close() { close(p.jobs) }
func (p *ReusableGoroutinesPool) Close() {
p.jobsMu.Lock()
defer p.jobsMu.Unlock()
close(p.jobs)
}
35 changes: 35 additions & 0 deletions concurrency/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"regexp"
"runtime"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestReusableGoroutinesPool(t *testing.T) {
Expand Down Expand Up @@ -59,3 +61,36 @@ func TestReusableGoroutinesPool(t *testing.T) {
}
t.Fatalf("expected %d goroutines after closing, got %d", 0, countGoroutines())
}

func TestReusableGoroutinesPool_Race(t *testing.T) {
w := NewReusableGoroutinesPool(2)

var panicked bool
var runCountAtomic atomic.Int32
const maxMsgCount = 10

var testWG sync.WaitGroup
testWG.Add(1)
go func() {
defer testWG.Done()
defer func() {
if r := recover(); r != nil {
panicked = true
}
}()
for i := 0; i < maxMsgCount; i++ {
w.Go(func() {
runCountAtomic.Add(1)
})
time.Sleep(10 * time.Millisecond)
}
}()
time.Sleep(10 * time.Millisecond)
w.Close() // close the pool
testWG.Wait() // wait for the test to finish

runCt := int(runCountAtomic.Load())
require.NotZero(t, runCt, "expected at least one run")
require.Less(t, runCt, 10, "expected less than 10 runs")
require.True(t, panicked, "expected panic")
}

0 comments on commit 67ef9a1

Please sign in to comment.