-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransform.go
166 lines (142 loc) · 4.08 KB
/
transform.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package giter
// Map returns an Iterator emitting the values of the given Iterator transformed by the given
// function.
func Map[T, TP any](f func(T) TP, iter Iterator[T]) Iterator[TP] {
return Make(
func(out chan<- TP, stopChan <-chan interface{}) {
defer iter.Close()
for v := range iter.Each {
select {
case out <- f(v):
case <-stopChan:
return
}
}
})
}
// Filter returns an Iterator emitting the values of the given Iterator which match the given
// predicate.
func Filter[T any](pred func(T) bool, iter Iterator[T]) Iterator[T] {
return Make(
func(out chan<- T, stopChan <-chan interface{}) {
defer iter.Close()
for v := range iter.Each {
if pred(v) {
select {
case out <- v:
case <-stopChan:
return
}
}
}
})
}
// FlatMap returns an Iterator emitting the 0 or more values for each value emitted by the given
// Iterator, as produced by the given function.
func FlatMap[T, R any](f func(T) []R, iter Iterator[T]) Iterator[R] {
return Make(
func(out chan<- R, stopChan <-chan interface{}) {
defer iter.Close()
for v := range iter.Each {
mapped := f(v)
for _, v := range mapped {
select {
case out <- v:
case <-stopChan:
return
}
}
}
})
}
// Chunk returns an Iterator emitting slices with the given length of values emitted by the given
// Iterator.
func Chunk[T any](n int, iter Iterator[T]) Iterator[[]T] {
return Make(
func(out chan<- []T, stopChan <-chan interface{}) {
defer iter.Close()
buf := make([]T, 0, n)
// returns false if signaled that we need to stop and bail out
flush := func() bool {
if len(buf) == 0 {
return true
}
outs := make([]T, len(buf))
copy(outs, buf)
select {
case out <- outs:
case <-stopChan:
return false
}
buf = buf[:0]
return true
}
for v := range iter.Each {
buf = buf[:len(buf)+1]
buf[len(buf)-1] = v
if len(buf) == cap(buf) {
if !flush() {
return
}
}
}
// flush anything remaining; ignore if we flushed everything since we're
// going to return afterwards regardless.
_ = flush()
})
}
// ChunkedFlatMap maps n elements of a given iterator at a time into a new iterator.
// The mapping function receives input and output slices, and is expected to return 0 or more values
// via the output slice.
// The output slice has len 0 and cap >= n.
// The resulting iterator produces the elements returned by the mapping function in the order they
// were produced.
// The contents of the input and output slices will be cleared at least as often as every chunk is
// emitted into the output iterator, to avoid retaining excessive heap space.
func ChunkedFlatMap[T, R any](n int, f func([]T, []R) []R, iter Iterator[T]) Iterator[R] {
// XXX maybe this should just be MapChunked? don't know that explicitly calling it FlatMap
// is necessary, since it's obvious that we're mapping one chunk at a time and we just
// receive a slice from the mapping function because we have to receive some sort of type
// capable of holding multiple values.
return Make(
func(out chan<- R, stopChan <-chan interface{}) {
defer iter.Close()
buf := make([]T, 0, n)
outBuf := make([]R, 0, cap(buf))
// returns false if signaled that we need to stop and bail out
flush := func() bool {
// once we leave this function we are done with the contents of
// these buffers and their contents (and anything it points to)
// should be allowed to go away.
defer clear(&buf)
defer clear(&outBuf)
if len(buf) == 0 {
return true
}
outs := f(buf, outBuf)
clear(&buf)
var zero R
for i, x := range outs {
select {
case out <- x:
outs[i] = zero
case <-stopChan:
return false
}
}
return true
}
for v := range iter.Each {
buf = buf[:len(buf)+1]
buf[len(buf)-1] = v
if len(buf) == cap(buf) {
if !flush() {
return
}
}
}
// flush anything remaining; ignore if we flushed everything since we're
// going to return afterwards regardless.
_ = flush()
})
}