Skip to content

Commit

Permalink
Ensure deduplication is disabled when range_start=='open'
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Dec 5, 2024
1 parent ca07633 commit 674736c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
3 changes: 3 additions & 0 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ def __call__(
def deduplication_disabled(self) -> bool:
"""Skip deduplication when length of the key is 0 or if lag is applied."""
# disable deduplication if end value is set - state is not saved
if self.range_start == "open":
return True
if self.end_value is not None:
return True
# disable deduplication if lag is applied - destination must deduplicate ranges
Expand Down Expand Up @@ -232,6 +234,7 @@ def __call__(
# new_value is "less" or equal to last_value (the actual max)
if last_value == new_value:
if self.range_start == "open":
# We only want greater than last_value
return None, False, False
# use func to compute row_value into last_value compatible
processed_row_value = last_value_func((row_value,))
Expand Down
26 changes: 26 additions & 0 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -3896,6 +3896,32 @@ def some_data(
assert items == expected_items


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_start_range_open_no_deduplication(item_type: TestDataItemFormat) -> None:
@dlt.source
def dummy():
@dlt.resource
def some_data(
updated_at: dlt.sources.incremental[int] = dlt.sources.incremental(
"updated_at",
range_start="open",
)
):
yield [{"updated_at": i} for i in range(3)]

yield some_data

pipeline = dlt.pipeline(pipeline_name=uniq_id())
pipeline.extract(dummy())

state = pipeline.state["sources"]["dummy"]["resources"]["some_data"]["incremental"][
"updated_at"
]

# No unique values should be computed
assert state["unique_hashes"] == []


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
@pytest.mark.parametrize("last_value_func", [min, max])
def test_end_range_closed(item_type: TestDataItemFormat, last_value_func: Any) -> None:
Expand Down

0 comments on commit 674736c

Please sign in to comment.