From f6d5c9ccd451074a9c12680267e9674e652c2836 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 3 Apr 2024 18:42:38 +0200 Subject: [PATCH] clean up a bit --- .../impl/filesystem/filesystem.py | 190 ++++++++++-------- 1 file changed, 105 insertions(+), 85 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 61516d5e30..388ed3fc69 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -5,7 +5,8 @@ from fsspec import AbstractFileSystem from contextlib import contextmanager from dlt.common import json, pendulum -import base64 +from dlt.common.typing import DictStrAny + import re from dlt.common import logger @@ -22,7 +23,7 @@ WithStateSync, StorageSchemaInfo, StateInfo, - DoNothingJob + DoNothingJob, ) from dlt.destinations.job_impl import EmptyLoadJob @@ -174,14 +175,6 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: " should be created previously!" ) - def _get_schema_file_name(self, hash: str) -> Tuple[str, str]: - """gets tuple of dir and fullpath for schema file for a given hash""" - dir = f"{self.dataset_path}/{self.schema.version_table_name}" - # remove all special chars from hash - safe_hash = "".join([c for c in hash if re.match(r"\w", c)]) - path = f"{dir}/{self.schema.name}__{safe_hash}.jsonl" - return dir, path - def update_stored_schema( self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None ) -> TSchemaTables: @@ -191,50 +184,11 @@ def update_stored_schema( for directory in dirs_to_create: self.fs_client.makedirs(directory, exist_ok=True) - # get paths - dir, current_path = self._get_schema_file_name("current") - _, hash_path = self._get_schema_file_name(self.schema.stored_version_hash) - - # TODO: duplicate of weaviate implementation, should be abstracted out - version_info = { - "version_hash": self.schema.stored_version_hash, - "schema_name": self.schema.name, - "version": self.schema.version, - "engine_version": self.schema.ENGINE_VERSION, - "inserted_at": pendulum.now().isoformat(), - "schema": json.dumps(self.schema.to_dict()), - } - - # we always keep tabs on what the current schema is - self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. - self.fs_client.write_text(current_path, json.dumps(version_info), "utf-8") - self.fs_client.write_text(hash_path, json.dumps(version_info), "utf-8") + # write schema to destination + self.store_current_schema() return expected_update - def get_stored_schema(self) -> Optional[StorageSchemaInfo]: - """Retrieves newest schema from destination storage""" - return self.get_stored_schema_by_hash("current") - - def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: - """retrieves the stored schema by hash""" - _, filepath = self._get_schema_file_name(version_hash) - if not self.fs_client.exists(filepath): - return None - schema_info = json.loads(self.fs_client.read_text(filepath)) - schema_info["inserted_at"] = pendulum.parse(schema_info["inserted_at"]) - return StorageSchemaInfo(**schema_info) - - def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: - """Loads compressed state from destination storage""" - dir = f"{self.dataset_path}/{self.schema.state_table_name}" - current_file_name = f"{dir}/{pipeline_name}_current.jsonl" - if not self.fs_client.exists(current_file_name): - return None - state_json = json.loads(self.fs_client.read_text(current_file_name)) - state_json.pop("version_hash") - return StateInfo(**state_json) - def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: """Gets unique directories where table data is stored.""" table_dirs: Set[str] = set() @@ -251,8 +205,8 @@ def is_storage_initialized(self) -> bool: return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: - - # do not load state file the regular way + # skip the state table, we create a jsonl file in the complete_load + # step if table["name"] == self.schema.state_table_name: return DoNothingJob(file_path) @@ -268,34 +222,31 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> def restore_file_load(self, file_path: str) -> LoadJob: return EmptyLoadJob.from_file_path(file_path, "completed") - def complete_load(self, load_id: str) -> None: + def __enter__(self) -> "FilesystemClient": + return self - # write entry to state "table" - from dlt import current - from dlt.pipeline.state_sync import state_doc + def __exit__( + self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType + ) -> None: + pass - # get the state from the current pipeline - pipeline = current.pipeline() - state = pipeline._get_state() - doc = state_doc(state) + def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: + return False - # convert pendulum now to iso timestamp - doc["created_at"] = doc["created_at"].isoformat() - - dir = f"{self.dataset_path}/{self.schema.state_table_name}" - safe_hash = "".join([c for c in doc["version_hash"] if re.match(r"\w", c)]) - hash_file_name = f"{dir}/{pipeline.pipeline_name}_{safe_hash}.jsonl" - current_file_name = f"{dir}/{pipeline.pipeline_name}_current.jsonl" + # + # state stuff + # - self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. - self.fs_client.write_text(hash_file_name, json.dumps(doc), "utf-8") - self.fs_client.write_text(current_file_name, json.dumps(doc), "utf-8") + def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: + dirname = os.path.dirname(filepath) + self.fs_client.makedirs(dirname, exist_ok=True) + self.fs_client.write_text(filepath, json.dumps(data), "utf-8") - # write entry to load "table" - dir = f"{self.dataset_path}/{self.schema.loads_table_name}" - file_name = f"{self.schema.name}.{load_id}" - filepath = f"{dir}/{file_name}" + def complete_load(self, load_id: str) -> None: + # store current state + self.store_current_state() + # write entry to load "table" # TODO: this is also duplicate across all destinations. DRY this. load_data = { "load_id": load_id, @@ -304,19 +255,88 @@ def complete_load(self, load_id: str) -> None: "inserted_at": pendulum.now().isoformat(), "schema_version_hash": self.schema.version_hash, } + filepath = ( + f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}.{load_id}.jsonl" + ) - self.fs_client.makedirs(dir, exist_ok=True) # may not be needed.. - self.fs_client.write_text(filepath, json.dumps(load_data), "utf-8") + self._write_to_json_file(filepath, load_data) + # + # state read/write + # + def _get_state_file_name(self, pipeline_name: str, hash: str) -> Tuple[str, str]: + """gets tuple of dir and fullpath for schema file for a given hash""" + safe_hash = "".join( + [c for c in hash if re.match(r"\w", c)] + ) # remove all special chars from hash + return ( + f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{safe_hash}.jsonl" + ) - def __enter__(self) -> "FilesystemClient": - return self + def store_current_state(self) -> None: + # get state doc from current pipeline + from dlt import current + from dlt.pipeline.state_sync import state_doc - def __exit__( - self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: TracebackType - ) -> None: - pass + pipeline = current.pipeline() + state = pipeline._get_state() + doc = state_doc(state) - def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: - return False + # get paths + current_path = self._get_state_file_name(pipeline.pipeline_name, "current") + hash_path = self._get_state_file_name( + pipeline.pipeline_name, self.schema.stored_version_hash + ) + + # write + self._write_to_json_file(current_path, doc) + self._write_to_json_file(hash_path, doc) + + def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: + """Loads compressed state from destination storage""" + file_name = self._get_state_file_name(pipeline_name, "current") + if self.fs_client.exists(file_name): + state_json = json.loads(self.fs_client.read_text(file_name)) + state_json.pop("version_hash") + return StateInfo(**state_json) + + # + # Schema read/write + # + + def _get_schema_file_name(self, hash: str) -> Tuple[str, str]: + """gets tuple of dir and fullpath for schema file for a given hash""" + safe_hash = "".join( + [c for c in hash if re.match(r"\w", c)] + ) # remove all special chars from hash + return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{safe_hash}.jsonl" + + def get_stored_schema(self) -> Optional[StorageSchemaInfo]: + """Retrieves newest schema from destination storage""" + return self.get_stored_schema_by_hash("current") + + def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: + """retrieves the stored schema by hash""" + filepath = self._get_schema_file_name(version_hash) + if self.fs_client.exists(filepath): + return StorageSchemaInfo(**json.loads(self.fs_client.read_text(filepath))) + + def store_current_schema(self) -> None: + # get paths + current_path = self._get_schema_file_name("current") + hash_path = self._get_schema_file_name(self.schema.stored_version_hash) + + # TODO: duplicate of weaviate implementation, should be abstracted out + version_info = { + "version_hash": self.schema.stored_version_hash, + "schema_name": self.schema.name, + "version": self.schema.version, + "engine_version": self.schema.ENGINE_VERSION, + "inserted_at": pendulum.now(), + "schema": json.dumps(self.schema.to_dict()), + } + + # we always keep tabs on what the current schema is + self._write_to_json_file(current_path, version_info) + self._write_to_json_file(hash_path, version_info)