diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index ef5dc65ba3..2670017c00 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -805,6 +805,9 @@ def _get_source_item(self) -> ResolvablePipeItem: if sources_count == 0: return None try: + # reset to beginning of resource list for fifo mode + if self._next_item_mode == "fifo": + self._current_source_index = -1 first_evaluated_index = -1 while True: print(self._current_source_index) @@ -826,11 +829,9 @@ def _get_source_item(self) -> ResolvablePipeItem: return ResolvablePipeItem(item.data, step, pipe, item.meta) else: return ResolvablePipeItem(item, step, pipe, meta) - first_evaluated_index = ( - self._current_source_index - if first_evaluated_index == -1 - else first_evaluated_index - ) + # remember the first evaluated index + if first_evaluated_index == -1: + first_evaluated_index = self._current_source_index except StopIteration: # remove empty iterator and try another source self._sources.pop(self._current_source_index) @@ -842,11 +843,6 @@ def _get_source_item(self) -> ResolvablePipeItem: raise except Exception as ex: raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex - finally: - # reset to beginning of resource list for fifo mode - if self._next_item_mode == "fifo": - self._current_source_index = -1 - print("tmp") @staticmethod def clone_pipes( diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 20d67c2470..f6c1684b94 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -312,15 +312,14 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: gen = gen() # wrap async gen already here - # max items will be handled by the wrapper if inspect.isasyncgen(gen): - gen = wrap_async_generator(gen, max_items) - max_items = -1 + gen = wrap_async_generator(gen) try: for i in gen: # type: ignore # TODO: help me fix this later yield i - count += 1 + if i is not None: + count += 1 if count == max_items: return finally: diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 7743f103a5..a4d993674a 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -132,8 +132,7 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in def wrap_async_generator( - gen: AsyncGenerator[TDataItems, None], - max_items: Optional[int] = -1, + gen: AsyncGenerator[TDataItems, None] ) -> Generator[Awaitable[TDataItems], None, None]: """Wraps an async generator into a list of awaitables""" exhausted = False @@ -141,13 +140,8 @@ def wrap_async_generator( # creates an awaitable that will return the next item from the async generator async def run() -> TDataItems: - nonlocal max_items async with lock: try: - if max_items == 0: - await gen.aclose() - raise StopAsyncIteration() - max_items -= 1 return await gen.__anext__() # on stop iteration mark as exhausted except StopAsyncIteration: @@ -156,10 +150,14 @@ async def run() -> TDataItems: raise # this generator yields None while the async generator is not exhauste - while not exhausted: - while lock.locked(): - yield None - yield run() + try: + while not exhausted: + while lock.locked(): + yield None + yield run() + except GeneratorExit: + # clean up async generator + asyncio.ensure_future(gen.aclose()) def wrap_compat_transformer(