Skip to content

Commit

Permalink
Merge pull request #619 from dlt-hub/rfix/updates-performance-docs
Browse files Browse the repository at this point in the history
updates performance docs
  • Loading branch information
rudolfix authored Sep 11, 2023
2 parents 1c42034 + c5baaab commit e851156
Show file tree
Hide file tree
Showing 19 changed files with 474 additions and 52 deletions.
2 changes: 1 addition & 1 deletion check-package.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ while IFS= read -r d; do
error="yes"
fi
fi
done < <(find . -mindepth 1 -not -path "./docs/website*" -type d -regex "^./[^.^_].*" '!' -exec test -e "{}/__init__.py" ';' -print)
done < <(find . -mindepth 1 -not -path "./docs/website/node_modules*" -type d -regex "^./[^.^_].*" '!' -exec test -e "{}/__init__.py" ';' -print)

if [ -z $error ]; then
exit 0
Expand Down
27 changes: 17 additions & 10 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import threading
from functools import wraps
from typing import Callable, Dict, Type, Any, Optional, Tuple, TypeVar, overload
from inspect import Signature, Parameter
Expand All @@ -14,6 +15,7 @@
_ORIGINAL_ARGS = "_dlt_orig_args"
# keep a registry of all the decorated functions
_FUNC_SPECS: Dict[int, Type[BaseConfiguration]] = {}
_RESOLVE_LOCK = threading.Lock()

TConfiguration = TypeVar("TConfiguration", bound=BaseConfiguration)

Expand Down Expand Up @@ -84,7 +86,6 @@ def decorator(f: TFun) -> TFun:
kwargs_arg = next((p for p in sig.parameters.values() if p.kind == Parameter.VAR_KEYWORD), None)
spec_arg: Parameter = None
pipeline_name_arg: Parameter = None
section_context = ConfigSectionContext(sections=sections, merge_style=sections_merge_style)

if spec is None:
SPEC = spec_from_signature(f, sig, include_defaults)
Expand Down Expand Up @@ -117,22 +118,28 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
config = last_config(**kwargs)
else:
# if section derivation function was provided then call it
nonlocal sections
if section_f:
section_context.sections = (section_f(bound_args.arguments), )
# sections may be a string
if isinstance(sections, str):
section_context.sections = (sections,)
curr_sections: Tuple[str, ...] = (section_f(bound_args.arguments), )
# sections may be a string
elif isinstance(sections, str):
curr_sections = (sections,)
else:
curr_sections = sections

# if one of arguments is spec the use it as initial value
if spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
section_context.pipeline_name = bound_args.arguments.get(pipeline_name_arg.name, pipeline_name_arg_default)
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments)
curr_pipeline_name = bound_args.arguments.get(pipeline_name_arg.name, pipeline_name_arg_default)
else:
curr_pipeline_name = None
section_context = ConfigSectionContext(pipeline_name=curr_pipeline_name, sections=curr_sections, merge_style=sections_merge_style)
# this may be called from many threads so make sure context is not mangled
with _RESOLVE_LOCK:
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments)
resolved_params = dict(config)
# overwrite or add resolved params
for p in sig.parameters.values():
Expand Down
12 changes: 12 additions & 0 deletions dlt/common/storages/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,15 @@ def open_zipsafe_ro(path: str, mode: str = "r", **kwargs: Any) -> IO[Any]:
return cast(IO[Any], f)
except (gzip.BadGzipFile, OSError):
return open(path, origmode, encoding=encoding, **kwargs)


