Skip to content

Commit

Permalink
move away from "current" file, rather iterator bucket path contents
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Apr 15, 2024
1 parent 15ac9bf commit a6ce1b1
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 62 deletions.
6 changes: 5 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,18 @@ def drop_storage(self) -> None:
pass

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
"""Updates storage to the current schema.
Implementations should not assume that `expected_update` is the exact difference between destination state and the self.schema. This is only the case if
destination has single writer and no other processes modify the schema.
Args:
load_id (str, optional): Load id during which the schema is updated
only_tables (Sequence[str], optional): Updates only listed tables. Defaults to None.
expected_update (TSchemaTables, optional): Update that is expected to be applied to the destination
Returns:
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/destination/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ def drop_storage(self) -> None:
pass

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
return super().update_stored_schema(only_tables, expected_update)
return super().update_stored_schema(load_id, only_tables, expected_update)

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
# skip internal tables and remove columns from schema if so configured
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,12 @@ def drop_storage(self) -> None:
pass

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
applied_update = super().update_stored_schema(only_tables, expected_update)
applied_update = super().update_stored_schema(load_id, only_tables, expected_update)
if self.config.fail_schema_update:
raise DestinationTransientException(
"Raise on schema update due to fail_schema_update config flag"
Expand Down
120 changes: 73 additions & 47 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import posixpath
import os
from types import TracebackType
from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional
from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple
from fsspec import AbstractFileSystem
from contextlib import contextmanager
from dlt.common import json, pendulum
Expand Down Expand Up @@ -33,6 +33,9 @@
from dlt.destinations import path_utils


INIT_FILE_NAME = "init"


class LoadFilesystemJob(LoadJob):
def __init__(
self,
Expand Down Expand Up @@ -176,7 +179,10 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
)

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> TSchemaTables:
# create destination dirs for all tables
table_names = only_tables or self.schema.tables.keys()
Expand All @@ -185,11 +191,10 @@ def update_stored_schema(
self.fs_client.makedirs(directory, exist_ok=True)
# we need to mark the folders of the data tables as initialized
if tables_name in self.schema.dlt_table_names():
print(directory + " " + tables_name)
self.fs_client.touch(f"{directory}/init")
self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME))

# write schema to destination
self.store_current_schema()
self.store_current_schema(load_id or "1")

return expected_update

Expand All @@ -206,7 +211,7 @@ def _get_table_dirs(self, table_names: Iterable[str]) -> List[str]:
)
destination_dir = posixpath.join(self.dataset_path, table_prefix)
# extract the path component
table_dirs.append(os.path.dirname(destination_dir))
table_dirs.append(posixpath.dirname(destination_dir))
return table_dirs

def is_storage_initialized(self) -> bool:
Expand Down Expand Up @@ -245,14 +250,27 @@ def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
#

def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None:
dirname = os.path.dirname(filepath)
dirname = posixpath.dirname(filepath)
if not self.fs_client.isdir(dirname):
return
self.fs_client.write_text(filepath, json.dumps(data), "utf-8")

def _to_path_safe_string(self, s: str) -> str:
return "".join([c for c in s if re.match(r"\w", c)]) if s else None

def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]:
if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
for filepath in self.fs_client.listdir(dirname, detail=False):
filename = os.path.splitext(os.path.basename(filepath))[0]
fileparts = filename.split("__")
if len(fileparts) != 3:
continue
yield filepath, fileparts

def complete_load(self, load_id: str) -> None:
# store current state
self.store_current_state()
self.store_current_state(load_id)

# write entry to load "table"
# TODO: this is also duplicate across all destinations. DRY this.
Expand All @@ -263,26 +281,19 @@ def complete_load(self, load_id: str) -> None:
"inserted_at": pendulum.now().isoformat(),
"schema_version_hash": self.schema.version_hash,
}
filepath = (
f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}.{load_id}.jsonl"
)
filepath = f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}__{load_id}.jsonl"

self._write_to_json_file(filepath, load_data)

#
# state read/write
#

def _get_state_file_name(self, pipeline_name: str, version_hash: str) -> str:
def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
safe_hash = "".join(
[c for c in version_hash if re.match(r"\w", c)]
) # remove all special chars from hash
return (
f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{safe_hash}.jsonl"
)
return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl"

def store_current_state(self) -> None:
def store_current_state(self, load_id: str) -> None:
# get state doc from current pipeline
from dlt.common.configuration.container import Container
from dlt.common.pipeline import PipelineContext
Expand All @@ -293,25 +304,28 @@ def store_current_state(self) -> None:
doc = state_doc(state)

# get paths
current_path = self._get_state_file_name(pipeline.pipeline_name, "current")
hash_path = self._get_state_file_name(
pipeline.pipeline_name, self.schema.stored_version_hash
pipeline.pipeline_name, self.schema.stored_version_hash, load_id
)

# write
self._write_to_json_file(current_path, doc)
self._write_to_json_file(hash_path, doc)

def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# raise if dir not initialized
filepath = self._get_state_file_name(pipeline_name, "current")
dirname = os.path.dirname(filepath)
if not self.fs_client.isdir(dirname):
raise DestinationUndefinedEntity({"dir": dirname})
# get base dir
dirname = posixpath.dirname(self._get_state_file_name(pipeline_name, "", ""))

# search newest state
selected_path = None
newest_load_id = "0"
for filepath, fileparts in self._list_dlt_dir(dirname):
if fileparts[0] == pipeline_name and fileparts[1] > newest_load_id:
newest_load_id = fileparts[1]
selected_path = filepath

"""Loads compressed state from destination storage"""
if self.fs_client.exists(filepath):
state_json = json.loads(self.fs_client.read_text(filepath))
if selected_path:
state_json = json.loads(self.fs_client.read_text(selected_path))
state_json.pop("version_hash")
return StateInfo(**state_json)

Expand All @@ -321,33 +335,46 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# Schema read/write
#

def _get_schema_file_name(self, version_hash: str) -> str:
def _get_schema_file_name(self, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
safe_hash = "".join(
[c for c in version_hash if re.match(r"\w", c)]
) # remove all special chars from hash
return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{safe_hash}.jsonl"
return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl"

def get_stored_schema(self) -> Optional[StorageSchemaInfo]:
"""Retrieves newest schema from destination storage"""
return self.get_stored_schema_by_hash("current")
return self._get_stored_schema_by_hash_or_newest()

def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]:
"""retrieves the stored schema by hash"""
filepath = self._get_schema_file_name(version_hash)
# raise if dir not initialized
dirname = os.path.dirname(filepath)
if not self.fs_client.isdir(dirname):
raise DestinationUndefinedEntity({"dir": dirname})
if self.fs_client.exists(filepath):
return StorageSchemaInfo(**json.loads(self.fs_client.read_text(filepath)))
return self._get_stored_schema_by_hash_or_newest(version_hash)

def _get_stored_schema_by_hash_or_newest(
self, version_hash: str = None
) -> Optional[StorageSchemaInfo]:
"""Get the schema by supplied hash, falls back to getting the newest version matching the existing schema name"""
version_hash = self._to_path_safe_string(version_hash)
dirname = posixpath.dirname(self._get_schema_file_name("", ""))
# find newest schema for pipeline or by version hash
selected_path = None
newest_load_id = "0"
for filepath, fileparts in self._list_dlt_dir(dirname):
if (
not version_hash
and fileparts[0] == self.schema.name
and fileparts[1] > newest_load_id
):
newest_load_id = fileparts[1]
selected_path = filepath
elif fileparts[2] == version_hash:
selected_path = filepath
break

if selected_path:
return StorageSchemaInfo(**json.loads(self.fs_client.read_text(selected_path)))

return None

def store_current_schema(self) -> None:
def store_current_schema(self, load_id: str) -> None:
# get paths
current_path = self._get_schema_file_name("current")
hash_path = self._get_schema_file_name(self.schema.stored_version_hash)
hash_path = self._get_schema_file_name(self.schema.stored_version_hash, load_id)

# TODO: duplicate of weaviate implementation, should be abstracted out
version_info = {
Expand All @@ -360,5 +387,4 @@ def store_current_schema(self) -> None:
}

# we always keep tabs on what the current schema is
self._write_to_json_file(current_path, version_info)
self._write_to_json_file(hash_path, version_info)
7 changes: 5 additions & 2 deletions dlt/destinations/impl/qdrant/qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,12 @@ def _delete_sentinel_collection(self) -> None:
self.db_client.delete_collection(self.sentinel_collection)

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
super().update_stored_schema(load_id, only_tables, expected_update)
applied_update: TSchemaTables = {}
schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash)
if schema_info is None:
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/weaviate/weaviate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,12 @@ def _delete_sentinel_class(self) -> None:

@wrap_weaviate_error
def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
super().update_stored_schema(load_id, only_tables, expected_update)
# Retrieve the schema from Weaviate
applied_update: TSchemaTables = {}
try:
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,12 @@ def is_storage_initialized(self) -> bool:
return self.sql_client.has_dataset()

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
self,
load_id: str = None,
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None,
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
super().update_stored_schema(load_id, only_tables, expected_update)
applied_update: TSchemaTables = {}
schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash)
if schema_info is None:
Expand Down
2 changes: 2 additions & 0 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
if isinstance(job_client, WithStagingDataset)
else None
),
load_id=load_id,
)

# init staging client
Expand All @@ -385,6 +386,7 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
expected_update,
job_client.should_truncate_table_before_load_on_staging_destination,
job_client.should_load_data_to_staging_dataset_on_staging_destination,
load_id=load_id,
)

self.load_storage.commit_schema_update(load_id, applied_update)
Expand Down
7 changes: 5 additions & 2 deletions dlt/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def init_client(
expected_update: TSchemaTables,
truncate_filter: Callable[[TTableSchema], bool],
load_staging_filter: Callable[[TTableSchema], bool],
load_id: str = None,
) -> TSchemaTables:
"""Initializes destination storage including staging dataset if supported
Expand Down Expand Up @@ -97,7 +98,7 @@ def init_client(
)

