From 2a749a50207f806cee5b4fd0481fd3bde613cf6a Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Tue, 10 Dec 2024 17:35:22 -0500 Subject: [PATCH] Add open/closed range arguments for incremental (#1991) * Add open/closed range arguments for incremental * Docs for incremental range args * Docstring * Typo * Ensure deduplication is disabled when range_start=='open' * Cache transformer settings --- dlt/common/incremental/typing.py | 4 + dlt/extract/incremental/__init__.py | 60 +++-- dlt/extract/incremental/transform.py | 75 ++++-- dlt/sources/sql_database/helpers.py | 12 +- .../verified-sources/sql_database/advanced.md | 49 +++- .../docs/general-usage/incremental-loading.md | 5 +- tests/extract/test_incremental.py | 111 +++++++- .../load/sources/sql_database/test_helpers.py | 237 ++++++++++++------ .../sql_database/test_sql_database_source.py | 17 +- 9 files changed, 434 insertions(+), 136 deletions(-) diff --git a/dlt/common/incremental/typing.py b/dlt/common/incremental/typing.py index 460e2f234b..2ca981bff0 100644 --- a/dlt/common/incremental/typing.py +++ b/dlt/common/incremental/typing.py @@ -8,6 +8,8 @@ LastValueFunc = Callable[[Sequence[TCursorValue]], Any] OnCursorValueMissing = Literal["raise", "include", "exclude"] +TIncrementalRange = Literal["open", "closed"] + class IncrementalColumnState(TypedDict): initial_value: Optional[Any] @@ -26,3 +28,5 @@ class IncrementalArgs(TypedDict, total=False): allow_external_schedulers: Optional[bool] lag: Optional[Union[float, int]] on_cursor_value_missing: Optional[OnCursorValueMissing] + range_start: Optional[TIncrementalRange] + range_end: Optional[TIncrementalRange] diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 28d33bb71f..5e7bae49c6 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -42,6 +42,7 @@ LastValueFunc, OnCursorValueMissing, IncrementalArgs, + TIncrementalRange, ) from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform from dlt.extract.incremental.transform import ( @@ -104,6 +105,11 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded on_cursor_value_missing: Specify what happens when the cursor_path does not exist in a record or a record has `None` at the cursor_path: raise, include, exclude lag: Optional value used to define a lag or attribution window. For datetime cursors, this is interpreted as seconds. For other types, it uses the + or - operator depending on the last_value_func. + range_start: Decide whether the incremental filtering range is `open` or `closed` on the start value side. Default is `closed`. + Setting this to `open` means that items with the same cursor value as the last value from the previous run (or `initial_value`) are excluded from the result. + The `open` range disables deduplication logic so it can serve as an optimization when you know cursors don't overlap between pipeline runs. + range_end: Decide whether the incremental filtering range is `open` or `closed` on the end value side. Default is `open` (exact `end_value` is excluded). + Setting this to `closed` means that items with the exact same cursor value as the `end_value` are included in the result. """ # this is config/dataclass so declare members @@ -116,6 +122,8 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa on_cursor_value_missing: OnCursorValueMissing = "raise" lag: Optional[float] = None duplicate_cursor_warning_threshold: ClassVar[int] = 200 + range_start: TIncrementalRange = "closed" + range_end: TIncrementalRange = "open" # incremental acting as empty EMPTY: ClassVar["Incremental[Any]"] = None @@ -132,6 +140,8 @@ def __init__( allow_external_schedulers: bool = False, on_cursor_value_missing: OnCursorValueMissing = "raise", lag: Optional[float] = None, + range_start: TIncrementalRange = "closed", + range_end: TIncrementalRange = "open", ) -> None: # make sure that path is valid if cursor_path: @@ -174,9 +184,11 @@ def __init__( self.start_out_of_range: bool = False """Becomes true on the first item that is out of range of `start_value`. I.e. when using `max` this is a value that is lower than `start_value`""" - self._transformers: Dict[str, IncrementalTransform] = {} + self._transformers: Dict[Type[IncrementalTransform], IncrementalTransform] = {} self._bound_pipe: SupportsPipe = None """Bound pipe""" + self.range_start = range_start + self.range_end = range_end @property def primary_key(self) -> Optional[TTableHintTemplate[TColumnNames]]: @@ -190,22 +202,6 @@ def primary_key(self, value: str) -> None: for transform in self._transformers.values(): transform.primary_key = value - def _make_transforms(self) -> None: - types = [("arrow", ArrowIncremental), ("json", JsonIncremental)] - for dt, kls in types: - self._transformers[dt] = kls( - self.resource_name, - self.cursor_path, - self.initial_value, - self.start_value, - self.end_value, - self.last_value_func, - self._primary_key, - set(self._cached_state["unique_hashes"]), - self.on_cursor_value_missing, - self.lag, - ) - @classmethod def from_existing_state( cls, resource_name: str, cursor_path: str @@ -489,7 +485,8 @@ def bind(self, pipe: SupportsPipe) -> "Incremental[TCursorValue]": ) # cache state self._cached_state = self.get_state() - self._make_transforms() + # Clear transforms so we get new instances + self._transformers.clear() return self def can_close(self) -> bool: @@ -520,15 +517,34 @@ def __str__(self) -> str: f" {self.last_value_func}" ) + def _make_or_get_transformer(self, cls: Type[IncrementalTransform]) -> IncrementalTransform: + if transformer := self._transformers.get(cls): + return transformer + transformer = self._transformers[cls] = cls( + self.resource_name, + self.cursor_path, + self.initial_value, + self.start_value, + self.end_value, + self.last_value_func, + self._primary_key, + set(self._cached_state["unique_hashes"]), + self.on_cursor_value_missing, + self.lag, + self.range_start, + self.range_end, + ) + return transformer + def _get_transformer(self, items: TDataItems) -> IncrementalTransform: # Assume list is all of the same type for item in items if isinstance(items, list) else [items]: if is_arrow_item(item): - return self._transformers["arrow"] + return self._make_or_get_transformer(ArrowIncremental) elif pandas is not None and isinstance(item, pandas.DataFrame): - return self._transformers["arrow"] - return self._transformers["json"] - return self._transformers["json"] + return self._make_or_get_transformer(ArrowIncremental) + return self._make_or_get_transformer(JsonIncremental) + return self._make_or_get_transformer(JsonIncremental) def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]: if rows is None: diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 22b1194b51..1d213e26c2 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -13,7 +13,12 @@ IncrementalPrimaryKeyMissing, IncrementalCursorPathHasValueNone, ) -from dlt.common.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing +from dlt.common.incremental.typing import ( + TCursorValue, + LastValueFunc, + OnCursorValueMissing, + TIncrementalRange, +) from dlt.extract.utils import resolve_column_value from dlt.extract.items import TTableHintTemplate @@ -57,6 +62,8 @@ def __init__( unique_hashes: Set[str], on_cursor_value_missing: OnCursorValueMissing = "raise", lag: Optional[float] = None, + range_start: TIncrementalRange = "closed", + range_end: TIncrementalRange = "open", ) -> None: self.resource_name = resource_name self.cursor_path = cursor_path @@ -71,6 +78,9 @@ def __init__( self.start_unique_hashes = set(unique_hashes) self.on_cursor_value_missing = on_cursor_value_missing self.lag = lag + self.range_start = range_start + self.range_end = range_end + # compile jsonpath self._compiled_cursor_path = compile_path(cursor_path) # for simple column name we'll fallback to search in dict @@ -107,6 +117,8 @@ def __call__( def deduplication_disabled(self) -> bool: """Skip deduplication when length of the key is 0 or if lag is applied.""" # disable deduplication if end value is set - state is not saved + if self.range_start == "open": + return True if self.end_value is not None: return True # disable deduplication if lag is applied - destination must deduplicate ranges @@ -191,10 +203,10 @@ def __call__( # Filter end value ranges exclusively, so in case of "max" function we remove values >= end_value if self.end_value is not None: try: - if ( - last_value_func((row_value, self.end_value)) != self.end_value - or last_value_func((row_value,)) == self.end_value - ): + if last_value_func((row_value, self.end_value)) != self.end_value: + return None, False, True + + if self.range_end == "open" and last_value_func((row_value,)) == self.end_value: return None, False, True except Exception as ex: raise IncrementalCursorInvalidCoercion( @@ -221,6 +233,9 @@ def __call__( ) from ex # new_value is "less" or equal to last_value (the actual max) if last_value == new_value: + if self.range_start == "open": + # We only want greater than last_value + return None, False, False # use func to compute row_value into last_value compatible processed_row_value = last_value_func((row_value,)) # skip the record that is not a start_value or new_value: that record was already processed @@ -258,6 +273,31 @@ def __call__( class ArrowIncremental(IncrementalTransform): _dlt_index = "_dlt_index" + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + if self.last_value_func is max: + self.compute = pa.compute.max + self.end_compare = ( + pa.compute.less if self.range_end == "open" else pa.compute.less_equal + ) + self.last_value_compare = ( + pa.compute.greater_equal if self.range_start == "closed" else pa.compute.greater + ) + self.new_value_compare = pa.compute.greater + elif self.last_value_func is min: + self.compute = pa.compute.min + self.end_compare = ( + pa.compute.greater if self.range_end == "open" else pa.compute.greater_equal + ) + self.last_value_compare = ( + pa.compute.less_equal if self.range_start == "closed" else pa.compute.less + ) + self.new_value_compare = pa.compute.less + else: + raise NotImplementedError( + "Only min or max last_value_func is supported for arrow tables" + ) + def compute_unique_values(self, item: "TAnyArrowItem", unique_columns: List[str]) -> List[str]: if not unique_columns: return [] @@ -312,28 +352,13 @@ def __call__( if not tbl: # row is None or empty arrow table return tbl, start_out_of_range, end_out_of_range - if self.last_value_func is max: - compute = pa.compute.max - end_compare = pa.compute.less - last_value_compare = pa.compute.greater_equal - new_value_compare = pa.compute.greater - elif self.last_value_func is min: - compute = pa.compute.min - end_compare = pa.compute.greater - last_value_compare = pa.compute.less_equal - new_value_compare = pa.compute.less - else: - raise NotImplementedError( - "Only min or max last_value_func is supported for arrow tables" - ) - # TODO: Json path support. For now assume the cursor_path is a column name cursor_path = self.cursor_path # The new max/min value try: # NOTE: datetimes are always pendulum in UTC - row_value = from_arrow_scalar(compute(tbl[cursor_path])) + row_value = from_arrow_scalar(self.compute(tbl[cursor_path])) cursor_data_type = tbl.schema.field(cursor_path).type row_value_scalar = to_arrow_scalar(row_value, cursor_data_type) except KeyError as e: @@ -364,10 +389,10 @@ def __call__( cursor_data_type, str(ex), ) from ex - tbl = tbl.filter(end_compare(tbl[cursor_path], end_value_scalar)) + tbl = tbl.filter(self.end_compare(tbl[cursor_path], end_value_scalar)) # Is max row value higher than end value? # NOTE: pyarrow bool *always* evaluates to python True. `as_py()` is necessary - end_out_of_range = not end_compare(row_value_scalar, end_value_scalar).as_py() + end_out_of_range = not self.end_compare(row_value_scalar, end_value_scalar).as_py() if self.start_value is not None: try: @@ -383,7 +408,7 @@ def __call__( str(ex), ) from ex # Remove rows lower or equal than the last start value - keep_filter = last_value_compare(tbl[cursor_path], start_value_scalar) + keep_filter = self.last_value_compare(tbl[cursor_path], start_value_scalar) start_out_of_range = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py()) tbl = tbl.filter(keep_filter) if not self.deduplication_disabled: @@ -407,7 +432,7 @@ def __call__( if ( self.last_value is None - or new_value_compare( + or self.new_value_compare( row_value_scalar, to_arrow_scalar(self.last_value, cursor_data_type) ).as_py() ): # Last value has changed diff --git a/dlt/sources/sql_database/helpers.py b/dlt/sources/sql_database/helpers.py index a8be2a6427..ee38c7dd98 100644 --- a/dlt/sources/sql_database/helpers.py +++ b/dlt/sources/sql_database/helpers.py @@ -94,12 +94,16 @@ def __init__( self.end_value = incremental.end_value self.row_order: TSortOrder = self.incremental.row_order self.on_cursor_value_missing = self.incremental.on_cursor_value_missing + self.range_start = self.incremental.range_start + self.range_end = self.incremental.range_end else: self.cursor_column = None self.last_value = None self.end_value = None self.row_order = None self.on_cursor_value_missing = None + self.range_start = None + self.range_end = None def _make_query(self) -> SelectAny: table = self.table @@ -110,11 +114,11 @@ def _make_query(self) -> SelectAny: # generate where if last_value_func is max: # Query ordered and filtered according to last_value function - filter_op = operator.ge - filter_op_end = operator.lt + filter_op = operator.ge if self.range_start == "closed" else operator.gt + filter_op_end = operator.lt if self.range_end == "open" else operator.le elif last_value_func is min: - filter_op = operator.le - filter_op_end = operator.gt + filter_op = operator.le if self.range_start == "closed" else operator.lt + filter_op_end = operator.gt if self.range_end == "open" else operator.ge else: # Custom last_value, load everything and let incremental handle filtering return query # type: ignore[no-any-return] diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md index 6ff3a267d2..c532f6d357 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/sql_database/advanced.md @@ -16,7 +16,7 @@ Efficient data management often requires loading only new or updated data from y Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing ID) to load only data newer than a specified initial value, enhancing efficiency by reducing processing time and resource use. Read [here](../../../walkthroughs/sql-incremental-configuration) for more details on incremental loading with `dlt`. -#### How to configure +### How to configure 1. **Choose a cursor column**: Identify a column in your SQL table that can serve as a reliable indicator of new or updated rows. Common choices include timestamp columns or auto-incrementing IDs. 1. **Set an initial value**: Choose a starting value for the cursor to begin loading data. This could be a specific timestamp or ID from which you wish to start loading data. 1. **Deduplication**: When using incremental loading, the system automatically handles the deduplication of rows based on the primary key (if available) or row hash for tables without a primary key. @@ -27,7 +27,7 @@ Incremental loading uses a cursor column (e.g., timestamp or auto-incrementing I If your cursor column name contains special characters (e.g., `$`) you need to escape it when passing it to the `incremental` function. For example, if your cursor column is `example_$column`, you should pass it as `"'example_$column'"` or `'"example_$column"'` to the `incremental` function: `incremental("'example_$column'", initial_value=...)`. ::: -#### Examples +### Examples 1. **Incremental loading with the resource `sql_table`**. @@ -52,7 +52,7 @@ If your cursor column name contains special characters (e.g., `$`) you need to e print(extract_info) ``` - Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater than the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). + Behind the scene, the loader generates a SQL query filtering rows with `last_modified` values greater or equal to the incremental value. In the first run, this is the initial value (midnight (00:00:00) January 1, 2024). In subsequent runs, it is the latest value of `last_modified` that `dlt` stores in [state](../../../general-usage/state). 2. **Incremental loading with the source `sql_database`**. @@ -78,6 +78,49 @@ If your cursor column name contains special characters (e.g., `$`) you need to e * `apply_hints` is a powerful method that enables schema modifications after resource creation, like adjusting write disposition and primary keys. You can choose from various tables and use `apply_hints` multiple times to create pipelines with merged, appended, or replaced resources. ::: +### Inclusive and exclusive filtering + +By default the incremental filtering is inclusive on the start value side so that +rows with cursor equal to the last run's cursor are fetched again from the database. + +The SQL query generated looks something like this (assuming `last_value_func` is `max`): + +```sql +SELECT * FROM family +WHERE last_modified >= :start_value +ORDER BY last_modified ASC +``` + +That means some rows overlapping with the previous load are fetched from the database. +Duplicates are then filtered out by dlt using either the primary key or a hash of the row's contents. + +This ensures there are no gaps in the extracted sequence. But it does come with some performance overhead, +both due to the deduplication processing and the cost of fetching redundant records from the database. + +This is not always needed. If you know that your data does not contain overlapping cursor values then you +can optimize extraction by passing `range_start="open"` to incremental. + +This both disables the deduplication process and changes the operator used in the SQL `WHERE` clause from `>=` (greater-or-equal) to `>` (greater than), so that no overlapping rows are fetched. + +E.g. + +```py +table = sql_table( + table='family', + incremental=dlt.sources.incremental( + 'last_modified', # Cursor column name + initial_value=pendulum.DateTime(2024, 1, 1, 0, 0, 0), # Initial cursor value + range_start="open", # exclude the start value + ) +) +``` + +It's a good option if: + +* The cursor is an auto incrementing ID +* The cursor is a high precision timestamp and two records are never created at exactly the same time +* Your pipeline runs are timed in such a way that new data is not generated during the load + ## Parallelized extraction You can extract each table in a separate thread (no multiprocessing at this point). This will decrease loading time if your queries take time to execute or your network latency/speed is low. To enable this, declare your sources/resources as follows: diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 3f452f0d16..5008795ed4 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -693,7 +693,7 @@ august_issues = repo_issues( ... ``` -Note that dlt's incremental filtering considers the ranges half-closed. `initial_value` is inclusive, `end_value` is exclusive, so chaining ranges like above works without overlaps. +Note that dlt's incremental filtering considers the ranges half-closed. `initial_value` is inclusive, `end_value` is exclusive, so chaining ranges like above works without overlaps. This behaviour can be changed with the `range_start` (default `"closed"`) and `range_end` (default `"open"`) arguments. ### Declare row order to not request unnecessary data @@ -793,6 +793,9 @@ def some_data(last_timestamp=dlt.sources.incremental("item.ts", primary_key=())) yield {"delta": i, "item": {"ts": pendulum.now().timestamp()}} ``` +This deduplication process is always enabled when `range_start` is set to `"closed"` (default). +When you pass `range_start="open"` no deduplication is done as it is not needed as rows with the previous cursor value are excluded. This can be a useful optimization to avoid the performance overhead of deduplication if the cursor field is guaranteed to be unique. + ### Using `dlt.sources.incremental` with dynamically created resources When resources are [created dynamically](source.md#create-resources-dynamically), it is possible to use the `dlt.sources.incremental` definition as well. diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 725872b621..3ebc9d1201 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -5,7 +5,7 @@ from datetime import datetime, date # noqa: I251 from itertools import chain, count from time import sleep -from typing import Any, Optional, Literal, Sequence, Dict +from typing import Any, Optional, Literal, Sequence, Dict, Iterable from unittest import mock import duckdb @@ -1522,6 +1522,7 @@ def some_data(last_timestamp=dlt.sources.incremental("ts", primary_key=())): @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_apply_hints_incremental(item_type: TestDataItemFormat) -> None: + os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately p = dlt.pipeline(pipeline_name=uniq_id(), destination="dummy") data = [{"created_at": 1}, {"created_at": 2}, {"created_at": 3}] source_items = data_to_item_format(item_type, data) @@ -3851,3 +3852,111 @@ def some_data(): for col in table_schema["columns"].values(): assert "incremental" not in col + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +@pytest.mark.parametrize("last_value_func", [min, max]) +def test_start_range_open(item_type: TestDataItemFormat, last_value_func: Any) -> None: + data_range: Iterable[int] = range(1, 12) + if last_value_func == max: + initial_value = 5 + # Only items higher than inital extracted + expected_items = list(range(6, 12)) + order_dir = "ASC" + elif last_value_func == min: + data_range = reversed(data_range) # type: ignore[call-overload] + initial_value = 5 + # Only items lower than inital extracted + expected_items = list(reversed(range(1, 5))) + order_dir = "DESC" + + @dlt.resource + def some_data( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", + initial_value=initial_value, + range_start="open", + last_value_func=last_value_func, + ), + ) -> Any: + data = [{"updated_at": i} for i in data_range] + yield data_to_item_format(item_type, data) + + pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination="duckdb") + pipeline.run(some_data()) + + with pipeline.sql_client() as client: + items = [ + row[0] + for row in client.execute_sql( + f"SELECT updated_at FROM some_data ORDER BY updated_at {order_dir}" + ) + ] + + assert items == expected_items + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_start_range_open_no_deduplication(item_type: TestDataItemFormat) -> None: + @dlt.source + def dummy(): + @dlt.resource + def some_data( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", + range_start="open", + ) + ): + yield [{"updated_at": i} for i in range(3)] + + yield some_data + + pipeline = dlt.pipeline(pipeline_name=uniq_id()) + pipeline.extract(dummy()) + + state = pipeline.state["sources"]["dummy"]["resources"]["some_data"]["incremental"][ + "updated_at" + ] + + # No unique values should be computed + assert state["unique_hashes"] == [] + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +@pytest.mark.parametrize("last_value_func", [min, max]) +def test_end_range_closed(item_type: TestDataItemFormat, last_value_func: Any) -> None: + values = [5, 10] + expected_items = list(range(5, 11)) + if last_value_func == max: + order_dir = "ASC" + elif last_value_func == min: + values = list(reversed(values)) + expected_items = list(reversed(expected_items)) + order_dir = "DESC" + + @dlt.resource + def some_data( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", + initial_value=values[0], + end_value=values[1], + range_end="closed", + last_value_func=last_value_func, + ), + ) -> Any: + data = [{"updated_at": i} for i in range(1, 12)] + yield data_to_item_format(item_type, data) + + pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination="duckdb") + pipeline.run(some_data()) + + with pipeline.sql_client() as client: + items = [ + row[0] + for row in client.execute_sql( + f"SELECT updated_at FROM some_data ORDER BY updated_at {order_dir}" + ) + ] + + # Includes values 5-10 inclusive + assert items == expected_items diff --git a/tests/load/sources/sql_database/test_helpers.py b/tests/load/sources/sql_database/test_helpers.py index def5430146..43da9c955f 100644 --- a/tests/load/sources/sql_database/test_helpers.py +++ b/tests/load/sources/sql_database/test_helpers.py @@ -1,3 +1,6 @@ +from typing import Callable, Any, TYPE_CHECKING +from dataclasses import dataclass + import pytest import dlt @@ -14,6 +17,18 @@ pytest.skip("Tests require sql alchemy", allow_module_level=True) +@dataclass +class MockIncremental: + last_value: Any + last_value_func: Callable[[Any], Any] + cursor_path: str + row_order: str = None + end_value: Any = None + on_cursor_value_missing: str = "raise" + range_start: str = "closed" + range_end: str = "open" + + @pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow", "pandas", "connectorx"]) def test_cursor_or_unique_column_not_in_table( sql_source_db: SQLAlchemySourceDB, backend: TableBackend @@ -36,13 +51,12 @@ def test_make_query_incremental_max( ) -> None: """Verify query is generated according to incremental settings""" - class MockIncremental: - last_value = dlt.common.pendulum.now() - last_value_func = max - cursor_path = "created_at" - row_order = "asc" - end_value = None - on_cursor_value_missing = "raise" + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=max, + cursor_path="created_at", + row_order="asc", + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -50,14 +64,14 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() expected = ( table.select() .order_by(table.c.created_at.asc()) - .where(table.c.created_at >= MockIncremental.last_value) + .where(table.c.created_at >= incremental.last_value) ) assert query.compare(expected) @@ -67,13 +81,14 @@ class MockIncremental: def test_make_query_incremental_min( sql_source_db: SQLAlchemySourceDB, backend: TableBackend ) -> None: - class MockIncremental: - last_value = dlt.common.pendulum.now() - last_value_func = min - cursor_path = "created_at" - row_order = "desc" - end_value = None - on_cursor_value_missing = "raise" + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=min, + cursor_path="created_at", + row_order="desc", + end_value=None, + on_cursor_value_missing="raise", + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -81,14 +96,14 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() expected = ( table.select() .order_by(table.c.created_at.asc()) # `min` func swaps order - .where(table.c.created_at <= MockIncremental.last_value) + .where(table.c.created_at <= incremental.last_value) ) assert query.compare(expected) @@ -103,13 +118,14 @@ def test_make_query_incremental_on_cursor_value_missing_set( with_end_value: bool, cursor_value_missing: str, ) -> None: - class MockIncremental: - last_value = dlt.common.pendulum.now() - last_value_func = max - cursor_path = "created_at" - row_order = "asc" - end_value = None if not with_end_value else dlt.common.pendulum.now().add(hours=1) - on_cursor_value_missing = cursor_value_missing + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=max, + cursor_path="created_at", + row_order="asc", + end_value=None if not with_end_value else dlt.common.pendulum.now().add(hours=1), + on_cursor_value_missing=cursor_value_missing, + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -117,7 +133,7 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() @@ -131,14 +147,14 @@ class MockIncremental: if with_end_value: where_clause = operator( sa.and_( - table.c.created_at >= MockIncremental.last_value, - table.c.created_at < MockIncremental.end_value, + table.c.created_at >= incremental.last_value, + table.c.created_at < incremental.end_value, ), missing_cond, ) else: where_clause = operator( - table.c.created_at >= MockIncremental.last_value, + table.c.created_at >= incremental.last_value, missing_cond, ) expected = table.select().order_by(table.c.created_at.asc()).where(where_clause) @@ -152,13 +168,14 @@ def test_make_query_incremental_on_cursor_value_missing_no_last_value( backend: TableBackend, cursor_value_missing: str, ) -> None: - class MockIncremental: - last_value = None - last_value_func = max - cursor_path = "created_at" - row_order = "asc" - end_value = None - on_cursor_value_missing = cursor_value_missing + incremental = MockIncremental( + last_value=None, + last_value_func=max, + cursor_path="created_at", + row_order="asc", + end_value=None, + on_cursor_value_missing=cursor_value_missing, + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -166,7 +183,7 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() @@ -189,13 +206,14 @@ def test_make_query_incremental_end_value( ) -> None: now = dlt.common.pendulum.now() - class MockIncremental: - last_value = now - last_value_func = min - cursor_path = "created_at" - end_value = now.add(hours=1) - row_order = None - on_cursor_value_missing = "raise" + incremental = MockIncremental( + last_value=now, + last_value_func=min, + cursor_path="created_at", + end_value=now.add(hours=1), + row_order=None, + on_cursor_value_missing="raise", + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -203,14 +221,14 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() expected = table.select().where( sa.and_( - table.c.created_at <= MockIncremental.last_value, - table.c.created_at > MockIncremental.end_value, + table.c.created_at <= incremental.last_value, + table.c.created_at > incremental.end_value, ) ) @@ -221,13 +239,14 @@ class MockIncremental: def test_make_query_incremental_any_fun( sql_source_db: SQLAlchemySourceDB, backend: TableBackend ) -> None: - class MockIncremental: - last_value = dlt.common.pendulum.now() - last_value_func = lambda x: x[-1] - cursor_path = "created_at" - row_order = "asc" - end_value = dlt.common.pendulum.now() - on_cursor_value_missing = "raise" + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=lambda x: x[-1], + cursor_path="created_at", + row_order="asc", + end_value=dlt.common.pendulum.now(), + on_cursor_value_missing="raise", + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -235,7 +254,7 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() @@ -256,12 +275,11 @@ def test_cursor_path_field_name_with_a_special_chars( if special_field_name not in table.c: table.append_column(sa.Column(special_field_name, sa.String)) - class MockIncremental: - cursor_path = "'id$field'" - last_value = None - end_value = None - row_order = None - on_cursor_value_missing = None + incremental = MockIncremental( + cursor_path="'id$field'", + last_value=None, + last_value_func=max, + ) # Should not raise any exception loader = TableLoader( @@ -269,7 +287,7 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) assert loader.cursor_column == table.c[special_field_name] @@ -281,12 +299,11 @@ def test_cursor_path_multiple_fields( """Test that a cursor_path with multiple fields raises a ValueError.""" table = sql_source_db.get_table("chat_message") - class MockIncremental: - cursor_path = "created_at,updated_at" - last_value = None - end_value = None - row_order = None - on_cursor_value_missing = None + incremental = MockIncremental( + cursor_path="created_at,updated_at", + last_value=None, + last_value_func=max, + ) with pytest.raises(ValueError) as excinfo: TableLoader( @@ -294,7 +311,7 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) assert "must be a simple column name" in str(excinfo.value) @@ -306,12 +323,11 @@ def test_cursor_path_complex_expression( """Test that a complex JSONPath expression in cursor_path raises a ValueError.""" table = sql_source_db.get_table("chat_message") - class MockIncremental: - cursor_path = "$.users[0].id" - last_value = None - end_value = None - row_order = None - on_cursor_value_missing = None + incremental = MockIncremental( + cursor_path="$.users[0].id", + last_value=None, + last_value_func=max, + ) with pytest.raises(ValueError) as excinfo: TableLoader( @@ -319,11 +335,80 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) assert "must be a simple column name" in str(excinfo.value) +@pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow", "pandas", "connectorx"]) +@pytest.mark.parametrize("last_value_func", [min, max]) +def test_make_query_incremental_range_start_open( + sql_source_db: SQLAlchemySourceDB, backend: TableBackend, last_value_func: Callable[[Any], Any] +) -> None: + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=last_value_func, + cursor_path="created_at", + end_value=None, + on_cursor_value_missing="raise", + range_start="open", + ) + + table = sql_source_db.get_table("chat_message") + + loader = TableLoader( + sql_source_db.engine, + backend, + table, + table_to_columns(table), + incremental=incremental, # type: ignore[arg-type] + ) + + query = loader.make_query() + expected = table.select() + + if last_value_func == min: + expected = expected.where(table.c.created_at < incremental.last_value) + else: + expected = expected.where(table.c.created_at > incremental.last_value) + + assert query.compare(expected) + + +@pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow", "pandas", "connectorx"]) +@pytest.mark.parametrize("last_value_func", [min, max]) +def test_make_query_incremental_range_end_closed( + sql_source_db: SQLAlchemySourceDB, backend: TableBackend, last_value_func: Callable[[Any], Any] +) -> None: + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=last_value_func, + cursor_path="created_at", + end_value=None, + on_cursor_value_missing="raise", + range_end="closed", + ) + + table = sql_source_db.get_table("chat_message") + loader = TableLoader( + sql_source_db.engine, + backend, + table, + table_to_columns(table), + incremental=incremental, # type: ignore[arg-type] + ) + + query = loader.make_query() + expected = table.select() + + if last_value_func == min: + expected = expected.where(table.c.created_at <= incremental.last_value) + else: + expected = expected.where(table.c.created_at >= incremental.last_value) + + assert query.compare(expected) + + def mock_json_column(field: str) -> TDataItem: """""" import pyarrow as pa diff --git a/tests/load/sources/sql_database/test_sql_database_source.py b/tests/load/sources/sql_database/test_sql_database_source.py index 9079638586..00257471e0 100644 --- a/tests/load/sources/sql_database/test_sql_database_source.py +++ b/tests/load/sources/sql_database/test_sql_database_source.py @@ -13,6 +13,7 @@ from dlt.common.utils import uniq_id from dlt.extract.exceptions import ResourceExtractionError +from dlt.extract.incremental.transform import JsonIncremental, ArrowIncremental from dlt.sources import DltResource from tests.pipeline.utils import ( @@ -831,8 +832,12 @@ def _assert_incremental(item): else: assert _r.incremental.primary_key == ["id"] assert _r.incremental._incremental.primary_key == ["id"] - assert _r.incremental._incremental._transformers["json"].primary_key == ["id"] - assert _r.incremental._incremental._transformers["arrow"].primary_key == ["id"] + assert _r.incremental._incremental._make_or_get_transformer( + JsonIncremental + ).primary_key == ["id"] + assert _r.incremental._incremental._make_or_get_transformer( + ArrowIncremental + ).primary_key == ["id"] return item pipeline = make_pipeline("duckdb") @@ -841,8 +846,12 @@ def _assert_incremental(item): assert resource.incremental.primary_key == ["id"] assert resource.incremental._incremental.primary_key == ["id"] - assert resource.incremental._incremental._transformers["json"].primary_key == ["id"] - assert resource.incremental._incremental._transformers["arrow"].primary_key == ["id"] + assert resource.incremental._incremental._make_or_get_transformer( + JsonIncremental + ).primary_key == ["id"] + assert resource.incremental._incremental._make_or_get_transformer( + ArrowIncremental + ).primary_key == ["id"] @pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow", "pandas", "connectorx"])