Skip to content

Commit

Permalink
ReusableGoroutinesPool: Add protection to Close
Browse files Browse the repository at this point in the history
There is a data race in mimir where the pool can be closed and `Go` is still called on it:
https://github.com/grafana/mimir/blob/0c6070552517bda1ccb97b8fc84ca50c591a71f7/pkg/distributor/distributor.go#L528-L532

Rather than handling this in Mimir, this can be handled in the util directly

In Mimir, we'll be able to define this pool as `wp := concurrency.NewReusableGoroutinesPool(cfg.ReusableIngesterPushWorkers).WithClosedAction(concurrency.WarnWhenClosed).WithLogger(...)` and the data race will be fixed
  • Loading branch information
julienduchesne committed Oct 10, 2024
1 parent 879ff5a commit fe2e81a
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 4 deletions.
61 changes: 57 additions & 4 deletions concurrency/worker.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
package concurrency

import (
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

type ClosedAction int

const (
PanicWhenClosed ClosedAction = iota
WarnWhenClosed
SpawnNewGoroutineWhenClosed
)

// NewReusableGoroutinesPool creates a new worker pool with the given size.
// These workers will run the workloads passed through Go() calls.
// If all workers are busy, Go() will spawn a new goroutine to run the workload.
func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool {
p := &ReusableGoroutinesPool{
jobs: make(chan func()),
logger: log.NewNopLogger(),
jobs: make(chan func()),
}
for i := 0; i < size; i++ {
go func() {
Expand All @@ -17,13 +33,45 @@ func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool {
return p
}

func (p *ReusableGoroutinesPool) WithLogger(logger log.Logger) *ReusableGoroutinesPool {
p.logger = logger
return p
}

func (p *ReusableGoroutinesPool) WithClosedAction(action ClosedAction) *ReusableGoroutinesPool {
p.closedAction = action
return p
}

type ReusableGoroutinesPool struct {
jobs chan func()
logger log.Logger

jobsMu sync.Mutex
closed bool
closedAction ClosedAction
jobs chan func()
}

// Go will run the given function in a worker of the pool.
// If all workers are busy, Go() will spawn a new goroutine to run the workload.
func (p *ReusableGoroutinesPool) Go(f func()) {
p.jobsMu.Lock()
defer p.jobsMu.Unlock()

if p.closed {
switch p.closedAction {
case PanicWhenClosed:
panic("tried to run a workload on a closed ReusableGoroutinesPool. Use a different ClosedAction to avoid this panic.")
case WarnWhenClosed:
level.Warn(p.logger).Log("msg", "tried to run a workload on a closed ReusableGoroutinesPool, dropping the workload")
return
case SpawnNewGoroutineWhenClosed:
level.Warn(p.logger).Log("msg", "tried to run a workload on a closed ReusableGoroutinesPool, spawning a new goroutine to run the workload")
go f()
return
}
}

select {
case p.jobs <- f:
default:
Expand All @@ -32,7 +80,12 @@ func (p *ReusableGoroutinesPool) Go(f func()) {
}

// Close stops the workers of the pool.
// No new Do() calls should be performed after calling Close().
// No new Go() calls should be performed after calling Close().
// Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads.
// Close is intended to be used in tests to ensure that no goroutines are leaked.
func (p *ReusableGoroutinesPool) Close() { close(p.jobs) }
func (p *ReusableGoroutinesPool) Close() {
p.jobsMu.Lock()
defer p.jobsMu.Unlock()
p.closed = true
close(p.jobs)
}
65 changes: 65 additions & 0 deletions concurrency/worker_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package concurrency

import (
"bytes"
"regexp"
"runtime"
"strings"
"sync"
"sync/atomic"

Check failure on line 9 in concurrency/worker_test.go

View workflow job for this annotation

GitHub Actions / Check

package "sync/atomic" shouldn't be imported, suggested: "go.uber.org/atomic"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -59,3 +63,64 @@ func TestReusableGoroutinesPool(t *testing.T) {
}
t.Fatalf("expected %d goroutines after closing, got %d", 0, countGoroutines())
}

func TestReusableGoroutinesPool_ClosedActionPanic(t *testing.T) {
w := NewReusableGoroutinesPool(2)

runCount, panicked := causePoolFailure(t, w, 10)

require.NotZero(t, runCount, "expected at least one run")
require.Less(t, runCount, 10, "expected less than 10 runs")
require.True(t, panicked, "expected panic")
}

func TestReusableGoroutinesPool_ClosedActionWarn(t *testing.T) {
var buf bytes.Buffer
w := NewReusableGoroutinesPool(2).WithClosedAction(WarnWhenClosed).WithLogger(log.NewLogfmtLogger(&buf))

runCount, panicked := causePoolFailure(t, w, 10)

require.NotZero(t, runCount, "expected at least one run")
require.Less(t, runCount, 10, "expected less than 10 runs")
require.False(t, panicked, "expected no panic")
require.Contains(t, buf.String(), "tried to run a workload on a closed ReusableGoroutinesPool, dropping the workload")
}

func TestReusableGoroutinesPool_ClosedActionSpawn(t *testing.T) {
var buf bytes.Buffer
w := NewReusableGoroutinesPool(2).WithClosedAction(SpawnNewGoroutineWhenClosed).WithLogger(log.NewLogfmtLogger(&buf))

runCount, panicked := causePoolFailure(t, w, 10)

require.Equal(t, runCount, 10, "expected all workloads to run")
require.False(t, panicked, "expected no panic")
require.Contains(t, buf.String(), "tried to run a workload on a closed ReusableGoroutinesPool, spawning a new goroutine to run the workload")
}

func causePoolFailure(t *testing.T, w *ReusableGoroutinesPool, maxMsgCount int) (runCount int, panicked bool) {
t.Helper()

var runCountAtomic atomic.Int32

var testWG sync.WaitGroup
testWG.Add(1)
go func() {
defer testWG.Done()
defer func() {
if r := recover(); r != nil {
panicked = true
}
}()
for i := 0; i < maxMsgCount; i++ {
w.Go(func() {
runCountAtomic.Add(1)
})
time.Sleep(10 * time.Millisecond)
}
}()
time.Sleep(10 * time.Millisecond)
w.Close() // close the pool
testWG.Wait() // wait for the test to finish

return int(runCountAtomic.Load()), panicked
}

0 comments on commit fe2e81a

Please sign in to comment.