Skip to content

Commit

Permalink
feat: filesystem delete old pipeline state files (#1838)
Browse files Browse the repository at this point in the history
* feat: delete old pipeline state files

* fix: common tests

* text: state file cleanup

* chore: improve tests and change remove function

* fix: filesystem common test

* chore: add max_state_files validation

* chore: simplified state cleanup

* docs: improve max_state_files doc

* chore: extended existing function

* test: shared dataset cleanup pipeline state

* test: oldest pipeline state file rm when cleanup

* chore: use rm helper function

* docs: filesystem max_state_files config

* docs: change filesystem max_state_files

* docs: rm empty spaces
  • Loading branch information
donotpush authored Sep 25, 2024
1 parent 8a86a73 commit 3aadc32
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 4 deletions.
2 changes: 2 additions & 0 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class FilesystemConfiguration(BaseConfiguration):
kwargs: Optional[DictStrAny] = None
client_kwargs: Optional[DictStrAny] = None
deltalake_storage_options: Optional[DictStrAny] = None
max_state_files: int = 100
"""Maximum number of pipeline state files to keep; 0 or negative value disables cleanup."""

@property
def protocol(self) -> str:
Expand Down
39 changes: 36 additions & 3 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import base64

from types import TracebackType
from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast
from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast, Any
from fsspec import AbstractFileSystem
from contextlib import contextmanager

Expand Down Expand Up @@ -479,7 +479,9 @@ def _to_path_safe_string(self, s: str) -> str:
"""for base64 strings"""
return base64.b64decode(s).hex() if s else None

def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str]]]:
def _list_dlt_table_files(
self, table_name: str, pipeline_name: str = None
) -> Iterator[Tuple[str, List[str]]]:
dirname = self.get_table_dir(table_name)
if not self.fs_client.exists(self.pathlib.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
Expand All @@ -488,7 +490,9 @@ def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str
fileparts = filename.split(FILENAME_SEPARATOR)
if len(fileparts) != 3:
continue
yield filepath, fileparts
# Filters only if pipeline_name provided
if pipeline_name is None or fileparts[0] == pipeline_name:
yield filepath, fileparts

def _store_load(self, load_id: str) -> None:
# write entry to load "table"
Expand Down Expand Up @@ -523,6 +527,31 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: s
f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _cleanup_pipeline_states(self, pipeline_name: str) -> None:
state_table_files = list(
self._list_dlt_table_files(self.schema.state_table_name, pipeline_name)
)

if len(state_table_files) > self.config.max_state_files:
# filter and collect a list of state files
state_file_info: List[Dict[str, Any]] = [
{
"load_id": float(fileparts[1]), # convert load_id to float for comparison
"filepath": filepath,
}
for filepath, fileparts in state_table_files
]

# sort state file info by load_id in descending order
state_file_info.sort(key=lambda x: x["load_id"], reverse=True)

# keeping only the most recent MAX_STATE_HISTORY files
files_to_delete = state_file_info[self.config.max_state_files :]

# delete the old files
for file_info in files_to_delete:
self._delete_file(file_info["filepath"])

def _store_current_state(self, load_id: str) -> None:
# don't save the state this way when used as staging
if self.config.as_staging_destination:
Expand All @@ -542,6 +571,10 @@ def _store_current_state(self, load_id: str) -> None:
# write
self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc))

# perform state cleanup only if max_state_files is set to a positive value
if self.config.max_state_files >= 1:
self._cleanup_pipeline_states(pipeline_name)

def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# search newest state
selected_path = None
Expand Down
3 changes: 2 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -697,5 +697,6 @@ This destination fully supports [dlt state sync](../../general-usage/state#synci

You will also notice `init` files being present in the root folder and the special `dlt` folders. In the absence of the concepts of schemas and tables in blob storages and directories, `dlt` uses these special files to harmonize the behavior of the `filesystem` destination with the other implemented destinations.

<!--@@@DLT_TUBA filesystem-->
**Note:** When a load generates a new state, for example when using incremental loads, a new state file appears in the `_dlt_pipeline_state` folder at the destination. To prevent data accumulation, state cleanup mechanisms automatically remove old state files, retaining only the latest 100 by default. This cleanup process can be customized or disabled using the filesystem configuration `max_state_files`, which determines the maximum number of pipeline state files to retain (default is 100). Setting this value to 0 or a negative number disables the cleanup of old states.

<!--@@@DLT_TUBA filesystem-->
2 changes: 2 additions & 0 deletions tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_filesystem_configuration() -> None:
"bucket_url": "az://root",
"credentials": None,
"client_kwargs": None,
"max_state_files": 100,
"kwargs": None,
"deltalake_storage_options": None,
}
Expand Down Expand Up @@ -173,6 +174,7 @@ def test_filesystem_configuration_with_additional_arguments() -> None:
"read_only": False,
"bucket_url": "az://root",
"credentials": None,
"max_state_files": 100,
"kwargs": {"use_ssl": True},
"client_kwargs": {"verify": "public.crt"},
"deltalake_storage_options": {"AWS_S3_LOCKING_PROVIDER": "dynamodb"},
Expand Down
184 changes: 184 additions & 0 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.common.utils import uniq_id
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.configuration.exceptions import ConfigurationValueError
from dlt.destinations import filesystem
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient
from dlt.destinations.impl.filesystem.typing import TExtraPlaceholders
Expand Down Expand Up @@ -1334,3 +1335,186 @@ def table_3():
# test truncate multiple
fs_client.truncate_tables(["table_1", "table_3"])
assert load_table_counts(p, "table_1", "table_2", "table_3") == {"table_2": 21}


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_cleanup_states_by_load_id(destination_config: DestinationTestConfiguration) -> None:
"""
Test the pipeline state cleanup functionality by verifying that old state files are removed based on `load_id` when multiple loads are executed.
Specifically, the oldest state file (corresponding to the first `load_id`) should be deleted.
This test checks that when running a pipeline with a resource that produces incremental data, older state files are cleared according to the `max_state_files` setting.
Steps:
1. Set `max_state_files` to 2, allowing only two newest state files to be kept.
2. Run the pipeline three times.
3. Verify that the state file from the first load is no longer present in the state table.
"""

