Skip to content

Commit

Permalink
chore: improve tests and change remove function
Browse files Browse the repository at this point in the history
  • Loading branch information
donotpush committed Sep 19, 2024
1 parent 8baaf0b commit d235052
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 36 deletions.
1 change: 1 addition & 0 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class FilesystemConfiguration(BaseConfiguration):
client_kwargs: Optional[DictStrAny] = None
deltalake_storage_options: Optional[DictStrAny] = None
enable_state_cleanup: bool = True
max_state_files: int = 100

@property
def protocol(self) -> str:
Expand Down
42 changes: 21 additions & 21 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

INIT_FILE_NAME = "init"
FILENAME_SEPARATOR = "__"
MAX_STATE_HISTORY = 100


class FilesystemLoadJob(RunnableLoadJob):
Expand Down Expand Up @@ -524,26 +523,27 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: s
def _cleanup_pipeline_states(self, pipeline_name: str) -> None:
state_table_files = list(self._list_dlt_table_files(self.schema.state_table_name))

# filter and collect a list of state files
state_file_info: List[Dict[str, Any]] = [
{
"pipeline_name": fileparts[0],
"load_id": float(fileparts[1]), # convert load_id to float for comparison
"filepath": filepath,
}
for filepath, fileparts in state_table_files
if fileparts[0] == pipeline_name
]

# sort state file info by load_id in descending order
state_file_info.sort(key=lambda x: x["load_id"], reverse=True)

# keeping only the most recent MAX_STATE_HISTORY files
files_to_delete = state_file_info[MAX_STATE_HISTORY:]

# delete the old files
for file_info in files_to_delete:
self.fs_client.rm_file(file_info["filepath"])
if len(state_table_files) > self.config.max_state_files:
# filter and collect a list of state files
state_file_info: List[Dict[str, Any]] = [
{
"pipeline_name": fileparts[0],
"load_id": float(fileparts[1]), # convert load_id to float for comparison
"filepath": filepath,
}
for filepath, fileparts in state_table_files
if fileparts[0] == pipeline_name
]

# sort state file info by load_id in descending order
state_file_info.sort(key=lambda x: x["load_id"], reverse=True)

# keeping only the most recent MAX_STATE_HISTORY files
files_to_delete = state_file_info[self.config.max_state_files :]

# delete the old files
for file_info in files_to_delete:
self.fs_client.rm(file_info["filepath"])

def _store_current_state(self, load_id: str) -> None:
# don't save the state this way when used as staging
Expand Down
35 changes: 20 additions & 15 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,37 +1259,42 @@ def my_resource_inc(prim_key=dlt.sources.incremental("id")):
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("enable_state_cleanup", ["true", "false"])
def test_state_cleanup(
destination_config: DestinationTestConfiguration,
enable_state_cleanup: str,
) -> None:
# TODO: add test parameters to:
# - cleanup disabled/enable
# - test different values of state files
# TODO: ask about making MAX_STATE_HISTORY configurable (very slow test)
os.environ["DESTINATION__FILESYSTEM__ENABLE_STATE_CLEANUP"] = enable_state_cleanup
os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = "3"

p = destination_config.setup_pipeline("p1", dataset_name="incremental_test")
p = destination_config.setup_pipeline("p1", dataset_name=destination_config.destination_name)

# Initial run with a base dataset
@dlt.resource(name="items")
def initial_resource(primary_key=dlt.sources.incremental("id")):
yield from [{"id": 1}, {"id": 2}]
# Initial dataset
@dlt.resource(name="items", primary_key="id")
def initial_resource(_=dlt.sources.incremental("id")):
yield from [{"id": 0}]

p.run(initial_resource)

# Run incremental tests multiple times
for i in range(3, 105): # Adjust range as needed
# Run incremental multiple times
for i in range(1, 5): # Adjust range as needed

@dlt.resource(name="items")
def incremental_resource(primary_key=dlt.sources.incremental("id"), new_i=i):
@dlt.resource(name="items", primary_key="id")
def incremental_resource(_=dlt.sources.incremental("id"), new_i=i):
yield from [{"id": j} for j in range(1, new_i + 1)]

p = destination_config.setup_pipeline("p1", dataset_name="incremental_test")
p = destination_config.setup_pipeline(
"p1", dataset_name=destination_config.destination_name
)
p.run(incremental_resource)

client: FilesystemClient = p.destination_client() # type: ignore
state_table_files = list(client._list_dlt_table_files(client.schema.state_table_name))

assert len(state_table_files) == 100
if enable_state_cleanup == "true":
assert len(state_table_files) == 3
else:
assert len(state_table_files) == 5


@pytest.mark.parametrize(
Expand Down

0 comments on commit d235052

Please sign in to comment.