diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 4765b989fc..011ef5df9c 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -373,7 +373,7 @@ def _verify_head_step(self, step: TPipeStep) -> None: # first element must be Iterable, Iterator or Callable in resource pipe if not isinstance(step, (Iterable, Iterator, AsyncIterator)) and not callable(step): raise CreatePipeException( - self.name, "A head of a resource pipe must be Iterable, Iterator or a Callable" + self.name, "A head of a resource pipe must be Iterable, Iterator, AsyncIterator or a Callable" ) def _wrap_transform_step_meta(self, step_no: int, step: TPipeStep) -> TPipeStep: diff --git a/pipeline.py b/pipeline.py deleted file mode 100644 index 5ac8ca2da3..0000000000 --- a/pipeline.py +++ /dev/null @@ -1,22 +0,0 @@ - -import dlt, asyncio - -@dlt.resource(table_name="hello") -async def async_gen_resource(idx): - for l in ["a", "b", "c"] * 3: - await asyncio.sleep(0.1) - yield {"async_gen": idx, "letter": l} - -pipeline_1 = dlt.pipeline("pipeline_1", destination="duckdb", full_refresh=True) -pipeline_1.run( - async_gen_resource(10), table_name="hello" -) -with pipeline_1.sql_client() as c: - with c.execute_query("SELECT * FROM hello") as cur: - rows = list(cur.fetchall()) - for r in rows: - print(r) - -# pipeline_1.run( -# async_gen_resource(11) -# ) \ No newline at end of file diff --git a/test.py b/test.py deleted file mode 100644 index 643595cd5e..0000000000 --- a/test.py +++ /dev/null @@ -1,24 +0,0 @@ - -import asyncio, inspect -from typing import Awaitable -from asyncio import Future - -async def async_gen_resource(): - for l in ["a", "b", "c"]: - # await asyncio.sleep(0.1) - yield {"async_gen": 1, "letter": l} - - -async def run() -> None: - gen = async_gen_resource() - result = [] - try: - while item := await gen.__anext__(): - result.append(item)# - except StopAsyncIteration: - return [result] - - -if __name__ == "__main__": - loop = asyncio.get_event_loop() - print(loop.run_until_complete(run())) \ No newline at end of file