Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable async generators as resources #905

Merged
merged 27 commits into from
Jan 30, 2024
Merged

enable async generators as resources #905

merged 27 commits into from
Jan 30, 2024

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Jan 22, 2024

A simple implementation to support async generators in our resources and transformers. This implementation basically converts an async generator to an awaitable list, so it can be run in the futures pool.

Implementation details: The way it is implemented now, the async generator will block one futures slot in the extraction until it is completed. The question is, should we aim at implementing something that will finish a future regularly after a certain batch size and yield control back to the main thread, but have some kind of info in the FuturePipeItem indicating that there still is an open async generator that it can be rescheduled? Somehow a follow-up future of sorts?

Copy link

netlify bot commented Jan 22, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 8cf3c3c
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/65b921bc45176300086522f1

@sh-rp sh-rp force-pushed the d#/async_iterators branch from 6971b8c to 07fd59c Compare January 22, 2024 21:59
@sh-rp sh-rp changed the title enable asyn generators as resources enable async generators as resources Jan 23, 2024
@sh-rp sh-rp force-pushed the d#/async_iterators branch from dff8622 to 7575d53 Compare January 23, 2024 12:33
@@ -791,6 +806,27 @@ def _get_source_item(self) -> ResolvablePipeItem:
elif self._next_item_mode == "round_robin":
return self._get_source_item_round_robin()

def _get_next_item_from_generator(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here there only is refactoring to get rid of non-dry code.

@sh-rp sh-rp marked this pull request as ready for review January 23, 2024 12:38
@sh-rp sh-rp requested a review from rudolfix January 23, 2024 12:38
@sh-rp sh-rp force-pushed the d#/async_iterators branch from 7575d53 to a07e37f Compare January 23, 2024 12:44
@sh-rp sh-rp linked an issue Jan 23, 2024 that may be closed by this pull request
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapping is very clever! but we need to yield item by item. we cannot collect all the items and yield at once...

result: List[TDataItem] = []
try:
item: TDataItems = None
while item := await f.__anext__():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem with this thing is that it will consume the whole generator in memory and then return results once. so ie. when there's a transformer it will receive a full dataset. right?

maybe we need to send each f.next() to a future pool and wrap this in a function that returns both a next result (after await) and the async iterator itself? so you put it into futures pool again and return the current value

@sh-rp
Copy link
Collaborator Author

sh-rp commented Jan 23, 2024

@rudolfix yes you are right, i was already thinking about that after submitting the PR. I have changed it now that the wrapped async generator returns an iterator of awaitables. There is some magic inside which essentially emulates this "yield none" behavior we implemented last summer to make round_robin extract work (I can point you to the commit I think if you don't remember). My remaining questions are:

  • Should we maybe change the default item mode to round_robin? I don't think the newbie user will find out about this if they just start yielding stuff
  • Should we make the chunk-size of this generator configurable, or are we happy with yielding single items? I suppose we can keep it like this and it will replicate the behavior of the regular generator.
  • I have modified the function that evaluates the result of futures a bit to not have stopiteration exceptions be an error, we could also have a custom exception there if you think we should still treat this as an error in other cases. wdyt?

@sh-rp
Copy link
Collaborator Author

sh-rp commented Jan 23, 2024

If we follow the approach of adding a follow up future, which i think is doable, then parallelism I think will also work for the fifo mode, I somehow like my approach better though, because it does not modify the pipe code much.

is_running = False

# it is best to use the round robin strategy here if multiple async generators are used in resources
while not exhausted:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the same strategy that user wants? anyway. IMO if you have two async generators they will be executed in parallel in futures, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not if we emit a list of awaitables, then one list will be processed first, then the other (you can see that in the test)

# it is best to use the round robin strategy here if multiple async generators are used in resources
while not exhausted:
while is_running:
yield None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% cpu usage here? this will be executed in tight loop right? if there's None we go for the next resource and we get None again so we go again etc.

btw. I think we are close and the idea with yielding None is good.


# it is best to use the round robin strategy here if multiple async generators are used in resources
while not exhausted:
while is_running:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm maybe asyncio critical sections are better? https://docs.python.org/3.8/library/asyncio-sync.html (if they work here because this code is sync)

@rudolfix
Copy link
Collaborator

rudolfix commented Jan 23, 2024

@sh-rp

  • Should we maybe change the default item mode to round_robin? I don't think the newbie user will find out about this if they just start yielding stuff

I'd say no. the default behavior is to evaluate resources one by another fully. I think it is good.
we can switch to round robin if there's an async generator in any of the pipes though. otherwise your current implementation will starve the async generators is there are normal ones. btw. there are a lot of nuances due to this hack with yielding None in a loop

  • Should we make the chunk-size of this generator configurable, or are we happy with yielding single items? I suppose we can keep it like this and it will replicate the behavior of the regular generator.

single item can be a list/batch. regular generators work the same way, no?

  • I have modified the function that evaluates the result of futures a bit to not have stopiteration exceptions be an error, we could also have a custom exception there if you think we should still treat this as an error in other cases. wdyt?

I do not see this modification. do you mean this?:

if isinstance(ex, StopAsyncIteration):
                return None

this is handling this new async iterator and LGTM

but yeah we should test if limiting async iterator still works. could you add a test with does add_limit on a resource that is async generator to make sure we close it properly before it is exhausted.

also please add a test where we have async and sync generator to see if we are not starving the async.

@sh-rp
Copy link
Collaborator Author

sh-rp commented Jan 24, 2024

@rudolfix do you remember what the reasoning of these lines was?

        # if there are currently more sources than added initially, we need to process the new ones first
        if sources_count > self._initial_sources_count:
            return self._get_source_item_fifo()

This makes the round robin fall back to fifo if more pipes were added during extraction. I remember that I put this in, but i have no recollection why. I will slow down parallel execution as soon as dynamically creates pipes are present in async generator scenarios and I am wondering if we can remove it.

dlt/extract/pipe.py Outdated Show resolved Hide resolved
sh-rp added 2 commits January 24, 2024 16:13
add more tests
add support for limit in asynciterator
try:
for i in gen: # type: ignore # TODO: help me fix this later
yield i
count += 1
if i is not None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is needed for the async generator to work properly, it changes the behavior of the limit, but probably that is ok, i am not sure.

sources_count = len(self._sources)
# no more sources to iterate
if sources_count == 0:
return None
# if there are currently more sources than added initially, we need to process the new ones first
if sources_count > self._initial_sources_count:
return self._get_source_item_current()
try:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function now nicely combines fifo and round_robin. in fifo mode it says on the first source and only ventures into the next ones if that returns none. It would be quite easy to switch it back to the old behavior though. I removed this part that switches from round robin to fifo in some cases as it does not really make sense anymore imho if fifo also can switch the source index.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be very easy to keep it by adding this condition to line 820. my worry here is that if let's say we have a resource that feeds item to a transformer and it is itself a generator, we generate million items, and this will produce million of source slots. my take is that we switch to FIFO mode when sources_count - self._initial_sources_count > self.max_parallel_items to exhaust new generators

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented it this way (and corrected the decreasing of initial sources counter along the way). Now there is a diferentiation between fifo and strict_fifo, I think this is necessary to prevent a scenario as you describe it.

yield None
yield run()
except GeneratorExit:
# clean up async generator
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think I need to clean up here, this can happen at the end of the loop in the pipe

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_limit will close the generator before it is exhausted. should we set the exhausted flag here? and do not call anext in line 144?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is, that the "parent" generator gets closed before the last item is evaulated that we want to evaluate. so it has to stay this way. do you know what I mean?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But i just had a nice idea of how to close the generator in the right moment this way, which is a good improvement :)



@pytest.mark.parametrize("parallelized", [True, False])
def test_async_decorator_experiment(parallelized) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this for now is just an experiment, but this could be the extension for the defer decorator. if you put it on a resource function, it will make every iteration run as a future, so quite cool i think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: i know that i was not supposed to do this yet ;)

) -> Generator[Awaitable[TDataItems], None, None]:
"""Wraps an async generator into a list of awaitables"""
exhausted = False
busy = False
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using an async.lock here will fail on python 3.8, and 3.9 because there is no current loop on the main thread. i am not sure why it is allowed in python 3.10. threading locks should not be used with coroutines as far as I understand.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for this! I think we fixed many flaws with parallelism, not only async gens

  1. we need a test where exception happens during async generation. I have a set of such tests somewhere, please add this one
  2. we need a test where large amount of nested iterators is created, see review
  3. we should move defer decorator (we can alias it!) to separate PR. nevertheless I reviewed it :)

