From c6a65f32c5b8a8536d3e24e663ccc3131a4833a1 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 15 Apr 2024 18:20:53 +0200 Subject: [PATCH] create init file also to mark datasets --- dlt/common/storages/load_package.py | 2 +- dlt/destinations/impl/filesystem/filesystem.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index 8870024de9..6d4f4a0521 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -692,7 +692,7 @@ def destination_state() -> DictStrAny: def load_package_source_state() -> DictStrAny: - """Get segment of load package state that is specific to the current destination.""" + """Get segment of load package state that is specific to the sources.""" lp = load_package() return lp["state"].setdefault("source_state", {}) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 8e3f65f4de..58f8aea853 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -178,6 +178,9 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: " should be created previously!" ) + # we mark the storage folder as initialized + self.fs_client.touch(posixpath.join(self.dataset_path, INIT_FILE_NAME)) + def update_stored_schema( self, load_id: str = None, @@ -215,7 +218,7 @@ def _get_table_dirs(self, table_names: Iterable[str]) -> List[str]: return table_dirs def is_storage_initialized(self) -> bool: - return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] + return self.fs_client.exists(posixpath.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return] def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: # skip the state table, we create a jsonl file in the complete_load step @@ -363,8 +366,9 @@ def _get_stored_schema_by_hash_or_newest( break if selected_path: + print("got state") return StorageSchemaInfo(**json.loads(self.fs_client.read_text(selected_path))) - + print("no state") return None def _store_current_schema(self, load_id: str) -> None: