Skip to content

Commit

Permalink
fixes tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Jan 26, 2024
1 parent fb9c564 commit 4a61e60
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
7 changes: 4 additions & 3 deletions dlt/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,14 @@ def wrap_async_iterator(
async def run() -> TDataItems:
nonlocal exhausted
try:
item = await gen.__anext__()
# if marked exhausted by the main thread and we are wrapping a generator
# we can close it here
if exhausted and isinstance(gen, AsyncGenerator):
await gen.aclose()
if exhausted:
raise StopAsyncIteration()
item = await gen.__anext__()
return item
# on stop iteration mark as exhausted
# also called when futures are cancelled
except StopAsyncIteration:
exhausted = True
raise
Expand Down
21 changes: 12 additions & 9 deletions tests/extract/test_extract_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,25 @@ def get_pipes():
# items will be in order of the pipes, nested iterator items appear inline, None triggers a bit of rotation
assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15]

# force strict mode, no rotation at all when crossing the initial source count
# force fifo, no rotation at all when crossing the initial source count
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo", max_parallel_items=1))
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
# order the same as above - same rules apply
assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15]

# round robin eval
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin"))
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 10, 3, 11, 4, 5, 8, 6, 9, 7]
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 10, 4, 11, 5, 6, 8, 9, 7]

# round robin with max parallel items triggers strict fifo in some cases (after gen2 and 3 are exhausted we already have the first yielded gen,
# items appear in order as sources are processed strictly from front)
_l = list(
PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin", max_parallel_items=1)
)
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# NOTE: 4, 10, 5 - after 4 there's NONE in fifo so we do next element (round robin style)
# NOTE: 6, 8, 7 - after 6 there's NONE - same thing
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 4, 10, 5, 6, 8, 7, 9, 11]


def test_rotation_on_none() -> None:
Expand Down Expand Up @@ -715,13 +717,14 @@ async def long_gen():
yield i
close_pipe_yielding = False
# we have a different exception here
except asyncio.CancelledError:
except GeneratorExit:
close_pipe_got_exit = True

def raise_gen(item: int):
# execute in a thread
async def raise_gen(item: int):
if item == 10:
raise RuntimeError("we fail")
yield item
raise RuntimeError("we fail async")
return item

assert_pipes_closed(raise_gen, long_gen)

Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/test_resources_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __aiter__(self):
async def __anext__(self):
# check for no further items
if self.counter >= 5:
raise StopAsyncIteration
raise StopAsyncIteration()
# increment the counter
self.counter += 1
# simulate work
Expand Down

0 comments on commit 4a61e60

Please sign in to comment.