Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable async generators as resources #905

Merged
merged 27 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
07fd59c
temp
sh-rp Jan 22, 2024
8c8a94f
enable nested generator and add tests
sh-rp Jan 23, 2024
a07e37f
remove temp files
sh-rp Jan 23, 2024
e4ca5c3
convert async iterable to list of awaitables
sh-rp Jan 23, 2024
b8396a6
temp
sh-rp Jan 24, 2024
79a42ed
update evaluation of round robin and fifo
sh-rp Jan 24, 2024
dbea27f
change limit behavior
sh-rp Jan 24, 2024
28a6a12
small fixes
sh-rp Jan 24, 2024
21c6db3
adds experiment for parallelizing regular resources
sh-rp Jan 24, 2024
a151428
fix linter
sh-rp Jan 24, 2024
0211d3d
test is creating pool before iteration solves ci test problems
sh-rp Jan 24, 2024
c087ebf
make one test more predictable
sh-rp Jan 24, 2024
a7bf8e0
remove async pool fix
sh-rp Jan 24, 2024
3c047a2
remove locks from generator wrappers
sh-rp Jan 24, 2024
8d81d99
make test even more predictable
sh-rp Jan 24, 2024
435239d
pr fixes
sh-rp Jan 26, 2024
3787c63
fix async error test
sh-rp Jan 26, 2024
05d0c55
update evaluation order tests
sh-rp Jan 26, 2024
614b80b
adds sources at the end of pipe, closes generators before futures so …
rudolfix Jan 26, 2024
fb9c564
allows async generator items to be evaluated in add_limit
rudolfix Jan 26, 2024
4a61e60
fixes tests
rudolfix Jan 26, 2024
9446e29
update performance docs
sh-rp Jan 29, 2024
d830086
unrelated formatting fixes
sh-rp Jan 29, 2024
b844231
fix one test
sh-rp Jan 29, 2024
568a2ce
small change to resource page
sh-rp Jan 29, 2024
2a168e5
fixes tests
sh-rp Jan 30, 2024
8cf3c3c
change generator exit test
sh-rp Jan 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 45 additions & 24 deletions dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
simulate_func_call,
wrap_compat_transformer,
wrap_resource_gen,
wrap_async_generator,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -321,6 +322,10 @@ def evaluate_gen(self) -> None:
# verify if transformer can be called
self._ensure_transform_step(self._gen_idx, gen)

# wrap async generator
if inspect.isasyncgen(self.gen):
self.replace_gen(wrap_async_generator(self.gen))

# evaluate transforms
for step_no, step in enumerate(self._steps):
# print(f"pipe {self.name} step no {step_no} step({step})")
Expand Down Expand Up @@ -366,9 +371,10 @@ def _wrap_gen(self, *args: Any, **kwargs: Any) -> Any:

def _verify_head_step(self, step: TPipeStep) -> None:
# first element must be Iterable, Iterator or Callable in resource pipe
if not isinstance(step, (Iterable, Iterator)) and not callable(step):
if not isinstance(step, (Iterable, Iterator, AsyncIterator)) and not callable(step):
raise CreatePipeException(
self.name, "A head of a resource pipe must be Iterable, Iterator or a Callable"
self.name,
"A head of a resource pipe must be Iterable, Iterator, AsyncIterator or a Callable",
)

def _wrap_transform_step_meta(self, step_no: int, step: TPipeStep) -> TPipeStep:
Expand Down Expand Up @@ -619,6 +625,16 @@ def __next__(self) -> PipeItem:
pipe_item = None
continue

# handle async iterator items as new source
if inspect.isasyncgen(item):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm maybe we should check for AsyncIterator? not all iterators are generators

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah you are totally right, this is a leftover from something else, I updated it in the other places too.

self._sources.append(
SourcePipeItem(
wrap_async_generator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta
)
)
pipe_item = None
continue

if isinstance(item, Awaitable) or callable(item):
# do we have a free slot or one of the slots is done?
if len(self._futures) < self.max_parallel_items or self._next_future() >= 0:
Expand Down Expand Up @@ -773,6 +789,8 @@ def _resolve_futures(self) -> ResolvablePipeItem:

if future.exception():
ex = future.exception()
if isinstance(ex, StopAsyncIteration):
return None
if isinstance(
ex, (PipelineException, ExtractorException, DltSourceException, PipeException)
):
Expand All @@ -791,6 +809,27 @@ def _get_source_item(self) -> ResolvablePipeItem:
elif self._next_item_mode == "round_robin":
return self._get_source_item_round_robin()

