diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 343a737c07..00cc306b0c 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -40,6 +40,7 @@ TCursorValue, LastValueFunc, OnCursorValueMissing, + TIncrementalRange, ) from dlt.extract.pipe import Pipe from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform @@ -111,6 +112,8 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa row_order: Optional[TSortOrder] = None allow_external_schedulers: bool = False on_cursor_value_missing: OnCursorValueMissing = "raise" + range_start: TIncrementalRange = "closed" + range_end: TIncrementalRange = "open" # incremental acting as empty EMPTY: ClassVar["Incremental[Any]"] = None @@ -126,6 +129,8 @@ def __init__( row_order: Optional[TSortOrder] = None, allow_external_schedulers: bool = False, on_cursor_value_missing: OnCursorValueMissing = "raise", + range_start: TIncrementalRange = "closed", + range_end: TIncrementalRange = "open", ) -> None: # make sure that path is valid if cursor_path: @@ -159,6 +164,8 @@ def __init__( self._transformers: Dict[str, IncrementalTransform] = {} self._bound_pipe: SupportsPipe = None """Bound pipe""" + self.range_start = range_start + self.range_end = range_end @property def primary_key(self) -> Optional[TTableHintTemplate[TColumnNames]]: @@ -185,6 +192,8 @@ def _make_transforms(self) -> None: self._primary_key, set(self._cached_state["unique_hashes"]), self.on_cursor_value_missing, + self.range_start, + self.range_end, ) @classmethod diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 209caabc17..5c7c773675 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -13,7 +13,12 @@ IncrementalPrimaryKeyMissing, IncrementalCursorPathHasValueNone, ) -from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing +from dlt.extract.incremental.typing import ( + TCursorValue, + LastValueFunc, + OnCursorValueMissing, + TIncrementalRange, +) from dlt.extract.utils import resolve_column_value from dlt.extract.items import TTableHintTemplate from dlt.common.schema.typing import TColumnNames @@ -57,6 +62,8 @@ def __init__( primary_key: Optional[TTableHintTemplate[TColumnNames]], unique_hashes: Set[str], on_cursor_value_missing: OnCursorValueMissing = "raise", + range_start: TIncrementalRange = "closed", + range_end: TIncrementalRange = "open", ) -> None: self.resource_name = resource_name self.cursor_path = cursor_path @@ -70,6 +77,8 @@ def __init__( self.unique_hashes = unique_hashes self.start_unique_hashes = set(unique_hashes) self.on_cursor_value_missing = on_cursor_value_missing + self.range_start = range_start + self.range_end = range_end # compile jsonpath self._compiled_cursor_path = compile_path(cursor_path) @@ -188,10 +197,10 @@ def __call__( # Filter end value ranges exclusively, so in case of "max" function we remove values >= end_value if self.end_value is not None: try: - if ( - last_value_func((row_value, self.end_value)) != self.end_value - or last_value_func((row_value,)) == self.end_value - ): + if last_value_func((row_value, self.end_value)) != self.end_value: + return None, False, True + + if self.range_end == "open" and last_value_func((row_value,)) == self.end_value: return None, False, True except Exception as ex: raise IncrementalCursorInvalidCoercion( @@ -218,6 +227,8 @@ def __call__( ) from ex # new_value is "less" or equal to last_value (the actual max) if last_value == new_value: + if self.range_start == "open": + return None, False, False # use func to compute row_value into last_value compatible processed_row_value = last_value_func((row_value,)) # skip the record that is not a start_value or new_value: that record was already processed @@ -311,13 +322,19 @@ def __call__( if self.last_value_func is max: compute = pa.compute.max - end_compare = pa.compute.less - last_value_compare = pa.compute.greater_equal + end_compare = pa.compute.less if self.range_end == "open" else pa.compute.less_equal + last_value_compare = ( + pa.compute.greater_equal if self.range_start == "closed" else pa.compute.greater + ) new_value_compare = pa.compute.greater elif self.last_value_func is min: compute = pa.compute.min - end_compare = pa.compute.greater - last_value_compare = pa.compute.less_equal + end_compare = ( + pa.compute.greater if self.range_end == "open" else pa.compute.greater_equal + ) + last_value_compare = ( + pa.compute.less_equal if self.range_start == "closed" else pa.compute.less + ) new_value_compare = pa.compute.less else: raise NotImplementedError( diff --git a/dlt/extract/incremental/typing.py b/dlt/extract/incremental/typing.py index 6829e6b370..8e9ce1670c 100644 --- a/dlt/extract/incremental/typing.py +++ b/dlt/extract/incremental/typing.py @@ -10,6 +10,8 @@ LastValueFunc = Callable[[Sequence[TCursorValue]], Any] OnCursorValueMissing = Literal["raise", "include", "exclude"] +TIncrementalRange = Literal["open", "closed"] + class IncrementalColumnState(TypedDict): initial_value: Optional[Any] diff --git a/dlt/sources/sql_database/helpers.py b/dlt/sources/sql_database/helpers.py index 235b96ac64..c13c07ba59 100644 --- a/dlt/sources/sql_database/helpers.py +++ b/dlt/sources/sql_database/helpers.py @@ -71,12 +71,16 @@ def __init__( self.end_value = incremental.end_value self.row_order: TSortOrder = self.incremental.row_order self.on_cursor_value_missing = self.incremental.on_cursor_value_missing + self.range_start = self.incremental.range_start + self.range_end = self.incremental.range_end else: self.cursor_column = None self.last_value = None self.end_value = None self.row_order = None self.on_cursor_value_missing = None + self.range_start = None + self.range_end = None def _make_query(self) -> SelectAny: table = self.table @@ -87,11 +91,11 @@ def _make_query(self) -> SelectAny: # generate where if last_value_func is max: # Query ordered and filtered according to last_value function - filter_op = operator.ge - filter_op_end = operator.lt + filter_op = operator.ge if self.range_start == "closed" else operator.gt + filter_op_end = operator.lt if self.range_end == "open" else operator.le elif last_value_func is min: - filter_op = operator.le - filter_op_end = operator.gt + filter_op = operator.le if self.range_start == "closed" else operator.lt + filter_op_end = operator.gt if self.range_end == "open" else operator.ge else: # Custom last_value, load everything and let incremental handle filtering return query # type: ignore[no-any-return] diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 0a0de75987..f1d78217f3 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -5,7 +5,7 @@ from datetime import datetime # noqa: I251 from itertools import chain, count from time import sleep -from typing import Any, Optional +from typing import Any, Optional, Iterable from unittest import mock import duckdb @@ -1462,6 +1462,7 @@ def some_data(last_timestamp=dlt.sources.incremental("ts", primary_key=())): @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_apply_hints_incremental(item_type: TestDataItemFormat) -> None: + os.environ["COMPLETED_PROB"] = "1.0" # make it complete immediately p = dlt.pipeline(pipeline_name=uniq_id(), destination="dummy") data = [{"created_at": 1}, {"created_at": 2}, {"created_at": 3}] source_items = data_to_item_format(item_type, data) @@ -2586,3 +2587,85 @@ def updated_is_int(updated_at=dlt.sources.incremental("updated_at", initial_valu pipeline.run(updated_is_int()) assert isinstance(pip_ex.value.__cause__, IncrementalCursorInvalidCoercion) assert pip_ex.value.__cause__.cursor_path == "updated_at" + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +@pytest.mark.parametrize("last_value_func", [min, max]) +def test_start_range_open(item_type: TestDataItemFormat, last_value_func: Any) -> None: + data_range: Iterable[int] = range(1, 12) + if last_value_func == max: + initial_value = 5 + # Only items higher than inital extracted + expected_items = list(range(6, 12)) + order_dir = "ASC" + elif last_value_func == min: + data_range = reversed(data_range) + initial_value = 5 + # Only items lower than inital extracted + expected_items = list(reversed(range(1, 5))) + order_dir = "DESC" + + @dlt.resource + def some_data( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", + initial_value=initial_value, + range_start="open", + last_value_func=last_value_func, + ), + ) -> Any: + data = [{"updated_at": i} for i in data_range] + yield data_to_item_format(item_type, data) + + pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination="duckdb") + pipeline.run(some_data()) + + with pipeline.sql_client() as client: + items = [ + row[0] + for row in client.execute_sql( + f"SELECT updated_at FROM some_data ORDER BY updated_at {order_dir}" + ) + ] + + assert items == expected_items + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +@pytest.mark.parametrize("last_value_func", [min, max]) +def test_end_range_closed(item_type: TestDataItemFormat, last_value_func: Any) -> None: + values = [5, 10] + expected_items = list(range(5, 11)) + if last_value_func == max: + order_dir = "ASC" + elif last_value_func == min: + values = list(reversed(values)) + expected_items = list(reversed(expected_items)) + order_dir = "DESC" + + @dlt.resource + def some_data( + updated_at: dlt.sources.incremental[int] = dlt.sources.incremental( + "updated_at", + initial_value=values[0], + end_value=values[1], + range_end="closed", + last_value_func=last_value_func, + ), + ) -> Any: + data = [{"updated_at": i} for i in range(1, 12)] + yield data_to_item_format(item_type, data) + + pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination="duckdb") + pipeline.run(some_data()) + + with pipeline.sql_client() as client: + items = [ + row[0] + for row in client.execute_sql( + f"SELECT updated_at FROM some_data ORDER BY updated_at {order_dir}" + ) + ] + + # Includes values 5-10 inclusive + assert items == expected_items diff --git a/tests/load/sources/sql_database/test_helpers.py b/tests/load/sources/sql_database/test_helpers.py index 4748f226a9..8a5720429e 100644 --- a/tests/load/sources/sql_database/test_helpers.py +++ b/tests/load/sources/sql_database/test_helpers.py @@ -1,5 +1,7 @@ -import pytest +from typing import Callable, Any, TYPE_CHECKING +from dataclasses import dataclass +import pytest import dlt from dlt.common.typing import TDataItem @@ -16,6 +18,18 @@ pytest.skip("Tests require sql alchemy", allow_module_level=True) +@dataclass +class MockIncremental: + last_value: Any + last_value_func: Callable[[Any], Any] + cursor_path: str + row_order: str = None + end_value: Any = None + on_cursor_value_missing: str = "raise" + range_start: str = "closed" + range_end: str = "open" + + @pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow", "pandas", "connectorx"]) def test_cursor_or_unique_column_not_in_table( sql_source_db: SQLAlchemySourceDB, backend: TableBackend @@ -38,13 +52,12 @@ def test_make_query_incremental_max( ) -> None: """Verify query is generated according to incremental settings""" - class MockIncremental: - last_value = dlt.common.pendulum.now() - last_value_func = max - cursor_path = "created_at" - row_order = "asc" - end_value = None - on_cursor_value_missing = "raise" + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=max, + cursor_path="created_at", + row_order="asc", + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -52,14 +65,14 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() expected = ( table.select() .order_by(table.c.created_at.asc()) - .where(table.c.created_at >= MockIncremental.last_value) + .where(table.c.created_at >= incremental.last_value) ) assert query.compare(expected) @@ -69,13 +82,14 @@ class MockIncremental: def test_make_query_incremental_min( sql_source_db: SQLAlchemySourceDB, backend: TableBackend ) -> None: - class MockIncremental: - last_value = dlt.common.pendulum.now() - last_value_func = min - cursor_path = "created_at" - row_order = "desc" - end_value = None - on_cursor_value_missing = "raise" + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=min, + cursor_path="created_at", + row_order="desc", + end_value=None, + on_cursor_value_missing="raise", + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -83,14 +97,14 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() expected = ( table.select() .order_by(table.c.created_at.asc()) # `min` func swaps order - .where(table.c.created_at <= MockIncremental.last_value) + .where(table.c.created_at <= incremental.last_value) ) assert query.compare(expected) @@ -105,13 +119,14 @@ def test_make_query_incremental_on_cursor_value_missing_set( with_end_value: bool, cursor_value_missing: str, ) -> None: - class MockIncremental: - last_value = dlt.common.pendulum.now() - last_value_func = max - cursor_path = "created_at" - row_order = "asc" - end_value = None if not with_end_value else dlt.common.pendulum.now().add(hours=1) - on_cursor_value_missing = cursor_value_missing + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=max, + cursor_path="created_at", + row_order="asc", + end_value=None if not with_end_value else dlt.common.pendulum.now().add(hours=1), + on_cursor_value_missing=cursor_value_missing, + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -119,7 +134,7 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() @@ -133,14 +148,14 @@ class MockIncremental: if with_end_value: where_clause = operator( sa.and_( - table.c.created_at >= MockIncremental.last_value, - table.c.created_at < MockIncremental.end_value, + table.c.created_at >= incremental.last_value, + table.c.created_at < incremental.end_value, ), missing_cond, ) else: where_clause = operator( - table.c.created_at >= MockIncremental.last_value, + table.c.created_at >= incremental.last_value, missing_cond, ) expected = table.select().order_by(table.c.created_at.asc()).where(where_clause) @@ -154,13 +169,14 @@ def test_make_query_incremental_on_cursor_value_missing_no_last_value( backend: TableBackend, cursor_value_missing: str, ) -> None: - class MockIncremental: - last_value = None - last_value_func = max - cursor_path = "created_at" - row_order = "asc" - end_value = None - on_cursor_value_missing = cursor_value_missing + incremental = MockIncremental( + last_value=None, + last_value_func=max, + cursor_path="created_at", + row_order="asc", + end_value=None, + on_cursor_value_missing=cursor_value_missing, + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -168,7 +184,7 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() @@ -191,13 +207,14 @@ def test_make_query_incremental_end_value( ) -> None: now = dlt.common.pendulum.now() - class MockIncremental: - last_value = now - last_value_func = min - cursor_path = "created_at" - end_value = now.add(hours=1) - row_order = None - on_cursor_value_missing = "raise" + incremental = MockIncremental( + last_value=now, + last_value_func=min, + cursor_path="created_at", + end_value=now.add(hours=1), + row_order=None, + on_cursor_value_missing="raise", + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -205,14 +222,14 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() expected = table.select().where( sa.and_( - table.c.created_at <= MockIncremental.last_value, - table.c.created_at > MockIncremental.end_value, + table.c.created_at <= incremental.last_value, + table.c.created_at > incremental.end_value, ) ) @@ -223,13 +240,43 @@ class MockIncremental: def test_make_query_incremental_any_fun( sql_source_db: SQLAlchemySourceDB, backend: TableBackend ) -> None: - class MockIncremental: - last_value = dlt.common.pendulum.now() - last_value_func = lambda x: x[-1] - cursor_path = "created_at" - row_order = "asc" - end_value = dlt.common.pendulum.now() - on_cursor_value_missing = "raise" + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=lambda x: x[-1], + cursor_path="created_at", + row_order="asc", + end_value=dlt.common.pendulum.now(), + on_cursor_value_missing="raise", + ) + + table = sql_source_db.get_table("chat_message") + loader = TableLoader( + sql_source_db.engine, + backend, + table, + table_to_columns(table), + incremental=incremental, # type: ignore[arg-type] + ) + + query = loader.make_query() + expected = table.select() + + assert query.compare(expected) + + +@pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow", "pandas", "connectorx"]) +@pytest.mark.parametrize("last_value_func", [min, max]) +def test_make_query_incremental_range_start_open( + sql_source_db: SQLAlchemySourceDB, backend: TableBackend, last_value_func: Callable[[Any], Any] +) -> None: + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=last_value_func, + cursor_path="created_at", + end_value=None, + on_cursor_value_missing="raise", + range_start="open", + ) table = sql_source_db.get_table("chat_message") loader = TableLoader( @@ -237,12 +284,51 @@ class MockIncremental: backend, table, table_to_columns(table), - incremental=MockIncremental(), # type: ignore[arg-type] + incremental=incremental, # type: ignore[arg-type] ) query = loader.make_query() expected = table.select() + if last_value_func == min: + expected = expected.where(table.c.created_at < incremental.last_value) + else: + expected = expected.where(table.c.created_at > incremental.last_value) + + assert query.compare(expected) + + +@pytest.mark.parametrize("backend", ["sqlalchemy", "pyarrow", "pandas", "connectorx"]) +@pytest.mark.parametrize("last_value_func", [min, max]) +def test_make_query_incremental_range_end_closed( + sql_source_db: SQLAlchemySourceDB, backend: TableBackend, last_value_func: Callable[[Any], Any] +) -> None: + incremental = MockIncremental( + last_value=dlt.common.pendulum.now(), + last_value_func=last_value_func, + cursor_path="created_at", + end_value=None, + on_cursor_value_missing="raise", + range_end="closed", + ) + + table = sql_source_db.get_table("chat_message") + loader = TableLoader( + sql_source_db.engine, + backend, + table, + table_to_columns(table), + incremental=incremental, # type: ignore[arg-type] + ) + + query = loader.make_query() + expected = table.select() + + if last_value_func == min: + expected = expected.where(table.c.created_at <= incremental.last_value) + else: + expected = expected.where(table.c.created_at >= incremental.last_value) + assert query.compare(expected)