From c9aa483d247ce1d97ff9faa4749ec22089130f70 Mon Sep 17 00:00:00 2001
From: Pavel Pogodaev
Date: Tue, 1 Oct 2024 15:44:29 +0300
Subject: [PATCH] HW6 is completed
Signed-off-by: Pavel Pogodaev
---
hw05_parallel_execution/.sync | 0
hw06_pipeline_execution/pipeline.go | 28 +++++++++++++++++++++--
hw06_pipeline_execution/pipeline_test.go | 29 ++++++++++++++++++++++++
3 files changed, 55 insertions(+), 2 deletions(-)
delete mode 100644 hw05_parallel_execution/.sync
diff --git a/hw05_parallel_execution/.sync b/hw05_parallel_execution/.sync
deleted file mode 100644
index e69de29..0000000
diff --git a/hw06_pipeline_execution/pipeline.go b/hw06_pipeline_execution/pipeline.go
index 9044486..e5aaafb 100644
--- a/hw06_pipeline_execution/pipeline.go
+++ b/hw06_pipeline_execution/pipeline.go
@@ -9,6 +9,30 @@ type (
type Stage func(in In) (out Out)
func ExecutePipeline(in In, done In, stages ...Stage) Out {
- // Place your code here.
- return nil
+ if in == nil {
+ return nil
+ }
+
+ for _, st := range stages {
+ tmpChan := make(chan interface{})
+ go func(ch In) {
+ defer close(tmpChan)
+
+ for {
+ select {
+ case item, ok := <-ch:
+ if !ok {
+ return
+ }
+ tmpChan <- item
+ case <-done:
+ return
+ }
+ }
+ }(in)
+
+ in = st(tmpChan)
+ }
+
+ return in
}
diff --git a/hw06_pipeline_execution/pipeline_test.go b/hw06_pipeline_execution/pipeline_test.go
index b638ed8..8668dfe 100644
--- a/hw06_pipeline_execution/pipeline_test.go
+++ b/hw06_pipeline_execution/pipeline_test.go
@@ -36,6 +36,35 @@ func TestPipeline(t *testing.T) {
g("Stringifier", func(v interface{}) interface{} { return strconv.Itoa(v.(int)) }),
}
+ t.Run("empty stages list", func(t *testing.T) {
+ in := make(Bi)
+ data := []string{"a", "b", "c", "d", "e"}
+
+ go func() {
+ for _, v := range data {
+ in <- v
+ }
+ close(in)
+ }()
+
+ result := make([]string, 0, len(data))
+ start := time.Now()
+ for s := range ExecutePipeline(in, nil, []Stage{}...) {
+ result = append(result, s.(string))
+ }
+ elapsed := time.Since(start)
+
+ require.Equal(t, data, result)
+ require.Less(t,
+ int64(elapsed),
+ int64(sleepPerStage)+int64(fault))
+ })
+
+ t.Run("nil check", func(t *testing.T) {
+ res := ExecutePipeline(nil, nil, stages...)
+ require.Nil(t, res)
+ })
+
t.Run("simple case", func(t *testing.T) {
in := make(Bi)
data := []int{1, 2, 3, 4, 5}