Skip to content

Commit

Permalink
update performance docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jan 29, 2024
1 parent 4a61e60 commit 9446e29
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
17 changes: 13 additions & 4 deletions docs/website/docs/reference/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ PROGRESS=log python pipeline_script.py
You can create pipelines that extract, normalize and load data in parallel.

### Extract
You can extract data concurrently if you write your pipelines to yield callables or awaitables that can be then evaluated in a thread or futures pool respectively.
You can extract data concurrently if you write your pipelines to yield callables or awaitables or use async generators for your resources that can be then evaluated in a thread or futures pool respectively.

Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool.
The example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool.
<!--@@@DLT_SNIPPET_START ./performance_snippets/performance-snippets.py::parallel_extract_callables-->
```py
import dlt
Expand Down Expand Up @@ -185,11 +185,20 @@ workers=4
<!--@@@DLT_SNIPPET_END ./performance_snippets/toml-snippets.toml::extract_workers_toml-->


The example below does the same but using an async/await and futures pool:
The example below does the same but using an async generator as the main resource and async/await and futures pool for the transformer:
<!--@@@DLT_SNIPPET_START ./performance_snippets/performance-snippets.py::parallel_extract_awaitables-->
```py
import asyncio

@dlt.resource
async def a_list_items(start, limit):
# simulate a slow REST API where you wait 0.3 sec for each item
index = start
while index < start + limit:
await asyncio.sleep(0.3)
yield index
index += 1

@dlt.transformer
async def a_get_details(item_id):
# simulate a slow REST API where you wait 0.3 sec for each item
Expand All @@ -198,7 +207,7 @@ async def a_get_details(item_id):
# just return the results, if you yield, generator will be evaluated in main thread
return {"row": item_id}

print(list(list_items(0, 10) | a_get_details))
print(list(a_list_items(0, 10) | a_get_details))
```
<!--@@@DLT_SNIPPET_END ./performance_snippets/performance-snippets.py::parallel_extract_awaitables-->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ def get_details(item_id):
# @@@DLT_SNIPPET_START parallel_extract_awaitables
import asyncio

@dlt.resource
async def a_list_items(start, limit):
# simulate a slow REST API where you wait 0.3 sec for each item
index = start
while index < start + limit:
await asyncio.sleep(0.3)
yield index
index += 1

@dlt.transformer
async def a_get_details(item_id):
# simulate a slow REST API where you wait 0.3 sec for each item
Expand All @@ -76,7 +85,7 @@ async def a_get_details(item_id):
# just return the results, if you yield, generator will be evaluated in main thread
return {"row": item_id}

print(list(list_items(0, 10) | a_get_details))
print(list(a_list_items(0, 10) | a_get_details))
# @@@DLT_SNIPPET_END parallel_extract_awaitables


Expand Down

0 comments on commit 9446e29

Please sign in to comment.