def _get_next_item_from_generator(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here there only is refactoring to get rid of non-dry code.

self,
gen: Any,
step: int,
pipe: Pipe,
meta: Any,
) -> ResolvablePipeItem:
item: ResolvablePipeItem = next(gen)
if item is None:
return item
# full pipe item may be returned, this is used by ForkPipe step
# to redirect execution of an item to another pipe
if isinstance(item, ResolvablePipeItem):
return item
else:
# keep the item assigned step and pipe when creating resolvable item
if isinstance(item, DataItemWithMeta):
return ResolvablePipeItem(item.data, step, pipe, item.meta)
else:
return ResolvablePipeItem(item, step, pipe, meta)

def _get_source_item_current(self) -> ResolvablePipeItem:
# no more sources to iterate
if len(self._sources) == 0:
Expand All @@ -803,17 +842,8 @@ def _get_source_item_current(self) -> ResolvablePipeItem:
set_current_pipe_name(pipe.name)
item = None
while item is None:
item = next(gen)
# full pipe item may be returned, this is used by ForkPipe step
# to redirect execution of an item to another pipe
if isinstance(item, ResolvablePipeItem):
return item
else:
# keep the item assigned step and pipe when creating resolvable item
if isinstance(item, DataItemWithMeta):
return ResolvablePipeItem(item.data, step, pipe, item.meta)
else:
return ResolvablePipeItem(item, step, pipe, meta)
item = self._get_next_item_from_generator(gen, step, pipe, meta)
return item
except StopIteration:
# remove empty iterator and try another source
self._sources.pop()
Expand All @@ -839,17 +869,8 @@ def _get_source_item_round_robin(self) -> ResolvablePipeItem:
self._round_robin_index = (self._round_robin_index + 1) % sources_count
gen, step, pipe, meta = self._sources[self._round_robin_index]
set_current_pipe_name(pipe.name)
item = next(gen)
# full pipe item may be returned, this is used by ForkPipe step
# to redirect execution of an item to another pipe
if isinstance(item, ResolvablePipeItem):
return item
else:
# keep the item assigned step and pipe when creating resolvable item
if isinstance(item, DataItemWithMeta):
return ResolvablePipeItem(item.data, step, pipe, item.meta)
else:
return ResolvablePipeItem(item, step, pipe, meta)
item = self._get_next_item_from_generator(gen, step, pipe, meta)
return item
except StopIteration:
# remove empty iterator and try another source
self._sources.pop(self._round_robin_index)
Expand Down
4 changes: 1 addition & 3 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ def from_data(
data = wrap_additional_type(data)

# several iterable types are not allowed and must be excluded right away
if isinstance(data, (AsyncIterator, AsyncIterable)):
raise InvalidResourceDataTypeAsync(name, data, type(data))
if isinstance(data, (str, dict)):
raise InvalidResourceDataTypeBasic(name, data, type(data))

Expand All @@ -135,7 +133,7 @@ def from_data(
parent_pipe = DltResource._get_parent_pipe(name, data_from)

# create resource from iterator, iterable or generator function
if isinstance(data, (Iterable, Iterator)) or callable(data):
if isinstance(data, (Iterable, Iterator, AsyncIterable)) or callable(data):
pipe = Pipe.from_data(name, data, parent=parent_pipe)
return cls(
pipe,
Expand Down
33 changes: 30 additions & 3 deletions dlt/extract/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import inspect
import makefun
from typing import Optional, Tuple, Union, List, Any, Sequence, cast
from typing import Optional, Tuple, Union, List, Any, Sequence, cast, Iterator
from collections.abc import Mapping as C_Mapping

from dlt.common.exceptions import MissingDependencyException
Expand Down Expand Up @@ -119,6 +119,29 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in
return meta_arg


def wrap_async_generator(gen: Any) -> Any:
"""Wraps an async generator into a list of awaitables"""
is_running = False
exhausted = False

async def run() -> TDataItems:
nonlocal is_running, exhausted
try:
return await gen.__anext__()
except StopAsyncIteration:
exhausted = True
raise
finally:
is_running = False

# it is best to use the round robin strategy here if multiple async generators are used in resources
while not exhausted:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the same strategy that user wants? anyway. IMO if you have two async generators they will be executed in parallel in futures, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not if we emit a list of awaitables, then one list will be processed first, then the other (you can see that in the test)

while is_running:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm maybe asyncio critical sections are better? https://docs.python.org/3.8/library/asyncio-sync.html (if they work here because this code is sync)

yield None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% cpu usage here? this will be executed in tight loop right? if there's None we go for the next resource and we get None again so we go again etc.

btw. I think we are close and the idea with yielding None is good.

is_running = True
yield run()


def wrap_compat_transformer(
name: str, f: AnyFun, sig: inspect.Signature, *args: Any, **kwargs: Any
) -> AnyFun:
Expand All @@ -142,8 +165,12 @@ def wrap_resource_gen(
name: str, f: AnyFun, sig: inspect.Signature, *args: Any, **kwargs: Any
) -> AnyFun:
"""Wraps a generator or generator function so it is evaluated on extraction"""
if inspect.isgeneratorfunction(inspect.unwrap(f)) or inspect.isgenerator(f):
# always wrap generators and generator functions. evaluate only at runtime!

if (
inspect.isgeneratorfunction(inspect.unwrap(f))
or inspect.isgenerator(f)
or inspect.isasyncgenfunction(f)
):

def _partial() -> Any:
# print(f"_PARTIAL: {args} {kwargs}")
sultaniman marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
128 changes: 116 additions & 12 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1631,29 +1631,133 @@ def api_fetch(page_num):
assert pipeline.last_trace.last_normalize_info.row_counts["product"] == 12


@pytest.mark.skip("skipped until async generators are implemented")
def test_async_generator() -> None:
#
# async generators resource tests
#
def test_async_generator_resource() -> None:
async def async_gen_table():
for l_ in ["a", "b", "c"]:
await asyncio.sleep(0.1)
yield {"letter": l_}

@dlt.resource
async def async_gen_resource():
for l_ in ["d", "e", "f"]:
await asyncio.sleep(0.1)
yield {"letter": l_}

pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True)

# pure async function
pipeline_1.run(async_gen_table(), table_name="async")
with pipeline_1.sql_client() as c:
with c.execute_query("SELECT * FROM async") as cur:
rows = list(cur.fetchall())
assert [r[0] for r in rows] == ["a", "b", "c"]

# async resource
pipeline_1.run(async_gen_resource(), table_name="async")
with pipeline_1.sql_client() as c:
with c.execute_query("SELECT * FROM async") as cur:
rows = list(cur.fetchall())
assert [r[0] for r in rows] == ["a", "b", "c", "d", "e", "f"]


def test_async_generator_nested() -> None:
def async_inner_table():
async def _gen(idx):
for l_ in ["a", "b", "c"]:
await asyncio.sleep(1)
await asyncio.sleep(0.1)
yield {"async_gen": idx, "letter": l_}

# just yield futures in a loop
for idx_ in range(10):
for idx_ in range(3):
yield _gen(idx_)

async def async_gen_table(idx):
pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True)
pipeline_1.run(async_inner_table(), table_name="async")
with pipeline_1.sql_client() as c:
with c.execute_query("SELECT * FROM async") as cur:
rows = list(cur.fetchall())
assert [(r[0], r[1]) for r in rows] == [
(0, "a"),
(0, "b"),
(0, "c"),
(1, "a"),
(1, "b"),
(1, "c"),
(2, "a"),
(2, "b"),
(2, "c"),
]


def test_async_generator_transformer() -> None:
@dlt.resource
async def async_resource():
for l_ in ["a", "b", "c"]:
await asyncio.sleep(1)
yield {"async_gen": idx, "letter": l_}
await asyncio.sleep(0.1)
yield {"letter": l_}

@dlt.resource
async def async_gen_resource(idx):
@dlt.transformer(data_from=async_resource)
async def async_transformer(item):
await asyncio.sleep(0.1)
yield {
"letter": item["letter"] + "t",
}

pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True)
pipeline_1.run(async_transformer(), table_name="async")

