-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
filesystem state sync #1184
filesystem state sync #1184
Changes from 3 commits
0369496
9a87f0f
f6d5c9c
d58a38b
2913c33
e32ad95
cd21ff6
6b7c16d
95cc882
b5eb47d
15ac9bf
a6ce1b1
bce2837
40f1f3e
5e8c233
e7e0192
7cd51b4
c406600
0c52fcd
f0635b2
bdaf094
a09f896
fce47c6
0d5423c
b2b5913
cd4dd23
c8b3429
6522f87
de34a48
abfc170
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,13 @@ | ||
import posixpath | ||
import os | ||
from types import TracebackType | ||
from typing import ClassVar, List, Type, Iterable, Set, Iterator | ||
from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple | ||
from fsspec import AbstractFileSystem | ||
from contextlib import contextmanager | ||
from dlt.common import json, pendulum | ||
from dlt.common.typing import DictStrAny | ||
|
||
import re | ||
|
||
from dlt.common import logger | ||
from dlt.common.schema import Schema, TSchemaTables, TTableSchema | ||
|
@@ -16,6 +20,10 @@ | |
JobClientBase, | ||
FollowupJob, | ||
WithStagingDataset, | ||
WithStateSync, | ||
StorageSchemaInfo, | ||
StateInfo, | ||
DoNothingJob, | ||
) | ||
|
||
from dlt.destinations.job_impl import EmptyLoadJob | ||
|
@@ -87,7 +95,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]: | |
return jobs | ||
|
||
|
||
class FilesystemClient(JobClientBase, WithStagingDataset): | ||
class FilesystemClient(JobClientBase, WithStagingDataset, WithStateSync): | ||
"""filesystem client storing jobs in memory""" | ||
|
||
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() | ||
|
@@ -171,9 +179,14 @@ def update_stored_schema( | |
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None | ||
) -> TSchemaTables: | ||
# create destination dirs for all tables | ||
# TODO we should only create dirs for datatables | ||
dirs_to_create = self._get_table_dirs(only_tables or self.schema.tables.keys()) | ||
for directory in dirs_to_create: | ||
self.fs_client.makedirs(directory, exist_ok=True) | ||
|
||
# write schema to destination | ||
self.store_current_schema() | ||
|
||
return expected_update | ||
|
||
def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]: | ||
|
@@ -192,6 +205,11 @@ def is_storage_initialized(self) -> bool: | |
return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return] | ||
|
||
def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: | ||
# skip the state table, we create a jsonl file in the complete_load | ||
# step | ||
if table["name"] == self.schema.state_table_name: | ||
return DoNothingJob(file_path) | ||
|
||
cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob | ||
return cls( | ||
file_path, | ||
|
@@ -204,12 +222,6 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> | |
def restore_file_load(self, file_path: str) -> LoadJob: | ||
return EmptyLoadJob.from_file_path(file_path, "completed") | ||
|
||
def complete_load(self, load_id: str) -> None: | ||
schema_name = self.schema.name | ||
table_name = self.schema.loads_table_name | ||
file_name = f"{schema_name}.{table_name}.{load_id}" | ||
self.fs_client.touch(posixpath.join(self.dataset_path, file_name)) | ||
|
||
def __enter__(self) -> "FilesystemClient": | ||
return self | ||
|
||
|
@@ -220,3 +232,111 @@ def __exit__( | |
|
||
def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: | ||
return False | ||
|
||
# | ||
# state stuff | ||
# | ||
|
||
def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: | ||
dirname = os.path.dirname(filepath) | ||
self.fs_client.makedirs(dirname, exist_ok=True) | ||
self.fs_client.write_text(filepath, json.dumps(data), "utf-8") | ||
|
||
def complete_load(self, load_id: str) -> None: | ||
# store current state | ||
self.store_current_state() | ||
|
||
# write entry to load "table" | ||
# TODO: this is also duplicate across all destinations. DRY this. | ||
load_data = { | ||
"load_id": load_id, | ||
"schema_name": self.schema.name, | ||
"status": 0, | ||
"inserted_at": pendulum.now().isoformat(), | ||
"schema_version_hash": self.schema.version_hash, | ||
} | ||
filepath = ( | ||
f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}.{load_id}.jsonl" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this path? maybe we should save it where the previous path was There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i wanted to align it with the way all the other dlt tables are stored. i somehow like it more, we could have a setting though for backwards compatibility or something? your call. |
||
) | ||
|
||
self._write_to_json_file(filepath, load_data) | ||
|
||
# | ||
# state read/write | ||
# | ||
|
||
def _get_state_file_name(self, pipeline_name: str, hash: str) -> Tuple[str, str]: | ||
"""gets tuple of dir and fullpath for schema file for a given hash""" | ||
safe_hash = "".join( | ||
[c for c in hash if re.match(r"\w", c)] | ||
) # remove all special chars from hash | ||
return ( | ||
f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{safe_hash}.jsonl" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm you will not get same hash twice. we do not emit state if hash is not changing. also I think load_id is a must in the file name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i put load id there now, not sure what you mean with the "you will not get the same hash twice" comment? |
||
) | ||
|
||
def store_current_state(self) -> None: | ||
# get state doc from current pipeline | ||
rudolfix marked this conversation as resolved.
Show resolved
Hide resolved
|
||
from dlt import current | ||
from dlt.pipeline.state_sync import state_doc | ||
|
||
pipeline = current.pipeline() | ||
state = pipeline._get_state() | ||
doc = state_doc(state) | ||
|
||
# get paths | ||
current_path = self._get_state_file_name(pipeline.pipeline_name, "current") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this will not work, yes we process package in order but do not assume that (because we do not have to) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure what you mean here, maybe we should discuss it briefly. Imho this setup replicates the behavior of the other destinations, with the same lookups/where clauses. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have changed it to iterate over the files in the dir and select the correct one |
||
hash_path = self._get_state_file_name( | ||
pipeline.pipeline_name, self.schema.stored_version_hash | ||
) | ||
|
||
# write | ||
self._write_to_json_file(current_path, doc) | ||
self._write_to_json_file(hash_path, doc) | ||
|
||
def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: | ||
"""Loads compressed state from destination storage""" | ||
file_name = self._get_state_file_name(pipeline_name, "current") | ||
if self.fs_client.exists(file_name): | ||
state_json = json.loads(self.fs_client.read_text(file_name)) | ||
state_json.pop("version_hash") | ||
return StateInfo(**state_json) | ||
|
||
# | ||
# Schema read/write | ||
# | ||
|
||
def _get_schema_file_name(self, hash: str) -> Tuple[str, str]: | ||
"""gets tuple of dir and fullpath for schema file for a given hash""" | ||
safe_hash = "".join( | ||
[c for c in hash if re.match(r"\w", c)] | ||
) # remove all special chars from hash | ||
return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{safe_hash}.jsonl" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO hash is enough. also it would be good to have load_id There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need the name in the filepath so I can find the right schema when looking for the newest version of a schema, so I will keep it. |
||
|
||
def get_stored_schema(self) -> Optional[StorageSchemaInfo]: | ||
"""Retrieves newest schema from destination storage""" | ||
return self.get_stored_schema_by_hash("current") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same thing like in state: find the oldest load id There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done (assuming you mean the newest load id :) ) |
||
|
||
def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: | ||
"""retrieves the stored schema by hash""" | ||
filepath = self._get_schema_file_name(version_hash) | ||
if self.fs_client.exists(filepath): | ||
return StorageSchemaInfo(**json.loads(self.fs_client.read_text(filepath))) | ||
|
||
def store_current_schema(self) -> None: | ||
# get paths | ||
current_path = self._get_schema_file_name("current") | ||
hash_path = self._get_schema_file_name(self.schema.stored_version_hash) | ||
|
||
# TODO: duplicate of weaviate implementation, should be abstracted out | ||
version_info = { | ||
rudolfix marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"version_hash": self.schema.stored_version_hash, | ||
"schema_name": self.schema.name, | ||
"version": self.schema.version, | ||
"engine_version": self.schema.ENGINE_VERSION, | ||
"inserted_at": pendulum.now(), | ||
"schema": json.dumps(self.schema.to_dict()), | ||
} | ||
|
||
# we always keep tabs on what the current schema is | ||
self._write_to_json_file(current_path, version_info) | ||
self._write_to_json_file(hash_path, version_info) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import dlt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. keep those in some ignored folder ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, i have that also, but since i am working on two different machines i need to do this sometimes ;) |
||
import os | ||
|
||
if __name__ == "__main__": | ||
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://my_files" | ||
os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" | ||
|
||
# resource with incremental for testing restoring of pipeline state | ||
@dlt.resource(name="my_table") | ||
def my_resouce(id=dlt.sources.incremental("id")): | ||
yield from [ | ||
{"id": 1}, | ||
{"id": 2}, | ||
{"id": 3}, | ||
{"id": 4}, | ||
{"id": 5} | ||
] | ||
|
||
pipe = dlt.pipeline(pipeline_name="dave", destination="filesystem") | ||
pipe.run(my_resouce(), table_name="my_table") #, loader_file_format="parquet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use
os.path
, useposixpath
. here paths are normalized from fsspec.