Skip to content

Commit

Permalink
first messy version of filesystem state sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Apr 3, 2024
1 parent 0369496 commit 9a87f0f
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 12 deletions.
1 change: 1 addition & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]:

@abstractmethod
def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo:
"""retrieves the stored schema by hash"""
pass

@abstractmethod
Expand Down
112 changes: 106 additions & 6 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import posixpath
import os
from types import TracebackType
from typing import ClassVar, List, Type, Iterable, Set, Iterator
from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple
from fsspec import AbstractFileSystem
from contextlib import contextmanager
from dlt.common import json, pendulum
import base64
import re

from dlt.common import logger
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
Expand All @@ -16,6 +19,10 @@
JobClientBase,
FollowupJob,
WithStagingDataset,
WithStateSync,
StorageSchemaInfo,
StateInfo,
DoNothingJob
)

from dlt.destinations.job_impl import EmptyLoadJob
Expand Down Expand Up @@ -87,7 +94,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
return jobs


class FilesystemClient(JobClientBase, WithStagingDataset):
class FilesystemClient(JobClientBase, WithStagingDataset, WithStateSync):
"""filesystem client storing jobs in memory"""

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
Expand Down Expand Up @@ -167,15 +174,67 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
" should be created previously!"
)

def _get_schema_file_name(self, hash: str) -> Tuple[str, str]:
"""gets tuple of dir and fullpath for schema file for a given hash"""
dir = f"{self.dataset_path}/{self.schema.version_table_name}"
# remove all special chars from hash
safe_hash = "".join([c for c in hash if re.match(r"\w", c)])
path = f"{dir}/{self.schema.name}__{safe_hash}.jsonl"
return dir, path

def update_stored_schema(
self, only_tables: Iterable[str] = None, expected_update: TSchemaTables = None
) -> TSchemaTables:
# create destination dirs for all tables
# TODO we should only create dirs for datatables
dirs_to_create = self._get_table_dirs(only_tables or self.schema.tables.keys())
for directory in dirs_to_create:
self.fs_client.makedirs(directory, exist_ok=True)

# get paths
dir, current_path = self._get_schema_file_name("current")
_, hash_path = self._get_schema_file_name(self.schema.stored_version_hash)

# TODO: duplicate of weaviate implementation, should be abstracted out
version_info = {
"version_hash": self.schema.stored_version_hash,
"schema_name": self.schema.name,
"version": self.schema.version,
"engine_version": self.schema.ENGINE_VERSION,
"inserted_at": pendulum.now().isoformat(),
"schema": json.dumps(self.schema.to_dict()),
}

# we always keep tabs on what the current schema is
self.fs_client.makedirs(dir, exist_ok=True) # may not be needed..
self.fs_client.write_text(current_path, json.dumps(version_info), "utf-8")
self.fs_client.write_text(hash_path, json.dumps(version_info), "utf-8")

return expected_update

def get_stored_schema(self) -> Optional[StorageSchemaInfo]:
"""Retrieves newest schema from destination storage"""
return self.get_stored_schema_by_hash("current")

def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]:
"""retrieves the stored schema by hash"""
_, filepath = self._get_schema_file_name(version_hash)
if not self.fs_client.exists(filepath):
return None
schema_info = json.loads(self.fs_client.read_text(filepath))
schema_info["inserted_at"] = pendulum.parse(schema_info["inserted_at"])
return StorageSchemaInfo(**schema_info)

def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
"""Loads compressed state from destination storage"""
dir = f"{self.dataset_path}/{self.schema.state_table_name}"
current_file_name = f"{dir}/{pipeline_name}_current.jsonl"
if not self.fs_client.exists(current_file_name):
return None
state_json = json.loads(self.fs_client.read_text(current_file_name))
state_json.pop("version_hash")
return StateInfo(**state_json)

def _get_table_dirs(self, table_names: Iterable[str]) -> Set[str]:
"""Gets unique directories where table data is stored."""
table_dirs: Set[str] = set()
Expand All @@ -192,6 +251,11 @@ def is_storage_initialized(self) -> bool:
return self.fs_client.isdir(self.dataset_path) # type: ignore[no-any-return]

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:

