Skip to content

Commit

Permalink
performance md
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Sep 11, 2023
1 parent 873b0df commit c5baaab
Showing 1 changed file with 32 additions and 33 deletions.
65 changes: 32 additions & 33 deletions docs/website/docs/reference/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ keywords: [scaling, parallelism, finetuning]
## Yield pages instead of rows

If you can, yield pages when producing data. This makes some processes more effective by lowering
the necessary function calls (each chunk of data that you yield goes through extract pipeline once so if you yield a chunk of 10.000 items you make significant savings)
the necessary function calls (each chunk of data that you yield goes through the extract pipeline once so if you yield a chunk of 10.000 items you will gain significant savings)
For example:
<!--SNIPSTART performance_chunking -->
```py
Expand Down Expand Up @@ -40,10 +40,10 @@ def database_cursor_chunked():
<!--SNIPEND-->

## Memory/disk management
`dlt` buffers data in memory to speed up processing and uses file system to pass data between extract and normalize stage. You can control the size of the buffers, size and number of the files to fine-tune memory and cpu usage. Those settings impact parallelism as well, which we explain in next chapter.
`dlt` buffers data in memory to speed up processing and uses file system to pass data between the **extract** and **normalize** stages. You can control the size of the buffers and size and number of the files to fine-tune memory and cpu usage. Those settings impact parallelism as well, which is explained in the next chapter.

### Controlling in-memory buffers
`dlt` maintains in-memory buffers when writing intermediary files in **extract** and **normalize** stages. The size of the buffers are controlled by specifying the number of data items held in them. Data is appended to open files when item buffer is full, then the buffer is cleared. You can specify buffer size via environment variables or in `config.toml` with more or less granularity:
`dlt` maintains in-memory buffers when writing intermediary files in the **extract** and **normalize** stages. The size of the buffers are controlled by specifying the number of data items held in them. Data is appended to open files when the item buffer is full, after which the buffer is cleared. You can the specify buffer size via environment variables or in `config.toml` to be more or less granular:
* set all buffers (both extract and normalize)
* set extract buffers separately from normalize buffers
* set extract buffers for particular source or resource
Expand Down Expand Up @@ -71,41 +71,40 @@ on IOT sensors or other tiny infrastructures, you might actually want to increas
processing.

### Controlling intermediary files size and rotation
`dlt` writes data to intermediary files. You can control the file size and a number of created files by setting maximum number of data items in a single file or a maximum single file size. Mind that file size is computed after compression is performed.
* `dlt` uses custom version of [`jsonl` file format](../dlt-ecosystem/file-formats/jsonl.md) between **extract** and **normalize** stages.
* files created between **normalize** and **load** stages are the same files that will be loaded to destination.
`dlt` writes data to intermediary files. You can control the file size and the number of created files by setting the maximum number of data items stored in a single file or the maximum single file size. Keep in mind that the file size is computed after compression was performed.
* `dlt` uses a custom version of [`jsonl` file format](../dlt-ecosystem/file-formats/jsonl.md) between the **extract** and **normalize** stages.
* files created between the **normalize** and **load** stages are the same files that will be loaded to destination.

:::tip
The default setting is to not rotate the files so if you have a resource with millions of records, `dlt` will still create a single intermediary file to normalize and a single file to load. **If you want such data to be normalized and loaded in parallel you must enable file rotation as described below**
:::
:::note
Some file formats (ie. parquet) do not support schema changes when writing a single file and on such event they are automatically rotated
Some file formats (ie. parquet) do not support schema changes when writing a single file and in that case they are automatically rotated when new columns are discovered.
:::

Below we set files to rotated after 100.000 items written or when size exceeds 1MiB.
Below we set files to rotated after 100.000 items written or when the filesize exceeds 1MiB.
```toml
# extract and normalize stages
[data_writer]
file_max_items=100000
max_file_size=1000000

# only in extract stage - for all sources
# only for the extract stage - for all sources
[sources.data_writer]
file_max_items=100000
max_file_size=1000000

