From 180de6e385baf94fc0614b659f2c0852a026db96 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 11 Sep 2023 17:15:47 +0200 Subject: [PATCH] migrate performance snippets --- dlt/common/configuration/providers/toml.py | 3 + docs/snippets/reference/.dlt/config.toml | 11 - docs/snippets/reference/__init__.py | 0 .../reference/parallel_load/.dlt/config.toml | 16 -- .../reference/parallel_load/__init__.py | 0 .../reference/parallel_load/parallel_load.py | 34 --- .../reference/performance_chunking.py | 27 -- .../reference/performance_parallel_extract.py | 39 --- .../reference/test_reference_snippets.py | 13 - docs/website/docs/conftest.py | 8 +- .../docs/reference/performance-snippets.py | 255 ++++++++++++++++++ docs/website/docs/reference/performance.md | 76 ++++-- 12 files changed, 316 insertions(+), 166 deletions(-) delete mode 100644 docs/snippets/reference/.dlt/config.toml delete mode 100644 docs/snippets/reference/__init__.py delete mode 100644 docs/snippets/reference/parallel_load/.dlt/config.toml delete mode 100644 docs/snippets/reference/parallel_load/__init__.py delete mode 100644 docs/snippets/reference/parallel_load/parallel_load.py delete mode 100644 docs/snippets/reference/performance_chunking.py delete mode 100644 docs/snippets/reference/performance_parallel_extract.py delete mode 100644 docs/snippets/reference/test_reference_snippets.py create mode 100644 docs/website/docs/reference/performance-snippets.py diff --git a/dlt/common/configuration/providers/toml.py b/dlt/common/configuration/providers/toml.py index 19374187fb..9e8b2a0059 100644 --- a/dlt/common/configuration/providers/toml.py +++ b/dlt/common/configuration/providers/toml.py @@ -89,6 +89,9 @@ class StringTomlProvider(BaseTomlProvider): def __init__(self, toml_string: str) -> None: super().__init__(StringTomlProvider.loads(toml_string)) + def update(self, toml_string: str) -> None: + self._toml = self.loads(toml_string) + def dumps(self) -> str: return tomlkit.dumps(self._toml) diff --git a/docs/snippets/reference/.dlt/config.toml b/docs/snippets/reference/.dlt/config.toml deleted file mode 100644 index f5139dcc96..0000000000 --- a/docs/snippets/reference/.dlt/config.toml +++ /dev/null @@ -1,11 +0,0 @@ -# [extract] -# max_parallel_items=1 - -[sources.extract] -max_parallel_items=1 - -# [sources.performance_parallel_extract.extract] -# workers=2 - -[sources.performance_parallel_extract.get_details.extract] -workers=2 \ No newline at end of file diff --git a/docs/snippets/reference/__init__.py b/docs/snippets/reference/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/docs/snippets/reference/parallel_load/.dlt/config.toml b/docs/snippets/reference/parallel_load/.dlt/config.toml deleted file mode 100644 index 0f8b273ce6..0000000000 --- a/docs/snippets/reference/parallel_load/.dlt/config.toml +++ /dev/null @@ -1,16 +0,0 @@ -# [sources.data_writer] -# file_max_items=20000 - -# pipeline name is default source name when loading resources -[sources.parallel_load.data_writer] -file_max_items=100000 - -[normalize] -workers=3 - -[normalize.data_writer] -disable_compression=false -file_max_items=100000 - -[load] -workers=11 diff --git a/docs/snippets/reference/parallel_load/__init__.py b/docs/snippets/reference/parallel_load/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/docs/snippets/reference/parallel_load/parallel_load.py b/docs/snippets/reference/parallel_load/parallel_load.py deleted file mode 100644 index 7bba13e411..0000000000 --- a/docs/snippets/reference/parallel_load/parallel_load.py +++ /dev/null @@ -1,34 +0,0 @@ -import os -os.environ["DLT_PROJECT_DIR"] = os.path.dirname(__file__) -# @@@SNIPSTART parallel_config_example -import os -import dlt -from itertools import islice -from dlt.common import pendulum - -@dlt.resource(name="table") -def read_table(limit): - rows = iter(range(limit)) - while item_slice := list(islice(rows, 1000)): - now = pendulum.now().isoformat() - yield [{"row": _id, "description": "this is row with id {_id}", "timestamp": now} for _id in item_slice] - - -# this prevents process pool to run the initialization code again -if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ: - pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) - pipeline.extract(read_table(1000000)) - # we should have 11 files (10 pieces for `table` and 1 for state) - extracted_files = pipeline.list_extracted_resources() - print(extracted_files) - # normalize and print counts - print(pipeline.normalize(loader_file_format="jsonl")) - # print jobs in load package (10 + 1 as above) - load_id = pipeline.list_normalized_load_packages()[0] - print(pipeline.get_load_package_info(load_id)) - print(pipeline.load()) -# @@@SNIPEND - - assert len(extracted_files) == 11 - loaded_package = pipeline.get_load_package_info(load_id) - assert len(loaded_package.jobs["completed_jobs"]) == 11 \ No newline at end of file diff --git a/docs/snippets/reference/performance_chunking.py b/docs/snippets/reference/performance_chunking.py deleted file mode 100644 index 7215e80937..0000000000 --- a/docs/snippets/reference/performance_chunking.py +++ /dev/null @@ -1,27 +0,0 @@ -import os -os.environ["DLT_PROJECT_DIR"] = os.path.dirname(__file__) -# @@@SNIPSTART performance_chunking -import dlt - -def get_rows(limit): - yield from map(lambda n: {"row": n}, range(limit)) - -@dlt.resource -def database_cursor(): - # here we yield each row returned from database separately - yield from get_rows(10000) -# @@@SNIPEND -# @@@SNIPSTART performance_chunking_chunk -from itertools import islice - -@dlt.resource -def database_cursor_chunked(): - # here we yield chunks of size 1000 - rows = get_rows(10000) - while item_slice := list(islice(rows, 1000)): - print(f"got chunk of length {len(item_slice)}") - yield item_slice -# @@@SNIPEND - -assert len(list(database_cursor())) == 10000 -assert len(list(database_cursor_chunked())) == 10000 \ No newline at end of file diff --git a/docs/snippets/reference/performance_parallel_extract.py b/docs/snippets/reference/performance_parallel_extract.py deleted file mode 100644 index c2b9d9fc30..0000000000 --- a/docs/snippets/reference/performance_parallel_extract.py +++ /dev/null @@ -1,39 +0,0 @@ -import os -os.environ["DLT_PROJECT_DIR"] = os.path.dirname(__file__) -# @@@SNIPSTART parallel_extract_callables -import dlt -from time import sleep -from threading import currentThread - -@dlt.resource -def list_items(start, limit): - yield from range(start, start + limit) - -@dlt.transformer -@dlt.defer -def get_details(item_id): - # simulate a slow REST API where you wait 0.3 sec for each item - sleep(0.3) - print(f"item_id {item_id} in thread {currentThread().name}") - # just return the results, if you yield, generator will be evaluated in main thread - return {"row": item_id} - - -# evaluate the pipeline and print all the items -# resources are iterators and they are evaluated in the same way in the pipeline.run -print(list(list_items(0, 10) | get_details)) -# @@@SNIPEND -# @@@SNIPSTART parallel_extract_awaitables -import asyncio - -@dlt.transformer -async def a_get_details(item_id): - # simulate a slow REST API where you wait 0.3 sec for each item - await asyncio.sleep(0.3) - print(f"item_id {item_id} in thread {currentThread().name}") - # just return the results, if you yield, generator will be evaluated in main thread - return {"row": item_id} - - -print(list(list_items(0, 10) | a_get_details)) -# @@@SNIPEND \ No newline at end of file diff --git a/docs/snippets/reference/test_reference_snippets.py b/docs/snippets/reference/test_reference_snippets.py deleted file mode 100644 index 9d3b1f9804..0000000000 --- a/docs/snippets/reference/test_reference_snippets.py +++ /dev/null @@ -1,13 +0,0 @@ -import os -import pytest - -from tests.pipeline.utils import assert_load_info -from docs.snippets.utils import run_snippet, list_snippets - -# we do not want github events to run because it consumes too much free github credits -RUN_SNIPPETS = list_snippets("reference") + ["parallel_load/parallel_load.py"] - - -# @pytest.mark.parametrize("snippet_name", RUN_SNIPPETS) -# def test_snippet(snippet_name: str) -> None: -# run_snippet(os.path.join("reference", snippet_name)) diff --git a/docs/website/docs/conftest.py b/docs/website/docs/conftest.py index 17f7c0b578..36a5dbb3f9 100644 --- a/docs/website/docs/conftest.py +++ b/docs/website/docs/conftest.py @@ -7,11 +7,12 @@ from dlt.common.configuration.container import Container # patch which providers to enable -from dlt.common.configuration.providers import ConfigProvider, EnvironProvider, SecretsTomlProvider, ConfigTomlProvider +from dlt.common.configuration.providers import StringTomlProvider, EnvironProvider, SecretsTomlProvider, ConfigTomlProvider from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext, ConfigProvidersConfiguration from tests.utils import patch_home_dir, autouse_test_storage, preserve_environ, duckdb_pipeline_location, wipe_pipeline +string_toml_provider = StringTomlProvider("") @pytest.fixture(autouse=True) def setup_tests(request): @@ -19,9 +20,12 @@ def setup_tests(request): dname = os.path.dirname(request.module.__file__) config_dir = dname + "/.dlt" + # clear string toml provider + string_toml_provider.update("") + # inject provider context so the original providers are restored at the end def _initial_providers(): - return [EnvironProvider(), SecretsTomlProvider(project_dir=config_dir, add_global_config=False), ConfigTomlProvider(project_dir=config_dir, add_global_config=False)] + return [string_toml_provider, EnvironProvider(), SecretsTomlProvider(project_dir=config_dir, add_global_config=False), ConfigTomlProvider(project_dir=config_dir, add_global_config=False)] glob_ctx = ConfigProvidersContext() glob_ctx.providers = _initial_providers() diff --git a/docs/website/docs/reference/performance-snippets.py b/docs/website/docs/reference/performance-snippets.py new file mode 100644 index 0000000000..a70f5c9945 --- /dev/null +++ b/docs/website/docs/reference/performance-snippets.py @@ -0,0 +1,255 @@ +from conftest import string_toml_provider + +PARALLEL_CONFIG_TOML = """ +# @@@DLT_SNIPPET_START parallel_config_toml +# the pipeline name is default source name when loading resources +chess_url="https://api.chess.com/pub/" + +[sources.parallel_load.data_writer] +file_max_items=100000 + +[normalize] +workers=3 + +[normalize.data_writer] +disable_compression=false +file_max_items=100000 + +[load] +workers=11 +# @@@DLT_SNIPPET_END parallel_config_toml +""" + +def parallel_config_snippet() -> None: + string_toml_provider.update(PARALLEL_CONFIG_TOML) + # @@@DLT_SNIPPET_START parallel_config + import os + import dlt + from itertools import islice + from dlt.common import pendulum + + @dlt.resource(name="table") + def read_table(limit): + rows = iter(range(limit)) + while item_slice := list(islice(rows, 1000)): + now = pendulum.now().isoformat() + yield [{"row": _id, "description": "this is row with id {_id}", "timestamp": now} for _id in item_slice] + + # this prevents process pool to run the initialization code again + if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ: + pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) + pipeline.extract(read_table(1000000)) + # we should have 11 files (10 pieces for `table` and 1 for state) + extracted_files = pipeline.list_extracted_resources() + print(extracted_files) + # normalize and print counts + print(pipeline.normalize(loader_file_format="jsonl")) + # print jobs in load package (10 + 1 as above) + load_id = pipeline.list_normalized_load_packages()[0] + print(pipeline.get_load_package_info(load_id)) + print(pipeline.load()) + # @@@DLT_SNIPPET_END parallel_config + + assert len(extracted_files) == 11 + loaded_package = pipeline.get_load_package_info(load_id) + assert len(loaded_package.jobs["completed_jobs"]) == 11 + + +def parallel_extract_callables_snippet() -> None: + # @@@DLT_SNIPPET_START parallel_extract_callables + import dlt + from time import sleep + from threading import currentThread + + @dlt.resource + def list_items(start, limit): + yield from range(start, start + limit) + + @dlt.transformer + @dlt.defer + def get_details(item_id): + # simulate a slow REST API where you wait 0.3 sec for each item + sleep(0.3) + print(f"item_id {item_id} in thread {currentThread().name}") + # just return the results, if you yield, generator will be evaluated in main thread + return {"row": item_id} + + + # evaluate the pipeline and print all the items + # resources are iterators and they are evaluated in the same way in the pipeline.run + print(list(list_items(0, 10) | get_details)) + # @@@DLT_SNIPPET_END parallel_extract_callables + + # @@@DLT_SNIPPET_START parallel_extract_awaitables + import asyncio + + @dlt.transformer + async def a_get_details(item_id): + # simulate a slow REST API where you wait 0.3 sec for each item + await asyncio.sleep(0.3) + print(f"item_id {item_id} in thread {currentThread().name}") + # just return the results, if you yield, generator will be evaluated in main thread + return {"row": item_id} + + + print(list(list_items(0, 10) | a_get_details)) + # @@@DLT_SNIPPET_END parallel_extract_awaitables + + +def performance_chunking_snippet() -> None: + # @@@DLT_SNIPPET_START performance_chunking + import dlt + + def get_rows(limit): + yield from map(lambda n: {"row": n}, range(limit)) + + @dlt.resource + def database_cursor(): + # here we yield each row returned from database separately + yield from get_rows(10000) + # @@@DLT_SNIPPET_END performance_chunking + + # @@@DLT_SNIPPET_START performance_chunking_chunk + from itertools import islice + + @dlt.resource + def database_cursor_chunked(): + # here we yield chunks of size 1000 + rows = get_rows(10000) + while item_slice := list(islice(rows, 1000)): + print(f"got chunk of length {len(item_slice)}") + yield item_slice + # @@@DLT_SNIPPET_END performance_chunking_chunk + + assert len(list(database_cursor())) == 10000 + assert len(list(database_cursor_chunked())) == 10000 + + +def test_toml_snippets() -> None: + + string_toml_provider.update(""" +# @@@DLT_SNIPPET_START buffer_toml +# set buffer size for extract and normalize stages +[data_writer] +buffer_max_items=100 + +# set buffers only in extract stage - for all sources +[sources.data_writer] +buffer_max_items=100 + +# set buffers only for a source with name zendesk_support +[sources.zendesk_support.data_writer] +buffer_max_items=100 + +# set buffers in normalize stage +[normalize.data_writer] +buffer_max_items=100 +# @@@DLT_SNIPPET_END buffer_toml +""") + + string_toml_provider.update(""" +# @@@DLT_SNIPPET_START file_size_toml +# extract and normalize stages +[data_writer] +file_max_items=100000 +max_file_size=1000000 + +# only for the extract stage - for all sources +[sources.data_writer] +file_max_items=100000 +max_file_size=1000000 + +# only for the extract stage of a source with name zendesk_support +[sources.zendesk_support.data_writer] +file_max_items=100000 +max_file_size=1000000 + +# only for the normalize stage +[normalize.data_writer] +file_max_items=100000 +max_file_size=1000000 +# @@@DLT_SNIPPET_END file_size_toml +""") + + string_toml_provider.update(""" +# @@@DLT_SNIPPET_START extract_workers_toml +# for all sources and resources being extracted +[extract] +worker=1 + +# for all resources in the zendesk_support source +[sources.zendesk_support.extract] +workers=2 + +# for the tickets resource in the zendesk_support source +[sources.zendesk_support.tickets.extract] +workers=4 +# @@@DLT_SNIPPET_END extract_workers_toml +""") + + string_toml_provider.update(""" +# @@@DLT_SNIPPET_START extract_parallel_items_toml +# for all sources and resources being extracted +[extract] +max_parallel_items=10 + +# for all resources in the zendesk_support source +[sources.zendesk_support.extract] +max_parallel_items=10 + +# for the tickets resource in the zendesk_support source +[sources.zendesk_support.tickets.extract] +max_parallel_items=10 +# @@@DLT_SNIPPET_END extract_parallel_items_toml +""") + +string_toml_provider.update(""" +# @@@DLT_SNIPPET_START normalize_workers_toml + [extract.data_writer] +# force extract file rotation if size exceeds 1MiB +max_file_size=1000000 + +[normalize] +# use 3 worker processes to process 3 files in parallel +workers=3 +# @@@DLT_SNIPPET_END normalize_workers_toml +""") + +string_toml_provider.update(""" +# @@@DLT_SNIPPET_START normalize_workers_2_toml +[normalize.data_writer] +# force normalize file rotation if it exceeds 1MiB +max_file_size=1000000 + +[load] +# have 50 concurrent load jobs +workers=50 +# @@@DLT_SNIPPET_END normalize_workers_2_toml +""") + +string_toml_provider.update(""" +# @@@DLT_SNIPPET_START retry_toml +[runtime] +request_max_attempts = 10 # Stop after 10 retry attempts instead of 5 +request_backoff_factor = 1.5 # Multiplier applied to the exponential delays. Default is 1 +request_timeout = 120 # Timeout in seconds +request_max_retry_delay = 30 # Cap exponential delay to 30 seconds +# @@@DLT_SNIPPET_END retry_toml +""") + +string_toml_provider.update(""" +# @@@DLT_SNIPPET_START item_mode_toml +[extract] # global setting +next_item_mode=round_robin + +[sources.my_pipeline.extract] # setting for the "my_pipeline" pipeline +next_item_mode=fifo +# @@@DLT_SNIPPET_END item_mode_toml +""") + +string_toml_provider.update(""" +# @@@DLT_SNIPPET_START compression_toml +[normalize.data_writer] +disable_compression=true +# @@@DLT_SNIPPET_END compression_toml +""") diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index 7a089ce7a6..5faa0c11bd 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -11,7 +11,7 @@ keywords: [scaling, parallelism, finetuning] If you can, yield pages when producing data. This makes some processes more effective by lowering the necessary function calls (each chunk of data that you yield goes through the extract pipeline once so if you yield a chunk of 10.000 items you will gain significant savings) For example: - + ```py import dlt @@ -23,9 +23,10 @@ def database_cursor(): # here we yield each row returned from database separately yield from get_rows(10000) ``` - + can be replaced with: - + + ```py from itertools import islice @@ -37,7 +38,7 @@ def database_cursor_chunked(): print(f"got chunk of length {len(item_slice)}") yield item_slice ``` - + ## Memory/disk management `dlt` buffers data in memory to speed up processing and uses file system to pass data between the **extract** and **normalize** stages. You can control the size of the buffers and size and number of the files to fine-tune memory and cpu usage. Those settings impact parallelism as well, which is explained in the next chapter. @@ -48,7 +49,8 @@ def database_cursor_chunked(): * set extract buffers separately from normalize buffers * set extract buffers for particular source or resource -```toml + +```py # set buffer size for extract and normalize stages [data_writer] buffer_max_items=100 @@ -65,6 +67,7 @@ buffer_max_items=100 [normalize.data_writer] buffer_max_items=100 ``` + The default buffer is actually set to a moderately low value (**5000 items**), so unless you are trying to run `dlt` on IOT sensors or other tiny infrastructures, you might actually want to increase it to speed up @@ -83,7 +86,9 @@ Some file formats (ie. parquet) do not support schema changes when writing a sin ::: Below we set files to rotated after 100.000 items written or when the filesize exceeds 1MiB. -```toml + + +```py # extract and normalize stages [data_writer] file_max_items=100000 @@ -104,13 +109,17 @@ max_file_size=1000000 file_max_items=100000 max_file_size=1000000 ``` + + ### Disabling and enabling file compression Several [text file formats](../dlt-ecosystem/file-formats/) have `gzip` compression enabled by default. If you wish that your load packages have uncompressed files (ie. to debug the content easily), change `data_writer.disable_compression` in config.toml. The entry below will disable the compression of the files processed in `normalize` stage. -```toml + +```py [normalize.data_writer] disable_compression=true ``` + ### Freeing disk space after loading @@ -132,7 +141,7 @@ PROGRESS=log python pipeline_script.py You can extract data concurrently if you write your pipelines to yield callables or awaitables that can be then evaluated in a thread or futures pool respectively. Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool. - + ```py import dlt from time import sleep @@ -156,10 +165,11 @@ def get_details(item_id): # resources are iterators and they are evaluated in the same way in the pipeline.run print(list(list_items(0, 10) | get_details)) ``` - + You can control the number of workers in the thread pool with **workers** setting. The default number of workers is **5**. Below you see a few ways to do that with different granularity -```toml + +```py # for all sources and resources being extracted [extract] worker=1 @@ -172,9 +182,11 @@ workers=2 [sources.zendesk_support.tickets.extract] workers=4 ``` + + The example below does the same but using an async/await and futures pool: - + ```py import asyncio @@ -189,10 +201,11 @@ async def a_get_details(item_id): print(list(list_items(0, 10) | a_get_details)) ``` - + You can control the number of async functions/awaitables being evaluate in parallel by setting **max_parallel_items**. The default number is *20**. Below you see a few ways to do that with different granularity -```toml + +```py # for all sources and resources being extracted [extract] max_parallel_items=10 @@ -205,6 +218,7 @@ max_parallel_items=10 [sources.zendesk_support.tickets.extract] max_parallel_items=10 ``` + :::note **max_parallel_items** applies to thread pools as well. It sets how many items may be queued to be executed and currently executing in a thread pool by the workers. Imagine a situation where you have millions @@ -217,7 +231,8 @@ Generators and iterators are always evaluated in the main thread. If you have a ### Normalize The **normalize** stage uses a process pool to create load package concurrently. Each file created by the **extract** stage is sent to a process pool. **If you have just a single resource with a lot of data, you should enable [extract file rotation](#controlling-intermediary-files-size-and-rotation)**. The number of processes in the pool is controlled with `workers` config value: -```toml + +```py [extract.data_writer] # force extract file rotation if size exceeds 1MiB max_file_size=1000000 @@ -225,6 +240,9 @@ max_file_size=1000000 [normalize] # use 3 worker processes to process 3 files in parallel workers=3 +``` + + ``` :::note The default is to not parallelize normalization and to perform it in the main process. @@ -238,7 +256,8 @@ Normalization is CPU bound and can easily saturate all your cores. Never allow ` The **load** stage uses a thread pool for parallelization. Loading is input/output bound. `dlt` avoids any processing of the content of the load package produced by the normalizer. By default loading happens in 20 threads, each loading a single file. As before, **if you have just a single table with millions of records you should enable [file rotation in the normalizer](#controlling-intermediary-files-size-and-rotation).**. Then the number of parallel load jobs is controlled by the `workers` config setting. -```toml + +```py [normalize.data_writer] # force normalize file rotation if it exceeds 1MiB max_file_size=1000000 @@ -247,7 +266,7 @@ max_file_size=1000000 # have 50 concurrent load jobs workers=50 ``` - + ### Parallel pipeline config example The example below simulates loading of a large database table with 1 000 000 records. The **config.toml** below sets the parallelization as follows: @@ -255,8 +274,12 @@ The example below simulates loading of a large database table with 1 000 000 rec * the normalizer will process the data in 3 processes * we use JSONL to load data to duckdb. We rotate JSONL files each 100 000 items so 10 files will be created. * we use 11 threads to load the data (10 JSON files + state file) -```toml + + +```py # the pipeline name is default source name when loading resources +chess_url="https://api.chess.com/pub/" + [sources.parallel_load.data_writer] file_max_items=100000 @@ -270,8 +293,10 @@ file_max_items=100000 [load] workers=11 ``` + - + + ```py import os import dlt @@ -285,8 +310,7 @@ def read_table(limit): now = pendulum.now().isoformat() yield [{"row": _id, "description": "this is row with id {_id}", "timestamp": now} for _id in item_slice] - -# this prevents the process pool to run the initialization code again +# this prevents process pool to run the initialization code again if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ: pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) pipeline.extract(read_table(1000000)) @@ -300,7 +324,7 @@ if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ: print(pipeline.get_load_package_info(load_id)) print(pipeline.load()) ``` - + ## Resources extraction, `fifo` vs. `round robin` @@ -316,13 +340,15 @@ second resource etc, doing as many rounds as necessary until all resources are f You can change this setting in your `config.toml` as follows: -```toml + +```py [extract] # global setting next_item_mode=round_robin [sources.my_pipeline.extract] # setting for the "my_pipeline" pipeline next_item_mode=fifo ``` + ## Using the built in requests client @@ -373,16 +399,18 @@ All standard HTTP server errors trigger a retry. This includes: ### Customizing retry settings - Many requests settings can be added to the runtime section in your `config.toml`. For example: -```toml + +```py [runtime] request_max_attempts = 10 # Stop after 10 retry attempts instead of 5 request_backoff_factor = 1.5 # Multiplier applied to the exponential delays. Default is 1 request_timeout = 120 # Timeout in seconds request_max_retry_delay = 30 # Cap exponential delay to 30 seconds ``` + + For more control you can create your own instance of `dlt.sources.requests.Client` and use that instead of the global client.