-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enable async generators as resources #905
Changes from 3 commits
07fd59c
8c8a94f
a07e37f
e4ca5c3
b8396a6
79a42ed
dbea27f
28a6a12
21c6db3
a151428
0211d3d
c087ebf
a7bf8e0
3c047a2
8d81d99
435239d
3787c63
05d0c55
614b80b
fb9c564
4a61e60
9446e29
d830086
b844231
568a2ce
2a168e5
8cf3c3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,7 @@ | |
simulate_func_call, | ||
wrap_compat_transformer, | ||
wrap_resource_gen, | ||
wrap_async_generator, | ||
) | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -321,6 +322,10 @@ def evaluate_gen(self) -> None: | |
# verify if transformer can be called | ||
self._ensure_transform_step(self._gen_idx, gen) | ||
|
||
# wrap async generator | ||
if inspect.isasyncgen(self.gen): | ||
self.replace_gen(wrap_async_generator(self.gen)) | ||
|
||
# evaluate transforms | ||
for step_no, step in enumerate(self._steps): | ||
# print(f"pipe {self.name} step no {step_no} step({step})") | ||
|
@@ -366,9 +371,9 @@ def _wrap_gen(self, *args: Any, **kwargs: Any) -> Any: | |
|
||
def _verify_head_step(self, step: TPipeStep) -> None: | ||
# first element must be Iterable, Iterator or Callable in resource pipe | ||
if not isinstance(step, (Iterable, Iterator)) and not callable(step): | ||
if not isinstance(step, (Iterable, Iterator, AsyncIterator)) and not callable(step): | ||
raise CreatePipeException( | ||
self.name, "A head of a resource pipe must be Iterable, Iterator or a Callable" | ||
self.name, "A head of a resource pipe must be Iterable, Iterator, AsyncIterator or a Callable" | ||
) | ||
|
||
def _wrap_transform_step_meta(self, step_no: int, step: TPipeStep) -> TPipeStep: | ||
|
@@ -619,6 +624,16 @@ def __next__(self) -> PipeItem: | |
pipe_item = None | ||
continue | ||
|
||
# handle async iterator items as new source | ||
if inspect.isasyncgen(item): | ||
self._sources.append( | ||
SourcePipeItem( | ||
wrap_async_generator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta | ||
) | ||
) | ||
pipe_item = None | ||
continue | ||
|
||
if isinstance(item, Awaitable) or callable(item): | ||
# do we have a free slot or one of the slots is done? | ||
if len(self._futures) < self.max_parallel_items or self._next_future() >= 0: | ||
|
@@ -791,6 +806,27 @@ def _get_source_item(self) -> ResolvablePipeItem: | |
elif self._next_item_mode == "round_robin": | ||
return self._get_source_item_round_robin() | ||
|
||
def _get_next_item_from_generator( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here there only is refactoring to get rid of non-dry code. |
||
self, | ||
gen: Any, | ||
step: int, | ||
pipe: Pipe, | ||
meta: Any, | ||
) -> ResolvablePipeItem: | ||
item: ResolvablePipeItem = next(gen) | ||
if item is None: | ||
return item | ||
# full pipe item may be returned, this is used by ForkPipe step | ||
# to redirect execution of an item to another pipe | ||
if isinstance(item, ResolvablePipeItem): | ||
return item | ||
else: | ||
# keep the item assigned step and pipe when creating resolvable item | ||
if isinstance(item, DataItemWithMeta): | ||
return ResolvablePipeItem(item.data, step, pipe, item.meta) | ||
else: | ||
return ResolvablePipeItem(item, step, pipe, meta) | ||
|
||
def _get_source_item_current(self) -> ResolvablePipeItem: | ||
# no more sources to iterate | ||
if len(self._sources) == 0: | ||
|
@@ -803,17 +839,8 @@ def _get_source_item_current(self) -> ResolvablePipeItem: | |
set_current_pipe_name(pipe.name) | ||
item = None | ||
while item is None: | ||
item = next(gen) | ||
# full pipe item may be returned, this is used by ForkPipe step | ||
# to redirect execution of an item to another pipe | ||
if isinstance(item, ResolvablePipeItem): | ||
return item | ||
else: | ||
# keep the item assigned step and pipe when creating resolvable item | ||
if isinstance(item, DataItemWithMeta): | ||
return ResolvablePipeItem(item.data, step, pipe, item.meta) | ||
else: | ||
return ResolvablePipeItem(item, step, pipe, meta) | ||
item = self._get_next_item_from_generator(gen, step, pipe, meta) | ||
return item | ||
except StopIteration: | ||
# remove empty iterator and try another source | ||
self._sources.pop() | ||
|
@@ -839,17 +866,8 @@ def _get_source_item_round_robin(self) -> ResolvablePipeItem: | |
self._round_robin_index = (self._round_robin_index + 1) % sources_count | ||
gen, step, pipe, meta = self._sources[self._round_robin_index] | ||
set_current_pipe_name(pipe.name) | ||
item = next(gen) | ||
# full pipe item may be returned, this is used by ForkPipe step | ||
# to redirect execution of an item to another pipe | ||
if isinstance(item, ResolvablePipeItem): | ||
return item | ||
else: | ||
# keep the item assigned step and pipe when creating resolvable item | ||
if isinstance(item, DataItemWithMeta): | ||
return ResolvablePipeItem(item.data, step, pipe, item.meta) | ||
else: | ||
return ResolvablePipeItem(item, step, pipe, meta) | ||
item = self._get_next_item_from_generator(gen, step, pipe, meta) | ||
return item | ||
except StopIteration: | ||
# remove empty iterator and try another source | ||
self._sources.pop(self._round_robin_index) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
import inspect | ||
import makefun | ||
from typing import Optional, Tuple, Union, List, Any, Sequence, cast | ||
from typing import Optional, Tuple, Union, List, Any, Sequence, cast, Iterator | ||
from collections.abc import Mapping as C_Mapping | ||
|
||
from dlt.common.exceptions import MissingDependencyException | ||
|
@@ -119,6 +119,25 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in | |
return meta_arg | ||
|
||
|
||
def wrap_async_generator(f: Any) -> Any: | ||
"""Wraps an async generator into a list with one awaitable""" | ||
|
||
async def run() -> List[TDataItem]: | ||
result: List[TDataItem] = [] | ||
try: | ||
item: TDataItems = None | ||
while item := await f.__anext__(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the problem with this thing is that it will consume the whole generator in memory and then return results once. so ie. when there's a transformer it will receive a full dataset. right? maybe we need to send each f.next() to a future pool and wrap this in a function that returns both a next result (after await) and the async iterator itself? so you put it into futures pool again and return the current value |
||
if isinstance(item, Iterator): | ||
result.extend(item) | ||
else: | ||
result.append(item) | ||
except StopAsyncIteration: | ||
pass | ||
return result | ||
|
||
yield run() | ||
|
||
|
||
def wrap_compat_transformer( | ||
name: str, f: AnyFun, sig: inspect.Signature, *args: Any, **kwargs: Any | ||
) -> AnyFun: | ||
|
@@ -142,8 +161,12 @@ def wrap_resource_gen( | |
name: str, f: AnyFun, sig: inspect.Signature, *args: Any, **kwargs: Any | ||
) -> AnyFun: | ||
"""Wraps a generator or generator function so it is evaluated on extraction""" | ||
if inspect.isgeneratorfunction(inspect.unwrap(f)) or inspect.isgenerator(f): | ||
# always wrap generators and generator functions. evaluate only at runtime! | ||
|
||
if ( | ||
inspect.isgeneratorfunction(inspect.unwrap(f)) | ||
or inspect.isgenerator(f) | ||
or inspect.isasyncgenfunction(f) | ||
): | ||
|
||
def _partial() -> Any: | ||
# print(f"_PARTIAL: {args} {kwargs}") | ||
sultaniman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm maybe we should check for AsyncIterator? not all iterators are generators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah you are totally right, this is a leftover from something else, I updated it in the other places too.