-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathparallel.go
111 lines (93 loc) · 3.09 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package process
import (
"context"
"sync"
)
// singleErrorFunc is a function that returns a single error value.
type singleErrorFunc func(ctx context.Context) error
// streamErrorFunc is a function that returns a stream of error values.
type streamErrorFunc func(ctx context.Context) <-chan error
// toStreamErrorFunc converts a singleErrorFunc into a streamErrorFunc.
func toStreamErrorFunc(fn singleErrorFunc) streamErrorFunc {
return func(ctx context.Context) <-chan error {
return withErrors(func(errs chan<- error) {
if err := fn(ctx); err != nil {
errs <- err
}
})
}
}
// chain returns a streamErrorFunc that invokes each of the given functions
// sequentially. Errors emitted from each source function will be emitted
// unchanged by the returned function. If a function emits an error, the
// remaining functions will not be called.
func chain(fns ...streamErrorFunc) streamErrorFunc {
return combineInternal(true, fns...)
}
// sequence returns a streamErrorFunc that invokes each of the given functions
// sequentially. Errors emitted from each source function will be emitted
// unchanged by the returned function. Every function will be called whether or
// not a function earlier in the sequence emitted an error.
func sequence(fns ...streamErrorFunc) streamErrorFunc {
return combineInternal(false, fns...)
}
func combineInternal(stopOnError bool, fns ...streamErrorFunc) streamErrorFunc {
if len(fns) == 1 {
return fns[0]
}
return func(ctx context.Context) <-chan error {
return withErrors(func(errs chan<- error) {
hasError := false
for _, fn := range fns {
if hasError && stopOnError {
break
}
for err := range fn(ctx) {
hasError = true
errs <- err
}
}
})
}
}
// parallel returns a streamErrorFunc that invokes the given functions
// concurrently and blocks until each function returns. An error emitted
// from any source function will be immediately emitted from the returned
// function.
func parallel(funcs ...streamErrorFunc) streamErrorFunc {
return func(ctx context.Context) <-chan error {
return withErrors(func(errs chan<- error) {
var wg sync.WaitGroup
for _, f := range funcs {
wg.Add(1)
go func(f func(ctx context.Context) <-chan error) {
defer wg.Done()
for err := range f(ctx) {
errs <- err
}
}(f)
}
wg.Wait()
})
}
}
// withErrors creates a channel of errors, invokes the given function in
// a goroutine with the channel as a parameter, then returns the channel.
// The channel is closed after the given function unblocks (asynchronously).
func withErrors(fn func(chan<- error)) <-chan error {
errs := make(chan error)
go func() {
defer close(errs)
fn(errs)
}()
return errs
}
// runAsync creates a channel of errors, invokes the given function in a
// goroutine with the given context and the channel as a parameter, then
// returns the channel. The channel is closed after the given function
// unblocks (asynchronously).
func runAsync(ctx context.Context, fn func(ctx context.Context) error) <-chan error {
return withErrors(func(results chan<- error) {
results <- fn(ctx)
})
}