Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filesystem delete old pipeline state files #1838

Merged
merged 16 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class FilesystemConfiguration(BaseConfiguration):
kwargs: Optional[DictStrAny] = None
client_kwargs: Optional[DictStrAny] = None
deltalake_storage_options: Optional[DictStrAny] = None
enable_state_cleanup: bool = True
donotpush marked this conversation as resolved.
Show resolved Hide resolved
max_state_files: int = 100

@property
def protocol(self) -> str:
Expand All @@ -170,6 +172,9 @@ def on_resolved(self) -> None:
if self.is_local_path(self.bucket_url):
self.bucket_url = self.make_file_url(self.bucket_url)

if self.max_state_files <= 1:
donotpush marked this conversation as resolved.
Show resolved Hide resolved
raise ConfigurationValueError("The max_state_files must be grater than 1.")

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
# use known credentials or empty credentials for unknown protocol
Expand Down
30 changes: 29 additions & 1 deletion dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import base64

from types import TracebackType
from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast
from typing import Dict, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast, Any
from fsspec import AbstractFileSystem
from contextlib import contextmanager

Expand Down Expand Up @@ -520,6 +520,31 @@ def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: s
f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _cleanup_pipeline_states(self, pipeline_name: str) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the ticket it says that we should make sure to not delete state files attached to failed loads, but we are not saving state on failed loads, so we should be good here.

Copy link
Collaborator Author

@donotpush donotpush Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed. @rudolfix specified:

delete only the state files that corresponds to finished loads (they have corresponding completed entry). 
this is to prevent a rate case when we have 100 unsuccessful partial loads and we delete the last right state

Will partial loads store a state file? if not, we can keep the code as it is.

state_table_files = list(self._list_dlt_table_files(self.schema.state_table_name))

if len(state_table_files) > self.config.max_state_files:
# filter and collect a list of state files
state_file_info: List[Dict[str, Any]] = [
{
"pipeline_name": fileparts[0],
"load_id": float(fileparts[1]), # convert load_id to float for comparison
"filepath": filepath,
}
for filepath, fileparts in state_table_files
if fileparts[0] == pipeline_name
]

# sort state file info by load_id in descending order
state_file_info.sort(key=lambda x: x["load_id"], reverse=True)

# keeping only the most recent MAX_STATE_HISTORY files
files_to_delete = state_file_info[self.config.max_state_files :]

# delete the old files
for file_info in files_to_delete:
donotpush marked this conversation as resolved.
Show resolved Hide resolved
self.fs_client.rm(file_info["filepath"])

def _store_current_state(self, load_id: str) -> None:
# don't save the state this way when used as staging
if self.config.as_staging_destination:
Expand All @@ -539,6 +564,9 @@ def _store_current_state(self, load_id: str) -> None:
# write
self._write_to_json_file(hash_path, cast(DictStrAny, pipeline_state_doc))

if self.config.enable_state_cleanup:
self._cleanup_pipeline_states(pipeline_name)

def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# search newest state
selected_path = None
Expand Down
4 changes: 4 additions & 0 deletions tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def test_filesystem_configuration() -> None:
"bucket_url": "az://root",
"credentials": None,
"client_kwargs": None,
"enable_state_cleanup": True,
"max_state_files": 100,
"kwargs": None,
"deltalake_storage_options": None,
}
Expand Down Expand Up @@ -173,6 +175,8 @@ def test_filesystem_configuration_with_additional_arguments() -> None:
"read_only": False,
"bucket_url": "az://root",
"credentials": None,
"enable_state_cleanup": True,
"max_state_files": 100,
"kwargs": {"use_ssl": True},
"client_kwargs": {"verify": "public.crt"},
"deltalake_storage_options": {"AWS_S3_LOCKING_PROVIDER": "dynamodb"},
Expand Down
59 changes: 59 additions & 0 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.common.utils import uniq_id
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.configuration.exceptions import ConfigurationValueError
from dlt.destinations import filesystem
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient
from dlt.destinations.impl.filesystem.typing import TExtraPlaceholders
Expand Down Expand Up @@ -1254,6 +1255,64 @@ def my_resource_inc(prim_key=dlt.sources.incremental("id")):
assert load_table_counts(p, "items") == {"items": 4 if restore else 6}


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_max_state_files_exception(destination_config: DestinationTestConfiguration) -> None:
os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = "1"

with pytest.raises(Exception, match="The max_state_files must be grater than 1."):
p = destination_config.setup_pipeline(
"p1", dataset_name=destination_config.destination_name
)
p.run([1, 2, 3])


@pytest.mark.parametrize(
donotpush marked this conversation as resolved.
Show resolved Hide resolved
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("enable_state_cleanup", ["true", "false"])
def test_enable_state_cleanup(
destination_config: DestinationTestConfiguration,
enable_state_cleanup: str,
) -> None:
os.environ["DESTINATION__FILESYSTEM__ENABLE_STATE_CLEANUP"] = enable_state_cleanup
os.environ["DESTINATION__FILESYSTEM__MAX_STATE_FILES"] = "3"

p = destination_config.setup_pipeline("p1", dataset_name=destination_config.destination_name)

# Initial dataset
@dlt.resource(name="items", primary_key="id")
def initial_resource(_=dlt.sources.incremental("id")):
yield from [{"id": 0}]

p.run(initial_resource)

# Run incremental multiple times
for i in range(1, 5): # Adjust range as needed

@dlt.resource(name="items", primary_key="id")
def incremental_resource(_=dlt.sources.incremental("id"), new_i=i):
yield from [{"id": j} for j in range(1, new_i + 1)]

p = destination_config.setup_pipeline(
"p1", dataset_name=destination_config.destination_name
)
p.run(incremental_resource)

client: FilesystemClient = p.destination_client() # type: ignore
state_table_files = list(client._list_dlt_table_files(client.schema.state_table_name))

if enable_state_cleanup == "true":
donotpush marked this conversation as resolved.
Show resolved Hide resolved
assert len(state_table_files) == 3
else:
assert len(state_table_files) == 5


@pytest.mark.parametrize(
"destination_config",
destinations_configs(all_buckets_filesystem_configs=True),
Expand Down
Loading