Skip to content

Commit

Permalink
migrate performance snippets
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Sep 11, 2023
1 parent 97b9d49 commit 20614f2
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 168 deletions.
3 changes: 3 additions & 0 deletions dlt/common/configuration/providers/toml.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class StringTomlProvider(BaseTomlProvider):
def __init__(self, toml_string: str) -> None:
super().__init__(StringTomlProvider.loads(toml_string))

def update(self, toml_string: str) -> None:
self._toml = self.loads(toml_string)

def dumps(self) -> str:
return tomlkit.dumps(self._toml)

Expand Down
11 changes: 0 additions & 11 deletions docs/snippets/reference/.dlt/config.toml

This file was deleted.

Empty file.
16 changes: 0 additions & 16 deletions docs/snippets/reference/parallel_load/.dlt/config.toml

This file was deleted.

Empty file.
34 changes: 0 additions & 34 deletions docs/snippets/reference/parallel_load/parallel_load.py

This file was deleted.

27 changes: 0 additions & 27 deletions docs/snippets/reference/performance_chunking.py

This file was deleted.

39 changes: 0 additions & 39 deletions docs/snippets/reference/performance_parallel_extract.py

This file was deleted.

13 changes: 0 additions & 13 deletions docs/snippets/reference/test_reference_snippets.py

This file was deleted.

3 changes: 3 additions & 0 deletions docs/website/docs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ def setup_tests(request):
dname = os.path.dirname(request.module.__file__)
config_dir = dname + "/.dlt"

# clear string toml provider
string_toml_provider.update("")

# inject provider context so the original providers are restored at the end
def _initial_providers():
return [string_toml_provider, EnvironProvider(), SecretsTomlProvider(project_dir=config_dir, add_global_config=False), ConfigTomlProvider(project_dir=config_dir, add_global_config=False)]
Expand Down
13 changes: 0 additions & 13 deletions docs/website/docs/reference/.dlt/config.toml

This file was deleted.

148 changes: 144 additions & 4 deletions docs/website/docs/reference/performance-snippets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from conftest import string_toml_provider

PARALLEL_CONFIG_TOML = """
# @@@DLT_SNIPPET_START parallel_config_toml
# the pipeline name is default source name when loading resources
chess_url="https://api.chess.com/pub/"
[sources.parallel_load.data_writer]
file_max_items=100000
Expand All @@ -17,10 +20,8 @@
# @@@DLT_SNIPPET_END parallel_config_toml
"""

from conftest import string_toml_provider

def parallel_config_snippet() -> None:
string_toml_provider.loads(PARALLEL_CONFIG_TOML)
string_toml_provider.update(PARALLEL_CONFIG_TOML)
# @@@DLT_SNIPPET_START parallel_config
import os
import dlt
Expand Down Expand Up @@ -51,4 +52,143 @@ def read_table(limit):

assert len(extracted_files) == 11
loaded_package = pipeline.get_load_package_info(load_id)
assert len(loaded_package.jobs["completed_jobs"]) == 11
assert len(loaded_package.jobs["completed_jobs"]) == 11


def parallel_extract_callables_snippet() -> None:
# @@@DLT_SNIPPET_START parallel_extract_callables
import dlt
from time import sleep
from threading import currentThread

@dlt.resource
def list_items(start, limit):
yield from range(start, start + limit)

@dlt.transformer
@dlt.defer
def get_details(item_id):
# simulate a slow REST API where you wait 0.3 sec for each item
sleep(0.3)
print(f"item_id {item_id} in thread {currentThread().name}")
# just return the results, if you yield, generator will be evaluated in main thread
return {"row": item_id}


# evaluate the pipeline and print all the items
# resources are iterators and they are evaluated in the same way in the pipeline.run
print(list(list_items(0, 10) | get_details))
# @@@DLT_SNIPPET_END parallel_extract_callables

# @@@DLT_SNIPPET_START parallel_extract_awaitables
import asyncio

@dlt.transformer
async def a_get_details(item_id):
# simulate a slow REST API where you wait 0.3 sec for each item
await asyncio.sleep(0.3)
print(f"item_id {item_id} in thread {currentThread().name}")
# just return the results, if you yield, generator will be evaluated in main thread
return {"row": item_id}


print(list(list_items(0, 10) | a_get_details))
# @@@DLT_SNIPPET_END parallel_extract_awaitables


def performance_chunking_snippet() -> None:
# @@@DLT_SNIPPET_START performance_chunking
import dlt

def get_rows(limit):
yield from map(lambda n: {"row": n}, range(limit))

@dlt.resource
def database_cursor():
# here we yield each row returned from database separately
yield from get_rows(10000)
# @@@DLT_SNIPPET_END performance_chunking

# @@@DLT_SNIPPET_START performance_chunking_chunk
from itertools import islice

@dlt.resource
def database_cursor_chunked():
# here we yield chunks of size 1000
rows = get_rows(10000)
while item_slice := list(islice(rows, 1000)):
print(f"got chunk of length {len(item_slice)}")
yield item_slice
# @@@DLT_SNIPPET_END performance_chunking_chunk

assert len(list(database_cursor())) == 10000
assert len(list(database_cursor_chunked())) == 10000


def test_toml_snippets() -> None:
BUFFER_TOML = """
# @@@DLT_SNIPPET_START buffer_toml
# set buffer size for extract and normalize stages
[data_writer]
buffer_max_items=100
# set buffers only in extract stage - for all sources
[sources.data_writer]
buffer_max_items=100
# set buffers only for a source with name zendesk_support
[sources.zendesk_support.data_writer]
buffer_max_items=100
# set buffers in normalize stage
[normalize.data_writer]
buffer_max_items=100
# @@@DLT_SNIPPET_END buffer_toml
"""
string_toml_provider.update(BUFFER_TOML)

FILE_SIZE_TOML = """
# @@@DLT_SNIPPET_START file_size_toml
# extract and normalize stages
[data_writer]
file_max_items=100000
max_file_size=1000000
# only for the extract stage - for all sources
[sources.data_writer]
file_max_items=100000
max_file_size=1000000
# only for the extract stage of a source with name zendesk_support
[sources.zendesk_support.data_writer]
file_max_items=100000
max_file_size=1000000
# only for the normalize stage
[normalize.data_writer]
file_max_items=100000
max_file_size=1000000
# @@@DLT_SNIPPET_END file_size_toml
"""
string_toml_provider.update(FILE_SIZE_TOML)

WORKERS_TOML = """
# @@@DLT_SNIPPET_START extract_workers_toml
# for all sources and resources being extracted
[extract]
worker=1
# for all resources in the zendesk_support source
[sources.zendesk_support.extract]
workers=2
# for the tickets resource in the zendesk_support source
[sources.zendesk_support.tickets.extract]
workers=4
# @@@DLT_SNIPPET_END extract_workers_toml
"""
string_toml_provider.update(WORKERS_TOML)




Loading

0 comments on commit 20614f2

Please sign in to comment.