# only for a source with name zendesk_support
# only for the extract stage of a source with name zendesk_support
[sources.zendesk_support.data_writer]
file_max_items=100000
max_file_size=1000000

# only normalize stage
# only for the normalize stage
[normalize.data_writer]
file_max_items=100000
max_file_size=1000000
```


### Disabling and enabling file compression
Several [text file formats](../dlt-ecosystem/file-formats/) have `gzip` compression enabled by default. If you wish that your load packages have uncompressed files (ie. to debug the content easily), change `data_writer.disable_compression` in config.toml. The entry below will disable the compression of the files processed in `normalize` stage.
```toml
Expand All @@ -115,10 +114,10 @@ disable_compression=true

### Freeing disk space after loading

Keep in mind load packages are buffered to disk and are left for any troubleshooting, so you can [clear disk space by setting `delete_completed_jobs` option](../running-in-production/running.md#data-left-behind).
Keep in mind load packages are buffered to disk and are left for any troubleshooting, so you can [clear disk space by setting the `delete_completed_jobs` option](../running-in-production/running.md#data-left-behind).

### Observing cpu and memory usage
Please make sure that you have `psutils` package installed (note that Airflow installs it by default). Then you can dump the stats periodically by setting the [progress](../general-usage/pipeline.md#display-the-loading-progress) to `log` in `config.toml`:
Please make sure that you have the `psutils` package installed (note that Airflow installs it by default). Then you can dump the stats periodically by setting the [progress](../general-usage/pipeline.md#display-the-loading-progress) to `log` in `config.toml`:
```toml
progress="log"
```
Expand All @@ -132,7 +131,7 @@ PROGRESS=log python pipeline_script.py
### Extract
You can extract data concurrently if you write your pipelines to yield callables or awaitables that can be then evaluated in a thread or futures pool respectively.

Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps `get_details` function in another callable that will be executed in the thread pool.
Example below simulates a typical situation where a dlt resource is used to fetch a page of items and then details of individual items are fetched separately in the transformer. The `@dlt.defer` decorator wraps the `get_details` function in another callable that will be executed in the thread pool.
<!--SNIPSTART parallel_extract_callables -->
```py
import dlt
Expand All @@ -159,22 +158,22 @@ print(list(list_items(0, 10) | get_details))
```
<!--SNIPEND -->

You can control the number of workers in thread pool with **workers** setting. The default number of workers is **5**. Below you see a few ways to do that with different granularity
You can control the number of workers in the thread pool with **workers** setting. The default number of workers is **5**. Below you see a few ways to do that with different granularity
```toml
# for all sources and resources being extracted
[extract]
worker=1

# for all resources in zendesk_support source
# for all resources in the zendesk_support source
[sources.zendesk_support.extract]
workers=2

# for tickets resource in zendesk_support source
# for the tickets resource in the zendesk_support source
[sources.zendesk_support.tickets.extract]
workers=4
```

Example below does the same but using async/await and futures pool:
The example below does the same but using an async/await and futures pool:
<!--SNIPSTART parallel_extract_awaitables -->
```py
import asyncio
Expand All @@ -198,29 +197,29 @@ You can control the number of async functions/awaitables being evaluate in paral
[extract]
max_parallel_items=10

# for all resources in zendesk_support source
# for all resources in the zendesk_support source
[sources.zendesk_support.extract]
max_parallel_items=10

# for tickets resource in zendesk_support source
# for the tickets resource in the zendesk_support source
[sources.zendesk_support.tickets.extract]
max_parallel_items=10
```

:::note
**max_parallel_items** apply to thread pools as well. It sets how many items may be queued to be executed and currently executing in a thread pool by the workers. Imagine a situation where you have millions
of callables to be evaluated in a thread pool of size 5. This limit will instantiate only the desired amount of them.
**max_parallel_items** applies to thread pools as well. It sets how many items may be queued to be executed and currently executing in a thread pool by the workers. Imagine a situation where you have millions
of callables to be evaluated in a thread pool with a size of 5. This limit will instantiate only the desired amount of workers.
:::

:::caution
Generators and iterators are always evaluated in the main thread. If you have a loop that yields items, instead yield functions or async functions that will create the items when evaluated in the pool.
:::

### Normalize
Normalize stage uses process pool to create load package concurrently. Each file created by **extract** stage is sent to a process pool. **If you have just a single resource with a lot of data, you should enable [extract file rotation](#controlling-intermediary-files-size-and-rotation)**. Number of processes in the pool is controlled with `workers` config value:
The **normalize** stage uses a process pool to create load package concurrently. Each file created by the **extract** stage is sent to a process pool. **If you have just a single resource with a lot of data, you should enable [extract file rotation](#controlling-intermediary-files-size-and-rotation)**. The number of processes in the pool is controlled with `workers` config value:
```toml
[extract.data_writer]
# force extract file rotation if it exceeds 1MiB
# force extract file rotation if size exceeds 1MiB
max_file_size=1000000

[normalize]
Expand All @@ -236,9 +235,9 @@ Normalization is CPU bound and can easily saturate all your cores. Never allow `
:::

### Load
Load stage uses thread pool for parallelization. Loading is input/output bound. `dlt` avoids any processing of the content of the load package produced by the normalizer. By default loading happens in 20 threads, each loading a single file.
The **load** stage uses a thread pool for parallelization. Loading is input/output bound. `dlt` avoids any processing of the content of the load package produced by the normalizer. By default loading happens in 20 threads, each loading a single file.

As before, **if you have just a single table with millions of records you should enable [file rotation in the normalizer](#controlling-intermediary-files-size-and-rotation).**. Then number of parallel load jobs is controlled by `workers` config.
As before, **if you have just a single table with millions of records you should enable [file rotation in the normalizer](#controlling-intermediary-files-size-and-rotation).**. Then the number of parallel load jobs is controlled by the `workers` config setting.
```toml
[normalize.data_writer]
# force normalize file rotation if it exceeds 1MiB
Expand All @@ -251,13 +250,13 @@ workers=50


### Parallel pipeline config example
Example below simulates loading of a large database table with 1 000 000 records. The **config.toml** below sets the parallelization as follows:
The example below simulates loading of a large database table with 1 000 000 records. The **config.toml** below sets the parallelization as follows:
* during extraction, files are rotated each 100 000 items, so there are 10 files with data for the same table
* normalizer will process the data in 3 processes
* the normalizer will process the data in 3 processes
* we use JSONL to load data to duckdb. We rotate JSONL files each 100 000 items so 10 files will be created.
* we use 11 threads to load the data (10 JSON files + state file)
```toml
# pipeline name is default source name when loading resources
# the pipeline name is default source name when loading resources
[sources.parallel_load.data_writer]
file_max_items=100000

Expand Down Expand Up @@ -287,7 +286,7 @@ def read_table(limit):
yield [{"row": _id, "description": "this is row with id {_id}", "timestamp": now} for _id in item_slice]


# this prevents process pool to run the initialization code again
# this prevents the process pool to run the initialization code again
if __name__ == "__main__" or "PYTEST_CURRENT_TEST" in os.environ:
pipeline = dlt.pipeline("parallel_load", destination="duckdb", full_refresh=True)
pipeline.extract(read_table(1000000))
Expand Down Expand Up @@ -322,7 +321,7 @@ You can change this setting in your `config.toml` as follows:
next_item_mode=round_robin

[sources.my_pipeline.extract] # setting for the "my_pipeline" pipeline
next_item_mode=5
next_item_mode=fifo
```

## Using the built in requests client
Expand Down Expand Up @@ -370,7 +369,7 @@ All standard HTTP server errors trigger a retry. This includes:

* Connection and timeout errors

When the remote server is unreachable, connection is unexpectedly dropped or when the request takes longer than the configured `timeout`.
When the remote server is unreachable, the connection is unexpectedly dropped or when the request takes longer than the configured `timeout`.

### Customizing retry settings

Expand Down

0 comments on commit c5baaab

Please sign in to comment.