Skip to content

Commit

Permalink
change limit behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jan 24, 2024
1 parent 79a42ed commit dbea27f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 25 deletions.
16 changes: 6 additions & 10 deletions dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,9 @@ def _get_source_item(self) -> ResolvablePipeItem:
if sources_count == 0:
return None
try:
# reset to beginning of resource list for fifo mode
if self._next_item_mode == "fifo":
self._current_source_index = -1
first_evaluated_index = -1
while True:
print(self._current_source_index)
Expand All @@ -826,11 +829,9 @@ def _get_source_item(self) -> ResolvablePipeItem:
return ResolvablePipeItem(item.data, step, pipe, item.meta)
else:
return ResolvablePipeItem(item, step, pipe, meta)
first_evaluated_index = (
self._current_source_index
if first_evaluated_index == -1
else first_evaluated_index
)
# remember the first evaluated index
if first_evaluated_index == -1:
first_evaluated_index = self._current_source_index
except StopIteration:
# remove empty iterator and try another source
self._sources.pop(self._current_source_index)
Expand All @@ -842,11 +843,6 @@ def _get_source_item(self) -> ResolvablePipeItem:
raise
except Exception as ex:
raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex
finally:
# reset to beginning of resource list for fifo mode
if self._next_item_mode == "fifo":
self._current_source_index = -1
print("tmp")

@staticmethod
def clone_pipes(
Expand Down
7 changes: 3 additions & 4 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,14 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep:
gen = gen()

# wrap async gen already here
# max items will be handled by the wrapper
if inspect.isasyncgen(gen):
gen = wrap_async_generator(gen, max_items)
max_items = -1
gen = wrap_async_generator(gen)

try:
for i in gen: # type: ignore # TODO: help me fix this later
yield i
count += 1
if i is not None:
count += 1
if count == max_items:
return
finally:
Expand Down
20 changes: 9 additions & 11 deletions dlt/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,16 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in


def wrap_async_generator(
gen: AsyncGenerator[TDataItems, None],
max_items: Optional[int] = -1,
gen: AsyncGenerator[TDataItems, None]
) -> Generator[Awaitable[TDataItems], None, None]:
"""Wraps an async generator into a list of awaitables"""
exhausted = False
lock = asyncio.Lock()

# creates an awaitable that will return the next item from the async generator
async def run() -> TDataItems:
nonlocal max_items
async with lock:
try:
if max_items == 0:
await gen.aclose()
raise StopAsyncIteration()
max_items -= 1
return await gen.__anext__()
# on stop iteration mark as exhausted
except StopAsyncIteration:
Expand All @@ -156,10 +150,14 @@ async def run() -> TDataItems:
raise

# this generator yields None while the async generator is not exhauste
while not exhausted:
while lock.locked():
yield None
yield run()
try:
while not exhausted:
while lock.locked():
yield None
yield run()
except GeneratorExit:
# clean up async generator
asyncio.ensure_future(gen.aclose())


def wrap_compat_transformer(
Expand Down

0 comments on commit dbea27f

Please sign in to comment.