with pipeline_1.sql_client() as c:
with c.execute_query("SELECT * FROM async") as cur:
rows = list(cur.fetchall())
assert len(rows) == 3
assert {r[0] for r in rows} == {"at", "bt", "ct"}


@pytest.mark.parametrize("next_item_mode", ["fifo", "round_robin"])
def test_parallel_async_generators(next_item_mode: str) -> None:
os.environ["EXTRACT__NEXT_ITEM_MODE"] = next_item_mode
execution_order = []

@dlt.resource(table_name="async1")
async def async_resource1():
for l_ in ["a", "b", "c"]:
await asyncio.sleep(1)
yield {"async_gen": idx, "letter": l_}
nonlocal execution_order
execution_order.append("one")
yield {"letter": l_}

@dlt.resource(table_name="async2")
async def async_resource2():
await asyncio.sleep(0.5)
for l_ in ["e", "f", "g"]:
await asyncio.sleep(1)
nonlocal execution_order
execution_order.append("two")
yield {"letter": l_}

@dlt.source
def source():
return [async_resource1(), async_resource2()]

pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True)
pipeline_1.run(async_gen_resource(10))
pipeline_1.run(async_gen_table(11))
pipeline_1.run(source())

with pipeline_1.sql_client() as c:
with c.execute_query("SELECT * FROM async1") as cur:
rows = list(cur.fetchall())
assert len(rows) == 3
assert {r[0] for r in rows} == {"a", "b", "c"}

with c.execute_query("SELECT * FROM async2") as cur:
rows = list(cur.fetchall())
assert len(rows) == 3
assert {r[0] for r in rows} == {"e", "f", "g"}

assert (
execution_order == ["one", "two", "one", "two", "one", "two"]
if next_item_mode == "round_robin"
else ["one", "one", "one", "two", "two", "two"]
)