Skip to content

Commit

Permalink
add test for testing incremental with limit
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Dec 15, 2024
1 parent a05ee7c commit f109a87
Showing 1 changed file with 46 additions and 5 deletions.
51 changes: 46 additions & 5 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -3854,6 +3854,7 @@ def some_data():
for col in table_schema["columns"].values():
assert "incremental" not in col


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
@pytest.mark.parametrize("last_value_func", [min, max])
def test_start_range_open(item_type: TestDataItemFormat, last_value_func: Any) -> None:
Expand Down Expand Up @@ -3961,17 +3962,57 @@ def some_data(
# Includes values 5-10 inclusive
assert items == expected_items

def test_incremental_and_limit():

@dlt.resource(incremental=dlt.sources.incremental(cursor_path="id", initial_value=0, last_value_func=min))
def resource():
for i in range(100):
@pytest.mark.parametrize("offset_by_last_value", [True, False])
def test_incremental_and_limit(offset_by_last_value: bool):
resource_called = 0

# here we check incremental and limit when incremental once when last value cannot be used
# to offset the source, and once when it can.

@dlt.resource(
table_name="items",
)
def resource(
incremental=dlt.sources.incremental(cursor_path="id", initial_value=-1, row_order="asc")
):
range_iterator = (
range(incremental.start_value + 1, 1000) if offset_by_last_value else range(1000)
)
for i in range_iterator:
nonlocal resource_called
resource_called += 1
yield {
"id": i,
"value": str(i),
}

resource.add_limit(10)

p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True)

p.run(resource().add_limit(10))
p.run(resource())

# check we have the right number of items
assert len(p.dataset().items.df()) == 10
assert resource_called == 10
# check that we have items 0-9
assert p.dataset().items.df().id.tolist() == list(range(10))

# run the next ten
p.run(resource())

# check we have the right number of items
assert len(p.dataset().items.df()) == 20
assert resource_called == 20 if offset_by_last_value else 30
# check that we have items 0-19
assert p.dataset().items.df().id.tolist() == list(range(20))

# run the next batch
p.run(resource())

# check we have the right number of items
assert len(p.dataset().items.df()) == 30
assert resource_called == 30 if offset_by_last_value else 60
# check that we have items 0-29
assert p.dataset().items.df().id.tolist() == list(range(30))

0 comments on commit f109a87

Please sign in to comment.