diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index f1a405684f..af2d791324 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -139,9 +139,9 @@ PROGRESS=log python pipeline_script.py You can create pipelines that extract, normalize and load data in parallel. ### Extract -You can extract data concurrently if you write your pipelines to yield callables or awaitables that can be then evaluated in a thread or futures pool respectively. +You can extract data concurrently if you write your pipelines to yield callables or awaitables or use async generators for your resources that can be then evaluated in a thread or futures pool respectively. -Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool. +The example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool. ```py import dlt @@ -185,11 +185,20 @@ workers=4 -The example below does the same but using an async/await and futures pool: +The example below does the same but using an async generator as the main resource and async/await and futures pool for the transformer: ```py import asyncio +@dlt.resource +async def a_list_items(start, limit): + # simulate a slow REST API where you wait 0.3 sec for each item + index = start + while index < start + limit: + await asyncio.sleep(0.3) + yield index + index += 1 + @dlt.transformer async def a_get_details(item_id): # simulate a slow REST API where you wait 0.3 sec for each item @@ -198,7 +207,7 @@ async def a_get_details(item_id): # just return the results, if you yield, generator will be evaluated in main thread return {"row": item_id} -print(list(list_items(0, 10) | a_get_details)) +print(list(a_list_items(0, 10) | a_get_details)) ``` diff --git a/docs/website/docs/reference/performance_snippets/performance-snippets.py b/docs/website/docs/reference/performance_snippets/performance-snippets.py index a6ad2f2618..a2ebd102a6 100644 --- a/docs/website/docs/reference/performance_snippets/performance-snippets.py +++ b/docs/website/docs/reference/performance_snippets/performance-snippets.py @@ -68,6 +68,15 @@ def get_details(item_id): # @@@DLT_SNIPPET_START parallel_extract_awaitables import asyncio + @dlt.resource + async def a_list_items(start, limit): + # simulate a slow REST API where you wait 0.3 sec for each item + index = start + while index < start + limit: + await asyncio.sleep(0.3) + yield index + index += 1 + @dlt.transformer async def a_get_details(item_id): # simulate a slow REST API where you wait 0.3 sec for each item @@ -76,7 +85,7 @@ async def a_get_details(item_id): # just return the results, if you yield, generator will be evaluated in main thread return {"row": item_id} - print(list(list_items(0, 10) | a_get_details)) + print(list(a_list_items(0, 10) | a_get_details)) # @@@DLT_SNIPPET_END parallel_extract_awaitables