-
Notifications
You must be signed in to change notification settings - Fork 193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enable async generators as resources #905
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
6971b8c
to
07fd59c
Compare
dff8622
to
7575d53
Compare
dlt/extract/pipe.py
Outdated
@@ -791,6 +806,27 @@ def _get_source_item(self) -> ResolvablePipeItem: | |||
elif self._next_item_mode == "round_robin": | |||
return self._get_source_item_round_robin() | |||
|
|||
def _get_next_item_from_generator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here there only is refactoring to get rid of non-dry code.
7575d53
to
a07e37f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrapping is very clever! but we need to yield item by item. we cannot collect all the items and yield at once...
dlt/extract/utils.py
Outdated
result: List[TDataItem] = [] | ||
try: | ||
item: TDataItems = None | ||
while item := await f.__anext__(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem with this thing is that it will consume the whole generator in memory and then return results once. so ie. when there's a transformer it will receive a full dataset. right?
maybe we need to send each f.next() to a future pool and wrap this in a function that returns both a next result (after await) and the async iterator itself? so you put it into futures pool again and return the current value
@rudolfix yes you are right, i was already thinking about that after submitting the PR. I have changed it now that the wrapped async generator returns an iterator of awaitables. There is some magic inside which essentially emulates this "yield none" behavior we implemented last summer to make round_robin extract work (I can point you to the commit I think if you don't remember). My remaining questions are:
|
If we follow the approach of adding a follow up future, which i think is doable, then parallelism I think will also work for the fifo mode, I somehow like my approach better though, because it does not modify the pipe code much. |
dlt/extract/utils.py
Outdated
is_running = False | ||
|
||
# it is best to use the round robin strategy here if multiple async generators are used in resources | ||
while not exhausted: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the same strategy that user wants? anyway. IMO if you have two async generators they will be executed in parallel in futures, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not if we emit a list of awaitables, then one list will be processed first, then the other (you can see that in the test)
dlt/extract/utils.py
Outdated
# it is best to use the round robin strategy here if multiple async generators are used in resources | ||
while not exhausted: | ||
while is_running: | ||
yield None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% cpu usage here? this will be executed in tight loop right? if there's None we go for the next resource and we get None again so we go again etc.
btw. I think we are close and the idea with yielding None is good.
dlt/extract/utils.py
Outdated
|
||
# it is best to use the round robin strategy here if multiple async generators are used in resources | ||
while not exhausted: | ||
while is_running: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm maybe asyncio critical sections are better? https://docs.python.org/3.8/library/asyncio-sync.html (if they work here because this code is sync)
I'd say no. the default behavior is to evaluate resources one by another fully. I think it is good.
single item can be a list/batch. regular generators work the same way, no?
I do not see this modification. do you mean this?:
this is handling this new async iterator and LGTM but yeah we should test if limiting async iterator still works. could you add a test with does also please add a test where we have async and sync generator to see if we are not starving the async. |
@rudolfix do you remember what the reasoning of these lines was?
This makes the round robin fall back to fifo if more pipes were added during extraction. I remember that I put this in, but i have no recollection why. I will slow down parallel execution as soon as dynamically creates pipes are present in async generator scenarios and I am wondering if we can remove it. |
add more tests add support for limit in asynciterator
try: | ||
for i in gen: # type: ignore # TODO: help me fix this later | ||
yield i | ||
count += 1 | ||
if i is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line is needed for the async generator to work properly, it changes the behavior of the limit, but probably that is ok, i am not sure.
sources_count = len(self._sources) | ||
# no more sources to iterate | ||
if sources_count == 0: | ||
return None | ||
# if there are currently more sources than added initially, we need to process the new ones first | ||
if sources_count > self._initial_sources_count: | ||
return self._get_source_item_current() | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function now nicely combines fifo and round_robin. in fifo mode it says on the first source and only ventures into the next ones if that returns none. It would be quite easy to switch it back to the old behavior though. I removed this part that switches from round robin to fifo in some cases as it does not really make sense anymore imho if fifo also can switch the source index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be very easy to keep it by adding this condition to line 820. my worry here is that if let's say we have a resource that feeds item to a transformer and it is itself a generator, we generate million items, and this will produce million of source slots. my take is that we switch to FIFO mode when sources_count - self._initial_sources_count > self.max_parallel_items to exhaust new generators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented it this way (and corrected the decreasing of initial sources counter along the way). Now there is a diferentiation between fifo and strict_fifo, I think this is necessary to prevent a scenario as you describe it.
dlt/extract/utils.py
Outdated
yield None | ||
yield run() | ||
except GeneratorExit: | ||
# clean up async generator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think I need to clean up here, this can happen at the end of the loop in the pipe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add_limit
will close the generator before it is exhausted. should we set the exhausted
flag here? and do not call anext in line 144?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem is, that the "parent" generator gets closed before the last item is evaulated that we want to evaluate. so it has to stay this way. do you know what I mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But i just had a nice idea of how to close the generator in the right moment this way, which is a good improvement :)
|
||
|
||
@pytest.mark.parametrize("parallelized", [True, False]) | ||
def test_async_decorator_experiment(parallelized) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this for now is just an experiment, but this could be the extension for the defer decorator. if you put it on a resource function, it will make every iteration run as a future, so quite cool i think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PS: i know that i was not supposed to do this yet ;)
) -> Generator[Awaitable[TDataItems], None, None]: | ||
"""Wraps an async generator into a list of awaitables""" | ||
exhausted = False | ||
busy = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using an async.lock here will fail on python 3.8, and 3.9 because there is no current loop on the main thread. i am not sure why it is allowed in python 3.10. threading locks should not be used with coroutines as far as I understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx for this! I think we fixed many flaws with parallelism, not only async gens
- we need a test where exception happens during async generation. I have a set of such tests somewhere, please add this one
- we need a test where large amount of nested iterators is created, see review
- we should move defer decorator (we can alias it!) to separate PR. nevertheless I reviewed it :)
dlt/extract/pipe.py
Outdated
continue | ||
|
||
# handle async iterator items as new source | ||
if inspect.isasyncgen(item): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm maybe we should check for AsyncIterator? not all iterators are generators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah you are totally right, this is a leftover from something else, I updated it in the other places too.
self._async_pool.shutdown_asyncgens(), self._ensure_async_pool() | ||
) | ||
while not future.done(): | ||
sleep(self.futures_poll_interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I'm just worried that our current add_limit
works only for generators and will not work for async see below:
def add_limit(self, max_items: int) -> "DltResource": # noqa: A003
"""Adds a limit `max_items` to the resource pipe
This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging. It is
a no-op for transformers. Those should be limited by their input data.
Args:
max_items (int): The maximum number of items to yield
Returns:
"DltResource": returns self
"""
def _gen_wrap(gen: TPipeStep) -> TPipeStep:
"""Wrap a generator to take the first `max_items` records"""
nonlocal max_items
count = 0
if inspect.isfunction(gen):
gen = gen()
try:
for i in gen: # type: ignore # TODO: help me fix this later
yield i
count += 1
if count == max_items:
return
finally:
if inspect.isgenerator(gen):
gen.close()
return
# transformers should be limited by their input, so we only limit non-transformers
if not self.is_transformer:
self._pipe.replace_gen(_gen_wrap(self._pipe.gen))
return self
sources_count = len(self._sources) | ||
# no more sources to iterate | ||
if sources_count == 0: | ||
return None | ||
# if there are currently more sources than added initially, we need to process the new ones first | ||
if sources_count > self._initial_sources_count: | ||
return self._get_source_item_current() | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be very easy to keep it by adding this condition to line 820. my worry here is that if let's say we have a resource that feeds item to a transformer and it is itself a generator, we generate million items, and this will produce million of source slots. my take is that we switch to FIFO mode when sources_count - self._initial_sources_count > self.max_parallel_items to exhaust new generators
dlt/extract/utils.py
Outdated
yield None | ||
yield run() | ||
except GeneratorExit: | ||
# clean up async generator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add_limit
will close the generator before it is exhausted. should we set the exhausted
flag here? and do not call anext in line 144?
tests/extract/test_extract_pipe.py
Outdated
# items will be round robin, nested iterators are fully iterated and appear inline as soon as they are encountered | ||
assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 55, 56, 77, 88, 89, 13, 3, 14, 4, 15] | ||
# items will be round robin, nested iterators are integrated into the round robin | ||
assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 3, 13, 55, 4, 14, 56, 15, 77, 88, 89] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please test a case when we have too many nested iterators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a couple more tests would be not so bad here, especially since it is a bit hard to read this test (ie to verify that it is correct) :)
threads = set() | ||
|
||
def parallelize(f) -> Any: | ||
"""converts regular itarable to generator of functions that can be run in parallel in the pipe""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO parallelize should just set a parallel
flag on a resource and the code below should be part of extract pipe - where you have the async wrapper already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's move this stuff to a new PR as you suggested, I have marked the test as skipped and we can discuss there.
if inspect.isfunction(gen): | ||
gen = gen() | ||
# if we have an async gen, no further action is needed | ||
if inspect.isasyncgen(gen): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls note that you cannot execute generators in the thread pool. you may execute only generator functions. so both isgen and isasyncgen will not work (for different reasons - we need good exception messages here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next pr!
gen = f(*args, **kwargs) | ||
# unpack generator | ||
if inspect.isfunction(gen): | ||
gen = gen() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this I assume happens in the thread pool, not in main thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
next pr!
pipeline_1.run(async_resource1().add_limit(13)) | ||
|
||
with pipeline_1.sql_client() as c: | ||
with c.execute_query("SELECT * FROM table1") as cur: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you do not need to go through duckdb and pipeline here. you can just call list(resource()) and it will evaluate same way as with run method
# we need to decrease the index to keep the round robin order | ||
self._round_robin_index -= 1 | ||
# since in this case we have popped an initial source, we need to decrease the initial sources count | ||
self._initial_sources_count -= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is actually a bug i would say..
@@ -603,6 +603,10 @@ def pass_gen(item, meta): | |||
|
|||
|
|||
def test_close_on_async_exception() -> None: | |||
global close_pipe_got_exit, close_pipe_yielding | |||
close_pipe_got_exit = False | |||
close_pipe_yielding = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
globals need to be reset in each test, they will not recover between tests in one file!
test async iterator
yield i | ||
close_pipe_yielding = False | ||
# we have a different exception here | ||
except asyncio.CancelledError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i expected to also get a generatorexit here, but it seems to not be the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW @sh-rp according to python docs, StopIteration and StopAsyncIteration are not propagated out of asynchronous generators, and are replaced with a RuntimeError.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@z3z1ma right! but we wrap async generator in regular generator so I expect GeneratorExit here. if we get both it means that we are leaking some edge cases ie. when pool is closed and we cancel async generators using asyncio module. I'll investigate in separate branch
@@ -715,13 +717,14 @@ async def long_gen(): | |||
yield i | |||
close_pipe_yielding = False | |||
# we have a different exception here | |||
except asyncio.CancelledError: | |||
except GeneratorExit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this works. the test fails now. The error on cancelling async gens is a cancellederror, not a generatorexit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
huh this was working for me. I was not cancelling async gen directly but the wrapped gen and propagating the exception. I'll check it out
3b34190
to
d830086
Compare
self._futures: List[FuturePipeItem] = [] | ||
self._next_item_mode = next_item_mode | ||
self._next_item_mode: TPipeNextItemMode = next_item_mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it is annotated in the argument should we also remove the annotation here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! but somehow mypy sees this as string here so I forced the type (or maybe it is VSCode language server)
# get next item from the current source | ||
gen, step, pipe, meta = self._sources[self._current_source_index] | ||
set_current_pipe_name(pipe.name) | ||
if (item := next(gen)) is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo this already will check for not None as item := next(gen)
if first_evaluated_index is None: | ||
first_evaluated_index = self._current_source_index | ||
# always go round robin if None was returned | ||
self._current_source_index = (self._current_source_index - 1) % sources_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also check if self._current_source_index > 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no! check how the modulo operator works for negative numbers :)
) -> Generator[Awaitable[TDataItems], None, None]: | ||
"""Wraps an async generator into a list of awaitables""" | ||
exhausted = False | ||
busy = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to pass some object instance to run
function which will contain the state the variables?
exhausted = False
busy = False
52c4e8c
to
8cf3c3c
Compare
nonlocal busy | ||
busy = False | ||
|
||
# this generator yields None while the async generator is not exhausted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean it will idle for the result of the async generator while yielding None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in a sense. most of this PR was to handle exactly this case. it is done upstream, if we see that all generators want to idle and we have no more data we sleep and yield control to other threads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! thanks @sh-rp and everyone else that participated and gave us ideas to implement that!
A simple implementation to support async generators in our resources and transformers. This implementation basically converts an async generator to an awaitable list, so it can be run in the futures pool.
Implementation details: The way it is implemented now, the async generator will block one futures slot in the extraction until it is completed. The question is, should we aim at implementing something that will finish a future regularly after a certain batch size and yield control back to the main thread, but have some kind of info in the FuturePipeItem indicating that there still is an open async generator that it can be rescheduled? Somehow a follow-up future of sorts?