Skip to content

Commit

Permalink
HW6 is completed
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Pogodaev <[email protected]>
  • Loading branch information
Pavel Pogodaev committed Oct 1, 2024
1 parent 411b524 commit 74609d8
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 2 deletions.
28 changes: 26 additions & 2 deletions hw06_pipeline_execution/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,30 @@ type (
type Stage func(in In) (out Out)

func ExecutePipeline(in In, done In, stages ...Stage) Out {
// Place your code here.
return nil
if in == nil {
return nil
}

for _, st := range stages {
tmpChan := make(chan interface{})
go func(ch In) {
defer close(tmpChan)

for {
select {
case item, ok := <-ch:
if !ok {
return
}
tmpChan <- item
case <-done:
return
}
}
}(in)

in = st(tmpChan)
}

return in
}
29 changes: 29 additions & 0 deletions hw06_pipeline_execution/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,35 @@ func TestPipeline(t *testing.T) {
g("Stringifier", func(v interface{}) interface{} { return strconv.Itoa(v.(int)) }),
}

t.Run("empty stages list", func(t *testing.T) {
in := make(Bi)
data := []string{"a", "b", "c", "d", "e"}

go func() {
for _, v := range data {
in <- v
}
close(in)
}()

result := make([]string, 0, len(data))
start := time.Now()
for s := range ExecutePipeline(in, nil, []Stage{}...) {
result = append(result, s.(string))
}
elapsed := time.Since(start)

require.Equal(t, data, result)
require.Less(t,
int64(elapsed),
int64(sleepPerStage)+int64(fault))
})

t.Run("nil check", func(t *testing.T) {
res := ExecutePipeline(nil, nil, stages...)
require.Nil(t, res)
})

t.Run("simple case", func(t *testing.T) {
in := make(Bi)
data := []int{1, 2, 3, 4, 5}
Expand Down

0 comments on commit 74609d8

Please sign in to comment.