@@ -178,7 +178,7 @@ type AsyncFilterFunc func(context.Context, *Node) *Node
178178func AsyncFilter (it Iterator , check AsyncFilterFunc , workers int ) Iterator {
179179 f := & asyncFilterIter {
180180 it : ensureSourceIter (it ),
181- slots : make (chan struct {}, workers + 1 ),
181+ slots : make (chan struct {}, workers + 1 ), // extra 1 slot to make sure all the goroutines can be completed
182182 passed : make (chan iteratorItem ),
183183 }
184184 for range cap (f .slots ) {
@@ -193,6 +193,9 @@ func AsyncFilter(it Iterator, check AsyncFilterFunc, workers int) Iterator {
193193 return
194194 case <- f .slots :
195195 }
196+ defer func () {
197+ f .slots <- struct {}{} // the iterator has ended
198+ }()
196199 // read from the iterator and start checking nodes in parallel
197200 // when a node is checked, it will be sent to the passed channel
198201 // and the slot will be released
@@ -201,7 +204,11 @@ func AsyncFilter(it Iterator, check AsyncFilterFunc, workers int) Iterator {
201204 nodeSource := f .it .NodeSource ()
202205
203206 // check the node async, in a separate goroutine
204- <- f .slots
207+ select {
208+ case <- ctx .Done ():
209+ return
210+ case <- f .slots :
211+ }
205212 go func () {
206213 if nn := check (ctx , node ); nn != nil {
207214 item := iteratorItem {nn , nodeSource }
@@ -213,8 +220,6 @@ func AsyncFilter(it Iterator, check AsyncFilterFunc, workers int) Iterator {
213220 f .slots <- struct {}{}
214221 }()
215222 }
216- // the iterator has ended
217- f .slots <- struct {}{}
218223 }()
219224
220225 return f
0 commit comments