From 3aadc32633190ec2f786a3459116136521211b38 Mon Sep 17 00:00:00 2001 From: Julian Alves <28436330+donotpush@users.noreply.github.com> Date: Wed, 25 Sep 2024 13:13:33 +0200 Subject: [PATCH] feat: filesystem delete old pipeline state files (#1838) * feat: delete old pipeline state files * fix: common tests * text: state file cleanup * chore: improve tests and change remove function * fix: filesystem common test * chore: add max_state_files validation * chore: simplified state cleanup * docs: improve max_state_files doc * chore: extended existing function * test: shared dataset cleanup pipeline state * test: oldest pipeline state file rm when cleanup * chore: use rm helper function * docs: filesystem max_state_files config * docs: change filesystem max_state_files * docs: rm empty spaces --- dlt/common/storages/configuration.py | 2 + .../impl/filesystem/filesystem.py | 39 +++- .../dlt-ecosystem/destinations/filesystem.md | 3 +- .../load/filesystem/test_filesystem_common.py | 2 + .../load/pipeline/test_filesystem_pipeline.py | 184 ++++++++++++++++++ 5 files changed, 226 insertions(+), 4 deletions(-) diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index 4da44bceee..777b51a488 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -145,6 +145,8 @@ class FilesystemConfiguration(BaseConfiguration): kwargs: Optional[DictStrAny] = None client_kwargs: Optional[DictStrAny] = None deltalake_storage_options: Optional[DictStrAny] = None + max_state_files: int = 100 + """Maximum number of pipeline state files to keep; 0 or negative value disables cleanup.""" @property def protocol(self) -> str: diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 5d244020dd..3f2f793559 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -3,7 +3,7 @@ import base64 from types import TracebackType -from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast +from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast, Any from fsspec import AbstractFileSystem from contextlib import contextmanager @@ -479,7 +479,9 @@ def _to_path_safe_string(self, s: str) -> str: """for base64 strings""" return base64.b64decode(s).hex() if s else None - def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str]]]: + def _list_dlt_table_files( + self, table_name: str, pipeline_name: str = None + ) -> Iterator[Tuple[str, List[str]]]: dirname = self.get_table_dir(table_name) if not self.fs_client.exists(self.pathlib.join(dirname, INIT_FILE_NAME)): raise DestinationUndefinedEntity({"dir": dirname}) @@ -488,7 +490,9 @@ def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str fileparts = filename.split(FILENAME_SEPARATOR) if len(fileparts) != 3: continue - yield filepath, fileparts + # Filters only if pipeline_name provided + if pipeline_name is None or fileparts[0] == pipeline_name: + yield filepath, fileparts def _store_load(self, load_id: str) -> None: # write entry to load "table" @@ -523,6 +527,31 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: s f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", ) + def _cleanup_pipeline_states(self, pipeline_name: str) -> None: + state_table_files = list( + self._list_dlt_table_files(self.schema.state_table_name, pipeline_name) + ) + + if len(state_table_files) > self.config.max_state_files: + # filter and collect a list of state files + state_file_info: List[Dict[str, Any]] = [ + { + "load_id": float(fileparts[1]), # convert load_id to float for comparison + "filepath": filepath, + } + for filepath, fileparts in state_table_files + ] + + # sort state file info by load_id in descending order + state_file_info.sort(key=lambda x: x["load_id"], reverse=True) + + # keeping only the most recent MAX_STATE_HISTORY files + files_to_delete = state_file_info[self.config.max_state_files :] + + # delete the old files + for file_info in files_to_delete: + self._delete_file(file_info["filepath"]) + def _store_current_state(self, load_id: str) -> None: # don't save the state this way when used as staging if self.config.as_staging_destination: @@ -542,6 +571,10 @@ def _store_current_state(self, load_id: str) -> None: # write self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc)) + # perform state cleanup only if max_state_files is set to a positive value + if self.config.max_state_files >= 1: + self._cleanup_pipeline_states(pipeline_name) + def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: # search newest state selected_path = None diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 004a2d7bc0..a456fa6e7d 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -697,5 +697,6 @@ This destination fully supports [dlt state sync](../../general-usage/state#synci You will also notice `init` files being present in the root folder and the special `dlt` folders. In the absence of the concepts of schemas and tables in blob storages and directories, `dlt` uses these special files to harmonize the behavior of the `filesystem` destination with the other implemented destinations. - +**Note:** When a load generates a new state, for example when using incremental loads, a new state file appears in the `_dlt_pipeline_state` folder at the destination. To prevent data accumulation, state cleanup mechanisms automatically remove old state files, retaining only the latest 100 by default. This cleanup process can be customized or disabled using the filesystem configuration `max_state_files`, which determines the maximum number of pipeline state files to retain (default is 100). Setting this value to 0 or a negative number disables the cleanup of old states. + diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 29ca1a2b57..0db93410e5 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -50,6 +50,7 @@ def test_filesystem_configuration() -> None: "bucket_url": "az://root", "credentials": None, "client_kwargs": None, + "max_state_files": 100, "kwargs": None, "deltalake_storage_options": None, } @@ -173,6 +174,7 @@ def test_filesystem_configuration_with_additional_arguments() -> None: "read_only": False, "bucket_url": "az://root", "credentials": None, + "max_state_files": 100, "kwargs": {"use_ssl": True}, "client_kwargs": {"verify": "public.crt"}, "deltalake_storage_options": {"AWS_S3_LOCKING_PROVIDER": "dynamodb"}, diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 92e927f438..11e0c88451 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -16,6 +16,7 @@ from dlt.common.storages.load_package import ParsedLoadJobFileName from dlt.common.utils import uniq_id from dlt.common.schema.typing import TWriteDisposition +from dlt.common.configuration.exceptions import ConfigurationValueError from dlt.destinations import filesystem from dlt.destinations.impl.filesystem.filesystem import FilesystemClient from dlt.destinations.impl.filesystem.typing import TExtraPlaceholders @@ -1334,3 +1335,186 @@ def table_3(): # test truncate multiple fs_client.truncate_tables(["table_1", "table_3"]) assert load_table_counts(p, "table_1", "table_2", "table_3") == {"table_2": 21} + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_buckets_filesystem_configs=True), + ids=lambda x: x.name, +) +def test_cleanup_states_by_load_id(destination_config: DestinationTestConfiguration) -> None: + """ + Test the pipeline state cleanup functionality by verifying that old state files are removed based on `load_id` when multiple loads are executed. + + Specifically, the oldest state file (corresponding to the first `load_id`) should be deleted. + + This test checks that when running a pipeline with a resource that produces incremental data, older state files are cleared according to the `max_state_files` setting. + + Steps: + 1. Set `max_state_files` to 2, allowing only two newest state files to be kept. + 2. Run the pipeline three times. + 3. Verify that the state file from the first load is no longer present in the state table. + """ + + dataset_name = f"{destination_config.destination_name}{uniq_id()}" + p = destination_config.setup_pipeline("p1", dataset_name=dataset_name) + + @dlt.resource(name="items", primary_key="id") + def r1(_=dlt.sources.incremental("id")): + yield from [{"id": 0}] + + @dlt.resource(name="items", primary_key="id") + def r2(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}] + + @dlt.resource(name="items", primary_key="id") + def r3(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}, {"id": 2}] + + os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(2) + + info = p.run(r1) + first_load_id = info.loads_ids[0] + + info = p.run(r2) + second_load_id = [load_id for load_id in info.loads_ids if load_id != first_load_id][0] + + info = p.run(r3) + third_load_id = [ + load_id + for load_id in info.loads_ids + if load_id != first_load_id and load_id != second_load_id + ][0] + + client: FilesystemClient = p.destination_client() # type: ignore + state_table_files = list(client._list_dlt_table_files(client.schema.state_table_name, "p1")) + + assert not any(fileparts[1] == first_load_id for _, fileparts in state_table_files) + assert any(fileparts[1] == second_load_id for _, fileparts in state_table_files) + assert any(fileparts[1] == third_load_id for _, fileparts in state_table_files) + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_buckets_filesystem_configs=True), + ids=lambda x: x.name, +) +@pytest.mark.parametrize("max_state_files", [-1, 0, 1, 3]) +def test_cleanup_states( + destination_config: DestinationTestConfiguration, max_state_files: int +) -> None: + """ + Test the behavior of pipeline state cleanup based on different max_state_files configurations. + + Steps: + 1. Run the pipeline five times with max_state_files set to -1, 0, 1, and 3. + 2. Verify that state files are cleaned or retained according to the max_state_files setting: + - Negative or zero values disable cleanup. + - Positive values trigger cleanup, keeping only the specified number of state files. + """ + os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(max_state_files) + + dataset_name = f"{destination_config.destination_name}{uniq_id()}" + p = destination_config.setup_pipeline("p1", dataset_name=dataset_name) + + @dlt.resource(name="items", primary_key="id") + def r1(_=dlt.sources.incremental("id")): + yield from [{"id": 0}] + + @dlt.resource(name="items", primary_key="id") + def r2(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}] + + @dlt.resource(name="items", primary_key="id") + def r3(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}, {"id": 2}] + + @dlt.resource(name="items", primary_key="id") + def r4(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}] + + @dlt.resource(name="items", primary_key="id") + def r5(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}] + + # run pipeline + run_count = 5 + + p.run(r1) + p.run(r2) + p.run(r3) + p.run(r4) + p.run(r5) + + client: FilesystemClient = p.destination_client() # type: ignore + state_table_files = list(client._list_dlt_table_files(client.schema.state_table_name, "p1")) + + if max_state_files == -1 or max_state_files == 0: + assert len(state_table_files) == run_count + else: + assert len(state_table_files) == max_state_files + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(all_buckets_filesystem_configs=True), + ids=lambda x: x.name, +) +def test_cleanup_states_shared_dataset(destination_config: DestinationTestConfiguration) -> None: + """ + Test that two pipelines sharing the same bucket_url and dataset_name can independently + clean their _dlt_pipeline_state files with different max_state_files configurations. + + Steps: + 1. Run pipeline p1 five times with max_state_files set to 5. + 2. Run pipeline p2 five times with max_state_files set to 2. + 3. Verify that each pipeline only deletes its own state files and does not affect the other. + """ + dataset_name = f"{destination_config.destination_name}{uniq_id()}" + + p1 = destination_config.setup_pipeline("p1", dataset_name=dataset_name) + p2 = destination_config.setup_pipeline("p2", dataset_name=dataset_name) + + @dlt.resource(name="items", primary_key="id") + def r1(_=dlt.sources.incremental("id")): + yield from [{"id": 0}] + + @dlt.resource(name="items", primary_key="id") + def r2(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}] + + @dlt.resource(name="items", primary_key="id") + def r3(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}, {"id": 2}] + + @dlt.resource(name="items", primary_key="id") + def r4(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}] + + @dlt.resource(name="items", primary_key="id") + def r5(_=dlt.sources.incremental("id")): + yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}] + + os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(5) + p1.run(r1) + p1.run(r2) + p1.run(r3) + p1.run(r4) + p1.run(r5) + + os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(2) + p2.run(r1) + p2.run(r2) + p2.run(r3) + p2.run(r4) + p2.run(r5) + + p1_client: FilesystemClient = p1.destination_client() # type: ignore + p1_state_files = list(p1_client._list_dlt_table_files(p1_client.schema.state_table_name, "p1")) + + p2_client: FilesystemClient = p2.destination_client() # type: ignore + p2_state_files = list(p2_client._list_dlt_table_files(p2_client.schema.state_table_name, "p2")) + + assert len(p1_state_files) == 5 + + assert len(p2_state_files) == 2