Skip to content

Commit

Permalink
Replace WaitGroup using semphore instead
Browse files Browse the repository at this point in the history
  • Loading branch information
godoylucase authored and Lucas Godoy committed Jul 5, 2021
1 parent 9ec8790 commit 88cda2a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 20 deletions.
56 changes: 45 additions & 11 deletions wpool/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package wpool
import (
"context"
"fmt"
"sync"
)

func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
defer wg.Done()
func worker(ctx context.Context, jobs <-chan Job, results chan<- Result, sem semaphore) {
defer sem.release()
for {
select {
case job, ok := <-jobs:
Expand All @@ -26,35 +25,70 @@ func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results ch
}
}

type semaphore interface {
acquire()
release()
wait()
close()
}

type slot struct{}
type slots chan slot

type execution struct {
slots slots
}

func newExecutionSlots(capacity int) execution {
slots := make(chan slot, capacity)
return execution{slots: slots}
}

func (e execution) acquire() {
e.slots <- slot{}
}

func (e execution) release() {
<-e.slots
}

func (e execution) wait() {
for i := 0; i < cap(e.slots); i++ {
e.slots <- slot{}
}
}

func (e execution) close() {
close(e.slots)
}

type WorkerPool struct {
workersCount int
jobs chan Job
results chan Result
Done chan struct{}
}

func New(wcount int) WorkerPool {
return WorkerPool{
workersCount: wcount,
jobs: make(chan Job, wcount),
results: make(chan Result, wcount),
Done: make(chan struct{}),
}
}

func (wp WorkerPool) Run(ctx context.Context) {
var wg sync.WaitGroup
eSlots := newExecutionSlots(wp.workersCount)
defer eSlots.close()

for i := 0; i < wp.workersCount; i++ {
wg.Add(1)
eSlots.acquire()
// fan out worker goroutines
//reading from jobs channel and
//pushing calcs into results channel
go worker(ctx, &wg, wp.jobs, wp.results)
go worker(ctx, wp.jobs, wp.results, eSlots)
}

wg.Wait()
close(wp.Done)
eSlots.wait()
close(wp.results)
}

Expand All @@ -63,7 +97,7 @@ func (wp WorkerPool) Results() <-chan Result {
}

func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
for i, _ := range jobsBulk {
for i := range jobsBulk {
wp.jobs <- jobsBulk[i]
}
close(wp.jobs)
Expand Down
20 changes: 11 additions & 9 deletions wpool/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestWorkerPool(t *testing.T) {
select {
case r, ok := <-wp.Results():
if !ok {
continue
return
}

i, err := strconv.ParseInt(string(r.Descriptor.ID), 10, 64)
Expand All @@ -39,8 +39,6 @@ func TestWorkerPool(t *testing.T) {
if val != int(i)*2 {
t.Fatalf("wrong value %v; expected %v", val, int(i)*2)
}
case <-wp.Done:
return
default:
}
}
Expand All @@ -56,12 +54,14 @@ func TestWorkerPool_TimeOut(t *testing.T) {

for {
select {
case r := <-wp.Results():
case r, ok := <-wp.Results():
if !ok {
return
}

if r.Err != nil && r.Err != context.DeadlineExceeded {
t.Fatalf("expected error: %v; got: %v", context.DeadlineExceeded, r.Err)
}
case <-wp.Done:
return
default:
}
}
Expand All @@ -77,12 +77,14 @@ func TestWorkerPool_Cancel(t *testing.T) {

for {
select {
case r := <-wp.Results():
case r, ok := <-wp.Results():
if !ok {
return
}

if r.Err != nil && r.Err != context.Canceled {
t.Fatalf("expected error: %v; got: %v", context.Canceled, r.Err)
}
case <-wp.Done:
return
default:
}
}
Expand Down

0 comments on commit 88cda2a

Please sign in to comment.