From 94a2cbe1f459622051bb77dc2ef16ee05132b7e0 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 11 Sep 2023 11:19:47 +0200 Subject: [PATCH 1/6] fixes instance reuse and race conditions when injecting configs --- dlt/common/configuration/inject.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/dlt/common/configuration/inject.py b/dlt/common/configuration/inject.py index d7419ee378..c623c400e0 100644 --- a/dlt/common/configuration/inject.py +++ b/dlt/common/configuration/inject.py @@ -1,4 +1,5 @@ import inspect +import threading from functools import wraps from typing import Callable, Dict, Type, Any, Optional, Tuple, TypeVar, overload from inspect import Signature, Parameter @@ -14,6 +15,7 @@ _ORIGINAL_ARGS = "_dlt_orig_args" # keep a registry of all the decorated functions _FUNC_SPECS: Dict[int, Type[BaseConfiguration]] = {} +_RESOLVE_LOCK = threading.Lock() TConfiguration = TypeVar("TConfiguration", bound=BaseConfiguration) @@ -84,7 +86,6 @@ def decorator(f: TFun) -> TFun: kwargs_arg = next((p for p in sig.parameters.values() if p.kind == Parameter.VAR_KEYWORD), None) spec_arg: Parameter = None pipeline_name_arg: Parameter = None - section_context = ConfigSectionContext(sections=sections, merge_style=sections_merge_style) if spec is None: SPEC = spec_from_signature(f, sig, include_defaults) @@ -117,22 +118,28 @@ def _wrap(*args: Any, **kwargs: Any) -> Any: config = last_config(**kwargs) else: # if section derivation function was provided then call it - nonlocal sections if section_f: - section_context.sections = (section_f(bound_args.arguments), ) - # sections may be a string - if isinstance(sections, str): - section_context.sections = (sections,) + curr_sections: Tuple[str, ...] = (section_f(bound_args.arguments), ) + # sections may be a string + elif isinstance(sections, str): + curr_sections = (sections,) + else: + curr_sections = sections # if one of arguments is spec the use it as initial value if spec_arg: config = bound_args.arguments.get(spec_arg.name, None) # resolve SPEC, also provide section_context with pipeline_name if pipeline_name_arg: - section_context.pipeline_name = bound_args.arguments.get(pipeline_name_arg.name, pipeline_name_arg_default) - with inject_section(section_context): - # print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}") - config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments) + curr_pipeline_name = bound_args.arguments.get(pipeline_name_arg.name, pipeline_name_arg_default) + else: + curr_pipeline_name = None + section_context = ConfigSectionContext(pipeline_name=curr_pipeline_name, sections=curr_sections, merge_style=sections_merge_style) + # this may be called from many threads so make sure context is not mangled + with _RESOLVE_LOCK: + with inject_section(section_context): + # print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}") + config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments) resolved_params = dict(config) # overwrite or add resolved params for p in sig.parameters.values(): From 2ad24476f754264b6f219328d8cd89ceab33bdf8 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 11 Sep 2023 11:20:20 +0200 Subject: [PATCH 2/6] fixes duckdb JSON compression and segfaults with parquet --- dlt/common/storages/file_storage.py | 12 ++++++++++++ dlt/common/utils.py | 10 ++++++++++ dlt/destinations/duckdb/duck.py | 21 ++++++++++++++++++--- 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/dlt/common/storages/file_storage.py b/dlt/common/storages/file_storage.py index 046116c82a..45d5287e03 100644 --- a/dlt/common/storages/file_storage.py +++ b/dlt/common/storages/file_storage.py @@ -270,3 +270,15 @@ def open_zipsafe_ro(path: str, mode: str = "r", **kwargs: Any) -> IO[Any]: return cast(IO[Any], f) except (gzip.BadGzipFile, OSError): return open(path, origmode, encoding=encoding, **kwargs) + + + @staticmethod + def is_gzipped(path: str) -> bool: + """Checks if file under path is gzipped by reading a header""" + try: + with gzip.open(path, "rt", encoding="utf-8") as f: + # Force gzip to read the first few bytes and check the magic number + f.read(2) + return True + except (gzip.BadGzipFile, OSError): + return False diff --git a/dlt/common/utils.py b/dlt/common/utils.py index f3eed395fd..d73daeeff8 100644 --- a/dlt/common/utils.py +++ b/dlt/common/utils.py @@ -447,3 +447,13 @@ def extend_list_deduplicated(original_list: List[Any], extending_list: Iterable[ if item not in list_keys: original_list.append(item) return original_list + + +@contextmanager +def maybe_context(manager: ContextManager[TAny]) -> Iterator[TAny]: + """Allows context manager `manager` to be None by creating dummy context. Otherwise `manager` is used""" + if manager is None: + yield None + else: + with manager as ctx: + yield ctx diff --git a/dlt/destinations/duckdb/duck.py b/dlt/destinations/duckdb/duck.py index 0076f1585f..b36164dc80 100644 --- a/dlt/destinations/duckdb/duck.py +++ b/dlt/destinations/duckdb/duck.py @@ -1,3 +1,4 @@ +import threading from typing import ClassVar, Dict, Optional from dlt.common.destination import DestinationCapabilitiesContext @@ -6,6 +7,7 @@ from dlt.common.destination.reference import LoadJob, FollowupJob, TLoadJobState from dlt.common.schema.typing import TTableSchema from dlt.common.storages.file_storage import FileStorage +from dlt.common.utils import maybe_context from dlt.destinations.insert_job_client import InsertValuesJobClient @@ -44,21 +46,34 @@ "unique": "UNIQUE" } +# duckdb cannot load PARQUET to the same table in parallel. so serialize it per table +PARQUET_TABLE_LOCK = threading.Lock() +TABLES_LOCKS: Dict[str, threading.Lock] = {} + class DuckDbCopyJob(LoadJob, FollowupJob): def __init__(self, table_name: str, file_path: str, sql_client: DuckDbSqlClient) -> None: super().__init__(FileStorage.get_file_name_from_file_path(file_path)) + qualified_table_name = sql_client.make_qualified_table_name(table_name) if file_path.endswith("parquet"): source_format = "PARQUET" + options = "" + # lock when creating a new lock + with PARQUET_TABLE_LOCK: + # create or get lock per table name + lock: threading.Lock = TABLES_LOCKS.setdefault(qualified_table_name, threading.Lock()) elif file_path.endswith("jsonl"): # NOTE: loading JSON does not work in practice on duckdb: the missing keys fail the load instead of being interpreted as NULL source_format = "JSON" # newline delimited, compression auto + options = ", COMPRESSION GZIP" if FileStorage.is_gzipped(file_path) else "" + lock = None else: raise ValueError(file_path) - qualified_table_name = sql_client.make_qualified_table_name(table_name) - with sql_client.begin_transaction(): - sql_client.execute_sql(f"COPY {qualified_table_name} FROM '{file_path}' ( FORMAT {source_format} );") + + with maybe_context(lock): + with sql_client.begin_transaction(): + sql_client.execute_sql(f"COPY {qualified_table_name} FROM '{file_path}' ( FORMAT {source_format} {options});") def state(self) -> TLoadJobState: From e4a43bbf4e7348e6966d56662413966e3f4289fa Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 11 Sep 2023 11:20:57 +0200 Subject: [PATCH 3/6] fixes normalizer config args passing --- dlt/normalize/configuration.py | 3 +++ dlt/pipeline/pipeline.py | 5 ++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dlt/normalize/configuration.py b/dlt/normalize/configuration.py index c4ed7aa89a..0df6dd621b 100644 --- a/dlt/normalize/configuration.py +++ b/dlt/normalize/configuration.py @@ -14,6 +14,9 @@ class NormalizeConfiguration(PoolRunnerConfiguration): _normalize_storage_config: NormalizeStorageConfiguration _load_storage_config: LoadStorageConfiguration + def on_resolved(self) -> None: + self.pool_type = "none" if self.workers == 1 else "process" + if TYPE_CHECKING: def __init__( self, diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 488a617eb3..9a0dc831b4 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -297,8 +297,8 @@ def extract( @with_config_section((known_sections.NORMALIZE,)) def normalize(self, workers: int = 1, loader_file_format: TLoaderFileFormat = None) -> NormalizeInfo: """Normalizes the data prepared with `extract` method, infers the schema and creates load packages for the `load` method. Requires `destination` to be known.""" - if is_interactive() and workers > 1: - raise NotImplementedError("Do not use normalize workers in interactive mode ie. in notebook") + if is_interactive(): + workers = 1 if loader_file_format and loader_file_format in INTERNAL_LOADER_FILE_FORMATS: raise ValueError(f"{loader_file_format} is one of internal dlt file formats.") # check if any schema is present, if not then no data was extracted @@ -310,7 +310,6 @@ def normalize(self, workers: int = 1, loader_file_format: TLoaderFileFormat = No # create default normalize config normalize_config = NormalizeConfiguration( workers=workers, - pool_type="none" if workers == 1 else "process", _schema_storage_config=self._schema_storage_config, _normalize_storage_config=self._normalize_storage_config, _load_storage_config=self._load_storage_config From 715a11f714a8654e86259d05b69fb2086a95cd70 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 11 Sep 2023 11:21:41 +0200 Subject: [PATCH 4/6] rewrites performance docs and adds snippets --- check-package.sh | 2 +- dlt/extract/pipe.py | 8 +- docs/snippets/reference/.dlt/config.toml | 11 + docs/snippets/reference/__init__.py | 0 .../reference/parallel_load/.dlt/config.toml | 16 + .../reference/parallel_load/__init__.py | 0 .../reference/parallel_load/parallel_load.py | 26 ++ .../reference/performance_chunking.py | 27 ++ .../reference/performance_parallel_extract.py | 37 +++ .../docs/dlt-ecosystem/destinations/duckdb.md | 5 +- docs/website/docs/reference/performance.md | 278 ++++++++++++++++-- docs/website/package-lock.json | 4 +- 12 files changed, 383 insertions(+), 31 deletions(-) create mode 100644 docs/snippets/reference/.dlt/config.toml create mode 100644 docs/snippets/reference/__init__.py create mode 100644 docs/snippets/reference/parallel_load/.dlt/config.toml create mode 100644 docs/snippets/reference/parallel_load/__init__.py create mode 100644 docs/snippets/reference/parallel_load/parallel_load.py create mode 100644 docs/snippets/reference/performance_chunking.py create mode 100644 docs/snippets/reference/performance_parallel_extract.py diff --git a/check-package.sh b/check-package.sh index 4b409a6616..17767b3eb9 100755 --- a/check-package.sh +++ b/check-package.sh @@ -19,7 +19,7 @@ while IFS= read -r d; do error="yes" fi fi -done < <(find . -mindepth 1 -not -path "./docs/website*" -type d -regex "^./[^.^_].*" '!' -exec test -e "{}/__init__.py" ';' -print) +done < <(find . -mindepth 1 -not -path "./docs/website/node_modules*" -type d -regex "^./[^.^_].*" '!' -exec test -e "{}/__init__.py" ';' -print) if [ -z $error ]; then exit 0 diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index e8a165e6aa..9e5b20374f 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -545,6 +545,7 @@ def __next__(self) -> PipeItem: if isinstance(item, Awaitable) or callable(item): # do we have a free slot or one of the slots is done? if len(self._futures) < self.max_parallel_items or self._next_future() >= 0: + # check if Awaitable first - awaitable can also be a callable if isinstance(item, Awaitable): future = asyncio.run_coroutine_threadsafe(item, self._ensure_async_pool()) elif callable(item): @@ -631,7 +632,12 @@ def start_background_loop(loop: asyncio.AbstractEventLoop) -> None: loop.run_forever() self._async_pool = asyncio.new_event_loop() - self._async_pool_thread = Thread(target=start_background_loop, args=(self._async_pool,), daemon=True) + self._async_pool_thread = Thread( + target=start_background_loop, + args=(self._async_pool,), + daemon=True, + name="DltFuturesThread" + ) self._async_pool_thread.start() # start or return async pool diff --git a/docs/snippets/reference/.dlt/config.toml b/docs/snippets/reference/.dlt/config.toml new file mode 100644 index 0000000000..f5139dcc96 --- /dev/null +++ b/docs/snippets/reference/.dlt/config.toml @@ -0,0 +1,11 @@ +# [extract] +# max_parallel_items=1 + +[sources.extract] +max_parallel_items=1 + +# [sources.performance_parallel_extract.extract] +# workers=2 + +[sources.performance_parallel_extract.get_details.extract] +workers=2 \ No newline at end of file diff --git a/docs/snippets/reference/__init__.py b/docs/snippets/reference/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/snippets/reference/parallel_load/.dlt/config.toml b/docs/snippets/reference/parallel_load/.dlt/config.toml new file mode 100644 index 0000000000..0f8b273ce6 --- /dev/null +++ b/docs/snippets/reference/parallel_load/.dlt/config.toml @@ -0,0 +1,16 @@ +# [sources.data_writer] +# file_max_items=20000 + +# pipeline name is default source name when loading resources +[sources.parallel_load.data_writer] +file_max_items=100000 + +[normalize] +workers=3 + +[normalize.data_writer] +disable_compression=false +file_max_items=100000 + +[load] +workers=11 diff --git a/docs/snippets/reference/parallel_load/__init__.py b/docs/snippets/reference/parallel_load/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/snippets/reference/parallel_load/parallel_load.py b/docs/snippets/reference/parallel_load/parallel_load.py new file mode 100644 index 0000000000..823561e4ac --- /dev/null +++ b/docs/snippets/reference/parallel_load/parallel_load.py @@ -0,0 +1,26 @@ +# @@@SNIPSTART parallel_config_example +import dlt +from itertools import islice +from dlt.common import pendulum + +@dlt.resource(name="table") +def read_table(limit): + rows = iter(range(limit)) + while item_slice := list(islice(rows, 1000)): + now = pendulum.now().isoformat() + yield [{"row": _id, "description": "this is row with id {_id}", "timestamp": now} for _id in item_slice] + + +# this prevents process pool to run the initialization code again +if __name__ == "__main__": + pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) + pipeline.extract(read_table(1000000)) + # we should have 11 files (10 pieces for `table` and 1 for state) + print(pipeline.list_extracted_resources()) + # normalize and print counts + print(pipeline.normalize(loader_file_format="jsonl")) + # print jobs in load package (10 + 1 as above) + load_id = pipeline.list_normalized_load_packages()[0] + print(pipeline.get_load_package_info(load_id)) + print(pipeline.load()) +# @@@SNIPEND \ No newline at end of file diff --git a/docs/snippets/reference/performance_chunking.py b/docs/snippets/reference/performance_chunking.py new file mode 100644 index 0000000000..0770ff9053 --- /dev/null +++ b/docs/snippets/reference/performance_chunking.py @@ -0,0 +1,27 @@ +# @@@SNIPSTART performance_chunking +import dlt + +def get_rows(limit): + yield from map(lambda n: {"row": n}, range(limit)) + +@dlt.resource +def database_cursor(): + # here we yield each row returned from database separately + yield from get_rows(10000) +# @@@SNIPEND +# @@@SNIPSTART performance_chunking_chunk +from itertools import islice + +@dlt.resource +def database_cursor_chunked(): + # here we yield chunks of size 1000 + rows = get_rows(10000) + while item_slice := list(islice(rows, 1000)): + print(f"got chunk of length {len(item_slice)}") + yield item_slice +# @@@SNIPEND + +assert len(list(database_cursor())) == 10000 +# assert len(list(database_cursor_chunked())) == 10 +for chunk in database_cursor_chunked(): + assert len(chunk) == 1000 diff --git a/docs/snippets/reference/performance_parallel_extract.py b/docs/snippets/reference/performance_parallel_extract.py new file mode 100644 index 0000000000..6ad83e7b4f --- /dev/null +++ b/docs/snippets/reference/performance_parallel_extract.py @@ -0,0 +1,37 @@ +# @@@SNIPSTART parallel_extract_callables +import dlt +from time import sleep +from threading import currentThread + +@dlt.resource +def list_items(start, limit): + yield from range(start, start + limit) + +@dlt.transformer +@dlt.defer +def get_details(item_id): + # simulate a slow REST API where you wait 0.3 sec for each item + sleep(0.3) + print(f"item_id {item_id} in thread {currentThread().name}") + # just return the results, if you yield, generator will be evaluated in main thread + return {"row": item_id} + + +# evaluate the pipeline and print all the items +# resources are iterators and they are evaluated in the same way in the pipeline.run +print(list(list_items(0, 10) | get_details)) +# @@@SNIPEND +# @@@SNIPSTART parallel_extract_awaitables +import asyncio + +@dlt.transformer +async def a_get_details(item_id): + # simulate a slow REST API where you wait 0.3 sec for each item + await asyncio.sleep(0.3) + print(f"item_id {item_id} in thread {currentThread().name}") + # just return the results, if you yield, generator will be evaluated in main thread + return {"row": item_id} + + +print(list(list_items(0, 10) | a_get_details)) +# @@@SNIPEND \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md index 1cd6f997a9..38fd38d8e8 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/duckdb.md +++ b/docs/website/docs/dlt-ecosystem/destinations/duckdb.md @@ -33,7 +33,10 @@ All write dispositions are supported You can configure the following file formats to load data to duckdb * [insert-values](../file-formats/insert-format.md) is used by default * [parquet](../file-formats/parquet.md) is supported -* [jsonl](../file-formats/jsonl.md) is supported but does not work in practice. the missing keys fail the COPY instead of being interpreted as NULL +:::note +`duckdb` cannot COPY many parquet files to a single table from multiple threads. In this situation `dlt` serializes the loads. Still - that may be faster than INSERT +::: +* [jsonl](../file-formats/jsonl.md) **is supported but does not work if JSON fields are optional. the missing keys fail the COPY instead of being interpreted as NULL** ## Supported column hints `duckdb` may create unique indexes for all columns with `unique` hints but this behavior **is disabled by default** because it slows the loading down significantly. diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index 7e49414b2f..cbd850b466 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -9,45 +9,110 @@ keywords: [scaling, parallelism, finetuning] ## Yield pages instead of rows If you can, yield pages when producing data. This makes some processes more effective by lowering -the necessary function calls. +the necessary function calls (each chunk of data that you yield goes through extract pipeline once so if you yield a chunk of 10.000 items you make significant savings) +For example: + +```py +import dlt -## Memory/disk management +def get_rows(limit): + yield from map(lambda n: {"row": n}, range(limit)) + +@dlt.resource +def database_cursor(): + # here we yield each row returned from database separately + yield from get_rows(10000) +``` + +can be replaced with: + +```py +from itertools import islice + +@dlt.resource +def database_cursor_chunked(): + # here we yield chunks of size 1000 + rows = get_rows(10000) + while item_slice := list(islice(rows, 1000)): + print(f"got chunk of length {len(item_slice)}") + yield item_slice +``` + -### Controlling in-memory and filesystem buffers -`dlt` likes resources that yield data because it can request data into a buffer before processing -and releasing it. This makes it possible to manage the amount of resources used. In order to -configure this option, you can specify buffer size via env variables or by adding to the -`config.toml`. +## Memory/disk management +`dlt` buffers data in memory to speed up processing and uses file system to pass data between extract and normalize stage. You can control the size of the buffers, size and number of the files to fine-tune memory and cpu usage. Those settings impact parallelism as well, which we explain in next chapter. -Globally in your `config.toml`: +### Controlling in-memory buffers +`dlt` maintains in-memory buffers when writing intermediary files in **extract** and **normalize** stages. The size of the buffers are controlled by specifying the number of data items held in them. Data is appended to open files when item buffer is full, then the buffer is cleared. You can specify buffer size via environment variables or in `config.toml` with more or less granularity: +* set all buffers (both extract and normalize) +* set extract buffers separately from normalize buffers +* set extract buffers for particular source or resource ```toml +# set buffer size for extract and normalize stages [data_writer] -max_buffer_items=100 +buffer_max_items=100 + +# set buffers only in extract stage - for all sources +[sources.data_writer] +buffer_max_items=100 + +# set buffers only for a source with name zendesk_support +[sources.zendesk_support.data_writer] +buffer_max_items=100 + +# set buffers in normalize stage +[normalize.data_writer] +buffer_max_items=100 ``` -or specifically for the normalization and source: +The default buffer is actually set to a moderately low value (**5000 items**), so unless you are trying to run `dlt` +on IOT sensors or other tiny infrastructures, you might actually want to increase it to speed up +processing. + +### Controlling intermediary files size and rotation +`dlt` writes data to intermediary files. You can control the file size and a number of created files by setting maximum number of data items in a single file or a maximum single file size. Mind that file size is computed after compression is performed. +* `dlt` uses custom version of [`jsonl` file format](../dlt-ecosystem/file-formats/jsonl.md) between **extract** and **normalize** stages. +* files created between **normalize** and **load** stages are the same files that will be loaded to destination. +:::tip +The default setting is to not rotate the files so if you have a resource with millions of records, `dlt` will still create a single intermediary file to normalize and a single file to load. **If you want such data to be normalized and loaded in parallel you must enable file rotation as described below** +::: +:::note +Some file formats (ie. parquet) do not support schema changes when writing a single file and on such event they are automatically rotated +::: + +Below we set files to rotated after 100.000 items written or when size exceeds 1MiB. ```toml -[normalize.data_writer] -max_buffer_items=100 +# extract and normalize stages +[data_writer] +file_max_items=100000 +max_file_size=1000000 +# only in extract stage - for all sources [sources.data_writer] -max_buffer_items=200 +file_max_items=100000 +max_file_size=1000000 + +# only for a source with name zendesk_support +[sources.zendesk_support.data_writer] +file_max_items=100000 +max_file_size=1000000 + +# only normalize stage +[normalize.data_writer] +file_max_items=100000 +max_file_size=1000000 ``` -The default buffer is actually set to a moderately low value, so unless you are trying to run `dlt` -on IOT sensors or other tiny infrastructures, you might actually want to increase it to speed up -processing. ### Disabling and enabling file compression Several [text file formats](../dlt-ecosystem/file-formats/) have `gzip` compression enabled by default. If you wish that your load packages have uncompressed files (ie. to debug the content easily), change `data_writer.disable_compression` in config.toml. The entry below will disable the compression of the files processed in `normalize` stage. ```toml [normalize.data_writer] -disable_compression=false +disable_compression=true ``` - ### Freeing disk space after loading Keep in mind load packages are buffered to disk and are left for any troubleshooting, so you can [clear disk space by setting `delete_completed_jobs` option](../running-in-production/running.md#data-left-behind). @@ -64,19 +129,180 @@ PROGRESS=log python pipeline_script.py ## Parallelism -Parallelism can be limited with the config option `max_parallel_items = 5` that you can place under -a source. As `dlt` is a library can also leverage parallelism outside of `dlt` such as by placing -tasks in parallel in a dag. +### Extract +You can extract data concurrently if you write your pipelines to yield callables or awaitables that can be then evaluated in a thread or futures pool respectively. + +Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps `get_details` function in another callable that will be executed in the thread pool. + +```py +import dlt +from time import sleep +from threading import currentThread + +@dlt.resource +def list_items(start, limit): + yield from range(start, start + limit) + +@dlt.transformer +@dlt.defer +def get_details(item_id): + # simulate a slow REST API where you wait 0.3 sec for each item + sleep(0.3) + print(f"item_id {item_id} in thread {currentThread().name}") + # just return the results, if you yield, generator will be evaluated in main thread + return {"row": item_id} + + +# evaluate the pipeline and print all the items +# resources are iterators and they are evaluated in the same way in the pipeline.run +print(list(list_items(0, 10) | get_details)) +``` + +You can control the number of workers in thread pool with **workers** setting. The default number of workers is **5**. Below you see a few ways to do that with different granularity ```toml -[extract] # global setting -max_parallel_items=5 +# for all sources and resources being extracted +[extract] +worker=1 -[sources.my_pipeline.extract] # setting for the "my_pipeline" pipeline -max_parallel_items=5 +# for all resources in zendesk_support source +[sources.zendesk_support.extract] +workers=2 + +# for tickets resource in zendesk_support source +[sources.zendesk_support.tickets.extract] +workers=4 ``` -## Resources loading, `fifo` vs. `round robin` +Example below does the same but using async/await and futures pool: + +```py +import asyncio + +@dlt.transformer +async def a_get_details(item_id): + # simulate a slow REST API where you wait 0.3 sec for each item + await asyncio.sleep(0.3) + print(f"item_id {item_id} in thread {currentThread().name}") + # just return the results, if you yield, generator will be evaluated in main thread + return {"row": item_id} + + +print(list(list_items(0, 10) | a_get_details)) +``` + + +You can control the number of async functions/awaitables being evaluate in parallel by setting **max_parallel_items**. The default number is *20**. Below you see a few ways to do that with different granularity +```toml +# for all sources and resources being extracted +[extract] +max_parallel_items=10 + +# for all resources in zendesk_support source +[sources.zendesk_support.extract] +max_parallel_items=10 + +# for tickets resource in zendesk_support source +[sources.zendesk_support.tickets.extract] +max_parallel_items=10 +``` + +:::note +**max_parallel_items** apply to thread pools as well. It sets how many items may be queued to be executed and currently executing in a thread pool by the workers. Imagine a situation where you have millions +of callables to be evaluated in a thread pool of size 5. This limit will instantiate only the desired amount of them. +::: + +:::caution +Generators and iterators are always evaluated in the main thread. If you have a loop that yields items, instead yield functions or async functions that will create the items when evaluated in the pool. +::: + +### Normalize +Normalize stage uses process pool to create load package concurrently. Each file created by **extract** stage is sent to a process pool. **If you have just a single resource with a lot of data, you should enable [extract file rotation](#controlling-intermediary-files-size-and-rotation)**. Number of processes in the pool is controlled with `workers` config value: +```toml +[extract.data_writer] +# force extract file rotation if it exceeds 1MiB +max_file_size=1000000 + +[normalize] +# use 3 worker processes to process 3 files in parallel +workers=3 +``` +:::note +The default is to not parallelize normalization and to perform it in the main process. +::: + +:::note +Normalization is CPU bound and can easily saturate all your cores. Never allow `dlt` to use all cores on your local machine. +::: + +### Load +Load stage uses thread pool for parallelization. Loading is input/output bound. `dlt` avoids any processing of the content of the load package produced by the normalizer. By default loading happens in 20 threads, each loading a single file. + +As before, **if you have just a single table with millions of records you should enable [file rotation in the normalizer](#controlling-intermediary-files-size-and-rotation).**. Then number of parallel load jobs is controlled by `workers` config. +```toml +[normalize.data_writer] +# force normalize file rotation if it exceeds 1MiB +max_file_size=1000000 + +[load] +# have 50 concurrent load jobs +workers=50 +``` + + +### Parallel pipeline config example +Example below simulates loading of a large database table with 1 000 000 records. The **config.toml** below sets the parallelization as follows: +* during extraction, files are rotated each 100 000 items, so there are 10 files with data for the same table +* normalizer will process the data in 3 processes +* we use JSONL to load data to duckdb. We rotate JSONL files each 100 000 items so 10 files will be created. +* we use 11 threads to load the data (10 JSON files + state file) +```toml +# pipeline name is default source name when loading resources +[sources.parallel_load.data_writer] +file_max_items=100000 + +[normalize] +workers=3 + +[normalize.data_writer] +disable_compression=false +file_max_items=100000 + +[load] +workers=11 +``` + + +```py +import dlt +from itertools import islice +from dlt.common import pendulum + +@dlt.resource(name="table") +def read_table(limit): + rows = iter(range(limit)) + while item_slice := list(islice(rows, 1000)): + now = pendulum.now().isoformat() + yield [{"row": _id, "description": "this is row with id {_id}", "timestamp": now} for _id in item_slice] + + +# this prevents process pool to run the initialization code again +if __name__ == "__main__": + pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) + pipeline.extract(read_table(1000000)) + # we should have 11 files (10 pieces for `table` and 1 for state) + print(pipeline.list_extracted_resources()) + # normalize and print counts + print(pipeline.normalize(loader_file_format="jsonl")) + # print jobs in load package (10 + 1 as above) + load_id = pipeline.list_normalized_load_packages()[0] + print(pipeline.get_load_package_info(load_id)) + print(pipeline.load()) +``` + + + +## Resources extraction, `fifo` vs. `round robin` When extracting from resources, you have two options to determine what the order of queries to your resources are: `fifo` and `round_robin`. diff --git a/docs/website/package-lock.json b/docs/website/package-lock.json index 1a78d94e1b..ecbfdcc72f 100644 --- a/docs/website/package-lock.json +++ b/docs/website/package-lock.json @@ -1,11 +1,11 @@ { - "name": "dltDocs", + "name": "dlt-docs", "version": "0.0.0", "lockfileVersion": 3, "requires": true, "packages": { "": { - "name": "dltDocs", + "name": "dlt-docs", "version": "0.0.0", "dependencies": { "@docusaurus/core": "2.4.1", From 873b0dfd33dd0578328aee168b84bbb05b27baf5 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 11 Sep 2023 12:21:36 +0200 Subject: [PATCH 5/6] makes snippets to always use .dlt in snippet folder, not working folder --- .../reference/parallel_load/parallel_load.py | 14 +++++++++++--- docs/snippets/reference/performance_chunking.py | 6 +++--- .../reference/performance_parallel_extract.py | 2 ++ docs/snippets/reference/test_reference_snippets.py | 13 +++++++++++++ docs/website/docs/reference/performance.md | 6 ++++-- 5 files changed, 33 insertions(+), 8 deletions(-) create mode 100644 docs/snippets/reference/test_reference_snippets.py diff --git a/docs/snippets/reference/parallel_load/parallel_load.py b/docs/snippets/reference/parallel_load/parallel_load.py index 823561e4ac..7bba13e411 100644 --- a/docs/snippets/reference/parallel_load/parallel_load.py +++ b/docs/snippets/reference/parallel_load/parallel_load.py @@ -1,4 +1,7 @@ +import os +os.environ["DLT_PROJECT_DIR"] = os.path.dirname(__file__) # @@@SNIPSTART parallel_config_example +import os import dlt from itertools import islice from dlt.common import pendulum @@ -12,15 +15,20 @@ def read_table(limit): # this prevents process pool to run the initialization code again -if __name__ == "__main__": +if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ: pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) pipeline.extract(read_table(1000000)) # we should have 11 files (10 pieces for `table` and 1 for state) - print(pipeline.list_extracted_resources()) + extracted_files = pipeline.list_extracted_resources() + print(extracted_files) # normalize and print counts print(pipeline.normalize(loader_file_format="jsonl")) # print jobs in load package (10 + 1 as above) load_id = pipeline.list_normalized_load_packages()[0] print(pipeline.get_load_package_info(load_id)) print(pipeline.load()) -# @@@SNIPEND \ No newline at end of file +# @@@SNIPEND + + assert len(extracted_files) == 11 + loaded_package = pipeline.get_load_package_info(load_id) + assert len(loaded_package.jobs["completed_jobs"]) == 11 \ No newline at end of file diff --git a/docs/snippets/reference/performance_chunking.py b/docs/snippets/reference/performance_chunking.py index 0770ff9053..7215e80937 100644 --- a/docs/snippets/reference/performance_chunking.py +++ b/docs/snippets/reference/performance_chunking.py @@ -1,3 +1,5 @@ +import os +os.environ["DLT_PROJECT_DIR"] = os.path.dirname(__file__) # @@@SNIPSTART performance_chunking import dlt @@ -22,6 +24,4 @@ def database_cursor_chunked(): # @@@SNIPEND assert len(list(database_cursor())) == 10000 -# assert len(list(database_cursor_chunked())) == 10 -for chunk in database_cursor_chunked(): - assert len(chunk) == 1000 +assert len(list(database_cursor_chunked())) == 10000 \ No newline at end of file diff --git a/docs/snippets/reference/performance_parallel_extract.py b/docs/snippets/reference/performance_parallel_extract.py index 6ad83e7b4f..c2b9d9fc30 100644 --- a/docs/snippets/reference/performance_parallel_extract.py +++ b/docs/snippets/reference/performance_parallel_extract.py @@ -1,3 +1,5 @@ +import os +os.environ["DLT_PROJECT_DIR"] = os.path.dirname(__file__) # @@@SNIPSTART parallel_extract_callables import dlt from time import sleep diff --git a/docs/snippets/reference/test_reference_snippets.py b/docs/snippets/reference/test_reference_snippets.py new file mode 100644 index 0000000000..9d3b1f9804 --- /dev/null +++ b/docs/snippets/reference/test_reference_snippets.py @@ -0,0 +1,13 @@ +import os +import pytest + +from tests.pipeline.utils import assert_load_info +from docs.snippets.utils import run_snippet, list_snippets + +# we do not want github events to run because it consumes too much free github credits +RUN_SNIPPETS = list_snippets("reference") + ["parallel_load/parallel_load.py"] + + +# @pytest.mark.parametrize("snippet_name", RUN_SNIPPETS) +# def test_snippet(snippet_name: str) -> None: +# run_snippet(os.path.join("reference", snippet_name)) diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index cbd850b466..ad5c4cf13c 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -274,6 +274,7 @@ workers=11 ```py +import os import dlt from itertools import islice from dlt.common import pendulum @@ -287,11 +288,12 @@ def read_table(limit): # this prevents process pool to run the initialization code again -if __name__ == "__main__": +if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ: pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) pipeline.extract(read_table(1000000)) # we should have 11 files (10 pieces for `table` and 1 for state) - print(pipeline.list_extracted_resources()) + extracted_files = pipeline.list_extracted_resources() + print(extracted_files) # normalize and print counts print(pipeline.normalize(loader_file_format="jsonl")) # print jobs in load package (10 + 1 as above) From c5baaabed407bdd0b941b4470f8349ae19a54ddc Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 11 Sep 2023 13:22:36 +0200 Subject: [PATCH 6/6] performance md --- docs/website/docs/reference/performance.md | 65 +++++++++++----------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index ad5c4cf13c..7a089ce7a6 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -9,7 +9,7 @@ keywords: [scaling, parallelism, finetuning] ## Yield pages instead of rows If you can, yield pages when producing data. This makes some processes more effective by lowering -the necessary function calls (each chunk of data that you yield goes through extract pipeline once so if you yield a chunk of 10.000 items you make significant savings) +the necessary function calls (each chunk of data that you yield goes through the extract pipeline once so if you yield a chunk of 10.000 items you will gain significant savings) For example: ```py @@ -40,10 +40,10 @@ def database_cursor_chunked(): ## Memory/disk management -`dlt` buffers data in memory to speed up processing and uses file system to pass data between extract and normalize stage. You can control the size of the buffers, size and number of the files to fine-tune memory and cpu usage. Those settings impact parallelism as well, which we explain in next chapter. +`dlt` buffers data in memory to speed up processing and uses file system to pass data between the **extract** and **normalize** stages. You can control the size of the buffers and size and number of the files to fine-tune memory and cpu usage. Those settings impact parallelism as well, which is explained in the next chapter. ### Controlling in-memory buffers -`dlt` maintains in-memory buffers when writing intermediary files in **extract** and **normalize** stages. The size of the buffers are controlled by specifying the number of data items held in them. Data is appended to open files when item buffer is full, then the buffer is cleared. You can specify buffer size via environment variables or in `config.toml` with more or less granularity: +`dlt` maintains in-memory buffers when writing intermediary files in the **extract** and **normalize** stages. The size of the buffers are controlled by specifying the number of data items held in them. Data is appended to open files when the item buffer is full, after which the buffer is cleared. You can the specify buffer size via environment variables or in `config.toml` to be more or less granular: * set all buffers (both extract and normalize) * set extract buffers separately from normalize buffers * set extract buffers for particular source or resource @@ -71,41 +71,40 @@ on IOT sensors or other tiny infrastructures, you might actually want to increas processing. ### Controlling intermediary files size and rotation -`dlt` writes data to intermediary files. You can control the file size and a number of created files by setting maximum number of data items in a single file or a maximum single file size. Mind that file size is computed after compression is performed. -* `dlt` uses custom version of [`jsonl` file format](../dlt-ecosystem/file-formats/jsonl.md) between **extract** and **normalize** stages. -* files created between **normalize** and **load** stages are the same files that will be loaded to destination. +`dlt` writes data to intermediary files. You can control the file size and the number of created files by setting the maximum number of data items stored in a single file or the maximum single file size. Keep in mind that the file size is computed after compression was performed. +* `dlt` uses a custom version of [`jsonl` file format](../dlt-ecosystem/file-formats/jsonl.md) between the **extract** and **normalize** stages. +* files created between the **normalize** and **load** stages are the same files that will be loaded to destination. :::tip The default setting is to not rotate the files so if you have a resource with millions of records, `dlt` will still create a single intermediary file to normalize and a single file to load. **If you want such data to be normalized and loaded in parallel you must enable file rotation as described below** ::: :::note -Some file formats (ie. parquet) do not support schema changes when writing a single file and on such event they are automatically rotated +Some file formats (ie. parquet) do not support schema changes when writing a single file and in that case they are automatically rotated when new columns are discovered. ::: -Below we set files to rotated after 100.000 items written or when size exceeds 1MiB. +Below we set files to rotated after 100.000 items written or when the filesize exceeds 1MiB. ```toml # extract and normalize stages [data_writer] file_max_items=100000 max_file_size=1000000 -# only in extract stage - for all sources +# only for the extract stage - for all sources [sources.data_writer] file_max_items=100000 max_file_size=1000000 -# only for a source with name zendesk_support +# only for the extract stage of a source with name zendesk_support [sources.zendesk_support.data_writer] file_max_items=100000 max_file_size=1000000 -# only normalize stage +# only for the normalize stage [normalize.data_writer] file_max_items=100000 max_file_size=1000000 ``` - ### Disabling and enabling file compression Several [text file formats](../dlt-ecosystem/file-formats/) have `gzip` compression enabled by default. If you wish that your load packages have uncompressed files (ie. to debug the content easily), change `data_writer.disable_compression` in config.toml. The entry below will disable the compression of the files processed in `normalize` stage. ```toml @@ -115,10 +114,10 @@ disable_compression=true ### Freeing disk space after loading -Keep in mind load packages are buffered to disk and are left for any troubleshooting, so you can [clear disk space by setting `delete_completed_jobs` option](../running-in-production/running.md#data-left-behind). +Keep in mind load packages are buffered to disk and are left for any troubleshooting, so you can [clear disk space by setting the `delete_completed_jobs` option](../running-in-production/running.md#data-left-behind). ### Observing cpu and memory usage -Please make sure that you have `psutils` package installed (note that Airflow installs it by default). Then you can dump the stats periodically by setting the [progress](../general-usage/pipeline.md#display-the-loading-progress) to `log` in `config.toml`: +Please make sure that you have the `psutils` package installed (note that Airflow installs it by default). Then you can dump the stats periodically by setting the [progress](../general-usage/pipeline.md#display-the-loading-progress) to `log` in `config.toml`: ```toml progress="log" ``` @@ -132,7 +131,7 @@ PROGRESS=log python pipeline_script.py ### Extract You can extract data concurrently if you write your pipelines to yield callables or awaitables that can be then evaluated in a thread or futures pool respectively. -Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps `get_details` function in another callable that will be executed in the thread pool. +Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool. ```py import dlt @@ -159,22 +158,22 @@ print(list(list_items(0, 10) | get_details)) ``` -You can control the number of workers in thread pool with **workers** setting. The default number of workers is **5**. Below you see a few ways to do that with different granularity +You can control the number of workers in the thread pool with **workers** setting. The default number of workers is **5**. Below you see a few ways to do that with different granularity ```toml # for all sources and resources being extracted [extract] worker=1 -# for all resources in zendesk_support source +# for all resources in the zendesk_support source [sources.zendesk_support.extract] workers=2 -# for tickets resource in zendesk_support source +# for the tickets resource in the zendesk_support source [sources.zendesk_support.tickets.extract] workers=4 ``` -Example below does the same but using async/await and futures pool: +The example below does the same but using an async/await and futures pool: ```py import asyncio @@ -198,18 +197,18 @@ You can control the number of async functions/awaitables being evaluate in paral [extract] max_parallel_items=10 -# for all resources in zendesk_support source +# for all resources in the zendesk_support source [sources.zendesk_support.extract] max_parallel_items=10 -# for tickets resource in zendesk_support source +# for the tickets resource in the zendesk_support source [sources.zendesk_support.tickets.extract] max_parallel_items=10 ``` :::note -**max_parallel_items** apply to thread pools as well. It sets how many items may be queued to be executed and currently executing in a thread pool by the workers. Imagine a situation where you have millions -of callables to be evaluated in a thread pool of size 5. This limit will instantiate only the desired amount of them. +**max_parallel_items** applies to thread pools as well. It sets how many items may be queued to be executed and currently executing in a thread pool by the workers. Imagine a situation where you have millions +of callables to be evaluated in a thread pool with a size of 5. This limit will instantiate only the desired amount of workers. ::: :::caution @@ -217,10 +216,10 @@ Generators and iterators are always evaluated in the main thread. If you have a ::: ### Normalize -Normalize stage uses process pool to create load package concurrently. Each file created by **extract** stage is sent to a process pool. **If you have just a single resource with a lot of data, you should enable [extract file rotation](#controlling-intermediary-files-size-and-rotation)**. Number of processes in the pool is controlled with `workers` config value: +The **normalize** stage uses a process pool to create load package concurrently. Each file created by the **extract** stage is sent to a process pool. **If you have just a single resource with a lot of data, you should enable [extract file rotation](#controlling-intermediary-files-size-and-rotation)**. The number of processes in the pool is controlled with `workers` config value: ```toml [extract.data_writer] -# force extract file rotation if it exceeds 1MiB +# force extract file rotation if size exceeds 1MiB max_file_size=1000000 [normalize] @@ -236,9 +235,9 @@ Normalization is CPU bound and can easily saturate all your cores. Never allow ` ::: ### Load -Load stage uses thread pool for parallelization. Loading is input/output bound. `dlt` avoids any processing of the content of the load package produced by the normalizer. By default loading happens in 20 threads, each loading a single file. +The **load** stage uses a thread pool for parallelization. Loading is input/output bound. `dlt` avoids any processing of the content of the load package produced by the normalizer. By default loading happens in 20 threads, each loading a single file. -As before, **if you have just a single table with millions of records you should enable [file rotation in the normalizer](#controlling-intermediary-files-size-and-rotation).**. Then number of parallel load jobs is controlled by `workers` config. +As before, **if you have just a single table with millions of records you should enable [file rotation in the normalizer](#controlling-intermediary-files-size-and-rotation).**. Then the number of parallel load jobs is controlled by the `workers` config setting. ```toml [normalize.data_writer] # force normalize file rotation if it exceeds 1MiB @@ -251,13 +250,13 @@ workers=50 ### Parallel pipeline config example -Example below simulates loading of a large database table with 1 000 000 records. The **config.toml** below sets the parallelization as follows: +The example below simulates loading of a large database table with 1 000 000 records. The **config.toml** below sets the parallelization as follows: * during extraction, files are rotated each 100 000 items, so there are 10 files with data for the same table -* normalizer will process the data in 3 processes +* the normalizer will process the data in 3 processes * we use JSONL to load data to duckdb. We rotate JSONL files each 100 000 items so 10 files will be created. * we use 11 threads to load the data (10 JSON files + state file) ```toml -# pipeline name is default source name when loading resources +# the pipeline name is default source name when loading resources [sources.parallel_load.data_writer] file_max_items=100000 @@ -287,7 +286,7 @@ def read_table(limit): yield [{"row": _id, "description": "this is row with id {_id}", "timestamp": now} for _id in item_slice] -# this prevents process pool to run the initialization code again +# this prevents the process pool to run the initialization code again if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ: pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) pipeline.extract(read_table(1000000)) @@ -322,7 +321,7 @@ You can change this setting in your `config.toml` as follows: next_item_mode=round_robin [sources.my_pipeline.extract] # setting for the "my_pipeline" pipeline -next_item_mode=5 +next_item_mode=fifo ``` ## Using the built in requests client @@ -370,7 +369,7 @@ All standard HTTP server errors trigger a retry. This includes: * Connection and timeout errors - When the remote server is unreachable, connection is unexpectedly dropped or when the request takes longer than the configured `timeout`. + When the remote server is unreachable, the connection is unexpectedly dropped or when the request takes longer than the configured `timeout`. ### Customizing retry settings