continue

# handle async iterator items as new source
if inspect.isasyncgen(item):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm maybe we should check for AsyncIterator? not all iterators are generators

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you are totally right, this is a leftover from something else, I updated it in the other places too.

self._async_pool.shutdown_asyncgens(), self._ensure_async_pool()
)
while not future.done():
sleep(self.futures_poll_interval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I'm just worried that our current add_limit works only for generators and will not work for async see below:

def add_limit(self, max_items: int) -> "DltResource":  # noqa: A003
        """Adds a limit `max_items` to the resource pipe

        This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging. It is
        a no-op for transformers. Those should be limited by their input data.

        Args:
            max_items (int): The maximum number of items to yield
        Returns:
            "DltResource": returns self
        """

        def _gen_wrap(gen: TPipeStep) -> TPipeStep:
            """Wrap a generator to take the first `max_items` records"""
            nonlocal max_items
            count = 0
            if inspect.isfunction(gen):
                gen = gen()
            try:
                for i in gen:  # type: ignore # TODO: help me fix this later
                    yield i
                    count += 1
                    if count == max_items:
                        return
            finally:
                if inspect.isgenerator(gen):
                    gen.close()
            return

        # transformers should be limited by their input, so we only limit non-transformers
        if not self.is_transformer:
            self._pipe.replace_gen(_gen_wrap(self._pipe.gen))
        return self

sources_count = len(self._sources)
# no more sources to iterate
if sources_count == 0:
return None
# if there are currently more sources than added initially, we need to process the new ones first
if sources_count > self._initial_sources_count:
return self._get_source_item_current()
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be very easy to keep it by adding this condition to line 820. my worry here is that if let's say we have a resource that feeds item to a transformer and it is itself a generator, we generate million items, and this will produce million of source slots. my take is that we switch to FIFO mode when sources_count - self._initial_sources_count > self.max_parallel_items to exhaust new generators

yield None
yield run()
except GeneratorExit:
# clean up async generator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_limit will close the generator before it is exhausted. should we set the exhausted flag here? and do not call anext in line 144?

dlt/extract/utils.py Show resolved Hide resolved
# items will be round robin, nested iterators are fully iterated and appear inline as soon as they are encountered
assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 55, 56, 77, 88, 89, 13, 3, 14, 4, 15]
# items will be round robin, nested iterators are integrated into the round robin
assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 3, 13, 55, 4, 14, 56, 15, 77, 88, 89]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please test a case when we have too many nested iterators

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a couple more tests would be not so bad here, especially since it is a bit hard to read this test (ie to verify that it is correct) :)

