From 9a87f0f0e51d882a19bcc9e8bf2e29b2caa77359 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 3 Apr 2024 18:19:12 +0200 Subject: [PATCH] first messy version of filesystem state sync --- dlt/common/destination/reference.py | 1 + .../impl/filesystem/filesystem.py | 112 +++++++++++++++++- dlt/pipeline/pipeline.py | 1 - dlt/pipeline/state_sync.py | 15 ++- fs_testing_pipe.py | 20 ++++ 5 files changed, 137 insertions(+), 12 deletions(-) create mode 100644 fs_testing_pipe.py diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index ddcc5d1146..c71532fb70 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -427,6 +427,7 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]: @abstractmethod def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo: + """retrieves the stored schema by hash""" pass @abstractmethod diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 33a597f915..61516d5e30 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,9 +1,12 @@ import posixpath import os from types import TracebackType -from typing import ClassVar, List, Type, Iterable, Set, Iterator +from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple from fsspec import AbstractFileSystem from contextlib import contextmanager +from dlt.common import json, pendulum +import base64 +import re from dlt.common import logger from dlt.common.schema import Schema, TSchemaTables, TTableSchema @@ -16,6 +19,10 @@ JobClientBase, FollowupJob, WithStagingDataset, + WithStateSync, + StorageSchemaInfo, + StateInfo, + DoNothingJob ) from dlt.destinations.job_impl import EmptyLoadJob @@ -87,7 +94,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]: return jobs -class FilesystemClient(JobClientBase, WithStagingDataset): +class FilesystemClient(JobClientBase, WithStagingDataset, WithStateSync): """filesystem client storing jobs in memory""" capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() @@ -167,15 +174,67 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: " should be created previously!" ) + def _get_schema_file_name(self, hash: str) -> Tuple[str, str]: + """gets tuple of dir and fullpath for schema file for a given hash""" + dir = f"{self.dataset_path}/{self.schema.version_table_name}" + # remove all special chars from hash + safe_hash = "".join([c for c in hash if re.match(r"\w", c)]) + path = f"{dir}/{self.schema.name}__{safe_hash}.jsonl" + return dir, path + def update_stored_schema( self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None ) -> TSchemaTables: # create destination dirs for all tables + # TODO we should only create dirs for datatables dirs_to_create = self._get_table_dirs(only_tables or self.schema.tables.keys()) for directory in dirs_to_create: self.fs_client.makedirs(directory, exist_ok=True) + + # get paths + dir, current_path = self._get_schema_file_name("current") + _, hash_path = self._get_schema_file_name(self.schema.stored_version_hash) + + # TODO: duplicate of weaviate implementation, should be abstracted out + version_info = { + "version_hash": self.schema.stored_version_hash, + "schema_name": self.schema.name, + "version": self.schema.version, + "engine_version": self.schema.ENGINE_VERSION, + "inserted_at": pendulum.now().isoformat(), + "schema": json.dumps(self.schema.to_dict()), + } + + # we always keep tabs on what the current schema is + self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. + self.fs_client.write_text(current_path, json.dumps(version_info), "utf-8") + self.fs_client.write_text(hash_path, json.dumps(version_info), "utf-8") + return expected_update + def get_stored_schema(self) -> Optional[StorageSchemaInfo]: + """Retrieves newest schema from destination storage""" + return self.get_stored_schema_by_hash("current") + + def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: + """retrieves the stored schema by hash""" + _, filepath = self._get_schema_file_name(version_hash) + if not self.fs_client.exists(filepath): + return None + schema_info = json.loads(self.fs_client.read_text(filepath)) + schema_info["inserted_at"] = pendulum.parse(schema_info["inserted_at"]) + return StorageSchemaInfo(**schema_info) + + def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: + """Loads compressed state from destination storage""" + dir = f"{self.dataset_path}/{self.schema.state_table_name}" + current_file_name = f"{dir}/{pipeline_name}_current.jsonl" + if not self.fs_client.exists(current_file_name): + return None + state_json = json.loads(self.fs_client.read_text(current_file_name)) + state_json.pop("version_hash") + return StateInfo(**state_json) + def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: """Gets unique directories where table data is stored.""" table_dirs: Set[str] = set() @@ -192,6 +251,11 @@ def is_storage_initialized(self) -> bool: return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: + + # do not load state file the regular way + if table["name"] == self.schema.state_table_name: + return DoNothingJob(file_path) + cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob return cls( file_path, @@ -205,10 +269,46 @@ def restore_file_load(self, file_path: str) -> LoadJob: return EmptyLoadJob.from_file_path(file_path, "completed") def complete_load(self, load_id: str) -> None: - schema_name = self.schema.name - table_name = self.schema.loads_table_name - file_name = f"{schema_name}.{table_name}.{load_id}" - self.fs_client.touch(posixpath.join(self.dataset_path, file_name)) + + # write entry to state "table" + from dlt import current + from dlt.pipeline.state_sync import state_doc + + # get the state from the current pipeline + pipeline = current.pipeline() + state = pipeline._get_state() + doc = state_doc(state) + + # convert pendulum now to iso timestamp + doc["created_at"] = doc["created_at"].isoformat() + + dir = f"{self.dataset_path}/{self.schema.state_table_name}" + safe_hash = "".join([c for c in doc["version_hash"] if re.match(r"\w", c)]) + hash_file_name = f"{dir}/{pipeline.pipeline_name}_{safe_hash}.jsonl" + current_file_name = f"{dir}/{pipeline.pipeline_name}_current.jsonl" + + self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. + self.fs_client.write_text(hash_file_name, json.dumps(doc), "utf-8") + self.fs_client.write_text(current_file_name, json.dumps(doc), "utf-8") + + # write entry to load "table" + dir = f"{self.dataset_path}/{self.schema.loads_table_name}" + file_name = f"{self.schema.name}.{load_id}" + filepath = f"{dir}/{file_name}" + + # TODO: this is also duplicate across all destinations. DRY this. + load_data = { + "load_id": load_id, + "schema_name": self.schema.name, + "status": 0, + "inserted_at": pendulum.now().isoformat(), + "schema_version_hash": self.schema.version_hash, + } + + self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. + self.fs_client.write_text(filepath, json.dumps(load_data), "utf-8") + + def __enter__(self) -> "FilesystemClient": return self diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index de1f7afced..c46e012ab5 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -713,7 +713,6 @@ def sync_destination( remote_state["schema_names"], always_download=True ) # TODO: we should probably wipe out pipeline here - # if we didn't full refresh schemas, get only missing schemas if restored_schemas is None: restored_schemas = self._get_schemas_from_destination( diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index 5366b9c46d..c5f2448d61 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -115,11 +115,11 @@ def migrate_pipeline_state( return cast(TPipelineState, state) -def state_resource(state: TPipelineState) -> DltResource: - state = copy(state) - state.pop("_local") +def state_doc(state: TPipelineState) -> DictStrAny: + doc = copy(state) + doc.pop("_local") state_str = compress_state(state) - state_doc = { + doc = { "version": state["_state_version"], "engine_version": state["_state_engine_version"], "pipeline_name": state["pipeline_name"], @@ -127,8 +127,13 @@ def state_resource(state: TPipelineState) -> DltResource: "created_at": pendulum.now(), "version_hash": state["_version_hash"], } + return doc + + +def state_resource(state: TPipelineState) -> DltResource: + doc = state_doc(state) return dlt.resource( - [state_doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS + [doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS ) diff --git a/fs_testing_pipe.py b/fs_testing_pipe.py new file mode 100644 index 0000000000..a3d3275f71 --- /dev/null +++ b/fs_testing_pipe.py @@ -0,0 +1,20 @@ +import dlt +import os + +if __name__ == "__main__": + os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://my_files" + os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" + + # resource with incremental for testing restoring of pipeline state + @dlt.resource(name="my_table") + def my_resouce(id=dlt.sources.incremental("id")): + yield from [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + {"id": 4}, + {"id": 5} + ] + + pipe = dlt.pipeline(pipeline_name="dave", destination="filesystem") + pipe.run(my_resouce(), table_name="my_table") #, loader_file_format="parquet")