From 66edcd10966e7043cdcb59997e5c9dec0bc60121 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Thu, 5 Dec 2024 16:55:51 -0500 Subject: [PATCH] Docs for incremental range args --- .../verified-sources/sql_database/advanced.md | 49 +++++++++++++++++-- .../docs/general-usage/incremental-loading.md | 5 +- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md index 6ff3a267d2..9014ef3b9b 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md @@ -16,7 +16,7 @@ Efficient data management often requires loading only new or updated data from y Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing ID) to load only data newer than a specified initial value, enhancing efficiency by reducing processing time and resource use. Read [here](../../../walkthroughs/sql-incremental-configuration) for more details on incremental loading with `dlt`. -#### How to configure +### How to configure 1. **Choose a cursor column**: Identify a column in your SQL table that can serve as a reliable indicator of new or updated rows. Common choices include timestamp columns or auto-incrementing IDs. 1. **Set an initial value**: Choose a starting value for the cursor to begin loading data. This could be a specific timestamp or ID from which you wish to start loading data. 1. **Deduplication**: When using incremental loading, the system automatically handles the deduplication of rows based on the primary key (if available) or row hash for tables without a primary key. @@ -27,7 +27,7 @@ Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing I If your cursor column name contains special characters (e.g., `$`) you need to escape it when passing it to the `incremental` function. For example, if your cursor column is `example_$column`, you should pass it as `"'example_$column'"` or `'"example_$column"'` to the `incremental` function: `incremental("'example_$column'", initial_value=...)`. ::: -#### Examples +### Examples 1. **Incremental loading with the resource `sql_table`**. @@ -52,7 +52,7 @@ If your cursor column name contains special characters (e.g., `$`) you need to e print(extract_info) ``` - Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). + Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater or equal to the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). In subsequent runs, it is the latest value of `last_modified` that `dlt` stores in [state](../../../general-usage/state). 2. **Incremental loading with the source `sql_database`**. @@ -78,6 +78,49 @@ If your cursor column name contains special characters (e.g., `$`) you need to e * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. ::: +### Inclusive and exclusive filtering + +By default the incremental filtering is inclusive on the start value side so that +rows with cursor equal to the last run's cursor are fetched again from the database. + +The SQL query generated looks something like this (assuming `last_value_func` is `max`): + +```sql +SELECT * FROM family +WHERE last_modified >= :start_value +ORDER BY last_modified ASC +``` + +That means some rows overlapping with the previous load are fetched from the database. +Duplicates are then filtered out by dlt using either the primary key or a hash of the row's contents. + +This ensures there are no gaps in the extracted sequence. But it does come with some performance overhead, +both due to the deduplication processing and the cost of fetching redundant records from the database. + +This is not always needed. If you know that your data does not contain overlapping cursor values then you +can optimize extraction by passing `start_range="open"` to incremental. + +This both disables the deduplication process and changes the operator used in the SQL `WHERE` clause from `>=` (greater-or-equal) to `>` (greater than), so that no overlapping rows are fetched. + +E.g. + +```py +table = sql_table( + table='family', + incremental=dlt.sources.incremental( + 'last_modified', # Cursor column name + initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0), # Initial cursor value + start_range="open", # exclude the start value + ) +) +``` + +It's a good option if: + +* The cursor is an auto incrementing ID +* The cursor is a high precision timestamp and two records are never created at exactly the same time +* Your pipeline runs are timed in such a way that new data is not generated during the load + ## Parallelized extraction You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. To enable this, declare your sources/resources as follows: diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 3f452f0d16..98e9c4165f 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -693,7 +693,7 @@ august_issues = repo_issues( ... ``` -Note that dlt's incremental filtering considers the ranges half-closed. `initial_value` is inclusive, `end_value` is exclusive, so chaining ranges like above works without overlaps. +Note that dlt's incremental filtering considers the ranges half-closed. `initial_value` is inclusive, `end_value` is exclusive, so chaining ranges like above works without overlaps. This behaviour can be changed with the `start_range` (default `"closed"`) and `end_range` (default `"open"`) arguments. ### Declare row order to not request unnecessary data @@ -793,6 +793,9 @@ def some_data(last_timestamp=dlt.sources.incremental("item.ts", primary_key=())) yield {"delta": i, "item": {"ts": pendulum.now().timestamp()}} ``` +This deduplication process is always enabled when `start_range` is set to `"closed"` (default). +When you pass `start_range="open"` no deduplication is done as it is not needed as rows with the previous cursor value are excluded. This can be a useful optimization to avoid the performance overhead of deduplication if the cursor field is guaranteed to be unique. + ### Using `dlt.sources.incremental` with dynamically created resources When resources are [created dynamically](source.md#create-resources-dynamically), it is possible to use the `dlt.sources.incremental` definition as well.