threads = set()

def parallelize(f) -> Any:
"""converts regular itarable to generator of functions that can be run in parallel in the pipe"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO parallelize should just set a parallel flag on a resource and the code below should be part of extract pipe - where you have the async wrapper already.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this stuff to a new PR as you suggested, I have marked the test as skipped and we can discuss there.

if inspect.isfunction(gen):
gen = gen()
# if we have an async gen, no further action is needed
if inspect.isasyncgen(gen):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls note that you cannot execute generators in the thread pool. you may execute only generator functions. so both isgen and isasyncgen will not work (for different reasons - we need good exception messages here)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next pr!

gen = f(*args, **kwargs)
# unpack generator
if inspect.isfunction(gen):
gen = gen()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this I assume happens in the thread pool, not in main thread?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next pr!

pipeline_1.run(async_resource1().add_limit(13))

with pipeline_1.sql_client() as c:
with c.execute_query("SELECT * FROM table1") as cur:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not need to go through duckdb and pipeline here. you can just call list(resource()) and it will evaluate same way as with run method

# we need to decrease the index to keep the round robin order
self._round_robin_index -= 1
# since in this case we have popped an initial source, we need to decrease the initial sources count
self._initial_sources_count -= 1
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually a bug i would say..

@@ -603,6 +603,10 @@ def pass_gen(item, meta):


def test_close_on_async_exception() -> None:
global close_pipe_got_exit, close_pipe_yielding
close_pipe_got_exit = False
close_pipe_yielding = False
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

globals need to be reset in each test, they will not recover between tests in one file!

yield i
close_pipe_yielding = False
# we have a different exception here
except asyncio.CancelledError:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i expected to also get a generatorexit here, but it seems to not be the case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW @sh-rp according to python docs, StopIteration and StopAsyncIteration are not propagated out of asynchronous generators, and are replaced with a RuntimeError.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@z3z1ma right! but we wrap async generator in regular generator so I expect GeneratorExit here. if we get both it means that we are leaking some edge cases ie. when pool is closed and we cancel async generators using asyncio module. I'll investigate in separate branch

@@ -715,13 +717,14 @@ async def long_gen():
yield i
close_pipe_yielding = False
# we have a different exception here
except asyncio.CancelledError:
except GeneratorExit:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works. the test fails now. The error on cancelling async gens is a cancellederror, not a generatorexit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh this was working for me. I was not cancelling async gen directly but the wrapped gen and propagating the exception. I'll check it out

@sh-rp sh-rp force-pushed the d#/async_iterators branch from 3b34190 to d830086 Compare January 29, 2024 16:52
self._futures: List[FuturePipeItem] = []
self._next_item_mode = next_item_mode
self._next_item_mode: TPipeNextItemMode = next_item_mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is annotated in the argument should we also remove the annotation here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! but somehow mypy sees this as string here so I forced the type (or maybe it is VSCode language server)

# get next item from the current source
gen, step, pipe, meta = self._sources[self._current_source_index]
set_current_pipe_name(pipe.name)
if (item := next(gen)) is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo this already will check for not None as item := next(gen)

if first_evaluated_index is None:
first_evaluated_index = self._current_source_index
# always go round robin if None was returned
self._current_source_index = (self._current_source_index - 1) % sources_count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check if self._current_source_index > 0?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no! check how the modulo operator works for negative numbers :)

) -> Generator[Awaitable[TDataItems], None, None]:
"""Wraps an async generator into a list of awaitables"""
exhausted = False
busy = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to pass some object instance to run function which will contain the state the variables?

    exhausted = False
    busy = False

nonlocal busy
busy = False

# this generator yields None while the async generator is not exhausted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean it will idle for the result of the async generator while yielding None?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a sense. most of this PR was to handle exactly this case. it is done upstream, if we see that all generators want to idle and we have no more data we sleep and yield control to other threads

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! thanks @sh-rp and everyone else that participated and gave us ideas to implement that!

@rudolfix rudolfix merged commit 93ddd19 into devel Jan 30, 2024
98 of 100 checks passed
@rudolfix rudolfix deleted the d#/async_iterators branch January 30, 2024 16:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

allows async iterators to be evaluated in extract step
4 participants