From 07fd59c1569758e36f30dec352da8d52557a191d Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 22 Jan 2024 18:21:57 +0100 Subject: [PATCH 01/27] temp --- dlt/extract/pipe.py | 53 +++++++++++++++++++-------------- dlt/extract/resource.py | 4 +-- dlt/extract/utils.py | 32 ++++++++++++++++++-- pipeline.py | 22 ++++++++++++++ test.py | 24 +++++++++++++++ tests/pipeline/test_pipeline.py | 4 +-- 6 files changed, 108 insertions(+), 31 deletions(-) create mode 100644 pipeline.py create mode 100644 test.py diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 6f02f882bc..91a091d76e 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -55,6 +55,7 @@ simulate_func_call, wrap_compat_transformer, wrap_resource_gen, + wrap_async_generator, ) if TYPE_CHECKING: @@ -321,6 +322,9 @@ def evaluate_gen(self) -> None: # verify if transformer can be called self._ensure_transform_step(self._gen_idx, gen) + # ensure that asyn gens are wrapped + self.replace_gen(wrap_async_generator(self.gen)) + # evaluate transforms for step_no, step in enumerate(self._steps): # print(f"pipe {self.name} step no {step_no} step({step})") @@ -366,7 +370,7 @@ def _wrap_gen(self, *args: Any, **kwargs: Any) -> Any: def _verify_head_step(self, step: TPipeStep) -> None: # first element must be Iterable, Iterator or Callable in resource pipe - if not isinstance(step, (Iterable, Iterator)) and not callable(step): + if not isinstance(step, (Iterable, Iterator, AsyncIterator)) and not callable(step): raise CreatePipeException( self.name, "A head of a resource pipe must be Iterable, Iterator or a Callable" ) @@ -791,6 +795,27 @@ def _get_source_item(self) -> ResolvablePipeItem: elif self._next_item_mode == "round_robin": return self._get_source_item_round_robin() + def _get_next_item_from_generator( + self, + gen: Any, + step: int, + pipe: Pipe, + meta: Any, + ) -> ResolvablePipeItem: + item: ResolvablePipeItem = next(gen) + if not item: + return item + # full pipe item may be returned, this is used by ForkPipe step + # to redirect execution of an item to another pipe + if isinstance(item, ResolvablePipeItem): + return item + else: + # keep the item assigned step and pipe when creating resolvable item + if isinstance(item, DataItemWithMeta): + return ResolvablePipeItem(item.data, step, pipe, item.meta) + else: + return ResolvablePipeItem(item, step, pipe, meta) + def _get_source_item_current(self) -> ResolvablePipeItem: # no more sources to iterate if len(self._sources) == 0: @@ -803,17 +828,8 @@ def _get_source_item_current(self) -> ResolvablePipeItem: set_current_pipe_name(pipe.name) item = None while item is None: - item = next(gen) - # full pipe item may be returned, this is used by ForkPipe step - # to redirect execution of an item to another pipe - if isinstance(item, ResolvablePipeItem): - return item - else: - # keep the item assigned step and pipe when creating resolvable item - if isinstance(item, DataItemWithMeta): - return ResolvablePipeItem(item.data, step, pipe, item.meta) - else: - return ResolvablePipeItem(item, step, pipe, meta) + item = self._get_next_item_from_generator(gen, step, pipe, meta) + return item except StopIteration: # remove empty iterator and try another source self._sources.pop() @@ -839,17 +855,8 @@ def _get_source_item_round_robin(self) -> ResolvablePipeItem: self._round_robin_index = (self._round_robin_index + 1) % sources_count gen, step, pipe, meta = self._sources[self._round_robin_index] set_current_pipe_name(pipe.name) - item = next(gen) - # full pipe item may be returned, this is used by ForkPipe step - # to redirect execution of an item to another pipe - if isinstance(item, ResolvablePipeItem): - return item - else: - # keep the item assigned step and pipe when creating resolvable item - if isinstance(item, DataItemWithMeta): - return ResolvablePipeItem(item.data, step, pipe, item.meta) - else: - return ResolvablePipeItem(item, step, pipe, meta) + item = self._get_next_item_from_generator(gen, step, pipe, meta) + return item except StopIteration: # remove empty iterator and try another source self._sources.pop(self._round_robin_index) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 93c23e05a8..68014f3181 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -123,8 +123,6 @@ def from_data( data = wrap_additional_type(data) # several iterable types are not allowed and must be excluded right away - if isinstance(data, (AsyncIterator, AsyncIterable)): - raise InvalidResourceDataTypeAsync(name, data, type(data)) if isinstance(data, (str, dict)): raise InvalidResourceDataTypeBasic(name, data, type(data)) @@ -135,7 +133,7 @@ def from_data( parent_pipe = DltResource._get_parent_pipe(name, data_from) # create resource from iterator, iterable or generator function - if isinstance(data, (Iterable, Iterator)) or callable(data): + if isinstance(data, (Iterable, Iterator, AsyncIterable)) or callable(data): pipe = Pipe.from_data(name, data, parent=parent_pipe) return cls( pipe, diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 1db18ff47e..8b4fb339a7 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -1,6 +1,6 @@ import inspect import makefun -from typing import Optional, Tuple, Union, List, Any, Sequence, cast +from typing import Optional, Tuple, Union, List, Any, Sequence, cast, Iterator from collections.abc import Mapping as C_Mapping from dlt.common.exceptions import MissingDependencyException @@ -119,6 +119,28 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in return meta_arg +def wrap_async_generator(wrapped) -> Any: + """Wraps an async generator into a list with one awaitable""" + if inspect.isasyncgen(wrapped): + + async def run() -> List[TDataItem]: + result: List[TDataItem] = [] + try: + item: TDataItems = None + while item := await wrapped.__anext__(): + if isinstance(item, Iterator): + result.extend(item) + else: + result.append(item) + except StopAsyncIteration: + pass + return result + + yield run() + else: + return wrapped + + def wrap_compat_transformer( name: str, f: AnyFun, sig: inspect.Signature, *args: Any, **kwargs: Any ) -> AnyFun: @@ -142,8 +164,12 @@ def wrap_resource_gen( name: str, f: AnyFun, sig: inspect.Signature, *args: Any, **kwargs: Any ) -> AnyFun: """Wraps a generator or generator function so it is evaluated on extraction""" - if inspect.isgeneratorfunction(inspect.unwrap(f)) or inspect.isgenerator(f): - # always wrap generators and generator functions. evaluate only at runtime! + + if ( + inspect.isgeneratorfunction(inspect.unwrap(f)) + or inspect.isgenerator(f) + or inspect.isasyncgenfunction(f) + ): def _partial() -> Any: # print(f"_PARTIAL: {args} {kwargs}") diff --git a/pipeline.py b/pipeline.py new file mode 100644 index 0000000000..09d4415575 --- /dev/null +++ b/pipeline.py @@ -0,0 +1,22 @@ + +import dlt, asyncio + +# @dlt.resource(table_name="hello") +async def async_gen_resource(idx): + for l in ["a", "b", "c"] * 3: + await asyncio.sleep(0.1) + yield {"async_gen": idx, "letter": l} + +pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) +pipeline_1.run( + async_gen_resource(10), table_name="hello" +) +with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM hello") as cur: + rows = list(cur.fetchall()) + for r in rows: + print(r) + +# pipeline_1.run( +# async_gen_resource(11) +# ) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000000..643595cd5e --- /dev/null +++ b/test.py @@ -0,0 +1,24 @@ + +import asyncio, inspect +from typing import Awaitable +from asyncio import Future + +async def async_gen_resource(): + for l in ["a", "b", "c"]: + # await asyncio.sleep(0.1) + yield {"async_gen": 1, "letter": l} + + +async def run() -> None: + gen = async_gen_resource() + result = [] + try: + while item := await gen.__anext__(): + result.append(item)# + except StopAsyncIteration: + return [result] + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + print(loop.run_until_complete(run())) \ No newline at end of file diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index f4f248261b..b49d759943 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1631,7 +1631,7 @@ def api_fetch(page_num): assert pipeline.last_trace.last_normalize_info.row_counts["product"] == 12 -@pytest.mark.skip("skipped until async generators are implemented") +# @pytest.mark.skip("skipped until async generators are implemented") def test_async_generator() -> None: def async_inner_table(): async def _gen(idx): @@ -1656,4 +1656,4 @@ async def async_gen_resource(idx): pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) pipeline_1.run(async_gen_resource(10)) - pipeline_1.run(async_gen_table(11)) + pipeline_1.run(async_gen_table(11), table_name="async_gen_table") From 8c8a94f0b930b9833b324ebf6c8903dc03bc99fa Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 23 Jan 2024 13:26:42 +0100 Subject: [PATCH 02/27] enable nested generator and add tests --- dlt/extract/pipe.py | 17 +++++-- dlt/extract/utils.py | 35 +++++++------- pipeline.py | 2 +- tests/pipeline/test_pipeline.py | 85 +++++++++++++++++++++++++++------ 4 files changed, 102 insertions(+), 37 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 91a091d76e..4765b989fc 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -322,8 +322,9 @@ def evaluate_gen(self) -> None: # verify if transformer can be called self._ensure_transform_step(self._gen_idx, gen) - # ensure that asyn gens are wrapped - self.replace_gen(wrap_async_generator(self.gen)) + # wrap async generator + if inspect.isasyncgen(self.gen): + self.replace_gen(wrap_async_generator(self.gen)) # evaluate transforms for step_no, step in enumerate(self._steps): @@ -623,6 +624,16 @@ def __next__(self) -> PipeItem: pipe_item = None continue + # handle async iterator items as new source + if inspect.isasyncgen(item): + self._sources.append( + SourcePipeItem( + wrap_async_generator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta + ) + ) + pipe_item = None + continue + if isinstance(item, Awaitable) or callable(item): # do we have a free slot or one of the slots is done? if len(self._futures) < self.max_parallel_items or self._next_future() >= 0: @@ -803,7 +814,7 @@ def _get_next_item_from_generator( meta: Any, ) -> ResolvablePipeItem: item: ResolvablePipeItem = next(gen) - if not item: + if item is None: return item # full pipe item may be returned, this is used by ForkPipe step # to redirect execution of an item to another pipe diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 8b4fb339a7..6c5b230637 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -119,26 +119,23 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in return meta_arg -def wrap_async_generator(wrapped) -> Any: +def wrap_async_generator(f: Any) -> Any: """Wraps an async generator into a list with one awaitable""" - if inspect.isasyncgen(wrapped): - - async def run() -> List[TDataItem]: - result: List[TDataItem] = [] - try: - item: TDataItems = None - while item := await wrapped.__anext__(): - if isinstance(item, Iterator): - result.extend(item) - else: - result.append(item) - except StopAsyncIteration: - pass - return result - - yield run() - else: - return wrapped + + async def run() -> List[TDataItem]: + result: List[TDataItem] = [] + try: + item: TDataItems = None + while item := await f.__anext__(): + if isinstance(item, Iterator): + result.extend(item) + else: + result.append(item) + except StopAsyncIteration: + pass + return result + + yield run() def wrap_compat_transformer( diff --git a/pipeline.py b/pipeline.py index 09d4415575..5ac8ca2da3 100644 --- a/pipeline.py +++ b/pipeline.py @@ -1,7 +1,7 @@ import dlt, asyncio -# @dlt.resource(table_name="hello") +@dlt.resource(table_name="hello") async def async_gen_resource(idx): for l in ["a", "b", "c"] * 3: await asyncio.sleep(0.1) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index b49d759943..56596f19b9 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1631,29 +1631,86 @@ def api_fetch(page_num): assert pipeline.last_trace.last_normalize_info.row_counts["product"] == 12 -# @pytest.mark.skip("skipped until async generators are implemented") -def test_async_generator() -> None: +# +# async generators resource tests +# +def test_async_generator_resource() -> None: + async def async_gen_table(): + for l_ in ["a", "b", "c"]: + await asyncio.sleep(0.1) + yield {"letter": l_} + + @dlt.resource + async def async_gen_resource(): + for l_ in ["d", "e", "f"]: + await asyncio.sleep(0.1) + yield {"letter": l_} + + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + + # pure async function + pipeline_1.run(async_gen_table(), table_name="async") + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async") as cur: + rows = list(cur.fetchall()) + assert [r[0] for r in rows] == ["a", "b", "c"] + + # async resource + pipeline_1.run(async_gen_resource(), table_name="async") + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async") as cur: + rows = list(cur.fetchall()) + assert [r[0] for r in rows] == ["a", "b", "c", "d", "e", "f"] + + +def test_async_generator_nested() -> None: def async_inner_table(): async def _gen(idx): for l_ in ["a", "b", "c"]: - await asyncio.sleep(1) + await asyncio.sleep(0.1) yield {"async_gen": idx, "letter": l_} # just yield futures in a loop - for idx_ in range(10): + for idx_ in range(3): yield _gen(idx_) - async def async_gen_table(idx): - for l_ in ["a", "b", "c"]: - await asyncio.sleep(1) - yield {"async_gen": idx, "letter": l_} - + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1.run(async_inner_table(), table_name="async") + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async") as cur: + rows = list(cur.fetchall()) + assert [(r[0], r[1]) for r in rows] == [ + (0, "a"), + (0, "b"), + (0, "c"), + (1, "a"), + (1, "b"), + (1, "c"), + (2, "a"), + (2, "b"), + (2, "c"), + ] + + +def test_async_generator_transformer() -> None: @dlt.resource - async def async_gen_resource(idx): + async def async_resource(): for l_ in ["a", "b", "c"]: - await asyncio.sleep(1) - yield {"async_gen": idx, "letter": l_} + await asyncio.sleep(0.1) + yield {"letter": l_} + + @dlt.transformer(data_from=async_resource) + async def async_transformer(items): + for item in items: + await asyncio.sleep(0.1) + yield { + "letter": item["letter"] + "t", + } pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - pipeline_1.run(async_gen_resource(10)) - pipeline_1.run(async_gen_table(11), table_name="async_gen_table") + pipeline_1.run(async_transformer(), table_name="async") + + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async") as cur: + rows = list(cur.fetchall()) + assert [r[0] for r in rows] == ["at", "bt", "ct"] From a07e37f74a77d5f853d411733969c8f837fc355c Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 23 Jan 2024 13:27:04 +0100 Subject: [PATCH 03/27] remove temp files --- dlt/extract/pipe.py | 2 +- pipeline.py | 22 ---------------------- test.py | 24 ------------------------ 3 files changed, 1 insertion(+), 47 deletions(-) delete mode 100644 pipeline.py delete mode 100644 test.py diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 4765b989fc..011ef5df9c 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -373,7 +373,7 @@ def _verify_head_step(self, step: TPipeStep) -> None: # first element must be Iterable, Iterator or Callable in resource pipe if not isinstance(step, (Iterable, Iterator, AsyncIterator)) and not callable(step): raise CreatePipeException( - self.name, "A head of a resource pipe must be Iterable, Iterator or a Callable" + self.name, "A head of a resource pipe must be Iterable, Iterator, AsyncIterator or a Callable" ) def _wrap_transform_step_meta(self, step_no: int, step: TPipeStep) -> TPipeStep: diff --git a/pipeline.py b/pipeline.py deleted file mode 100644 index 5ac8ca2da3..0000000000 --- a/pipeline.py +++ /dev/null @@ -1,22 +0,0 @@ - -import dlt, asyncio - -@dlt.resource(table_name="hello") -async def async_gen_resource(idx): - for l in ["a", "b", "c"] * 3: - await asyncio.sleep(0.1) - yield {"async_gen": idx, "letter": l} - -pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) -pipeline_1.run( - async_gen_resource(10), table_name="hello" -) -with pipeline_1.sql_client() as c: - with c.execute_query("SELECT * FROM hello") as cur: - rows = list(cur.fetchall()) - for r in rows: - print(r) - -# pipeline_1.run( -# async_gen_resource(11) -# ) \ No newline at end of file diff --git a/test.py b/test.py deleted file mode 100644 index 643595cd5e..0000000000 --- a/test.py +++ /dev/null @@ -1,24 +0,0 @@ - -import asyncio, inspect -from typing import Awaitable -from asyncio import Future - -async def async_gen_resource(): - for l in ["a", "b", "c"]: - # await asyncio.sleep(0.1) - yield {"async_gen": 1, "letter": l} - - -async def run() -> None: - gen = async_gen_resource() - result = [] - try: - while item := await gen.__anext__(): - result.append(item)# - except StopAsyncIteration: - return [result] - - -if __name__ == "__main__": - loop = asyncio.get_event_loop() - print(loop.run_until_complete(run())) \ No newline at end of file From e4ca5c37df38e13f4521b31132c7fd02eacb051a Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 23 Jan 2024 17:24:21 +0100 Subject: [PATCH 04/27] convert async iterable to list of awaitables --- dlt/extract/pipe.py | 5 ++- dlt/extract/utils.py | 32 +++++++++-------- tests/pipeline/test_pipeline.py | 61 +++++++++++++++++++++++++++++---- 3 files changed, 76 insertions(+), 22 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 011ef5df9c..5edc52f836 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -373,7 +373,8 @@ def _verify_head_step(self, step: TPipeStep) -> None: # first element must be Iterable, Iterator or Callable in resource pipe if not isinstance(step, (Iterable, Iterator, AsyncIterator)) and not callable(step): raise CreatePipeException( - self.name, "A head of a resource pipe must be Iterable, Iterator, AsyncIterator or a Callable" + self.name, + "A head of a resource pipe must be Iterable, Iterator, AsyncIterator or a Callable", ) def _wrap_transform_step_meta(self, step_no: int, step: TPipeStep) -> TPipeStep: @@ -788,6 +789,8 @@ def _resolve_futures(self) -> ResolvablePipeItem: if future.exception(): ex = future.exception() + if isinstance(ex, StopAsyncIteration): + return None if isinstance( ex, (PipelineException, ExtractorException, DltSourceException, PipeException) ): diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 6c5b230637..5720ee239e 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -119,23 +119,27 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in return meta_arg -def wrap_async_generator(f: Any) -> Any: - """Wraps an async generator into a list with one awaitable""" +def wrap_async_generator(gen: Any) -> Any: + """Wraps an async generator into a list of awaitables""" + is_running = False + exhausted = False - async def run() -> List[TDataItem]: - result: List[TDataItem] = [] + async def run() -> TDataItems: + nonlocal is_running, exhausted try: - item: TDataItems = None - while item := await f.__anext__(): - if isinstance(item, Iterator): - result.extend(item) - else: - result.append(item) + return await gen.__anext__() except StopAsyncIteration: - pass - return result - - yield run() + exhausted = True + raise + finally: + is_running = False + + # it is best to use the round robin strategy here if multiple async generators are used in resources + while not exhausted: + while is_running: + yield None + is_running = True + yield run() def wrap_compat_transformer( diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 56596f19b9..ce3ef1b8d4 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1700,12 +1700,11 @@ async def async_resource(): yield {"letter": l_} @dlt.transformer(data_from=async_resource) - async def async_transformer(items): - for item in items: - await asyncio.sleep(0.1) - yield { - "letter": item["letter"] + "t", - } + async def async_transformer(item): + await asyncio.sleep(0.1) + yield { + "letter": item["letter"] + "t", + } pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) pipeline_1.run(async_transformer(), table_name="async") @@ -1713,4 +1712,52 @@ async def async_transformer(items): with pipeline_1.sql_client() as c: with c.execute_query("SELECT * FROM async") as cur: rows = list(cur.fetchall()) - assert [r[0] for r in rows] == ["at", "bt", "ct"] + assert len(rows) == 3 + assert {r[0] for r in rows} == {"at", "bt", "ct"} + + +@pytest.mark.parametrize("next_item_mode", ["fifo", "round_robin"]) +def test_parallel_async_generators(next_item_mode: str) -> None: + os.environ["EXTRACT__NEXT_ITEM_MODE"] = next_item_mode + execution_order = [] + + @dlt.resource(table_name="async1") + async def async_resource1(): + for l_ in ["a", "b", "c"]: + await asyncio.sleep(1) + nonlocal execution_order + execution_order.append("one") + yield {"letter": l_} + + @dlt.resource(table_name="async2") + async def async_resource2(): + await asyncio.sleep(0.5) + for l_ in ["e", "f", "g"]: + await asyncio.sleep(1) + nonlocal execution_order + execution_order.append("two") + yield {"letter": l_} + + @dlt.source + def source(): + return [async_resource1(), async_resource2()] + + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1.run(source()) + + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async1") as cur: + rows = list(cur.fetchall()) + assert len(rows) == 3 + assert {r[0] for r in rows} == {"a", "b", "c"} + + with c.execute_query("SELECT * FROM async2") as cur: + rows = list(cur.fetchall()) + assert len(rows) == 3 + assert {r[0] for r in rows} == {"e", "f", "g"} + + assert ( + execution_order == ["one", "two", "one", "two", "one", "two"] + if next_item_mode == "round_robin" + else ["one", "one", "one", "two", "two", "two"] + ) From b8396a69459fd2d07b29e3e80de2f2cbc5a29e8f Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 12:17:45 +0100 Subject: [PATCH 05/27] temp --- dlt/extract/pipe.py | 54 +++++++++++++++++++-------------- dlt/extract/utils.py | 45 +++++++++++++++++---------- tests/pipeline/test_pipeline.py | 5 +-- 3 files changed, 64 insertions(+), 40 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 5edc52f836..1deea591a5 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -109,7 +109,7 @@ class SourcePipeItem(NamedTuple): Callable[[TDataItems], Iterator[ResolvablePipeItem]], ] -TPipeNextItemMode = Union[Literal["fifo"], Literal["round_robin"]] +TPipeNextItemMode = Union[Literal["auto"], Literal["fifo"], Literal["round_robin"]] class ForkPipe: @@ -144,6 +144,7 @@ def __init__(self, name: str, steps: List[TPipeStep] = None, parent: "Pipe" = No self._gen_idx = 0 self._steps: List[TPipeStep] = [] self.parent = parent + self.generates_awaitables = False # add the steps, this will check and mod transformations if steps: for step in steps: @@ -325,6 +326,7 @@ def evaluate_gen(self) -> None: # wrap async generator if inspect.isasyncgen(self.gen): self.replace_gen(wrap_async_generator(self.gen)) + self.generates_awaitables = True # evaluate transforms for step_no, step in enumerate(self._steps): @@ -495,7 +497,7 @@ class PipeIteratorConfiguration(BaseConfiguration): workers: int = 5 futures_poll_interval: float = 0.01 copy_on_fork: bool = False - next_item_mode: str = "fifo" + next_item_mode: str = "auto" __section__ = "extract" @@ -504,6 +506,7 @@ def __init__( max_parallel_items: int, workers: int, futures_poll_interval: float, + sources: List[SourcePipeItem], next_item_mode: TPipeNextItemMode, ) -> None: self.max_parallel_items = max_parallel_items @@ -515,8 +518,20 @@ def __init__( self._async_pool: asyncio.AbstractEventLoop = None self._async_pool_thread: Thread = None self._thread_pool: ThreadPoolExecutor = None - self._sources: List[SourcePipeItem] = [] + self._sources = sources + self._initial_sources_count = len(sources) self._futures: List[FuturePipeItem] = [] + + # evaluate next item mode, switch to round robin if we have any async generators + if next_item_mode == "auto": + next_item_mode = ( + "round_robin" if any(s.pipe.generates_awaitables for s in self._sources) else "fifo" + ) + + # we process fifo backwards + if next_item_mode == "fifo": + self._sources.reverse() + self._next_item_mode = next_item_mode @classmethod @@ -528,7 +543,7 @@ def from_pipe( max_parallel_items: int = 20, workers: int = 5, futures_poll_interval: float = 0.01, - next_item_mode: TPipeNextItemMode = "fifo", + next_item_mode: TPipeNextItemMode = "auto", ) -> "PipeIterator": # join all dependent pipes if pipe.parent: @@ -539,12 +554,10 @@ def from_pipe( pipe.evaluate_gen() if not isinstance(pipe.gen, Iterator): raise PipeGenInvalid(pipe.name, pipe.gen) + # create extractor - extract = cls(max_parallel_items, workers, futures_poll_interval, next_item_mode) - # add as first source - extract._sources.append(SourcePipeItem(pipe.gen, 0, pipe, None)) - cls._initial_sources_count = 1 - return extract + sources = [SourcePipeItem(pipe.gen, 0, pipe, None)] + return cls(max_parallel_items, workers, futures_poll_interval, sources, next_item_mode) @classmethod @with_config(spec=PipeIteratorConfiguration) @@ -557,10 +570,11 @@ def from_pipes( workers: int = 5, futures_poll_interval: float = 0.01, copy_on_fork: bool = False, - next_item_mode: TPipeNextItemMode = "fifo", + next_item_mode: TPipeNextItemMode = "auto", ) -> "PipeIterator": # print(f"max_parallel_items: {max_parallel_items} workers: {workers}") - extract = cls(max_parallel_items, workers, futures_poll_interval, next_item_mode) + sources: List[SourcePipeItem] = [] + # clone all pipes before iterating (recursively) as we will fork them (this add steps) and evaluate gens pipes, _ = PipeIterator.clone_pipes(pipes) @@ -580,18 +594,14 @@ def _fork_pipeline(pipe: Pipe) -> None: if not isinstance(pipe.gen, Iterator): raise PipeGenInvalid(pipe.name, pipe.gen) # add every head as source only once - if not any(i.pipe == pipe for i in extract._sources): - extract._sources.append(SourcePipeItem(pipe.gen, 0, pipe, None)) + if not any(i.pipe == pipe for i in sources): + sources.append(SourcePipeItem(pipe.gen, 0, pipe, None)) - # reverse pipes for current mode, as we start processing from the back - if next_item_mode == "fifo": - pipes.reverse() for pipe in pipes: _fork_pipeline(pipe) - extract._initial_sources_count = len(extract._sources) - - return extract + # create extractor + return cls(max_parallel_items, workers, futures_poll_interval, sources, next_item_mode) def __next__(self) -> PipeItem: pipe_item: Union[ResolvablePipeItem, SourcePipeItem] = None @@ -805,7 +815,7 @@ def _resolve_futures(self) -> ResolvablePipeItem: def _get_source_item(self) -> ResolvablePipeItem: if self._next_item_mode == "fifo": - return self._get_source_item_current() + return self._get_source_item_fifo() elif self._next_item_mode == "round_robin": return self._get_source_item_round_robin() @@ -830,7 +840,7 @@ def _get_next_item_from_generator( else: return ResolvablePipeItem(item, step, pipe, meta) - def _get_source_item_current(self) -> ResolvablePipeItem: + def _get_source_item_fifo(self) -> ResolvablePipeItem: # no more sources to iterate if len(self._sources) == 0: return None @@ -860,7 +870,7 @@ def _get_source_item_round_robin(self) -> ResolvablePipeItem: return None # if there are currently more sources than added initially, we need to process the new ones first if sources_count > self._initial_sources_count: - return self._get_source_item_current() + return self._get_source_item_fifo() try: # print(f"got {pipe.name}") # register current pipe name during the execution of gen diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 5720ee239e..b1b0180d3c 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -1,6 +1,18 @@ import inspect import makefun -from typing import Optional, Tuple, Union, List, Any, Sequence, cast, Iterator +import asyncio +from typing import ( + Optional, + Tuple, + Union, + List, + Any, + Sequence, + cast, + AsyncGenerator, + Awaitable, + Generator, +) from collections.abc import Mapping as C_Mapping from dlt.common.exceptions import MissingDependencyException @@ -119,26 +131,27 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in return meta_arg -def wrap_async_generator(gen: Any) -> Any: - """Wraps an async generator into a list of awaitables""" - is_running = False +def wrap_async_generator( + gen: AsyncGenerator[TDataItems, None] +) -> Generator[Awaitable[TDataItems], None, None]: + """Wraps an async generatqor into a list of awaitables""" exhausted = False + lock = asyncio.Lock() + # creates an awaitable that will return the next item from the async generator async def run() -> TDataItems: - nonlocal is_running, exhausted - try: - return await gen.__anext__() - except StopAsyncIteration: - exhausted = True - raise - finally: - is_running = False - - # it is best to use the round robin strategy here if multiple async generators are used in resources + async with lock: + try: + return await gen.__anext__() + except StopAsyncIteration: + nonlocal exhausted + exhausted = True + raise + + # this generator yields None while the async generator is not exhauste while not exhausted: - while is_running: + while lock.locked(): yield None - is_running = True yield run() diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index ce3ef1b8d4..aecad7e037 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1716,7 +1716,7 @@ async def async_transformer(item): assert {r[0] for r in rows} == {"at", "bt", "ct"} -@pytest.mark.parametrize("next_item_mode", ["fifo", "round_robin"]) +@pytest.mark.parametrize("next_item_mode", ["auto", "fifo", "round_robin"]) def test_parallel_async_generators(next_item_mode: str) -> None: os.environ["EXTRACT__NEXT_ITEM_MODE"] = next_item_mode execution_order = [] @@ -1756,8 +1756,9 @@ def source(): assert len(rows) == 3 assert {r[0] for r in rows} == {"e", "f", "g"} + # auto mode will switch to round robin if we have awaitables assert ( execution_order == ["one", "two", "one", "two", "one", "two"] - if next_item_mode == "round_robin" + if next_item_mode in ["auto", "round_robin"] else ["one", "one", "one", "two", "two", "two"] ) From 79a42eda4ebd970a4568020facbd68977f131c0a Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 16:13:27 +0100 Subject: [PATCH 06/27] update evaluation of round robin and fifo add more tests add support for limit in asynciterator --- dlt/extract/pipe.py | 134 +++++--------- dlt/extract/resource.py | 9 + dlt/extract/utils.py | 11 +- tests/extract/test_extract_pipe.py | 4 +- tests/pipeline/test_pipeline.py | 133 -------------- tests/pipeline/test_resources_evaluation.py | 185 ++++++++++++++++++++ 6 files changed, 249 insertions(+), 227 deletions(-) create mode 100644 tests/pipeline/test_resources_evaluation.py diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 1deea591a5..ef5dc65ba3 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -109,7 +109,7 @@ class SourcePipeItem(NamedTuple): Callable[[TDataItems], Iterator[ResolvablePipeItem]], ] -TPipeNextItemMode = Union[Literal["auto"], Literal["fifo"], Literal["round_robin"]] +TPipeNextItemMode = Union[Literal["fifo"], Literal["round_robin"]] class ForkPipe: @@ -144,7 +144,6 @@ def __init__(self, name: str, steps: List[TPipeStep] = None, parent: "Pipe" = No self._gen_idx = 0 self._steps: List[TPipeStep] = [] self.parent = parent - self.generates_awaitables = False # add the steps, this will check and mod transformations if steps: for step in steps: @@ -326,7 +325,6 @@ def evaluate_gen(self) -> None: # wrap async generator if inspect.isasyncgen(self.gen): self.replace_gen(wrap_async_generator(self.gen)) - self.generates_awaitables = True # evaluate transforms for step_no, step in enumerate(self._steps): @@ -497,7 +495,7 @@ class PipeIteratorConfiguration(BaseConfiguration): workers: int = 5 futures_poll_interval: float = 0.01 copy_on_fork: bool = False - next_item_mode: str = "auto" + next_item_mode: str = "fifo" __section__ = "extract" @@ -513,25 +511,12 @@ def __init__( self.workers = workers self.futures_poll_interval = futures_poll_interval - self._round_robin_index: int = -1 - self._initial_sources_count: int = 0 + self._current_source_index: int = -1 self._async_pool: asyncio.AbstractEventLoop = None self._async_pool_thread: Thread = None self._thread_pool: ThreadPoolExecutor = None self._sources = sources - self._initial_sources_count = len(sources) self._futures: List[FuturePipeItem] = [] - - # evaluate next item mode, switch to round robin if we have any async generators - if next_item_mode == "auto": - next_item_mode = ( - "round_robin" if any(s.pipe.generates_awaitables for s in self._sources) else "fifo" - ) - - # we process fifo backwards - if next_item_mode == "fifo": - self._sources.reverse() - self._next_item_mode = next_item_mode @classmethod @@ -543,7 +528,7 @@ def from_pipe( max_parallel_items: int = 20, workers: int = 5, futures_poll_interval: float = 0.01, - next_item_mode: TPipeNextItemMode = "auto", + next_item_mode: TPipeNextItemMode = "fifo", ) -> "PipeIterator": # join all dependent pipes if pipe.parent: @@ -570,7 +555,7 @@ def from_pipes( workers: int = 5, futures_poll_interval: float = 0.01, copy_on_fork: bool = False, - next_item_mode: TPipeNextItemMode = "auto", + next_item_mode: TPipeNextItemMode = "fifo", ) -> "PipeIterator": # print(f"max_parallel_items: {max_parallel_items} workers: {workers}") sources: List[SourcePipeItem] = [] @@ -629,18 +614,19 @@ def __next__(self) -> PipeItem: # if item is iterator, then add it as a new source if isinstance(item, Iterator): # print(f"adding iterable {item}") - self._sources.append( - SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta) + self._sources.insert( + 0, SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta) ) pipe_item = None continue # handle async iterator items as new source if inspect.isasyncgen(item): - self._sources.append( + self._sources.insert( + 0, SourcePipeItem( wrap_async_generator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta - ) + ), ) pipe_item = None continue @@ -814,85 +800,53 @@ def _resolve_futures(self) -> ResolvablePipeItem: return ResolvablePipeItem(item, step, pipe, meta) def _get_source_item(self) -> ResolvablePipeItem: - if self._next_item_mode == "fifo": - return self._get_source_item_fifo() - elif self._next_item_mode == "round_robin": - return self._get_source_item_round_robin() - - def _get_next_item_from_generator( - self, - gen: Any, - step: int, - pipe: Pipe, - meta: Any, - ) -> ResolvablePipeItem: - item: ResolvablePipeItem = next(gen) - if item is None: - return item - # full pipe item may be returned, this is used by ForkPipe step - # to redirect execution of an item to another pipe - if isinstance(item, ResolvablePipeItem): - return item - else: - # keep the item assigned step and pipe when creating resolvable item - if isinstance(item, DataItemWithMeta): - return ResolvablePipeItem(item.data, step, pipe, item.meta) - else: - return ResolvablePipeItem(item, step, pipe, meta) - - def _get_source_item_fifo(self) -> ResolvablePipeItem: - # no more sources to iterate - if len(self._sources) == 0: - return None - try: - # get items from last added iterator, this makes the overall Pipe as close to FIFO as possible - gen, step, pipe, meta = self._sources[-1] - # print(f"got {pipe.name}") - # register current pipe name during the execution of gen - set_current_pipe_name(pipe.name) - item = None - while item is None: - item = self._get_next_item_from_generator(gen, step, pipe, meta) - return item - except StopIteration: - # remove empty iterator and try another source - self._sources.pop() - return self._get_source_item() - except (PipelineException, ExtractorException, DltSourceException, PipeException): - raise - except Exception as ex: - raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex - - def _get_source_item_round_robin(self) -> ResolvablePipeItem: sources_count = len(self._sources) # no more sources to iterate if sources_count == 0: return None - # if there are currently more sources than added initially, we need to process the new ones first - if sources_count > self._initial_sources_count: - return self._get_source_item_fifo() try: - # print(f"got {pipe.name}") - # register current pipe name during the execution of gen - item = None - while item is None: - self._round_robin_index = (self._round_robin_index + 1) % sources_count - gen, step, pipe, meta = self._sources[self._round_robin_index] + first_evaluated_index = -1 + while True: + print(self._current_source_index) + self._current_source_index = (self._current_source_index + 1) % sources_count + # if we have checked all sources once and all returned None, then we can sleep a bit + if self._current_source_index == first_evaluated_index: + sleep(self.futures_poll_interval) + # get next item from the current source + gen, step, pipe, meta = self._sources[self._current_source_index] set_current_pipe_name(pipe.name) - item = self._get_next_item_from_generator(gen, step, pipe, meta) - return item + if (item := next(gen)) is not None: + # full pipe item may be returned, this is used by ForkPipe step + # to redirect execution of an item to another pipe + if isinstance(item, ResolvablePipeItem): + return item + else: + # keep the item assigned step and pipe when creating resolvable item + if isinstance(item, DataItemWithMeta): + return ResolvablePipeItem(item.data, step, pipe, item.meta) + else: + return ResolvablePipeItem(item, step, pipe, meta) + first_evaluated_index = ( + self._current_source_index + if first_evaluated_index == -1 + else first_evaluated_index + ) except StopIteration: # remove empty iterator and try another source - self._sources.pop(self._round_robin_index) + self._sources.pop(self._current_source_index) # we need to decrease the index to keep the round robin order - self._round_robin_index -= 1 - # since in this case we have popped an initial source, we need to decrease the initial sources count - self._initial_sources_count -= 1 - return self._get_source_item_round_robin() + if self._next_item_mode == "round_robin": + self._current_source_index -= 1 + return self._get_source_item() except (PipelineException, ExtractorException, DltSourceException, PipeException): raise except Exception as ex: raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex + finally: + # reset to beginning of resource list for fifo mode + if self._next_item_mode == "fifo": + self._current_source_index = -1 + print("tmp") @staticmethod def clone_pipes( diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 68014f3181..20d67c2470 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -1,5 +1,6 @@ from copy import deepcopy import inspect +import asyncio from typing import ( AsyncIterable, AsyncIterator, @@ -24,6 +25,7 @@ pipeline_state, ) from dlt.common.utils import flatten_list_or_items, get_callable_name, uniq_id +from dlt.extract.utils import wrap_async_generator from dlt.extract.typing import ( DataItemWithMeta, @@ -308,6 +310,13 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: count = 0 if inspect.isfunction(gen): gen = gen() + + # wrap async gen already here + # max items will be handled by the wrapper + if inspect.isasyncgen(gen): + gen = wrap_async_generator(gen, max_items) + max_items = -1 + try: for i in gen: # type: ignore # TODO: help me fix this later yield i diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index b1b0180d3c..7743f103a5 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -132,17 +132,24 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in def wrap_async_generator( - gen: AsyncGenerator[TDataItems, None] + gen: AsyncGenerator[TDataItems, None], + max_items: Optional[int] = -1, ) -> Generator[Awaitable[TDataItems], None, None]: - """Wraps an async generatqor into a list of awaitables""" + """Wraps an async generator into a list of awaitables""" exhausted = False lock = asyncio.Lock() # creates an awaitable that will return the next item from the async generator async def run() -> TDataItems: + nonlocal max_items async with lock: try: + if max_items == 0: + await gen.aclose() + raise StopAsyncIteration() + max_items -= 1 return await gen.__anext__() + # on stop iteration mark as exhausted except StopAsyncIteration: nonlocal exhausted exhausted = True diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index 4ad6cb6f72..7b5177c695 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -44,8 +44,8 @@ def get_pipes(): # round robin mode _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin")) - # items will be round robin, nested iterators are fully iterated and appear inline as soon as they are encountered - assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 55, 56, 77, 88, 89, 13, 3, 14, 4, 15] + # items will be round robin, nested iterators are integrated into the round robin + assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 3, 13, 55, 4, 14, 56, 15, 77, 88, 89] def test_rotation_on_none() -> None: diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index aecad7e037..7e99027e08 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1629,136 +1629,3 @@ def api_fetch(page_num): load_info = pipeline.run(product()) assert_load_info(load_info) assert pipeline.last_trace.last_normalize_info.row_counts["product"] == 12 - - -# -# async generators resource tests -# -def test_async_generator_resource() -> None: - async def async_gen_table(): - for l_ in ["a", "b", "c"]: - await asyncio.sleep(0.1) - yield {"letter": l_} - - @dlt.resource - async def async_gen_resource(): - for l_ in ["d", "e", "f"]: - await asyncio.sleep(0.1) - yield {"letter": l_} - - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - - # pure async function - pipeline_1.run(async_gen_table(), table_name="async") - with pipeline_1.sql_client() as c: - with c.execute_query("SELECT * FROM async") as cur: - rows = list(cur.fetchall()) - assert [r[0] for r in rows] == ["a", "b", "c"] - - # async resource - pipeline_1.run(async_gen_resource(), table_name="async") - with pipeline_1.sql_client() as c: - with c.execute_query("SELECT * FROM async") as cur: - rows = list(cur.fetchall()) - assert [r[0] for r in rows] == ["a", "b", "c", "d", "e", "f"] - - -def test_async_generator_nested() -> None: - def async_inner_table(): - async def _gen(idx): - for l_ in ["a", "b", "c"]: - await asyncio.sleep(0.1) - yield {"async_gen": idx, "letter": l_} - - # just yield futures in a loop - for idx_ in range(3): - yield _gen(idx_) - - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - pipeline_1.run(async_inner_table(), table_name="async") - with pipeline_1.sql_client() as c: - with c.execute_query("SELECT * FROM async") as cur: - rows = list(cur.fetchall()) - assert [(r[0], r[1]) for r in rows] == [ - (0, "a"), - (0, "b"), - (0, "c"), - (1, "a"), - (1, "b"), - (1, "c"), - (2, "a"), - (2, "b"), - (2, "c"), - ] - - -def test_async_generator_transformer() -> None: - @dlt.resource - async def async_resource(): - for l_ in ["a", "b", "c"]: - await asyncio.sleep(0.1) - yield {"letter": l_} - - @dlt.transformer(data_from=async_resource) - async def async_transformer(item): - await asyncio.sleep(0.1) - yield { - "letter": item["letter"] + "t", - } - - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - pipeline_1.run(async_transformer(), table_name="async") - - with pipeline_1.sql_client() as c: - with c.execute_query("SELECT * FROM async") as cur: - rows = list(cur.fetchall()) - assert len(rows) == 3 - assert {r[0] for r in rows} == {"at", "bt", "ct"} - - -@pytest.mark.parametrize("next_item_mode", ["auto", "fifo", "round_robin"]) -def test_parallel_async_generators(next_item_mode: str) -> None: - os.environ["EXTRACT__NEXT_ITEM_MODE"] = next_item_mode - execution_order = [] - - @dlt.resource(table_name="async1") - async def async_resource1(): - for l_ in ["a", "b", "c"]: - await asyncio.sleep(1) - nonlocal execution_order - execution_order.append("one") - yield {"letter": l_} - - @dlt.resource(table_name="async2") - async def async_resource2(): - await asyncio.sleep(0.5) - for l_ in ["e", "f", "g"]: - await asyncio.sleep(1) - nonlocal execution_order - execution_order.append("two") - yield {"letter": l_} - - @dlt.source - def source(): - return [async_resource1(), async_resource2()] - - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - pipeline_1.run(source()) - - with pipeline_1.sql_client() as c: - with c.execute_query("SELECT * FROM async1") as cur: - rows = list(cur.fetchall()) - assert len(rows) == 3 - assert {r[0] for r in rows} == {"a", "b", "c"} - - with c.execute_query("SELECT * FROM async2") as cur: - rows = list(cur.fetchall()) - assert len(rows) == 3 - assert {r[0] for r in rows} == {"e", "f", "g"} - - # auto mode will switch to round robin if we have awaitables - assert ( - execution_order == ["one", "two", "one", "two", "one", "two"] - if next_item_mode in ["auto", "round_robin"] - else ["one", "one", "one", "two", "two", "two"] - ) diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py new file mode 100644 index 0000000000..6373ef9615 --- /dev/null +++ b/tests/pipeline/test_resources_evaluation.py @@ -0,0 +1,185 @@ +import dlt, asyncio, pytest, os + + +# +# async generators resource tests +# +def test_async_generator_resource() -> None: + async def async_gen_table(): + for l_ in ["a", "b", "c"]: + await asyncio.sleep(0.1) + yield {"letter": l_} + + @dlt.resource + async def async_gen_resource(): + for l_ in ["d", "e", "f"]: + await asyncio.sleep(0.1) + yield {"letter": l_} + + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + + # pure async function + pipeline_1.run(async_gen_table(), table_name="async") + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async") as cur: + rows = list(cur.fetchall()) + assert [r[0] for r in rows] == ["a", "b", "c"] + + # async resource + pipeline_1.run(async_gen_resource(), table_name="async") + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async") as cur: + rows = list(cur.fetchall()) + assert [r[0] for r in rows] == ["a", "b", "c", "d", "e", "f"] + + +def test_async_generator_nested() -> None: + def async_inner_table(): + async def _gen(idx): + for l_ in ["a", "b", "c"]: + await asyncio.sleep(0.1) + yield {"async_gen": idx, "letter": l_} + + # just yield futures in a loop + for idx_ in range(3): + yield _gen(idx_) + + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1.run(async_inner_table(), table_name="async") + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async") as cur: + rows = list(cur.fetchall()) + assert len(rows) == 9 + assert {(r[0], r[1]) for r in rows} == { + (0, "a"), + (0, "b"), + (0, "c"), + (1, "a"), + (1, "b"), + (1, "c"), + (2, "a"), + (2, "b"), + (2, "c"), + } + + +def test_async_generator_transformer() -> None: + @dlt.resource + async def async_resource(): + for l_ in ["a", "b", "c"]: + await asyncio.sleep(0.1) + yield {"letter": l_} + + @dlt.transformer(data_from=async_resource) + async def async_transformer(item): + await asyncio.sleep(0.1) + yield { + "letter": item["letter"] + "t", + } + + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1.run(async_transformer(), table_name="async") + + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async") as cur: + rows = list(cur.fetchall()) + assert len(rows) == 3 + assert {r[0] for r in rows} == {"at", "bt", "ct"} + + +@pytest.mark.parametrize("next_item_mode", ["fifo", "round_robin"]) +@pytest.mark.parametrize( + "resource_mode", ["both_sync", "both_async", "first_async", "second_async"] +) +def test_parallel_async_generators(next_item_mode: str, resource_mode: str) -> None: + os.environ["EXTRACT__NEXT_ITEM_MODE"] = next_item_mode + execution_order = [] + + @dlt.resource(table_name="table1") + async def sync_resource1(): + for l_ in ["a", "b", "c"]: + nonlocal execution_order + execution_order.append("one") + yield {"letter": l_} + + @dlt.resource(table_name="table2") + async def sync_resource2(): + for l_ in ["e", "f", "g"]: + nonlocal execution_order + execution_order.append("two") + yield {"letter": l_} + + @dlt.resource(table_name="table1") + async def async_resource1(): + for l_ in ["a", "b", "c"]: + await asyncio.sleep(1) + nonlocal execution_order + execution_order.append("one") + yield {"letter": l_} + + @dlt.resource(table_name="table2") + async def async_resource2(): + await asyncio.sleep(0.5) + for l_ in ["e", "f", "g"]: + await asyncio.sleep(1) + nonlocal execution_order + execution_order.append("two") + yield {"letter": l_} + + @dlt.source + def source(): + if resource_mode == "both_sync": + return [sync_resource1(), sync_resource2()] + elif resource_mode == "both_async": + return [async_resource1(), async_resource2()] + elif resource_mode == "first_async": + return [async_resource1(), sync_resource2()] + elif resource_mode == "second_async": + return [sync_resource1(), async_resource2()] + + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1.run(source()) + + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM table1") as cur: + rows = list(cur.fetchall()) + assert len(rows) == 3 + assert {r[0] for r in rows} == {"a", "b", "c"} + + with c.execute_query("SELECT * FROM table2") as cur: + rows = list(cur.fetchall()) + assert len(rows) == 3 + assert {r[0] for r in rows} == {"e", "f", "g"} + + # in both item modes there will be parallel execution + if resource_mode in ["both_async"]: + assert execution_order == ["one", "two", "one", "two", "one", "two"] + # first the first resouce is exhausted, then the second + elif resource_mode in ["both_sync"] and next_item_mode == "fifo": + assert execution_order == ["one", "one", "one", "two", "two", "two"] + # round robin is executed in sync + elif resource_mode in ["both_sync"] and next_item_mode == "round_robin": + assert execution_order == ["one", "two", "one", "two", "one", "two"] + elif resource_mode in ["first_async"]: + assert execution_order == ["two", "two", "two", "one", "one", "one"] + elif resource_mode in ["second_async"]: + assert execution_order == ["one", "one", "one", "two", "two", "two"] + else: + assert False, "Should not reach here" + + +def test_limit_async_resource() -> None: + @dlt.resource(table_name="table1") + async def async_resource1(): + for l_ in range(20): + print(l_) + await asyncio.sleep(0.1) + yield {"index": l_} + + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1.run(async_resource1().add_limit(13)) + + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM table1") as cur: + rows = list(cur.fetchall()) + assert len(rows) == 13 From dbea27fcb046a7c9909c305599715fcbfbf249ff Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 16:33:07 +0100 Subject: [PATCH 07/27] change limit behavior --- dlt/extract/pipe.py | 16 ++++++---------- dlt/extract/resource.py | 7 +++---- dlt/extract/utils.py | 20 +++++++++----------- 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index ef5dc65ba3..2670017c00 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -805,6 +805,9 @@ def _get_source_item(self) -> ResolvablePipeItem: if sources_count == 0: return None try: + # reset to beginning of resource list for fifo mode + if self._next_item_mode == "fifo": + self._current_source_index = -1 first_evaluated_index = -1 while True: print(self._current_source_index) @@ -826,11 +829,9 @@ def _get_source_item(self) -> ResolvablePipeItem: return ResolvablePipeItem(item.data, step, pipe, item.meta) else: return ResolvablePipeItem(item, step, pipe, meta) - first_evaluated_index = ( - self._current_source_index - if first_evaluated_index == -1 - else first_evaluated_index - ) + # remember the first evaluated index + if first_evaluated_index == -1: + first_evaluated_index = self._current_source_index except StopIteration: # remove empty iterator and try another source self._sources.pop(self._current_source_index) @@ -842,11 +843,6 @@ def _get_source_item(self) -> ResolvablePipeItem: raise except Exception as ex: raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex - finally: - # reset to beginning of resource list for fifo mode - if self._next_item_mode == "fifo": - self._current_source_index = -1 - print("tmp") @staticmethod def clone_pipes( diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 20d67c2470..f6c1684b94 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -312,15 +312,14 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: gen = gen() # wrap async gen already here - # max items will be handled by the wrapper if inspect.isasyncgen(gen): - gen = wrap_async_generator(gen, max_items) - max_items = -1 + gen = wrap_async_generator(gen) try: for i in gen: # type: ignore # TODO: help me fix this later yield i - count += 1 + if i is not None: + count += 1 if count == max_items: return finally: diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 7743f103a5..a4d993674a 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -132,8 +132,7 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in def wrap_async_generator( - gen: AsyncGenerator[TDataItems, None], - max_items: Optional[int] = -1, + gen: AsyncGenerator[TDataItems, None] ) -> Generator[Awaitable[TDataItems], None, None]: """Wraps an async generator into a list of awaitables""" exhausted = False @@ -141,13 +140,8 @@ def wrap_async_generator( # creates an awaitable that will return the next item from the async generator async def run() -> TDataItems: - nonlocal max_items async with lock: try: - if max_items == 0: - await gen.aclose() - raise StopAsyncIteration() - max_items -= 1 return await gen.__anext__() # on stop iteration mark as exhausted except StopAsyncIteration: @@ -156,10 +150,14 @@ async def run() -> TDataItems: raise # this generator yields None while the async generator is not exhauste - while not exhausted: - while lock.locked(): - yield None - yield run() + try: + while not exhausted: + while lock.locked(): + yield None + yield run() + except GeneratorExit: + # clean up async generator + asyncio.ensure_future(gen.aclose()) def wrap_compat_transformer( From 28a6a12e41e591eab04a96fb3bfe22d4a93931cd Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 17:11:00 +0100 Subject: [PATCH 08/27] small fixes --- dlt/extract/pipe.py | 6 ++++++ dlt/extract/utils.py | 2 +- tests/extract/test_extract_pipe.py | 4 ++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 2670017c00..2ff568c331 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -715,6 +715,12 @@ def stop_background_loop(loop: asyncio.AbstractEventLoop) -> None: # print("stopping loop") if self._async_pool: + # wait for all async generators to be closed + future = asyncio.run_coroutine_threadsafe( + self._async_pool.shutdown_asyncgens(), self._ensure_async_pool() + ) + while not future.done(): + sleep(self.futures_poll_interval) self._async_pool.call_soon_threadsafe(stop_background_loop, self._async_pool) # print("joining thread") self._async_pool_thread.join() diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index a4d993674a..f77e1f6c06 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -157,7 +157,7 @@ async def run() -> TDataItems: yield run() except GeneratorExit: # clean up async generator - asyncio.ensure_future(gen.aclose()) + pass def wrap_compat_transformer( diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index 7b5177c695..e60d533f1f 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -39,8 +39,8 @@ def get_pipes(): # default mode is "fifo" _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo")) - # items will be in order of the pipes, nested iterator items appear inline - assert [pi.item for pi in _l] == [1, 2, 55, 56, 77, 88, 89, 3, 4, 11, 12, 13, 14, 15, 20, 21] + # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation + assert [pi.item for pi in _l] == [1, 2, 55, 56, 3, 77, 88, 89, 4, 11, 12, 13, 14, 15, 20, 21] # round robin mode _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin")) From 21c6db3d261562d63bb4960794232681cac630ca Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 18:16:43 +0100 Subject: [PATCH 09/27] adds experiment for parallelizing regular resources --- dlt/extract/pipe.py | 8 +- dlt/extract/utils.py | 2 +- tests/pipeline/test_resources_evaluation.py | 101 +++++++++++++++++++- 3 files changed, 105 insertions(+), 6 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 2ff568c331..3d1c04208c 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -636,6 +636,7 @@ def __next__(self) -> PipeItem: if len(self._futures) < self.max_parallel_items or self._next_future() >= 0: # check if Awaitable first - awaitable can also be a callable if isinstance(item, Awaitable): + print("schedule") future = asyncio.run_coroutine_threadsafe(item, self._ensure_async_pool()) elif callable(item): future = self._ensure_thread_pool().submit(item) @@ -800,7 +801,11 @@ def _resolve_futures(self) -> ResolvablePipeItem: raise ResourceExtractionError(pipe.name, future, str(ex), "future") from ex item = future.result() - if isinstance(item, DataItemWithMeta): + + # we also interpret future items that are None to not be value to be consumed + if item is None: + return None + elif isinstance(item, DataItemWithMeta): return ResolvablePipeItem(item.data, step, pipe, item.meta) else: return ResolvablePipeItem(item, step, pipe, meta) @@ -816,7 +821,6 @@ def _get_source_item(self) -> ResolvablePipeItem: self._current_source_index = -1 first_evaluated_index = -1 while True: - print(self._current_source_index) self._current_source_index = (self._current_source_index + 1) % sources_count # if we have checked all sources once and all returned None, then we can sleep a bit if self._current_source_index == first_evaluated_index: diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index f77e1f6c06..a11e081510 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -149,7 +149,7 @@ async def run() -> TDataItems: exhausted = True raise - # this generator yields None while the async generator is not exhauste + # this generator yields None while the async generator is not exhausted try: while not exhausted: while lock.locked(): diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index 6373ef9615..4062897334 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -1,4 +1,7 @@ -import dlt, asyncio, pytest, os +from typing import Any + +import dlt, asyncio, pytest, os, threading, inspect, time +from functools import wraps # @@ -96,14 +99,14 @@ def test_parallel_async_generators(next_item_mode: str, resource_mode: str) -> N execution_order = [] @dlt.resource(table_name="table1") - async def sync_resource1(): + def sync_resource1(): for l_ in ["a", "b", "c"]: nonlocal execution_order execution_order.append("one") yield {"letter": l_} @dlt.resource(table_name="table2") - async def sync_resource2(): + def sync_resource2(): for l_ in ["e", "f", "g"]: nonlocal execution_order execution_order.append("two") @@ -183,3 +186,95 @@ async def async_resource1(): with c.execute_query("SELECT * FROM table1") as cur: rows = list(cur.fetchall()) assert len(rows) == 13 + + +@pytest.mark.parametrize("parallelized", [True, False]) +def test_async_decorator_experiment(parallelized) -> None: + os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo" + execution_order = [] + threads = set() + + def parallelize(f) -> Any: + exhausted = False + lock = threading.Lock() + + """converts regular itarable to generator of functions that can be run in parallel in the pipe""" + @wraps(f) + def _wrap(*args: Any, **kwargs: Any) -> Any: + gen = f(*args, **kwargs) + # unpack generator + if inspect.isfunction(gen): + gen = gen() + # if we have an async gen, no further action is needed + if inspect.isasyncgen(gen): + raise Exception("Already async gen") + + # get next item from generator + def _gen(): + nonlocal exhausted + with lock: + # await asyncio.sleep(0.1) + try: + return next(gen) + # on stop iteration mark as exhausted + except StopIteration: + exhausted = True + return None + try: + while not exhausted: + while lock.locked(): + yield None + yield _gen + except GeneratorExit: + # clean up inner generator + gen.close() + + return _wrap + + @parallelize + def resource1(): + for l_ in ["a", "b", "c"]: + time.sleep(0.1) + nonlocal execution_order + execution_order.append("one") + threads.add(threading.get_ident()) + yield {"letter": l_} + + @parallelize + def resource2(): + time.sleep(0.05) + for l_ in ["e", "f", "g"]: + time.sleep(0.1) + nonlocal execution_order + execution_order.append("two") + threads.add(threading.get_ident()) + yield {"letter": l_} + + @dlt.source + def source(): + if parallelized: + return [resource1(), resource2()] + else: # return unwrapped resources + return [resource1.__wrapped__(), resource2.__wrapped__()] + + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1.run(source()) + + # all records should be here + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM resource1") as cur: + rows = list(cur.fetchall()) + assert len(rows) == 3 + assert {r[0] for r in rows} == {"a", "b", "c"} + + with c.execute_query("SELECT * FROM resource2") as cur: + rows = list(cur.fetchall()) + assert len(rows) == 3 + assert {r[0] for r in rows} == {"e", "f", "g"} + + if parallelized: + assert len(threads) > 1 + assert execution_order == ["one", "two", "one", "two", "one", "two"] + else: + assert execution_order == ["one", "one", "one", "two", "two", "two"] + assert len(threads) == 1 From a151428394394465be60629e86a3ef8c4ea98acc Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 18:23:25 +0100 Subject: [PATCH 10/27] fix linter --- tests/pipeline/test_resources_evaluation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index 4062897334..c938498426 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -168,7 +168,7 @@ def source(): elif resource_mode in ["second_async"]: assert execution_order == ["one", "one", "one", "two", "two", "two"] else: - assert False, "Should not reach here" + raise AssertionError("Unknown combination") def test_limit_async_resource() -> None: From 0211d3dc82b9c2c557cf826c73c2b7b339b04b18 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 20:13:08 +0100 Subject: [PATCH 11/27] test is creating pool before iteration solves ci test problems --- dlt/extract/pipe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 3d1c04208c..b4f1816e3f 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -589,6 +589,7 @@ def _fork_pipeline(pipe: Pipe) -> None: return cls(max_parallel_items, workers, futures_poll_interval, sources, next_item_mode) def __next__(self) -> PipeItem: + self._ensure_async_pool() pipe_item: Union[ResolvablePipeItem, SourcePipeItem] = None # __next__ should call itself to remove the `while` loop and continue clauses but that may lead to stack overflows: there's no tail recursion opt in python # https://stackoverflow.com/questions/13591970/does-python-optimize-tail-recursion (see Y combinator on how it could be emulated) From c087ebf6a323027061ee9d798009ec41829b187e Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 20:49:34 +0100 Subject: [PATCH 12/27] make one test more predictable --- tests/pipeline/test_resources_evaluation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index c938498426..d908624b38 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -234,7 +234,7 @@ def _gen(): @parallelize def resource1(): for l_ in ["a", "b", "c"]: - time.sleep(0.1) + time.sleep(0.3) nonlocal execution_order execution_order.append("one") threads.add(threading.get_ident()) @@ -242,9 +242,9 @@ def resource1(): @parallelize def resource2(): - time.sleep(0.05) + time.sleep(0.1) for l_ in ["e", "f", "g"]: - time.sleep(0.1) + time.sleep(0.3) nonlocal execution_order execution_order.append("two") threads.add(threading.get_ident()) From a7bf8e076e7917b89538a5468f36273ad6671f51 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 21:18:03 +0100 Subject: [PATCH 13/27] remove async pool fix --- dlt/extract/pipe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index b4f1816e3f..3d1c04208c 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -589,7 +589,6 @@ def _fork_pipeline(pipe: Pipe) -> None: return cls(max_parallel_items, workers, futures_poll_interval, sources, next_item_mode) def __next__(self) -> PipeItem: - self._ensure_async_pool() pipe_item: Union[ResolvablePipeItem, SourcePipeItem] = None # __next__ should call itself to remove the `while` loop and continue clauses but that may lead to stack overflows: there's no tail recursion opt in python # https://stackoverflow.com/questions/13591970/does-python-optimize-tail-recursion (see Y combinator on how it could be emulated) From 3c047a2ffddbf97a201e3d7d8fc6dbf9fbadbb84 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 21:53:41 +0100 Subject: [PATCH 14/27] remove locks from generator wrappers --- dlt/extract/utils.py | 23 +++++++++------- tests/pipeline/test_resources_evaluation.py | 29 ++++++++++++--------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index a11e081510..7c3ed1cc9d 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -136,24 +136,27 @@ def wrap_async_generator( ) -> Generator[Awaitable[TDataItems], None, None]: """Wraps an async generator into a list of awaitables""" exhausted = False - lock = asyncio.Lock() + busy = False # creates an awaitable that will return the next item from the async generator async def run() -> TDataItems: - async with lock: - try: - return await gen.__anext__() - # on stop iteration mark as exhausted - except StopAsyncIteration: - nonlocal exhausted - exhausted = True - raise + try: + return await gen.__anext__() + # on stop iteration mark as exhausted + except StopAsyncIteration: + nonlocal exhausted + exhausted = True + raise + finally: + nonlocal busy + busy = False # this generator yields None while the async generator is not exhausted try: while not exhausted: - while lock.locked(): + while busy: yield None + busy = True yield run() except GeneratorExit: # clean up async generator diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index d908624b38..cd5e80a316 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -195,12 +195,13 @@ def test_async_decorator_experiment(parallelized) -> None: threads = set() def parallelize(f) -> Any: - exhausted = False - lock = threading.Lock() - """converts regular itarable to generator of functions that can be run in parallel in the pipe""" + @wraps(f) def _wrap(*args: Any, **kwargs: Any) -> Any: + exhausted = False + busy = False + gen = f(*args, **kwargs) # unpack generator if inspect.isfunction(gen): @@ -212,18 +213,22 @@ def _wrap(*args: Any, **kwargs: Any) -> Any: # get next item from generator def _gen(): nonlocal exhausted - with lock: - # await asyncio.sleep(0.1) - try: - return next(gen) - # on stop iteration mark as exhausted - except StopIteration: - exhausted = True - return None + # await asyncio.sleep(0.1) + try: + return next(gen) + # on stop iteration mark as exhausted + except StopIteration: + exhausted = True + return None + finally: + nonlocal busy + busy = False + try: while not exhausted: - while lock.locked(): + while busy: yield None + busy = True yield _gen except GeneratorExit: # clean up inner generator From 8d81d992639c0eda3d3044e96958416b81cdb46b Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 24 Jan 2024 22:30:15 +0100 Subject: [PATCH 15/27] make test even more predictable --- tests/pipeline/test_resources_evaluation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index cd5e80a316..71e1fa42ff 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -239,7 +239,7 @@ def _gen(): @parallelize def resource1(): for l_ in ["a", "b", "c"]: - time.sleep(0.3) + time.sleep(0.5) nonlocal execution_order execution_order.append("one") threads.add(threading.get_ident()) @@ -247,9 +247,9 @@ def resource1(): @parallelize def resource2(): - time.sleep(0.1) + time.sleep(0.25) for l_ in ["e", "f", "g"]: - time.sleep(0.3) + time.sleep(0.5) nonlocal execution_order execution_order.append("two") threads.add(threading.get_ident()) From 435239d54140ab534f80717e239fb08e0d3bd944 Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 26 Jan 2024 11:24:25 +0100 Subject: [PATCH 16/27] pr fixes --- dlt/extract/pipe.py | 30 ++- dlt/extract/resource.py | 6 +- dlt/extract/utils.py | 19 +- tests/extract/test_extract_pipe.py | 38 ++++ tests/pipeline/test_resources_evaluation.py | 204 ++++++++++---------- 5 files changed, 175 insertions(+), 122 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 3d1c04208c..8a1a29e172 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -55,7 +55,7 @@ simulate_func_call, wrap_compat_transformer, wrap_resource_gen, - wrap_async_generator, + wrap_async_iterator, ) if TYPE_CHECKING: @@ -109,7 +109,7 @@ class SourcePipeItem(NamedTuple): Callable[[TDataItems], Iterator[ResolvablePipeItem]], ] -TPipeNextItemMode = Union[Literal["fifo"], Literal["round_robin"]] +TPipeNextItemMode = Literal["fifo", "round_robin"] class ForkPipe: @@ -324,7 +324,7 @@ def evaluate_gen(self) -> None: # wrap async generator if inspect.isasyncgen(self.gen): - self.replace_gen(wrap_async_generator(self.gen)) + self.replace_gen(wrap_async_iterator(self.gen)) # evaluate transforms for step_no, step in enumerate(self._steps): @@ -510,14 +510,14 @@ def __init__( self.max_parallel_items = max_parallel_items self.workers = workers self.futures_poll_interval = futures_poll_interval - - self._current_source_index: int = -1 self._async_pool: asyncio.AbstractEventLoop = None self._async_pool_thread: Thread = None self._thread_pool: ThreadPoolExecutor = None self._sources = sources self._futures: List[FuturePipeItem] = [] self._next_item_mode = next_item_mode + self._initial_sources_count = len(sources) + self._current_source_index: int = -1 @classmethod @with_config(spec=PipeIteratorConfiguration) @@ -621,11 +621,11 @@ def __next__(self) -> PipeItem: continue # handle async iterator items as new source - if inspect.isasyncgen(item): + if isinstance(item, AsyncIterator): self._sources.insert( 0, SourcePipeItem( - wrap_async_generator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta + wrap_async_iterator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta ), ) pipe_item = None @@ -815,13 +815,21 @@ def _get_source_item(self) -> ResolvablePipeItem: # no more sources to iterate if sources_count == 0: return None + # while we have more new sources than available future slots, we do strict fifo where we + # only ever check the first source, this is to prevent an uncontrolled number of sources + # being created in certain scenarios + force_strict_fifo = (sources_count - self._initial_sources_count) > self.max_parallel_items try: - # reset to beginning of resource list for fifo mode + # always reset to start of list for fifo mode if self._next_item_mode == "fifo": self._current_source_index = -1 first_evaluated_index = -1 while True: - self._current_source_index = (self._current_source_index + 1) % sources_count + # in strict fifo mode we never check more than the top most source + if force_strict_fifo: + self._current_source_index = 0 + else: + self._current_source_index = (self._current_source_index + 1) % sources_count # if we have checked all sources once and all returned None, then we can sleep a bit if self._current_source_index == first_evaluated_index: sleep(self.futures_poll_interval) @@ -845,9 +853,13 @@ def _get_source_item(self) -> ResolvablePipeItem: except StopIteration: # remove empty iterator and try another source self._sources.pop(self._current_source_index) + # decrease initial source count if we popped an initial source + if self._current_source_index >= abs(self._initial_sources_count - sources_count): + self._initial_sources_count -= 1 # we need to decrease the index to keep the round robin order if self._next_item_mode == "round_robin": self._current_source_index -= 1 + return self._get_source_item() except (PipelineException, ExtractorException, DltSourceException, PipeException): raise diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index f6c1684b94..e0fa40bc21 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -25,7 +25,7 @@ pipeline_state, ) from dlt.common.utils import flatten_list_or_items, get_callable_name, uniq_id -from dlt.extract.utils import wrap_async_generator +from dlt.extract.utils import wrap_async_iterator from dlt.extract.typing import ( DataItemWithMeta, @@ -312,8 +312,8 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: gen = gen() # wrap async gen already here - if inspect.isasyncgen(gen): - gen = wrap_async_generator(gen) + if isinstance(gen, AsyncIterator): + gen = wrap_async_iterator(gen) try: for i in gen: # type: ignore # TODO: help me fix this later diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 7c3ed1cc9d..6d78d0b659 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -9,6 +9,7 @@ Any, Sequence, cast, + AsyncIterator, AsyncGenerator, Awaitable, Generator, @@ -131,8 +132,8 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in return meta_arg -def wrap_async_generator( - gen: AsyncGenerator[TDataItems, None] +def wrap_async_iterator( + gen: AsyncIterator[TDataItems], ) -> Generator[Awaitable[TDataItems], None, None]: """Wraps an async generator into a list of awaitables""" exhausted = False @@ -140,11 +141,16 @@ def wrap_async_generator( # creates an awaitable that will return the next item from the async generator async def run() -> TDataItems: + nonlocal exhausted try: - return await gen.__anext__() + item = await gen.__anext__() + # if marked exhausted by the main thread and we are wrapping a generator + # we can close it here + if exhausted and isinstance(gen, AsyncGenerator): + await gen.aclose() + return item # on stop iteration mark as exhausted except StopAsyncIteration: - nonlocal exhausted exhausted = True raise finally: @@ -158,9 +164,10 @@ async def run() -> TDataItems: yield None busy = True yield run() + # this gets called from the main thread when the wrapping generater is closed except GeneratorExit: - # clean up async generator - pass + # mark as exhausted + exhausted = True def wrap_compat_transformer( diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index e60d533f1f..760ae678d0 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -603,6 +603,10 @@ def pass_gen(item, meta): def test_close_on_async_exception() -> None: + global close_pipe_got_exit, close_pipe_yielding + close_pipe_got_exit = False + close_pipe_yielding = False + def long_gen(): global close_pipe_got_exit, close_pipe_yielding @@ -628,6 +632,10 @@ async def raise_gen(item: int): def test_close_on_thread_pool_exception() -> None: + global close_pipe_got_exit, close_pipe_yielding + close_pipe_got_exit = False + close_pipe_yielding = False + def long_gen(): global close_pipe_got_exit, close_pipe_yielding @@ -655,6 +663,10 @@ def raise_gen(item: int): def test_close_on_sync_exception() -> None: + global close_pipe_got_exit, close_pipe_yielding + close_pipe_got_exit = False + close_pipe_yielding = False + def long_gen(): global close_pipe_got_exit, close_pipe_yielding @@ -674,6 +686,32 @@ def raise_gen(item: int): assert_pipes_closed(raise_gen, long_gen) +def test_close_on_async_generator() -> None: + global close_pipe_got_exit, close_pipe_yielding + close_pipe_got_exit = False + close_pipe_yielding = False + + async def long_gen(): + global close_pipe_got_exit, close_pipe_yielding + + # will be closed by PipeIterator + try: + close_pipe_yielding = True + for i in range(0, 10000): + asyncio.sleep(0.01) + yield i + close_pipe_yielding = False + except GeneratorExit: + close_pipe_got_exit = True + + def raise_gen(item: int): + if item == 10: + raise RuntimeError("we fail") + yield item + + assert_pipes_closed(raise_gen, long_gen) + + def assert_pipes_closed(raise_gen, long_gen) -> None: global close_pipe_got_exit, close_pipe_yielding diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index 71e1fa42ff..516deb404d 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -179,107 +179,103 @@ async def async_resource1(): await asyncio.sleep(0.1) yield {"index": l_} - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - pipeline_1.run(async_resource1().add_limit(13)) - - with pipeline_1.sql_client() as c: - with c.execute_query("SELECT * FROM table1") as cur: - rows = list(cur.fetchall()) - assert len(rows) == 13 - - -@pytest.mark.parametrize("parallelized", [True, False]) -def test_async_decorator_experiment(parallelized) -> None: - os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo" - execution_order = [] - threads = set() - - def parallelize(f) -> Any: - """converts regular itarable to generator of functions that can be run in parallel in the pipe""" - - @wraps(f) - def _wrap(*args: Any, **kwargs: Any) -> Any: - exhausted = False - busy = False - - gen = f(*args, **kwargs) - # unpack generator - if inspect.isfunction(gen): - gen = gen() - # if we have an async gen, no further action is needed - if inspect.isasyncgen(gen): - raise Exception("Already async gen") - - # get next item from generator - def _gen(): - nonlocal exhausted - # await asyncio.sleep(0.1) - try: - return next(gen) - # on stop iteration mark as exhausted - except StopIteration: - exhausted = True - return None - finally: - nonlocal busy - busy = False - - try: - while not exhausted: - while busy: - yield None - busy = True - yield _gen - except GeneratorExit: - # clean up inner generator - gen.close() - - return _wrap - - @parallelize - def resource1(): - for l_ in ["a", "b", "c"]: - time.sleep(0.5) - nonlocal execution_order - execution_order.append("one") - threads.add(threading.get_ident()) - yield {"letter": l_} - - @parallelize - def resource2(): - time.sleep(0.25) - for l_ in ["e", "f", "g"]: - time.sleep(0.5) - nonlocal execution_order - execution_order.append("two") - threads.add(threading.get_ident()) - yield {"letter": l_} - - @dlt.source - def source(): - if parallelized: - return [resource1(), resource2()] - else: # return unwrapped resources - return [resource1.__wrapped__(), resource2.__wrapped__()] - - pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) - pipeline_1.run(source()) - - # all records should be here - with pipeline_1.sql_client() as c: - with c.execute_query("SELECT * FROM resource1") as cur: - rows = list(cur.fetchall()) - assert len(rows) == 3 - assert {r[0] for r in rows} == {"a", "b", "c"} - - with c.execute_query("SELECT * FROM resource2") as cur: - rows = list(cur.fetchall()) - assert len(rows) == 3 - assert {r[0] for r in rows} == {"e", "f", "g"} - - if parallelized: - assert len(threads) > 1 - assert execution_order == ["one", "two", "one", "two", "one", "two"] - else: - assert execution_order == ["one", "one", "one", "two", "two", "two"] - assert len(threads) == 1 + result = list(async_resource1().add_limit(13)) + assert len(result) == 13 + + +# @pytest.mark.skip(reason="To be properly implemented in an upcoming PR") +# @pytest.mark.parametrize("parallelized", [True, False]) +# def test_async_decorator_experiment(parallelized) -> None: +# os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo" +# execution_order = [] +# threads = set() + +# def parallelize(f) -> Any: +# """converts regular itarable to generator of functions that can be run in parallel in the pipe""" + +# @wraps(f) +# def _wrap(*args: Any, **kwargs: Any) -> Any: +# exhausted = False +# busy = False + +# gen = f(*args, **kwargs) +# # unpack generator +# if inspect.isfunction(gen): +# gen = gen() +# # if we have an async gen, no further action is needed +# if inspect.isasyncgen(gen): +# raise Exception("Already async gen") + +# # get next item from generator +# def _gen(): +# nonlocal exhausted +# # await asyncio.sleep(0.1) +# try: +# return next(gen) +# # on stop iteration mark as exhausted +# except StopIteration: +# exhausted = True +# return None +# finally: +# nonlocal busy +# busy = False + +# try: +# while not exhausted: +# while busy: +# yield None +# busy = True +# yield _gen +# except GeneratorExit: +# # clean up inner generator +# gen.close() + +# return _wrap + +# @parallelize +# def resource1(): +# for l_ in ["a", "b", "c"]: +# time.sleep(0.5) +# nonlocal execution_order +# execution_order.append("one") +# threads.add(threading.get_ident()) +# yield {"letter": l_} + +# @parallelize +# def resource2(): +# time.sleep(0.25) +# for l_ in ["e", "f", "g"]: +# time.sleep(0.5) +# nonlocal execution_order +# execution_order.append("two") +# threads.add(threading.get_ident()) +# yield {"letter": l_} + +# @dlt.source +# def source(): +# if parallelized: +# return [resource1(), resource2()] +# else: # return unwrapped resources +# return [resource1.__wrapped__(), resource2.__wrapped__()] + +# pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) +# pipeline_1.run(source()) + +# # all records should be here +# with pipeline_1.sql_client() as c: +# with c.execute_query("SELECT * FROM resource1") as cur: +# rows = list(cur.fetchall()) +# assert len(rows) == 3 +# assert {r[0] for r in rows} == {"a", "b", "c"} + +# with c.execute_query("SELECT * FROM resource2") as cur: +# rows = list(cur.fetchall()) +# assert len(rows) == 3 +# assert {r[0] for r in rows} == {"e", "f", "g"} + +# if parallelized: +# assert len(threads) > 1 +# assert execution_order == ["one", "two", "one", "two", "one", "two"] +# else: +# assert execution_order == ["one", "one", "one", "two", "two", "two"] +# assert len(threads) == 1 From 3787c637d32aee25c78a3320ac5d4d23701a7a65 Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 26 Jan 2024 11:47:28 +0100 Subject: [PATCH 17/27] fix async error test test async iterator --- dlt/extract/pipe.py | 3 +-- tests/extract/test_extract_pipe.py | 5 ++-- tests/pipeline/test_resources_evaluation.py | 29 +++++++++++++++++++++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 8a1a29e172..2a1666564b 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -323,7 +323,7 @@ def evaluate_gen(self) -> None: self._ensure_transform_step(self._gen_idx, gen) # wrap async generator - if inspect.isasyncgen(self.gen): + if isinstance(self.gen, AsyncIterator): self.replace_gen(wrap_async_iterator(self.gen)) # evaluate transforms @@ -636,7 +636,6 @@ def __next__(self) -> PipeItem: if len(self._futures) < self.max_parallel_items or self._next_future() >= 0: # check if Awaitable first - awaitable can also be a callable if isinstance(item, Awaitable): - print("schedule") future = asyncio.run_coroutine_threadsafe(item, self._ensure_async_pool()) elif callable(item): future = self._ensure_thread_pool().submit(item) diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index 760ae678d0..2dab094ebf 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -698,10 +698,11 @@ async def long_gen(): try: close_pipe_yielding = True for i in range(0, 10000): - asyncio.sleep(0.01) + await asyncio.sleep(0.01) yield i close_pipe_yielding = False - except GeneratorExit: + # we have a different exception here + except asyncio.CancelledError: close_pipe_got_exit = True def raise_gen(item: int): diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index 516deb404d..2731686e5a 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -4,6 +4,35 @@ from functools import wraps +def test_async_iterator_resource() -> None: + # define an asynchronous iterator + class AsyncIterator: + def __init__(self): + self.counter = 0 + + def __aiter__(self): + return self + + # return the next awaitable + async def __anext__(self): + # check for no further items + if self.counter >= 5: + raise StopAsyncIteration + # increment the counter + self.counter += 1 + # simulate work + await asyncio.sleep(0.1) + # return the counter value + return {"i": self.counter} + + pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) + pipeline_1.run(AsyncIterator, table_name="async") + with pipeline_1.sql_client() as c: + with c.execute_query("SELECT * FROM async") as cur: + rows = list(cur.fetchall()) + assert [r[0] for r in rows] == [1, 2, 3, 4, 5] + + # # async generators resource tests # From 05d0c55a89f7c3136375996ea519804e0c6311c3 Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 26 Jan 2024 12:22:51 +0100 Subject: [PATCH 18/27] update evaluation order tests --- dlt/extract/pipe.py | 5 ++-- tests/extract/test_extract_pipe.py | 31 +++++++++++++++------ tests/pipeline/test_resources_evaluation.py | 4 +-- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 2a1666564b..0835ee41ad 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -817,7 +817,7 @@ def _get_source_item(self) -> ResolvablePipeItem: # while we have more new sources than available future slots, we do strict fifo where we # only ever check the first source, this is to prevent an uncontrolled number of sources # being created in certain scenarios - force_strict_fifo = (sources_count - self._initial_sources_count) > self.max_parallel_items + force_strict_fifo = (sources_count - self._initial_sources_count) >= self.max_parallel_items try: # always reset to start of list for fifo mode if self._next_item_mode == "fifo": @@ -826,9 +826,11 @@ def _get_source_item(self) -> ResolvablePipeItem: while True: # in strict fifo mode we never check more than the top most source if force_strict_fifo: + print("strict") self._current_source_index = 0 else: self._current_source_index = (self._current_source_index + 1) % sources_count + print(self._current_source_index) # if we have checked all sources once and all returned None, then we can sleep a bit if self._current_source_index == first_evaluated_index: sleep(self.futures_poll_interval) @@ -858,7 +860,6 @@ def _get_source_item(self) -> ResolvablePipeItem: # we need to decrease the index to keep the round robin order if self._next_item_mode == "round_robin": self._current_source_index -= 1 - return self._get_source_item() except (PipelineException, ExtractorException, DltSourceException, PipeException): raise diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index 2dab094ebf..f7869bc78b 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -16,19 +16,19 @@ def test_next_item_mode() -> None: def nested_gen_level_2(): - yield from [88, None, 89] + yield from [6, None, 7] def nested_gen(): - yield from [55, 56, None, 77, nested_gen_level_2()] + yield from [3, 4, None, 5, nested_gen_level_2(), 8, 9] def source_gen1(): - yield from [1, 2, nested_gen(), 3, 4] + yield from [1, 2, nested_gen(), 10, 11] def source_gen2(): - yield from range(11, 16) + yield from [12, 13] def source_gen3(): - yield from range(20, 22) + yield from [14, 15] def get_pipes(): return [ @@ -39,13 +39,26 @@ def get_pipes(): # default mode is "fifo" _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo")) + # items will be in order of the pipes, nested iterator items appear inline, None triggers a bit of rotation + assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15] + + # force strict mode, no rotation at all when crossing the initial source count + _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo", max_parallel_items=1)) # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation - assert [pi.item for pi in _l] == [1, 2, 55, 56, 3, 77, 88, 89, 4, 11, 12, 13, 14, 15, 20, 21] + assert [pi.item for pi in _l] == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] - # round robin mode + # round robin eval _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin")) - # items will be round robin, nested iterators are integrated into the round robin - assert [pi.item for pi in _l] == [1, 11, 20, 2, 12, 21, 3, 13, 55, 4, 14, 56, 15, 77, 88, 89] + # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation + assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 10, 3, 11, 4, 5, 8, 6, 9, 7] + + # round robin with max parallel items triggers strict fifo in some cases (after gen2 and 3 are exhausted we already have the first yielded gen, + # items appear in order as sources are processed strictly from front) + _l = list( + PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin", max_parallel_items=1) + ) + # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation + assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 4, 5, 6, 7, 8, 9, 10, 11] def test_rotation_on_none() -> None: diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index 2731686e5a..f61b4b374a 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -1,11 +1,11 @@ from typing import Any -import dlt, asyncio, pytest, os, threading, inspect, time -from functools import wraps +import dlt, asyncio, pytest, os def test_async_iterator_resource() -> None: # define an asynchronous iterator + @dlt.resource() class AsyncIterator: def __init__(self): self.counter = 0 From 614b80bbd55309fc5bcc429bd09b4bce510df8e9 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 26 Jan 2024 23:17:17 +0100 Subject: [PATCH 19/27] adds sources at the end of pipe, closes generators before futures so wrapped generators are notified --- dlt/extract/pipe.py | 62 +++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 0835ee41ad..3062ed083d 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -515,9 +515,9 @@ def __init__( self._thread_pool: ThreadPoolExecutor = None self._sources = sources self._futures: List[FuturePipeItem] = [] - self._next_item_mode = next_item_mode + self._next_item_mode: TPipeNextItemMode = next_item_mode self._initial_sources_count = len(sources) - self._current_source_index: int = -1 + self._current_source_index: int = 0 @classmethod @with_config(spec=PipeIteratorConfiguration) @@ -582,6 +582,8 @@ def _fork_pipeline(pipe: Pipe) -> None: if not any(i.pipe == pipe for i in sources): sources.append(SourcePipeItem(pipe.gen, 0, pipe, None)) + # reverse pipes for current mode, as we start processing from the back + pipes.reverse() for pipe in pipes: _fork_pipeline(pipe) @@ -614,16 +616,15 @@ def __next__(self) -> PipeItem: # if item is iterator, then add it as a new source if isinstance(item, Iterator): # print(f"adding iterable {item}") - self._sources.insert( - 0, SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta) + self._sources.append( + SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta) ) pipe_item = None continue # handle async iterator items as new source if isinstance(item, AsyncIterator): - self._sources.insert( - 0, + self._sources.append( SourcePipeItem( wrap_async_iterator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta ), @@ -701,19 +702,18 @@ def close(self) -> None: def stop_background_loop(loop: asyncio.AbstractEventLoop) -> None: loop.stop() - # stop all futures - for f, _, _, _ in self._futures: - if not f.done(): - f.cancel() - self._futures.clear() - # close all generators for gen, _, _, _ in self._sources: if inspect.isgenerator(gen): gen.close() self._sources.clear() - # print("stopping loop") + # stop all futures + for f, _, _, _ in self._futures: + if not f.done(): + f.cancel() + + # let tasks cancel if self._async_pool: # wait for all async generators to be closed future = asyncio.run_coroutine_threadsafe( @@ -730,6 +730,8 @@ def stop_background_loop(loop: asyncio.AbstractEventLoop) -> None: self._thread_pool.shutdown(wait=True) self._thread_pool = None + self._futures.clear() + def _ensure_async_pool(self) -> asyncio.AbstractEventLoop: # lazily create async pool is separate thread if self._async_pool: @@ -814,23 +816,18 @@ def _get_source_item(self) -> ResolvablePipeItem: # no more sources to iterate if sources_count == 0: return None - # while we have more new sources than available future slots, we do strict fifo where we - # only ever check the first source, this is to prevent an uncontrolled number of sources - # being created in certain scenarios - force_strict_fifo = (sources_count - self._initial_sources_count) >= self.max_parallel_items try: - # always reset to start of list for fifo mode - if self._next_item_mode == "fifo": - self._current_source_index = -1 - first_evaluated_index = -1 + first_evaluated_index: int = None + # always reset to end of list for fifo mode, also take into account that new sources can be added + # if too many new sources is added we switch to fifo not to exhaust them + if ( + self._next_item_mode == "fifo" + or (sources_count - self._initial_sources_count) >= self.max_parallel_items + ): + self._current_source_index = sources_count - 1 + else: + self._current_source_index = (self._current_source_index - 1) % sources_count while True: - # in strict fifo mode we never check more than the top most source - if force_strict_fifo: - print("strict") - self._current_source_index = 0 - else: - self._current_source_index = (self._current_source_index + 1) % sources_count - print(self._current_source_index) # if we have checked all sources once and all returned None, then we can sleep a bit if self._current_source_index == first_evaluated_index: sleep(self.futures_poll_interval) @@ -849,17 +846,16 @@ def _get_source_item(self) -> ResolvablePipeItem: else: return ResolvablePipeItem(item, step, pipe, meta) # remember the first evaluated index - if first_evaluated_index == -1: + if first_evaluated_index is None: first_evaluated_index = self._current_source_index + # always go round robin if None was returned + self._current_source_index = (self._current_source_index - 1) % sources_count except StopIteration: # remove empty iterator and try another source self._sources.pop(self._current_source_index) # decrease initial source count if we popped an initial source - if self._current_source_index >= abs(self._initial_sources_count - sources_count): + if self._current_source_index < self._initial_sources_count: self._initial_sources_count -= 1 - # we need to decrease the index to keep the round robin order - if self._next_item_mode == "round_robin": - self._current_source_index -= 1 return self._get_source_item() except (PipelineException, ExtractorException, DltSourceException, PipeException): raise From fb9c5649489b4c85a8228187efead5f0785df09e Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 26 Jan 2024 23:17:49 +0100 Subject: [PATCH 20/27] allows async generator items to be evaluated in add_limit --- dlt/extract/resource.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index e0fa40bc21..ac7339ec7c 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -306,22 +306,26 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003 def _gen_wrap(gen: TPipeStep) -> TPipeStep: """Wrap a generator to take the first `max_items` records""" - nonlocal max_items count = 0 + is_async_gen = False if inspect.isfunction(gen): gen = gen() # wrap async gen already here if isinstance(gen, AsyncIterator): gen = wrap_async_iterator(gen) + is_async_gen = True try: for i in gen: # type: ignore # TODO: help me fix this later yield i if i is not None: count += 1 - if count == max_items: - return + # async gen yields awaitable so we must count one awaitable more + # so the previous one is evaluated and yielded. + # new awaitable will be cancelled + if count == max_items + int(is_async_gen): + return finally: if inspect.isgenerator(gen): gen.close() From 4a61e608e334d5093224eddf29f901a88400e51a Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 26 Jan 2024 23:18:05 +0100 Subject: [PATCH 21/27] fixes tests --- dlt/extract/utils.py | 7 ++++--- tests/extract/test_extract_pipe.py | 21 ++++++++++++--------- tests/pipeline/test_resources_evaluation.py | 2 +- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 6d78d0b659..0e86994eb4 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -143,13 +143,14 @@ def wrap_async_iterator( async def run() -> TDataItems: nonlocal exhausted try: - item = await gen.__anext__() # if marked exhausted by the main thread and we are wrapping a generator # we can close it here - if exhausted and isinstance(gen, AsyncGenerator): - await gen.aclose() + if exhausted: + raise StopAsyncIteration() + item = await gen.__anext__() return item # on stop iteration mark as exhausted + # also called when futures are cancelled except StopAsyncIteration: exhausted = True raise diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index f7869bc78b..95652de386 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -42,15 +42,15 @@ def get_pipes(): # items will be in order of the pipes, nested iterator items appear inline, None triggers a bit of rotation assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15] - # force strict mode, no rotation at all when crossing the initial source count + # force fifo, no rotation at all when crossing the initial source count _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="fifo", max_parallel_items=1)) - # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation - assert [pi.item for pi in _l] == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] + # order the same as above - same rules apply + assert [pi.item for pi in _l] == [1, 2, 3, 4, 10, 5, 6, 8, 7, 9, 11, 12, 13, 14, 15] # round robin eval _l = list(PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin")) # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation - assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 10, 3, 11, 4, 5, 8, 6, 9, 7] + assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 10, 4, 11, 5, 6, 8, 9, 7] # round robin with max parallel items triggers strict fifo in some cases (after gen2 and 3 are exhausted we already have the first yielded gen, # items appear in order as sources are processed strictly from front) @@ -58,7 +58,9 @@ def get_pipes(): PipeIterator.from_pipes(get_pipes(), next_item_mode="round_robin", max_parallel_items=1) ) # items will be in order of the pipes, nested iterator items appear inline, None triggers rotation - assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 4, 5, 6, 7, 8, 9, 10, 11] + # NOTE: 4, 10, 5 - after 4 there's NONE in fifo so we do next element (round robin style) + # NOTE: 6, 8, 7 - after 6 there's NONE - same thing + assert [pi.item for pi in _l] == [1, 12, 14, 2, 13, 15, 3, 4, 10, 5, 6, 8, 7, 9, 11] def test_rotation_on_none() -> None: @@ -715,13 +717,14 @@ async def long_gen(): yield i close_pipe_yielding = False # we have a different exception here - except asyncio.CancelledError: + except GeneratorExit: close_pipe_got_exit = True - def raise_gen(item: int): + # execute in a thread + async def raise_gen(item: int): if item == 10: - raise RuntimeError("we fail") - yield item + raise RuntimeError("we fail async") + return item assert_pipes_closed(raise_gen, long_gen) diff --git a/tests/pipeline/test_resources_evaluation.py b/tests/pipeline/test_resources_evaluation.py index f61b4b374a..7f0a7890a7 100644 --- a/tests/pipeline/test_resources_evaluation.py +++ b/tests/pipeline/test_resources_evaluation.py @@ -17,7 +17,7 @@ def __aiter__(self): async def __anext__(self): # check for no further items if self.counter >= 5: - raise StopAsyncIteration + raise StopAsyncIteration() # increment the counter self.counter += 1 # simulate work From 9446e297e751fb489e555b89f2e5657f04145ea9 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 29 Jan 2024 17:51:37 +0100 Subject: [PATCH 22/27] update performance docs --- docs/website/docs/reference/performance.md | 17 +++++++++++++---- .../performance-snippets.py | 11 ++++++++++- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index f1a405684f..af2d791324 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -139,9 +139,9 @@ PROGRESS=log python pipeline_script.py You can create pipelines that extract, normalize and load data in parallel. ### Extract -You can extract data concurrently if you write your pipelines to yield callables or awaitables that can be then evaluated in a thread or futures pool respectively. +You can extract data concurrently if you write your pipelines to yield callables or awaitables or use async generators for your resources that can be then evaluated in a thread or futures pool respectively. -Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool. +The example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool. ```py import dlt @@ -185,11 +185,20 @@ workers=4 -The example below does the same but using an async/await and futures pool: +The example below does the same but using an async generator as the main resource and async/await and futures pool for the transformer: ```py import asyncio +@dlt.resource +async def a_list_items(start, limit): + # simulate a slow REST API where you wait 0.3 sec for each item + index = start + while index < start + limit: + await asyncio.sleep(0.3) + yield index + index += 1 + @dlt.transformer async def a_get_details(item_id): # simulate a slow REST API where you wait 0.3 sec for each item @@ -198,7 +207,7 @@ async def a_get_details(item_id): # just return the results, if you yield, generator will be evaluated in main thread return {"row": item_id} -print(list(list_items(0, 10) | a_get_details)) +print(list(a_list_items(0, 10) | a_get_details)) ``` diff --git a/docs/website/docs/reference/performance_snippets/performance-snippets.py b/docs/website/docs/reference/performance_snippets/performance-snippets.py index a6ad2f2618..a2ebd102a6 100644 --- a/docs/website/docs/reference/performance_snippets/performance-snippets.py +++ b/docs/website/docs/reference/performance_snippets/performance-snippets.py @@ -68,6 +68,15 @@ def get_details(item_id): # @@@DLT_SNIPPET_START parallel_extract_awaitables import asyncio + @dlt.resource + async def a_list_items(start, limit): + # simulate a slow REST API where you wait 0.3 sec for each item + index = start + while index < start + limit: + await asyncio.sleep(0.3) + yield index + index += 1 + @dlt.transformer async def a_get_details(item_id): # simulate a slow REST API where you wait 0.3 sec for each item @@ -76,7 +85,7 @@ async def a_get_details(item_id): # just return the results, if you yield, generator will be evaluated in main thread return {"row": item_id} - print(list(list_items(0, 10) | a_get_details)) + print(list(a_list_items(0, 10) | a_get_details)) # @@@DLT_SNIPPET_END parallel_extract_awaitables From d830086470f8d3e68e3196b98eae5528bda558d2 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 29 Jan 2024 17:51:46 +0100 Subject: [PATCH 23/27] unrelated formatting fixes --- docs/examples/chess_production/chess.py | 24 ++++++++++--------- docs/examples/connector_x_arrow/load_arrow.py | 10 ++++---- docs/examples/google_sheets/google_sheets.py | 5 +--- docs/examples/incremental_loading/zendesk.py | 8 +++---- docs/examples/nested_data/nested_data.py | 2 -- .../pdf_to_weaviate/pdf_to_weaviate.py | 11 +++++---- docs/examples/qdrant_zendesk/qdrant.py | 9 ++++--- docs/examples/transformers/pokemon.py | 4 +--- .../docs/examples/chess_production/index.md | 12 +++++----- .../docs/examples/connector_x_arrow/index.md | 8 +++---- .../docs/examples/pdf_to_weaviate/index.md | 13 +++------- .../website/docs/general-usage/destination.md | 12 ++++++++-- docs/website/docs/intro.md | 13 +++++----- .../docs/tutorial/load-data-from-an-api.md | 24 +++++++++---------- docs/website/package-lock.json | 5 +--- 15 files changed, 76 insertions(+), 84 deletions(-) diff --git a/docs/examples/chess_production/chess.py b/docs/examples/chess_production/chess.py index 5b767f0eb6..2e85805781 100644 --- a/docs/examples/chess_production/chess.py +++ b/docs/examples/chess_production/chess.py @@ -6,7 +6,6 @@ from dlt.common.typing import StrAny, TDataItems from dlt.sources.helpers.requests import client - @dlt.source def chess( chess_url: str = dlt.config.value, @@ -60,7 +59,6 @@ def players_games(username: Any) -> Iterator[TDataItems]: MAX_PLAYERS = 5 - def load_data_with_retry(pipeline, data): try: for attempt in Retrying( @@ -70,16 +68,18 @@ def load_data_with_retry(pipeline, data): reraise=True, ): with attempt: - logger.info(f"Running the pipeline, attempt={attempt.retry_state.attempt_number}") + logger.info( + f"Running the pipeline, attempt={attempt.retry_state.attempt_number}" + ) load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" - ) + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification @@ -92,7 +92,9 @@ def load_data_with_retry(pipeline, data): # print the information on the first load package and all jobs inside logger.info(f"First load package info: {load_info.load_packages[0]}") # print the information on the first completed job in first load package - logger.info(f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}") + logger.info( + f"First completed job info: {load_info.load_packages[0].jobs['completed_jobs'][0]}" + ) # check for schema updates: schema_updates = [p.schema_update for p in load_info.load_packages] @@ -150,4 +152,4 @@ def load_data_with_retry(pipeline, data): ) # get data for a few famous players data = chess(chess_url="https://api.chess.com/pub/", max_players=MAX_PLAYERS) - load_data_with_retry(pipeline, data) + load_data_with_retry(pipeline, data) \ No newline at end of file diff --git a/docs/examples/connector_x_arrow/load_arrow.py b/docs/examples/connector_x_arrow/load_arrow.py index b3c654cef9..24ba2acb0e 100644 --- a/docs/examples/connector_x_arrow/load_arrow.py +++ b/docs/examples/connector_x_arrow/load_arrow.py @@ -3,7 +3,6 @@ import dlt from dlt.sources.credentials import ConnectionStringCredentials - def read_sql_x( conn_str: ConnectionStringCredentials = dlt.secrets.value, query: str = dlt.config.value, @@ -15,17 +14,16 @@ def read_sql_x( protocol="binary", ) - def genome_resource(): # create genome resource with merge on `upid` primary key genome = dlt.resource( - name="genome", + name="acanthochromis_polyacanthus", write_disposition="merge", - primary_key="upid", + primary_key="analysis_id", standalone=True, )(read_sql_x)( - "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] - "SELECT * FROM genome ORDER BY created LIMIT 1000", + "mysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type] + "SELECT * FROM analysis LIMIT 20", ) # add incremental on created at genome.apply_hints(incremental=dlt.sources.incremental("created")) diff --git a/docs/examples/google_sheets/google_sheets.py b/docs/examples/google_sheets/google_sheets.py index 1ba330e4ca..8a93df9970 100644 --- a/docs/examples/google_sheets/google_sheets.py +++ b/docs/examples/google_sheets/google_sheets.py @@ -9,7 +9,6 @@ ) from dlt.common.typing import DictStrAny, StrAny - def _initialize_sheets( credentials: Union[GcpOAuthCredentials, GcpServiceAccountCredentials] ) -> Any: @@ -17,7 +16,6 @@ def _initialize_sheets( service = build("sheets", "v4", credentials=credentials.to_native_credentials()) return service - @dlt.source def google_spreadsheet( spreadsheet_id: str, @@ -57,7 +55,6 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: for name in sheet_names ] - if __name__ == "__main__": pipeline = dlt.pipeline(destination="duckdb") # see example.secrets.toml to where to put credentials @@ -70,4 +67,4 @@ def get_sheet(sheet_name: str) -> Iterator[DictStrAny]: sheet_names=range_names, ) ) - print(info) + print(info) \ No newline at end of file diff --git a/docs/examples/incremental_loading/zendesk.py b/docs/examples/incremental_loading/zendesk.py index 6113f98793..4b8597886a 100644 --- a/docs/examples/incremental_loading/zendesk.py +++ b/docs/examples/incremental_loading/zendesk.py @@ -6,11 +6,12 @@ from dlt.common.typing import TAnyDateTime from dlt.sources.helpers.requests import client - @dlt.source(max_table_nesting=2) def zendesk_support( credentials: Dict[str, str] = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008 + start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008 + year=2000, month=1, day=1 + ), end_date: Optional[TAnyDateTime] = None, ): """ @@ -112,7 +113,6 @@ def get_pages( if not response_json["end_of_stream"]: get_url = response_json["next_page"] - if __name__ == "__main__": # create dlt pipeline pipeline = dlt.pipeline( @@ -120,4 +120,4 @@ def get_pages( ) load_info = pipeline.run(zendesk_support()) - print(load_info) + print(load_info) \ No newline at end of file diff --git a/docs/examples/nested_data/nested_data.py b/docs/examples/nested_data/nested_data.py index 7f85f0522e..3464448de6 100644 --- a/docs/examples/nested_data/nested_data.py +++ b/docs/examples/nested_data/nested_data.py @@ -13,7 +13,6 @@ CHUNK_SIZE = 10000 - # You can limit how deep dlt goes when generating child tables. # By default, the library will descend and generate child tables # for all nested lists, without a limit. @@ -82,7 +81,6 @@ def load_documents(self) -> Iterator[TDataItem]: while docs_slice := list(islice(cursor, CHUNK_SIZE)): yield map_nested_in_place(convert_mongo_objs, docs_slice) - def convert_mongo_objs(value: Any) -> Any: if isinstance(value, (ObjectId, Decimal128)): return str(value) diff --git a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py index fecd842214..8f7833e7d7 100644 --- a/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py +++ b/docs/examples/pdf_to_weaviate/pdf_to_weaviate.py @@ -4,14 +4,16 @@ from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader - @dlt.resource(selected=False) def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) - yield {"file_name": filename, "file_path": file_path, "mtime": os.path.getmtime(file_path)} - + yield { + "file_name": filename, + "file_path": file_path, + "mtime": os.path.getmtime(file_path), + } @dlt.transformer(primary_key="page_id", write_disposition="merge") def pdf_to_text(file_item, separate_pages: bool = False): @@ -26,7 +28,6 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item - pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" @@ -50,4 +51,4 @@ def pdf_to_text(file_item, separate_pages: bool = False): client = weaviate.Client("http://localhost:8080") # get text of all the invoices in InvoiceText class we just created above -print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) +print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do()) \ No newline at end of file diff --git a/docs/examples/qdrant_zendesk/qdrant.py b/docs/examples/qdrant_zendesk/qdrant.py index bd0cbafc99..300d8dc6ad 100644 --- a/docs/examples/qdrant_zendesk/qdrant.py +++ b/docs/examples/qdrant_zendesk/qdrant.py @@ -10,12 +10,13 @@ from dlt.common.configuration.inject import with_config - # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk @dlt.source(max_table_nesting=2) def zendesk_support( credentials: Dict[str, str] = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = pendulum.datetime(year=2000, month=1, day=1), # noqa: B008 + start_date: Optional[TAnyDateTime] = pendulum.datetime( # noqa: B008 + year=2000, month=1, day=1 + ), end_date: Optional[TAnyDateTime] = None, ): """ @@ -79,7 +80,6 @@ def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]: return None return ensure_pendulum_datetime(value) - # modify dates to return datetime objects instead def _fix_date(ticket): ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"]) @@ -87,7 +87,6 @@ def _fix_date(ticket): ticket["due_at"] = _parse_date_or_none(ticket["due_at"]) return ticket - # function from: https://github.com/dlt-hub/verified-sources/tree/master/sources/zendesk def get_pages( url: str, @@ -128,7 +127,6 @@ def get_pages( if not response_json["end_of_stream"]: get_url = response_json["next_page"] - if __name__ == "__main__": # create a pipeline with an appropriate name pipeline = dlt.pipeline( @@ -148,6 +146,7 @@ def get_pages( print(load_info) + # running the Qdrant client to connect to your Qdrant database @with_config(sections=("destination", "qdrant", "credentials")) diff --git a/docs/examples/transformers/pokemon.py b/docs/examples/transformers/pokemon.py index 97b9a98b11..c17beff6a8 100644 --- a/docs/examples/transformers/pokemon.py +++ b/docs/examples/transformers/pokemon.py @@ -1,7 +1,6 @@ import dlt from dlt.sources.helpers import requests - @dlt.source(max_table_nesting=2) def source(pokemon_api_url: str): """""" @@ -47,7 +46,6 @@ def species(pokemon_details): return (pokemon_list | pokemon, pokemon_list | pokemon | species) - if __name__ == "__main__": # build duck db pipeline pipeline = dlt.pipeline( @@ -56,4 +54,4 @@ def species(pokemon_details): # the pokemon_list resource does not need to be loaded load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon")) - print(load_info) + print(load_info) \ No newline at end of file diff --git a/docs/website/docs/examples/chess_production/index.md b/docs/website/docs/examples/chess_production/index.md index b812e47ef8..f372d26d80 100644 --- a/docs/website/docs/examples/chess_production/index.md +++ b/docs/website/docs/examples/chess_production/index.md @@ -110,12 +110,12 @@ def load_data_with_retry(pipeline, data): load_info = pipeline.run(data) logger.info(str(load_info)) - # raise on failed jobs - load_info.raise_on_failed_jobs() - # send notification - send_slack_message( - pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" - ) + # raise on failed jobs + load_info.raise_on_failed_jobs() + # send notification + send_slack_message( + pipeline.runtime_config.slack_incoming_hook, "Data was successfully loaded!" + ) except Exception: # we get here after all the failed retries # send notification diff --git a/docs/website/docs/examples/connector_x_arrow/index.md b/docs/website/docs/examples/connector_x_arrow/index.md index 6702b8bbef..92941e1988 100644 --- a/docs/website/docs/examples/connector_x_arrow/index.md +++ b/docs/website/docs/examples/connector_x_arrow/index.md @@ -56,13 +56,13 @@ def read_sql_x( def genome_resource(): # create genome resource with merge on `upid` primary key genome = dlt.resource( - name="genome", + name="acanthochromis_polyacanthus", write_disposition="merge", - primary_key="upid", + primary_key="analysis_id", standalone=True, )(read_sql_x)( - "mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type] - "SELECT * FROM genome ORDER BY created LIMIT 1000", + "mysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1", # type: ignore[arg-type] + "SELECT * FROM analysis LIMIT 20", ) # add incremental on created at genome.apply_hints(incremental=dlt.sources.incremental("created")) diff --git a/docs/website/docs/examples/pdf_to_weaviate/index.md b/docs/website/docs/examples/pdf_to_weaviate/index.md index c67f1f9253..cc2ef01e33 100644 --- a/docs/website/docs/examples/pdf_to_weaviate/index.md +++ b/docs/website/docs/examples/pdf_to_weaviate/index.md @@ -28,7 +28,6 @@ import dlt from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader - @dlt.resource(selected=False) def list_files(folder_path: str): folder_path = os.path.abspath(folder_path) @@ -37,10 +36,9 @@ def list_files(folder_path: str): yield { "file_name": filename, "file_path": file_path, - "mtime": os.path.getmtime(file_path) + "mtime": os.path.getmtime(file_path), } - @dlt.transformer(primary_key="page_id", write_disposition="merge") def pdf_to_text(file_item, separate_pages: bool = False): if not separate_pages: @@ -54,10 +52,7 @@ def pdf_to_text(file_item, separate_pages: bool = False): page_item["page_id"] = file_item["file_name"] + "_" + str(page_no) yield page_item -pipeline = dlt.pipeline( - pipeline_name='pdf_to_text', - destination='weaviate' -) +pipeline = dlt.pipeline(pipeline_name="pdf_to_text", destination="weaviate") # this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf" # (3) sends them to pdf_to_text transformer with pipe (|) operator @@ -70,9 +65,7 @@ pdf_pipeline = list_files("assets/invoices").add_filter( pdf_pipeline.table_name = "InvoiceText" # use weaviate_adapter to tell destination to vectorize "text" column -load_info = pipeline.run( - weaviate_adapter(pdf_pipeline, vectorize="text") -) +load_info = pipeline.run(weaviate_adapter(pdf_pipeline, vectorize="text")) row_counts = pipeline.last_trace.last_normalize_info print(row_counts) print("------") diff --git a/docs/website/docs/general-usage/destination.md b/docs/website/docs/general-usage/destination.md index be3f8d8296..c20aa62d16 100644 --- a/docs/website/docs/general-usage/destination.md +++ b/docs/website/docs/general-usage/destination.md @@ -27,6 +27,7 @@ Above we want to use **filesystem** built-in destination. You can use shorthand ```py import dlt + pipeline = dlt.pipeline("pipeline", destination="dlt.destinations.filesystem") ``` @@ -37,6 +38,7 @@ Above we use built in **filesystem** destination by providing a class type `file ```py import dlt from dlt.destinations import filesystem + pipeline = dlt.pipeline("pipeline", destination=filesystem) ``` @@ -50,6 +52,7 @@ You can instantiate **destination class** yourself to configure it explicitly. W ```py import dlt + azure_bucket = filesystem("az://dlt-azure-bucket", destination_name="production_az_bucket") pipeline = dlt.pipeline("pipeline", destination=azure_bucket) ``` @@ -99,7 +102,10 @@ import dlt from dlt.destinations import postgres # pass full credentials - together with the password (not recommended) -pipeline = dlt.pipeline("pipeline", destination=postgres(credentials="postgresql://loader:loader@localhost:5432/dlt_data")) +pipeline = dlt.pipeline( + "pipeline", + destination=postgres(credentials="postgresql://loader:loader@localhost:5432/dlt_data"), +) ``` @@ -126,7 +132,9 @@ from dlt.sources.credentials import AzureCredentials credentials = AzureCredentials() # fill only the account name, leave key to be taken from secrets credentials.azure_storage_account_name = "production_storage" -pipeline = dlt.pipeline("pipeline", destination=filesystem("az://dlt-azure-bucket", credentials=credentials)) +pipeline = dlt.pipeline( + "pipeline", destination=filesystem("az://dlt-azure-bucket", credentials=credentials) +) ``` diff --git a/docs/website/docs/intro.md b/docs/website/docs/intro.md index 5662d6302d..ba00e593a5 100644 --- a/docs/website/docs/intro.md +++ b/docs/website/docs/intro.md @@ -49,7 +49,7 @@ for player in ["magnuscarlsen", "rpragchess"]: response.raise_for_status() data.append(response.json()) # Extract, normalize, and load the data -load_info = pipeline.run(data, table_name='player') +load_info = pipeline.run(data, table_name="player") ``` @@ -143,23 +143,24 @@ from sqlalchemy import create_engine # MySQL instance to get data. # NOTE: you'll need to install pymysql with `pip install pymysql` # NOTE: loading data from public mysql instance may take several seconds -engine = create_engine("mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam") +engine = create_engine( + "mysql+pymysql://anonymous@ensembldb.ensembl.org:3306/acanthochromis_polyacanthus_core_100_1" +) with engine.connect() as conn: # Select genome table, stream data in batches of 100 elements - query = "SELECT * FROM genome LIMIT 1000" + query = "SELECT * FROM analysis LIMIT 1000" rows = conn.execution_options(yield_per=100).exec_driver_sql(query) pipeline = dlt.pipeline( pipeline_name="from_database", destination="duckdb", - dataset_name="genome_data", + dataset_name="acanthochromis_polyacanthus_data", ) # Convert the rows into dictionaries on the fly with a map function load_info = pipeline.run( - map(lambda row: dict(row._mapping), rows), - table_name="genome" + map(lambda row: dict(row._mapping), rows), table_name="acanthochromis_polyacanthus" ) print(load_info) diff --git a/docs/website/docs/tutorial/load-data-from-an-api.md b/docs/website/docs/tutorial/load-data-from-an-api.md index 4452d22d59..0491080156 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api.md +++ b/docs/website/docs/tutorial/load-data-from-an-api.md @@ -32,9 +32,9 @@ response = requests.get(url) response.raise_for_status() pipeline = dlt.pipeline( - pipeline_name='github_issues', - destination='duckdb', - dataset_name='github_data', + pipeline_name="github_issues", + destination="duckdb", + dataset_name="github_data", ) # The response contains a list of issues load_info = pipeline.run(response.json(), table_name="issues") @@ -152,9 +152,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_incremental', - destination='duckdb', - dataset_name='github_data_append', + pipeline_name="github_issues_incremental", + destination="duckdb", + dataset_name="github_data_append", ) load_info = pipeline.run(get_issues) @@ -212,15 +212,15 @@ from dlt.sources.helpers import requests primary_key="id", ) def get_issues( - updated_at = dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") + updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z") ): # NOTE: we read only open issues to minimize number of calls to # the API. There's a limit of ~50 calls for not authenticated # Github users url = ( - f"https://api.github.com/repos/dlt-hub/dlt/issues" + "https://api.github.com/repos/dlt-hub/dlt/issues" f"?since={updated_at.last_value}&per_page=100&sort=updated" - f"&directions=desc&state=open" + "&directions=desc&state=open" ) while True: @@ -234,9 +234,9 @@ def get_issues( url = response.links["next"]["url"] pipeline = dlt.pipeline( - pipeline_name='github_issues_merge', - destination='duckdb', - dataset_name='github_data_merge', + pipeline_name="github_issues_merge", + destination="duckdb", + dataset_name="github_data_merge", ) load_info = pipeline.run(get_issues) row_counts = pipeline.last_trace.last_normalize_info diff --git a/docs/website/package-lock.json b/docs/website/package-lock.json index 577b362611..c45374f83b 100644 --- a/docs/website/package-lock.json +++ b/docs/website/package-lock.json @@ -25,7 +25,6 @@ }, "devDependencies": { "@docusaurus/module-type-aliases": "2.4.1" - }, "engines": { "node": ">=16.14" @@ -5714,7 +5713,6 @@ "version": "16.3.1", "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.3.1.tgz", "integrity": "sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ==", - "dev": true, "engines": { "node": ">=12" }, @@ -11526,8 +11524,7 @@ "node_modules/toml": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/toml/-/toml-3.0.0.tgz", - "integrity": "sha512-y/mWCZinnvxjTKYhJ+pYxwD0mRLVvOtdS2Awbgxln6iEnt4rk0yBxeSBHkGJcPucRiG0e55mwWp+g/05rsrd6w==", - "dev": true + "integrity": "sha512-y/mWCZinnvxjTKYhJ+pYxwD0mRLVvOtdS2Awbgxln6iEnt4rk0yBxeSBHkGJcPucRiG0e55mwWp+g/05rsrd6w==" }, "node_modules/totalist": { "version": "3.0.1", From b8442314b8d7c07d673883ddf7e63094654af54b Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 29 Jan 2024 17:53:50 +0100 Subject: [PATCH 24/27] fix one test --- tests/extract/test_extract_pipe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index 95652de386..938d61da58 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -717,7 +717,7 @@ async def long_gen(): yield i close_pipe_yielding = False # we have a different exception here - except GeneratorExit: + except asyncio.CancelledError: close_pipe_got_exit = True # execute in a thread From 568a2ced002258d83c5e2fdc189d828daeeb9424 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 29 Jan 2024 17:59:22 +0100 Subject: [PATCH 25/27] small change to resource page --- docs/website/docs/general-usage/resource.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 96b10bdc86..b29aacea3b 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -8,7 +8,7 @@ keywords: [resource, api endpoint, dlt.resource] ## Declare a resource -A [resource](glossary.md#resource) is a function that yields data. To create a +A [resource](glossary.md#resource) is a ([optionally async](https://dlthub.com/docs/reference/performance#parallelism)) function that yields data. To create a resource, we add the `@dlt.resource` decorator to that function. Commonly used arguments: From 2a168e57a2cd9c2f2333acefdb382598a6166f2b Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 30 Jan 2024 10:18:41 +0100 Subject: [PATCH 26/27] fixes tests --- tests/extract/test_extract_pipe.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index 938d61da58..20b03a0c51 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -716,8 +716,7 @@ async def long_gen(): await asyncio.sleep(0.01) yield i close_pipe_yielding = False - # we have a different exception here - except asyncio.CancelledError: + except GeneratorExit: close_pipe_got_exit = True # execute in a thread From 8cf3c3cfcae069cc9044dc16fbccc5d298a66aa4 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 30 Jan 2024 11:40:21 +0100 Subject: [PATCH 27/27] change generator exit test --- tests/extract/test_extract_pipe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index 20b03a0c51..38dc8a9319 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -718,6 +718,8 @@ async def long_gen(): close_pipe_yielding = False except GeneratorExit: close_pipe_got_exit = True + except asyncio.CancelledError: + close_pipe_got_exit = True # execute in a thread async def raise_gen(item: int):