From a04aadc838280b5a1089fe4159967ca4844a112a Mon Sep 17 00:00:00 2001 From: Santiago Aguilera Date: Mon, 10 Feb 2020 00:13:56 -0300 Subject: [PATCH] Remove runSync since it speeds-up ~30% --- pipeline_benchmark_test.go | 4 ++-- run.go | 12 ------------ sequential_group.go | 11 ++++++++--- sequential_stage.go | 11 ++++++++--- 4 files changed, 18 insertions(+), 20 deletions(-) diff --git a/pipeline_benchmark_test.go b/pipeline_benchmark_test.go index 6ac6f27..a6e6cc6 100644 --- a/pipeline_benchmark_test.go +++ b/pipeline_benchmark_test.go @@ -400,12 +400,12 @@ func BenchmarkPipeline_Run(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { + b.StartTimer() err := pipe.Run(graph, ctx) - b.StopTimer() + if err != nil { b.Fail() } - b.StartTimer() } } diff --git a/run.go b/run.go index 51dcb38..a106f7e 100644 --- a/run.go +++ b/run.go @@ -47,15 +47,3 @@ func spawnAsync(workers int, run func(index int) error) error { wg.Wait() return errResult.Get() } - -// Run synchronously a number of workers. If one of them fails, the operation is aborted and the error returned. -func runSync(workers int, run func(index int) error) error { - for i := 0; i < workers; i++ { - err := run(i) - - if err != nil { - return err - } - } - return nil -} diff --git a/sequential_group.go b/sequential_group.go index d8e9185..91bb9e5 100644 --- a/sequential_group.go +++ b/sequential_group.go @@ -3,9 +3,14 @@ package pipeline type sequentialGroup []Stage func (s sequentialGroup) Run(executor Executor, ctx Context) error { - return runSync(len(s), func(index int) error { - return s[index].Run(executor, ctx) - }) + for _, stage := range s { + err := stage.Run(executor, ctx) + + if err != nil { + return err + } + } + return nil } func (s sequentialGroup) Draw(graph GraphDiagram) { diff --git a/sequential_stage.go b/sequential_stage.go index 34c0967..7d697dd 100644 --- a/sequential_stage.go +++ b/sequential_stage.go @@ -3,9 +3,14 @@ package pipeline type sequentialStage []Step func (s sequentialStage) Run(executor Executor, ctx Context) error { - return runSync(len(s), func(index int) error { - return executor.Run(s[index], ctx) - }) + for _, step := range s { + err := executor.Run(step, ctx) + + if err != nil { + return err + } + } + return nil } func (s sequentialStage) Draw(graph GraphDiagram) {