dataset_name = f"{destination_config.destination_name}{uniq_id()}"
p = destination_config.setup_pipeline("p1", dataset_name=dataset_name)

@dlt.resource(name="items", primary_key="id")
def r1(_=dlt.sources.incremental("id")):
yield from [{"id": 0}]

@dlt.resource(name="items", primary_key="id")
def r2(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}]

@dlt.resource(name="items", primary_key="id")
def r3(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}]

os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(2)

info = p.run(r1)
first_load_id = info.loads_ids[0]

info = p.run(r2)
second_load_id = [load_id for load_id in info.loads_ids if load_id != first_load_id][0]

info = p.run(r3)
third_load_id = [
load_id
for load_id in info.loads_ids
if load_id != first_load_id and load_id != second_load_id
][0]

client: FilesystemClient = p.destination_client() # type: ignore
state_table_files = list(client._list_dlt_table_files(client.schema.state_table_name, "p1"))

assert not any(fileparts[1] == first_load_id for _, fileparts in state_table_files)
assert any(fileparts[1] == second_load_id for _, fileparts in state_table_files)
assert any(fileparts[1] == third_load_id for _, fileparts in state_table_files)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("max_state_files", [-1, 0, 1, 3])
def test_cleanup_states(
destination_config: DestinationTestConfiguration, max_state_files: int
) -> None:
"""
Test the behavior of pipeline state cleanup based on different max_state_files configurations.
Steps:
1. Run the pipeline five times with max_state_files set to -1, 0, 1, and 3.
2. Verify that state files are cleaned or retained according to the max_state_files setting:
- Negative or zero values disable cleanup.
- Positive values trigger cleanup, keeping only the specified number of state files.
"""
os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(max_state_files)

dataset_name = f"{destination_config.destination_name}{uniq_id()}"
p = destination_config.setup_pipeline("p1", dataset_name=dataset_name)

@dlt.resource(name="items", primary_key="id")
def r1(_=dlt.sources.incremental("id")):
yield from [{"id": 0}]

@dlt.resource(name="items", primary_key="id")
def r2(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}]

@dlt.resource(name="items", primary_key="id")
def r3(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}]

@dlt.resource(name="items", primary_key="id")
def r4(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]

@dlt.resource(name="items", primary_key="id")
def r5(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}]

# run pipeline
run_count = 5

p.run(r1)
p.run(r2)
p.run(r3)
p.run(r4)
p.run(r5)

client: FilesystemClient = p.destination_client() # type: ignore
state_table_files = list(client._list_dlt_table_files(client.schema.state_table_name, "p1"))

if max_state_files == -1 or max_state_files == 0:
assert len(state_table_files) == run_count
else:
assert len(state_table_files) == max_state_files


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_cleanup_states_shared_dataset(destination_config: DestinationTestConfiguration) -> None:
"""
Test that two pipelines sharing the same bucket_url and dataset_name can independently
clean their _dlt_pipeline_state files with different max_state_files configurations.
Steps:
1. Run pipeline p1 five times with max_state_files set to 5.
2. Run pipeline p2 five times with max_state_files set to 2.
3. Verify that each pipeline only deletes its own state files and does not affect the other.
"""
dataset_name = f"{destination_config.destination_name}{uniq_id()}"

p1 = destination_config.setup_pipeline("p1", dataset_name=dataset_name)
p2 = destination_config.setup_pipeline("p2", dataset_name=dataset_name)

@dlt.resource(name="items", primary_key="id")
def r1(_=dlt.sources.incremental("id")):
yield from [{"id": 0}]

@dlt.resource(name="items", primary_key="id")
def r2(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}]

@dlt.resource(name="items", primary_key="id")
def r3(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}]

@dlt.resource(name="items", primary_key="id")
def r4(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}]

@dlt.resource(name="items", primary_key="id")
def r5(_=dlt.sources.incremental("id")):
yield from [{"id": 0}, {"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}]

os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(5)
p1.run(r1)
p1.run(r2)
p1.run(r3)
p1.run(r4)
p1.run(r5)

os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = str(2)
p2.run(r1)
p2.run(r2)
p2.run(r3)
p2.run(r4)
p2.run(r5)

p1_client: FilesystemClient = p1.destination_client() # type: ignore
p1_state_files = list(p1_client._list_dlt_table_files(p1_client.schema.state_table_name, "p1"))

p2_client: FilesystemClient = p2.destination_client() # type: ignore
p2_state_files = list(p2_client._list_dlt_table_files(p2_client.schema.state_table_name, "p2"))

assert len(p1_state_files) == 5

assert len(p2_state_files) == 2

0 comments on commit 3aadc32

Please sign in to comment.