Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable async generators as resources #905

Merged
merged 27 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
07fd59c
temp
sh-rp Jan 22, 2024
8c8a94f
enable nested generator and add tests
sh-rp Jan 23, 2024
a07e37f
remove temp files
sh-rp Jan 23, 2024
e4ca5c3
convert async iterable to list of awaitables
sh-rp Jan 23, 2024
b8396a6
temp
sh-rp Jan 24, 2024
79a42ed
update evaluation of round robin and fifo
sh-rp Jan 24, 2024
dbea27f
change limit behavior
sh-rp Jan 24, 2024
28a6a12
small fixes
sh-rp Jan 24, 2024
21c6db3
adds experiment for parallelizing regular resources
sh-rp Jan 24, 2024
a151428
fix linter
sh-rp Jan 24, 2024
0211d3d
test is creating pool before iteration solves ci test problems
sh-rp Jan 24, 2024
c087ebf
make one test more predictable
sh-rp Jan 24, 2024
a7bf8e0
remove async pool fix
sh-rp Jan 24, 2024
3c047a2
remove locks from generator wrappers
sh-rp Jan 24, 2024
8d81d99
make test even more predictable
sh-rp Jan 24, 2024
435239d
pr fixes
sh-rp Jan 26, 2024
3787c63
fix async error test
sh-rp Jan 26, 2024
05d0c55
update evaluation order tests
sh-rp Jan 26, 2024
614b80b
adds sources at the end of pipe, closes generators before futures so …
rudolfix Jan 26, 2024
fb9c564
allows async generator items to be evaluated in add_limit
rudolfix Jan 26, 2024
4a61e60
fixes tests
rudolfix Jan 26, 2024
9446e29
update performance docs
sh-rp Jan 29, 2024
d830086
unrelated formatting fixes
sh-rp Jan 29, 2024
b844231
fix one test
sh-rp Jan 29, 2024
568a2ce
small change to resource page
sh-rp Jan 29, 2024
2a168e5
fixes tests
sh-rp Jan 30, 2024
8cf3c3c
change generator exit test
sh-rp Jan 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dlt/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,14 @@ def wrap_async_iterator(
async def run() -> TDataItems:
nonlocal exhausted
try:
item = await gen.__anext__()
# if marked exhausted by the main thread and we are wrapping a generator
# we can close it here
if exhausted and isinstance(gen, AsyncGenerator):
await gen.aclose()
if exhausted:
raise StopAsyncIteration()
item = await gen.__anext__()
return item
# on stop iteration mark as exhausted
# also called when futures are cancelled
except StopAsyncIteration:
exhausted = True
raise
Expand Down
21 changes: 12 additions & 9 deletions tests/extract/test_extract_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,25 @@ def get_pipes():
# items will be in order of the pipes, nested iterator items appear inline, None triggers a bit of rotation
assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15]

# force strict mode, no rotation at all when crossing the initial source count
# force fifo, no rotation at all when crossing the initial source count
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo", max_parallel_items=1))
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
# order the same as above - same rules apply
assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15]

# round robin eval
_l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin"))
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 10, 3, 11, 4, 5, 8, 6, 9, 7]
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 10, 4, 11, 5, 6, 8, 9, 7]

# round robin with max parallel items triggers strict fifo in some cases (after gen2 and 3 are exhausted we already have the first yielded gen,
# items appear in order as sources are processed strictly from front)
_l = list(
PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin", max_parallel_items=1)
)
# items will be in order of the pipes, nested iterator items appear inline, None triggers rotation
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# NOTE: 4, 10, 5 - after 4 there's NONE in fifo so we do next element (round robin style)
# NOTE: 6, 8, 7 - after 6 there's NONE - same thing
assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 4, 10, 5, 6, 8, 7, 9, 11]


def test_rotation_on_none() -> None:
Expand Down Expand Up @@ -715,13 +717,14 @@ async def long_gen():
yield i
close_pipe_yielding = False
# we have a different exception here
except asyncio.CancelledError:
except GeneratorExit:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works. the test fails now. The error on cancelling async gens is a cancellederror, not a generatorexit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh this was working for me. I was not cancelling async gen directly but the wrapped gen and propagating the exception. I'll check it out

close_pipe_got_exit = True

def raise_gen(item: int):
# execute in a thread
async def raise_gen(item: int):
if item == 10:
raise RuntimeError("we fail")
yield item
raise RuntimeError("we fail async")
return item

assert_pipes_closed(raise_gen, long_gen)

Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/test_resources_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __aiter__(self):
async def __anext__(self):
# check for no further items
if self.counter >= 5:
raise StopAsyncIteration
raise StopAsyncIteration()
# increment the counter
self.counter += 1
# simulate work
Expand Down
Loading