diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 3d1c04208c..b4f1816e3f 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -589,6 +589,7 @@ def _fork_pipeline(pipe: Pipe) -> None: return cls(max_parallel_items, workers, futures_poll_interval, sources, next_item_mode) def __next__(self) -> PipeItem: + self._ensure_async_pool() pipe_item: Union[ResolvablePipeItem, SourcePipeItem] = None # __next__ should call itself to remove the `while` loop and continue clauses but that may lead to stack overflows: there's no tail recursion opt in python # https://stackoverflow.com/questions/13591970/does-python-optimize-tail-recursion (see Y combinator on how it could be emulated)