-
Notifications
You must be signed in to change notification settings - Fork 2
/
map.go
151 lines (123 loc) · 3.72 KB
/
map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package async
import (
"reflect"
)
/*
Map allows you to manipulate data in a slice in Waterfall mode.
Each Routine will be called with the value and index of the current position
in the slice. When calling the Done function, an error will cause the
mapping to immediately exit. All other arguments are sent back as the
replacement for the current value.
For example, take a look at one of the tests for this function:
func TestMapInt(t *testing.T) {
ints := []int{1, 2, 3, 4, 5}
expects := []int{2, 4, 6, 8, 10}
mapper := func(done async.Done, args ...interface{}) {
Status("Hit int")
Status("Args: %+v\n", args)
done(nil, args[0].(int)*2)
}
final := func(err error, results ...interface{}) {
Status("Hit int end")
Status("Results: %+v\n", results)
for i := 0; i < len(results); i++ {
if results[i] != expects[i] {
t.Errorf("Did not map correctly.")
break
}
}
}
async.Map(ints, mapper, final)
}
*/
func Map(data interface{}, routine Routine, callbacks ...Done) {
var (
routines []Routine
results []interface{}
)
d := reflect.ValueOf(data)
for i := 0; i < d.Len(); i++ {
v := d.Index(i).Interface()
routines = append(routines, func(id int) Routine {
return func(done Done, args ...interface{}) {
done = func(original Done) Done {
return func(err error, args ...interface{}) {
results = append(results, args...)
if id == (d.Len() - 1) {
original(err, results...)
return
}
original(err, args...)
}
}(done)
routine(done, v, id)
}
}(i))
}
Waterfall(routines, callbacks...)
}
/*
MapParallel allows you to manipulate data in a slice in Parallel mode.
Each Routine will be called with the value and index of the current position
in the slice. When calling the Done function, arguments are sent
back as the replacement for the current value.
If there is an error, any further results will be discarded but it will not
immediately exit. It will continue to run all of the other Routine functions
that were passed into it. This is because by the time the error is sent, the
goroutines have already been started. At this current time, there is no way
to cancel a sleep timer in Go.
For example, take a look at one of the tests for this function:
func TestMapStringParallel(t *testing.T) {
str := []string{
"test",
"test2",
"test3",
"test4",
"test5",
}
expects := []string{
"test1",
"test2",
"test3",
"test4",
"test5",
}
mapper := func(done async.Done, args ...interface{}) {
Status("Hit string")
Status("Args: %+v\n", args)
if args[1] == 0 {
done(nil, "test1")
return
}
done(nil, args[0])
}
final := func(err error, results ...interface{}) {
Status("Hit string end")
Status("Results: %+v\n", results)
for i := 0; i < len(results); i++ {
if results[i] != expects[i] {
t.Errorf("Did not map correctly.")
break
}
}
}
async.MapParallel(str, mapper, final)
}
The output of mapping in Parallel mode cannot be guaranteed to stay in the
same order, due to the fact that it may take longer to process some things
in your map routine. If you need the data to stay in the order it is in, use
Map instead to ensure it stays in order.
*/
func MapParallel(data interface{}, routine Routine, callbacks ...Done) {
var routines []Routine
d := reflect.ValueOf(data)
for i := 0; i < d.Len(); i++ {
v := d.Index(i).Interface()
routines = append(routines, func(id int) Routine {
return func(done Done, args ...interface{}) {
routine(done, v, id)
}
}(i))
}
Parallel(routines, callbacks...)
}