applied_update = _init_dataset_and_update_schema(
job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables
job_client, expected_update, tables_with_jobs | dlt_tables, truncate_tables, load_id=load_id
)

# update the staging dataset if client supports this
Expand All @@ -117,6 +118,7 @@ def init_client(
staging_tables | {schema.version_table_name}, # keep only schema version
staging_tables, # all eligible tables must be also truncated
staging_info=True,
load_id=load_id,
)

return applied_update
Expand All @@ -128,6 +130,7 @@ def _init_dataset_and_update_schema(
update_tables: Iterable[str],
truncate_tables: Iterable[str] = None,
staging_info: bool = False,
load_id: str = None,
) -> TSchemaTables:
staging_text = "for staging dataset" if staging_info else ""
logger.info(
Expand All @@ -140,7 +143,7 @@ def _init_dataset_and_update_schema(
f" {staging_text}"
)
applied_update = job_client.update_stored_schema(
only_tables=update_tables, expected_update=expected_update
load_id=load_id, only_tables=update_tables, expected_update=expected_update
)
logger.info(
f"Client for {job_client.config.destination_type} will truncate tables {staging_text}"
Expand Down
4 changes: 2 additions & 2 deletions fs_testing_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

if __name__ == "__main__":

shutil.rmtree("./my_files", ignore_errors=True)
# shutil.rmtree("./my_files", ignore_errors=True)
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://my_files"
os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE"

Expand All @@ -20,7 +20,7 @@ def my_resouce(id=dlt.sources.incremental("id")):
{"id": 5}
]

pipeline_name = f"dave_{random.randint(0, 10000000)}"
pipeline_name = f"dave"

pipe = dlt.pipeline(pipeline_name=pipeline_name, destination="filesystem")
pipe.run(my_resouce(), table_name="my_table") #, loader_file_format="parquet")
Expand Down
Loading

0 comments on commit a6ce1b1

Please sign in to comment.