Skip to content

Commit

Permalink
text: state file cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
donotpush committed Sep 18, 2024
1 parent 577d604 commit 8baaf0b
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,44 @@ def my_resource_inc(prim_key=dlt.sources.incremental("id")):
assert load_table_counts(p, "items") == {"items": 4 if restore else 6}


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_state_cleanup(
destination_config: DestinationTestConfiguration,
) -> None:
# TODO: add test parameters to:
# - cleanup disabled/enable
# - test different values of state files
# TODO: ask about making MAX_STATE_HISTORY configurable (very slow test)

p = destination_config.setup_pipeline("p1", dataset_name="incremental_test")

# Initial run with a base dataset
@dlt.resource(name="items")
def initial_resource(primary_key=dlt.sources.incremental("id")):
yield from [{"id": 1}, {"id": 2}]

p.run(initial_resource)

# Run incremental tests multiple times
for i in range(3, 105): # Adjust range as needed

@dlt.resource(name="items")
def incremental_resource(primary_key=dlt.sources.incremental("id"), new_i=i):
yield from [{"id": j} for j in range(1, new_i + 1)]

p = destination_config.setup_pipeline("p1", dataset_name="incremental_test")
p.run(incremental_resource)

client: FilesystemClient = p.destination_client() # type: ignore
state_table_files = list(client._list_dlt_table_files(client.schema.state_table_name))

assert len(state_table_files) == 100


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
Expand Down

0 comments on commit 8baaf0b

Please sign in to comment.