Skip to content

Commit

Permalink
Use RWMutex instead of mutex + do not panic
Browse files Browse the repository at this point in the history
  • Loading branch information
julienduchesne committed Oct 11, 2024
1 parent 67ef9a1 commit 2e255e5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
14 changes: 11 additions & 3 deletions concurrency/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,22 @@ func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool {
}

type ReusableGoroutinesPool struct {
jobsMu sync.Mutex
jobsMu sync.RWMutex
closed bool
jobs chan func()
}

// Go will run the given function in a worker of the pool.
// If all workers are busy, Go() will spawn a new goroutine to run the workload.
func (p *ReusableGoroutinesPool) Go(f func()) {
p.jobsMu.Lock()
defer p.jobsMu.Unlock()
p.jobsMu.RLock()
defer p.jobsMu.RUnlock()

// If the pool is closed, run the function in a new goroutine.
if p.closed {
go f()
return
}

select {
case p.jobs <- f:
Expand All @@ -46,5 +53,6 @@ func (p *ReusableGoroutinesPool) Go(f func()) {
func (p *ReusableGoroutinesPool) Close() {
p.jobsMu.Lock()
defer p.jobsMu.Unlock()
p.closed = true
close(p.jobs)
}
11 changes: 2 additions & 9 deletions concurrency/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,17 @@ func TestReusableGoroutinesPool(t *testing.T) {
t.Fatalf("expected %d goroutines after closing, got %d", 0, countGoroutines())
}

// TestReusableGoroutinesPool_Race tests that Close() and Go() can be called concurrently.
func TestReusableGoroutinesPool_Race(t *testing.T) {
w := NewReusableGoroutinesPool(2)

var panicked bool
var runCountAtomic atomic.Int32
const maxMsgCount = 10

var testWG sync.WaitGroup
testWG.Add(1)
go func() {
defer testWG.Done()
defer func() {
if r := recover(); r != nil {
panicked = true
}
}()
for i := 0; i < maxMsgCount; i++ {
w.Go(func() {
runCountAtomic.Add(1)
Expand All @@ -90,7 +85,5 @@ func TestReusableGoroutinesPool_Race(t *testing.T) {
testWG.Wait() // wait for the test to finish

runCt := int(runCountAtomic.Load())
require.NotZero(t, runCt, "expected at least one run")
require.Less(t, runCt, 10, "expected less than 10 runs")
require.True(t, panicked, "expected panic")
require.Equal(t, runCt, 10, "expected all functions to run")
}

0 comments on commit 2e255e5

Please sign in to comment.