# do not load state file the regular way
if table["name"] == self.schema.state_table_name:
return DoNothingJob(file_path)

cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob
return cls(
file_path,
Expand All @@ -205,10 +269,46 @@ def restore_file_load(self, file_path: str) -> LoadJob:
return EmptyLoadJob.from_file_path(file_path, "completed")

def complete_load(self, load_id: str) -> None:
schema_name = self.schema.name
table_name = self.schema.loads_table_name
file_name = f"{schema_name}.{table_name}.{load_id}"
self.fs_client.touch(posixpath.join(self.dataset_path, file_name))

# write entry to state "table"
from dlt import current
from dlt.pipeline.state_sync import state_doc

# get the state from the current pipeline
pipeline = current.pipeline()
state = pipeline._get_state()
doc = state_doc(state)

# convert pendulum now to iso timestamp
doc["created_at"] = doc["created_at"].isoformat()

dir = f"{self.dataset_path}/{self.schema.state_table_name}"
safe_hash = "".join([c for c in doc["version_hash"] if re.match(r"\w", c)])
hash_file_name = f"{dir}/{pipeline.pipeline_name}_{safe_hash}.jsonl"
current_file_name = f"{dir}/{pipeline.pipeline_name}_current.jsonl"

self.fs_client.makedirs(dir, exist_ok=True) # may not be needed..
self.fs_client.write_text(hash_file_name, json.dumps(doc), "utf-8")
self.fs_client.write_text(current_file_name, json.dumps(doc), "utf-8")

# write entry to load "table"
dir = f"{self.dataset_path}/{self.schema.loads_table_name}"
file_name = f"{self.schema.name}.{load_id}"
filepath = f"{dir}/{file_name}"

# TODO: this is also duplicate across all destinations. DRY this.
load_data = {
"load_id": load_id,
"schema_name": self.schema.name,
"status": 0,
"inserted_at": pendulum.now().isoformat(),
"schema_version_hash": self.schema.version_hash,
}

self.fs_client.makedirs(dir, exist_ok=True) # may not be needed..
self.fs_client.write_text(filepath, json.dumps(load_data), "utf-8")



def __enter__(self) -> "FilesystemClient":
return self
Expand Down
1 change: 0 additions & 1 deletion dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,6 @@ def sync_destination(
remote_state["schema_names"], always_download=True
)
# TODO: we should probably wipe out pipeline here

# if we didn't full refresh schemas, get only missing schemas
if restored_schemas is None:
restored_schemas = self._get_schemas_from_destination(
Expand Down
15 changes: 10 additions & 5 deletions dlt/pipeline/state_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,25 @@ def migrate_pipeline_state(
return cast(TPipelineState, state)


def state_resource(state: TPipelineState) -> DltResource:
state = copy(state)
state.pop("_local")
def state_doc(state: TPipelineState) -> DictStrAny:
doc = copy(state)
doc.pop("_local")
state_str = compress_state(state)
state_doc = {
doc = {
"version": state["_state_version"],
"engine_version": state["_state_engine_version"],
"pipeline_name": state["pipeline_name"],
"state": state_str,
"created_at": pendulum.now(),
"version_hash": state["_version_hash"],
}
return doc


def state_resource(state: TPipelineState) -> DltResource:
doc = state_doc(state)
return dlt.resource(
[state_doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS
[doc], name=STATE_TABLE_NAME, write_disposition="append", columns=STATE_TABLE_COLUMNS
)


Expand Down
20 changes: 20 additions & 0 deletions fs_testing_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import dlt
import os

if __name__ == "__main__":
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://my_files"
os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE"

# resource with incremental for testing restoring of pipeline state
@dlt.resource(name="my_table")
def my_resouce(id=dlt.sources.incremental("id")):
yield from [
{"id": 1},
{"id": 2},
{"id": 3},
{"id": 4},
{"id": 5}
]

pipe = dlt.pipeline(pipeline_name="dave", destination="filesystem")
pipe.run(my_resouce(), table_name="my_table") #, loader_file_format="parquet")

0 comments on commit 9a87f0f

Please sign in to comment.