@staticmethod
def is_gzipped(path: str) -> bool:
"""Checks if file under path is gzipped by reading a header"""
try:
with gzip.open(path, "rt", encoding="utf-8") as f:
# Force gzip to read the first few bytes and check the magic number
f.read(2)
return True
except (gzip.BadGzipFile, OSError):
return False
10 changes: 10 additions & 0 deletions dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,13 @@ def extend_list_deduplicated(original_list: List[Any], extending_list: Iterable[
if item not in list_keys:
original_list.append(item)
return original_list


@contextmanager
def maybe_context(manager: ContextManager[TAny]) -> Iterator[TAny]:
"""Allows context manager `manager` to be None by creating dummy context. Otherwise `manager` is used"""
if manager is None:
yield None
else:
with manager as ctx:
yield ctx
21 changes: 18 additions & 3 deletions dlt/destinations/duckdb/duck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
from typing import ClassVar, Dict, Optional

from dlt.common.destination import DestinationCapabilitiesContext
Expand All @@ -6,6 +7,7 @@
from dlt.common.destination.reference import LoadJob, FollowupJob, TLoadJobState
from dlt.common.schema.typing import TTableSchema
from dlt.common.storages.file_storage import FileStorage
from dlt.common.utils import maybe_context

from dlt.destinations.insert_job_client import InsertValuesJobClient

Expand Down Expand Up @@ -44,21 +46,34 @@
"unique": "UNIQUE"
}

# duckdb cannot load PARQUET to the same table in parallel. so serialize it per table
PARQUET_TABLE_LOCK = threading.Lock()
TABLES_LOCKS: Dict[str, threading.Lock] = {}


class DuckDbCopyJob(LoadJob, FollowupJob):
def __init__(self, table_name: str, file_path: str, sql_client: DuckDbSqlClient) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

qualified_table_name = sql_client.make_qualified_table_name(table_name)
if file_path.endswith("parquet"):
source_format = "PARQUET"
options = ""
# lock when creating a new lock
with PARQUET_TABLE_LOCK:
# create or get lock per table name
lock: threading.Lock = TABLES_LOCKS.setdefault(qualified_table_name, threading.Lock())
elif file_path.endswith("jsonl"):
# NOTE: loading JSON does not work in practice on duckdb: the missing keys fail the load instead of being interpreted as NULL
source_format = "JSON" # newline delimited, compression auto
options = ", COMPRESSION GZIP" if FileStorage.is_gzipped(file_path) else ""
lock = None
else:
raise ValueError(file_path)
qualified_table_name = sql_client.make_qualified_table_name(table_name)
with sql_client.begin_transaction():
sql_client.execute_sql(f"COPY {qualified_table_name} FROM '{file_path}' ( FORMAT {source_format} );")

with maybe_context(lock):
with sql_client.begin_transaction():
sql_client.execute_sql(f"COPY {qualified_table_name} FROM '{file_path}' ( FORMAT {source_format} {options});")


def state(self) -> TLoadJobState:
Expand Down
8 changes: 7 additions & 1 deletion dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ def __next__(self) -> PipeItem:
if isinstance(item, Awaitable) or callable(item):
# do we have a free slot or one of the slots is done?
if len(self._futures) < self.max_parallel_items or self._next_future() >= 0:
# check if Awaitable first - awaitable can also be a callable
if isinstance(item, Awaitable):
future = asyncio.run_coroutine_threadsafe(item, self._ensure_async_pool())
elif callable(item):
Expand Down Expand Up @@ -631,7 +632,12 @@ def start_background_loop(loop: asyncio.AbstractEventLoop) -> None:
loop.run_forever()

self._async_pool = asyncio.new_event_loop()
self._async_pool_thread = Thread(target=start_background_loop, args=(self._async_pool,), daemon=True)
self._async_pool_thread = Thread(
target=start_background_loop,
args=(self._async_pool,),
daemon=True,
name="DltFuturesThread"
)
self._async_pool_thread.start()

# start or return async pool
Expand Down
3 changes: 3 additions & 0 deletions dlt/normalize/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class NormalizeConfiguration(PoolRunnerConfiguration):
_normalize_storage_config: NormalizeStorageConfiguration
_load_storage_config: LoadStorageConfiguration

def on_resolved(self) -> None:
self.pool_type = "none" if self.workers == 1 else "process"

if TYPE_CHECKING:
def __init__(
self,
Expand Down
5 changes: 2 additions & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ def extract(
@with_config_section((known_sections.NORMALIZE,))
def normalize(self, workers: int = 1, loader_file_format: TLoaderFileFormat = None) -> NormalizeInfo:
"""Normalizes the data prepared with `extract` method, infers the schema and creates load packages for the `load` method. Requires `destination` to be known."""
if is_interactive() and workers > 1:
raise NotImplementedError("Do not use normalize workers in interactive mode ie. in notebook")
if is_interactive():
workers = 1
if loader_file_format and loader_file_format in INTERNAL_LOADER_FILE_FORMATS:
raise ValueError(f"{loader_file_format} is one of internal dlt file formats.")
# check if any schema is present, if not then no data was extracted
Expand All @@ -310,7 +310,6 @@ def normalize(self, workers: int = 1, loader_file_format: TLoaderFileFormat = No
# create default normalize config
normalize_config = NormalizeConfiguration(
workers=workers,
pool_type="none" if workers == 1 else "process",
_schema_storage_config=self._schema_storage_config,
_normalize_storage_config=self._normalize_storage_config,
_load_storage_config=self._load_storage_config
Expand Down
11 changes: 11 additions & 0 deletions docs/snippets/reference/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# [extract]
# max_parallel_items=1

[sources.extract]
max_parallel_items=1

# [sources.performance_parallel_extract.extract]
# workers=2

[sources.performance_parallel_extract.get_details.extract]
workers=2
Empty file.
16 changes: 16 additions & 0 deletions docs/snippets/reference/parallel_load/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# [sources.data_writer]
# file_max_items=20000

# pipeline name is default source name when loading resources
[sources.parallel_load.data_writer]
file_max_items=100000

[normalize]
workers=3

[normalize.data_writer]
disable_compression=false
file_max_items=100000

[load]
workers=11
Empty file.
34 changes: 34 additions & 0 deletions docs/snippets/reference/parallel_load/parallel_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os
os.environ["DLT_PROJECT_DIR"] = os.path.dirname(__file__)
# @@@SNIPSTART parallel_config_example
import os
import dlt
from itertools import islice
from dlt.common import pendulum

@dlt.resource(name="table")
def read_table(limit):
rows = iter(range(limit))
while item_slice := list(islice(rows, 1000)):
now = pendulum.now().isoformat()
yield [{"row": _id, "description": "this is row with id {_id}", "timestamp": now} for _id in item_slice]


# this prevents process pool to run the initialization code again
if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ:
pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True)
pipeline.extract(read_table(1000000))
# we should have 11 files (10 pieces for `table` and 1 for state)
extracted_files = pipeline.list_extracted_resources()
print(extracted_files)
# normalize and print counts
print(pipeline.normalize(loader_file_format="jsonl"))
# print jobs in load package (10 + 1 as above)
load_id = pipeline.list_normalized_load_packages()[0]
print(pipeline.get_load_package_info(load_id))
print(pipeline.load())
# @@@SNIPEND

assert len(extracted_files) == 11
loaded_package = pipeline.get_load_package_info(load_id)
assert len(loaded_package.jobs["completed_jobs"]) == 11
27 changes: 27 additions & 0 deletions docs/snippets/reference/performance_chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
os.environ["DLT_PROJECT_DIR"] = os.path.dirname(__file__)
# @@@SNIPSTART performance_chunking
import dlt

def get_rows(limit):
yield from map(lambda n: {"row": n}, range(limit))

@dlt.resource
def database_cursor():
# here we yield each row returned from database separately
yield from get_rows(10000)
# @@@SNIPEND
# @@@SNIPSTART performance_chunking_chunk
from itertools import islice

@dlt.resource
def database_cursor_chunked():
# here we yield chunks of size 1000
rows = get_rows(10000)
while item_slice := list(islice(rows, 1000)):
print(f"got chunk of length {len(item_slice)}")
yield item_slice
# @@@SNIPEND

assert len(list(database_cursor())) == 10000
assert len(list(database_cursor_chunked())) == 10000
39 changes: 39 additions & 0 deletions docs/snippets/reference/performance_parallel_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
os.environ["DLT_PROJECT_DIR"] = os.path.dirname(__file__)
# @@@SNIPSTART parallel_extract_callables
import dlt
from time import sleep
from threading import currentThread

@dlt.resource
def list_items(start, limit):
yield from range(start, start + limit)

@dlt.transformer
@dlt.defer
def get_details(item_id):
# simulate a slow REST API where you wait 0.3 sec for each item
sleep(0.3)
print(f"item_id {item_id} in thread {currentThread().name}")
# just return the results, if you yield, generator will be evaluated in main thread
return {"row": item_id}


# evaluate the pipeline and print all the items
# resources are iterators and they are evaluated in the same way in the pipeline.run
print(list(list_items(0, 10) | get_details))
# @@@SNIPEND
# @@@SNIPSTART parallel_extract_awaitables
import asyncio

@dlt.transformer
async def a_get_details(item_id):
# simulate a slow REST API where you wait 0.3 sec for each item
await asyncio.sleep(0.3)
print(f"item_id {item_id} in thread {currentThread().name}")
# just return the results, if you yield, generator will be evaluated in main thread
return {"row": item_id}


print(list(list_items(0, 10) | a_get_details))
# @@@SNIPEND
13 changes: 13 additions & 0 deletions docs/snippets/reference/test_reference_snippets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
import pytest

from tests.pipeline.utils import assert_load_info
from docs.snippets.utils import run_snippet, list_snippets

# we do not want github events to run because it consumes too much free github credits
RUN_SNIPPETS = list_snippets("reference") + ["parallel_load/parallel_load.py"]


# @pytest.mark.parametrize("snippet_name", RUN_SNIPPETS)
# def test_snippet(snippet_name: str) -> None:
# run_snippet(os.path.join("reference", snippet_name))
5 changes: 4 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/duckdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ All write dispositions are supported
You can configure the following file formats to load data to duckdb
* [insert-values](../file-formats/insert-format.md) is used by default
* [parquet](../file-formats/parquet.md) is supported
* [jsonl](../file-formats/jsonl.md) is supported but does not work in practice. the missing keys fail the COPY instead of being interpreted as NULL
:::note
`duckdb` cannot COPY many parquet files to a single table from multiple threads. In this situation `dlt` serializes the loads. Still - that may be faster than INSERT
:::
* [jsonl](../file-formats/jsonl.md) **is supported but does not work if JSON fields are optional. the missing keys fail the COPY instead of being interpreted as NULL**

## Supported column hints
`duckdb` may create unique indexes for all columns with `unique` hints but this behavior **is disabled by default** because it slows the loading down significantly.
Expand Down
Loading

0 comments on commit e851156

Please sign in to comment.