Skip to content

Commit

Permalink
reduce allocs in favor of channels vs custom atoms
Browse files Browse the repository at this point in the history
  • Loading branch information
saantiaguilera committed May 17, 2022
1 parent a49eeb2 commit f42436c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 41 deletions.
67 changes: 28 additions & 39 deletions concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package pipeline
import (
"context"
"errors"
"sync"
"sync/atomic"
)

type (
Expand All @@ -15,17 +13,21 @@ type (

reducer[O any] func(context.Context, O, O) (O, error)

// concurrentSlice that allows appending values of type T concurrently
concurrentSlice[T any] struct {
sync.RWMutex
v []T
// concurrentResult is a discriminated union of a result or error.
concurrentResult[T any] struct {
Ret T
Err error
}
)

// NewConcurrentStep creates a step that will run each of the inner steps concurrently.
// The step will wait for all of the steps to finish before returning.
//
// If one of them fails, the step will wait until everyone finishes and after that return the first encountered error.
//
// This step (as all the others) doesn't handle panics. Be careful since this step creates goroutines and the panics
// not necessarilly will be signaled in the same goroutine as the origin call.
// Make sure to handle panics on your own if your code is unsafe (through decorations / deferrals in steps / etc)
func NewConcurrentStep[I, O any](steps []Step[I, O], reduce reducer[O]) ConcurrentStep[I, O] {
return ConcurrentStep[I, O]{
steps: steps,
Expand Down Expand Up @@ -67,60 +69,47 @@ func (c ConcurrentStep[I, O]) Run(ctx context.Context, in I) (O, error) {
// Run a number of workers concurrently, waiting for all of them to finish.
// After they're all done, if one of them failed the error is returned.
// If more than one fails, the last error is returned
//
// Note: this method doesn't recover from panics in goroutines. To be cohesive across the whole
// API none of the steps handle panics to let the client handle them on their own
// (through decorations / same steps with deferrals / whatever he wants to)
func (c ConcurrentStep[I, O]) runConcurrently(
ctx context.Context,
workers []Step[I, O],
in I,
) ([]O, error) {

var wg sync.WaitGroup
var errResult atomic.Value
var mergedRes concurrentSlice[O]

wg.Add(len(workers))
ch := make(chan concurrentResult[O], len(workers))
if len(workers) > 1 {
for i := 0; i < len(workers); i++ {
go c.runStep(&wg, &errResult, &mergedRes, ctx, in, workers[i])
go c.runStep(ctx, in, workers[i], ch)
}
wg.Wait()
} else { // avoid concurrency, no need to spawn and wait just use current
c.runStep(&wg, &errResult, &mergedRes, ctx, in, workers[0])
c.runStep(ctx, in, workers[0], ch)
}

if err, ok := errResult.Load().(error); ok && err != nil {
return nil, err
ret := make([]O, len(workers))
var err error
for i := 0; i < len(workers); i++ {
v := <-ch
if v.Err != nil && err == nil {
err = v.Err // don't break here. we want to make sure all steps finish before returning.
}
ret[i] = v.Ret
}
return mergedRes.Get(), nil
return ret, err
}

func (c ConcurrentStep[I, O]) runStep(
wg *sync.WaitGroup,
errResult *atomic.Value,
mergedRes *concurrentSlice[O],
ctx context.Context,
in I,
step Step[I, O],
ch chan<- concurrentResult[O],
) {

res, err := step.Run(ctx, in)
if err != nil {
errResult.CompareAndSwap(nil, err)
ch <- concurrentResult[O]{
Ret: res,
Err: err,
}

mergedRes.Append(res)
wg.Done()
}

// Read slice T stored inside
func (s *concurrentSlice[T]) Get() []T {
s.RLock()
defer s.RUnlock()
return s.v
}

// Append safely inside T slice
func (s *concurrentSlice[T]) Append(t T) {
s.Lock()
defer s.Unlock()
s.v = append(s.v, t)
}
4 changes: 2 additions & 2 deletions pipeline_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func Test_GraphRendering(t *testing.T) {
// goos: darwin
// goarch: amd64
// cpu: Intel(R) Core(TM) i7-1068NG7 CPU @ 2.30GHz
// BenchmarkPipeline_Run-8 45620 23057 ns/op 5476 B/op 80 allocs/op
// Given this graph magnitude, the cost of traversing it is negligible in comparison to a step operation.
// BenchmarkPipeline_Run-8 45620 25057 ns/op 5476 B/op 56 allocs/op
// Given this graph magnitude, the cost of traversing it is negligible (~0.025ms) in comparison to a step operation.
func BenchmarkPipeline_Run(b *testing.B) {
var err error
graph := NewImmenseGraph()
Expand Down

0 comments on commit f42436c

Please sign in to comment.