From c5baaabed407bdd0b941b4470f8349ae19a54ddc Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 11 Sep 2023 13:22:36 +0200 Subject: [PATCH] performance md --- docs/website/docs/reference/performance.md | 65 +++++++++++----------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/docs/website/docs/reference/performance.md b/docs/website/docs/reference/performance.md index ad5c4cf13c..7a089ce7a6 100644 --- a/docs/website/docs/reference/performance.md +++ b/docs/website/docs/reference/performance.md @@ -9,7 +9,7 @@ keywords: [scaling, parallelism, finetuning] ## Yield pages instead of rows If you can, yield pages when producing data. This makes some processes more effective by lowering -the necessary function calls (each chunk of data that you yield goes through extract pipeline once so if you yield a chunk of 10.000 items you make significant savings) +the necessary function calls (each chunk of data that you yield goes through the extract pipeline once so if you yield a chunk of 10.000 items you will gain significant savings) For example: ```py @@ -40,10 +40,10 @@ def database_cursor_chunked(): ## Memory/disk management -`dlt` buffers data in memory to speed up processing and uses file system to pass data between extract and normalize stage. You can control the size of the buffers, size and number of the files to fine-tune memory and cpu usage. Those settings impact parallelism as well, which we explain in next chapter. +`dlt` buffers data in memory to speed up processing and uses file system to pass data between the **extract** and **normalize** stages. You can control the size of the buffers and size and number of the files to fine-tune memory and cpu usage. Those settings impact parallelism as well, which is explained in the next chapter. ### Controlling in-memory buffers -`dlt` maintains in-memory buffers when writing intermediary files in **extract** and **normalize** stages. The size of the buffers are controlled by specifying the number of data items held in them. Data is appended to open files when item buffer is full, then the buffer is cleared. You can specify buffer size via environment variables or in `config.toml` with more or less granularity: +`dlt` maintains in-memory buffers when writing intermediary files in the **extract** and **normalize** stages. The size of the buffers are controlled by specifying the number of data items held in them. Data is appended to open files when the item buffer is full, after which the buffer is cleared. You can the specify buffer size via environment variables or in `config.toml` to be more or less granular: * set all buffers (both extract and normalize) * set extract buffers separately from normalize buffers * set extract buffers for particular source or resource @@ -71,41 +71,40 @@ on IOT sensors or other tiny infrastructures, you might actually want to increas processing. ### Controlling intermediary files size and rotation -`dlt` writes data to intermediary files. You can control the file size and a number of created files by setting maximum number of data items in a single file or a maximum single file size. Mind that file size is computed after compression is performed. -* `dlt` uses custom version of [`jsonl` file format](../dlt-ecosystem/file-formats/jsonl.md) between **extract** and **normalize** stages. -* files created between **normalize** and **load** stages are the same files that will be loaded to destination. +`dlt` writes data to intermediary files. You can control the file size and the number of created files by setting the maximum number of data items stored in a single file or the maximum single file size. Keep in mind that the file size is computed after compression was performed. +* `dlt` uses a custom version of [`jsonl` file format](../dlt-ecosystem/file-formats/jsonl.md) between the **extract** and **normalize** stages. +* files created between the **normalize** and **load** stages are the same files that will be loaded to destination. :::tip The default setting is to not rotate the files so if you have a resource with millions of records, `dlt` will still create a single intermediary file to normalize and a single file to load. **If you want such data to be normalized and loaded in parallel you must enable file rotation as described below** ::: :::note -Some file formats (ie. parquet) do not support schema changes when writing a single file and on such event they are automatically rotated +Some file formats (ie. parquet) do not support schema changes when writing a single file and in that case they are automatically rotated when new columns are discovered. ::: -Below we set files to rotated after 100.000 items written or when size exceeds 1MiB. +Below we set files to rotated after 100.000 items written or when the filesize exceeds 1MiB. ```toml # extract and normalize stages [data_writer] file_max_items=100000 max_file_size=1000000 -# only in extract stage - for all sources +# only for the extract stage - for all sources [sources.data_writer] file_max_items=100000 max_file_size=1000000 -# only for a source with name zendesk_support +# only for the extract stage of a source with name zendesk_support [sources.zendesk_support.data_writer] file_max_items=100000 max_file_size=1000000 -# only normalize stage +# only for the normalize stage [normalize.data_writer] file_max_items=100000 max_file_size=1000000 ``` - ### Disabling and enabling file compression Several [text file formats](../dlt-ecosystem/file-formats/) have `gzip` compression enabled by default. If you wish that your load packages have uncompressed files (ie. to debug the content easily), change `data_writer.disable_compression` in config.toml. The entry below will disable the compression of the files processed in `normalize` stage. ```toml @@ -115,10 +114,10 @@ disable_compression=true ### Freeing disk space after loading -Keep in mind load packages are buffered to disk and are left for any troubleshooting, so you can [clear disk space by setting `delete_completed_jobs` option](../running-in-production/running.md#data-left-behind). +Keep in mind load packages are buffered to disk and are left for any troubleshooting, so you can [clear disk space by setting the `delete_completed_jobs` option](../running-in-production/running.md#data-left-behind). ### Observing cpu and memory usage -Please make sure that you have `psutils` package installed (note that Airflow installs it by default). Then you can dump the stats periodically by setting the [progress](../general-usage/pipeline.md#display-the-loading-progress) to `log` in `config.toml`: +Please make sure that you have the `psutils` package installed (note that Airflow installs it by default). Then you can dump the stats periodically by setting the [progress](../general-usage/pipeline.md#display-the-loading-progress) to `log` in `config.toml`: ```toml progress="log" ``` @@ -132,7 +131,7 @@ PROGRESS=log python pipeline_script.py ### Extract You can extract data concurrently if you write your pipelines to yield callables or awaitables that can be then evaluated in a thread or futures pool respectively. -Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps `get_details` function in another callable that will be executed in the thread pool. +Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool. ```py import dlt @@ -159,22 +158,22 @@ print(list(list_items(0, 10) | get_details)) ``` -You can control the number of workers in thread pool with **workers** setting. The default number of workers is **5**. Below you see a few ways to do that with different granularity +You can control the number of workers in the thread pool with **workers** setting. The default number of workers is **5**. Below you see a few ways to do that with different granularity ```toml # for all sources and resources being extracted [extract] worker=1 -# for all resources in zendesk_support source +# for all resources in the zendesk_support source [sources.zendesk_support.extract] workers=2 -# for tickets resource in zendesk_support source +# for the tickets resource in the zendesk_support source [sources.zendesk_support.tickets.extract] workers=4 ``` -Example below does the same but using async/await and futures pool: +The example below does the same but using an async/await and futures pool: ```py import asyncio @@ -198,18 +197,18 @@ You can control the number of async functions/awaitables being evaluate in paral [extract] max_parallel_items=10 -# for all resources in zendesk_support source +# for all resources in the zendesk_support source [sources.zendesk_support.extract] max_parallel_items=10 -# for tickets resource in zendesk_support source +# for the tickets resource in the zendesk_support source [sources.zendesk_support.tickets.extract] max_parallel_items=10 ``` :::note -**max_parallel_items** apply to thread pools as well. It sets how many items may be queued to be executed and currently executing in a thread pool by the workers. Imagine a situation where you have millions -of callables to be evaluated in a thread pool of size 5. This limit will instantiate only the desired amount of them. +**max_parallel_items** applies to thread pools as well. It sets how many items may be queued to be executed and currently executing in a thread pool by the workers. Imagine a situation where you have millions +of callables to be evaluated in a thread pool with a size of 5. This limit will instantiate only the desired amount of workers. ::: :::caution @@ -217,10 +216,10 @@ Generators and iterators are always evaluated in the main thread. If you have a ::: ### Normalize -Normalize stage uses process pool to create load package concurrently. Each file created by **extract** stage is sent to a process pool. **If you have just a single resource with a lot of data, you should enable [extract file rotation](#controlling-intermediary-files-size-and-rotation)**. Number of processes in the pool is controlled with `workers` config value: +The **normalize** stage uses a process pool to create load package concurrently. Each file created by the **extract** stage is sent to a process pool. **If you have just a single resource with a lot of data, you should enable [extract file rotation](#controlling-intermediary-files-size-and-rotation)**. The number of processes in the pool is controlled with `workers` config value: ```toml [extract.data_writer] -# force extract file rotation if it exceeds 1MiB +# force extract file rotation if size exceeds 1MiB max_file_size=1000000 [normalize] @@ -236,9 +235,9 @@ Normalization is CPU bound and can easily saturate all your cores. Never allow ` ::: ### Load -Load stage uses thread pool for parallelization. Loading is input/output bound. `dlt` avoids any processing of the content of the load package produced by the normalizer. By default loading happens in 20 threads, each loading a single file. +The **load** stage uses a thread pool for parallelization. Loading is input/output bound. `dlt` avoids any processing of the content of the load package produced by the normalizer. By default loading happens in 20 threads, each loading a single file. -As before, **if you have just a single table with millions of records you should enable [file rotation in the normalizer](#controlling-intermediary-files-size-and-rotation).**. Then number of parallel load jobs is controlled by `workers` config. +As before, **if you have just a single table with millions of records you should enable [file rotation in the normalizer](#controlling-intermediary-files-size-and-rotation).**. Then the number of parallel load jobs is controlled by the `workers` config setting. ```toml [normalize.data_writer] # force normalize file rotation if it exceeds 1MiB @@ -251,13 +250,13 @@ workers=50 ### Parallel pipeline config example -Example below simulates loading of a large database table with 1 000 000 records. The **config.toml** below sets the parallelization as follows: +The example below simulates loading of a large database table with 1 000 000 records. The **config.toml** below sets the parallelization as follows: * during extraction, files are rotated each 100 000 items, so there are 10 files with data for the same table -* normalizer will process the data in 3 processes +* the normalizer will process the data in 3 processes * we use JSONL to load data to duckdb. We rotate JSONL files each 100 000 items so 10 files will be created. * we use 11 threads to load the data (10 JSON files + state file) ```toml -# pipeline name is default source name when loading resources +# the pipeline name is default source name when loading resources [sources.parallel_load.data_writer] file_max_items=100000 @@ -287,7 +286,7 @@ def read_table(limit): yield [{"row": _id, "description": "this is row with id {_id}", "timestamp": now} for _id in item_slice] -# this prevents process pool to run the initialization code again +# this prevents the process pool to run the initialization code again if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ: pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True) pipeline.extract(read_table(1000000)) @@ -322,7 +321,7 @@ You can change this setting in your `config.toml` as follows: next_item_mode=round_robin [sources.my_pipeline.extract] # setting for the "my_pipeline" pipeline -next_item_mode=5 +next_item_mode=fifo ``` ## Using the built in requests client @@ -370,7 +369,7 @@ All standard HTTP server errors trigger a retry. This includes: * Connection and timeout errors - When the remote server is unreachable, connection is unexpectedly dropped or when the request takes longer than the configured `timeout`. + When the remote server is unreachable, the connection is unexpectedly dropped or when the request takes longer than the configured `timeout`. ### Customizing retry settings