Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filesystem state sync #1184

Merged
merged 30 commits into from
Apr 17, 2024
Merged

filesystem state sync #1184

merged 30 commits into from
Apr 17, 2024

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Apr 3, 2024

Description

Implements support for synching state and schema with a filesystem destination. Also saves the info to the loads table the way it is saved in other destinations. The saving of the state is done a bit different, mostly to have the same fileformat always and making things easier. I'm not sure if exracting the state at such a late time creates any problems?

Notes:

  • This approach does not respect a custom layout for the dlt tables for now and always uses jsonl (i think this is fine). Maybe we could provide a configurable path to the dlt internal tables that the user can change, and by default it is _dlt/ (in which all the dlt table folders then lie)
  • I touch an "init" file in each dlt table folder to differentiate between "table does not exist" and "table exists but no state found"
  • Should we gzip the schema and state files?
  • Don't upload files if they exist already (if the hash is the same, the contents should be the same)?

ToDo:

  • Write a couple of filesystem specific state tests, may not be necessary, but I had one instance where the schema was created too often, so maybe a simple test for this would be good.

Copy link

netlify bot commented Apr 3, 2024

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit abfc170
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/661fec5fd232f0000805326b
😎 Deploy Preview https://deploy-preview-1184--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@sh-rp sh-rp force-pushed the d#/filesystem_state_sync branch from 8791aaa to f6d5c9c Compare April 3, 2024 16:45
@sh-rp
Copy link
Collaborator Author

sh-rp commented Apr 3, 2024

Screenshot 2024-04-03 at 18 49 08
This is what you end up with when running the example pipeline currently included. You can always find the current state and current schema with a direct key as well as all the previous versions via hash. I think this is the easiest and fastest way of doing this.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also make sure that it works when state sync is disabled. otherwise it is really good!

schema_name=self.schema.name, table_name=table_name
)
# dlt tables do not respect layout (for now)
if table_name in self.schema.dlt_table_names():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is totally fine and should stay like that (we need to document this)

#

def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None:
dirname = os.path.dirname(filepath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use os.path, use posixpath. here paths are normalized from fsspec.


def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
# raise if dir not initialized
filepath = self._get_state_file_name(pipeline_name, "current")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to reproduce the WHERE clause of other destinations.

  • must start with pipeline_name
  • find one with highest load_id and return

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what is does, no? I mean it starts with the pipeline name, so you can look up the state with the pipeline, and instead of looking for the highest load_id (which we shouldn't do anyway, since we should not rely on load ids being timestamps) it has this current marker. I have a screenshot above of the file layout this pr produces. This way we can avoid iterating through a list of files to find the newest one which will eventually slow down against destinations with many loads, at least that would be my expectation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed this now.

safe_hash = "".join(
[c for c in version_hash if re.match(r"\w", c)]
) # remove all special chars from hash
return f"{self.dataset_path}/{self.schema.version_table_name}/{self.schema.name}__{safe_hash}.jsonl"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO hash is enough. also it would be good to have load_id

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the name in the filepath so I can find the right schema when looking for the newest version of a schema, so I will keep it.


def get_stored_schema(self) -> Optional[StorageSchemaInfo]:
"""Retrieves newest schema from destination storage"""
return self.get_stored_schema_by_hash("current")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing like in state: find the oldest load id

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done (assuming you mean the newest load id :) )

@@ -0,0 +1,20 @@
import dlt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep those in some ignored folder ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i have that also, but since i am working on two different machines i need to do this sometimes ;)

@sh-rp
Copy link
Collaborator Author

sh-rp commented Apr 15, 2024

Screenshot 2024-04-03 at 18 49 08 This is what you end up with when running the example pipeline currently included. You can always find the current state and current schema with a direct key as well as all the previous versions via hash. I think this is the easiest and fastest way of doing this.

this image is not quite up to date, there will now also be these init files in each folder.

self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME))

# write schema to destination
self.store_current_schema(load_id or "1")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When "sync_destination" is called, we are not inside the context of a load. I am not quite sure how to handle this case. I first just did not store the schema, but there is a test that verifies that there is a schema in the destination after "sync_destination" is called on a pipeline with nothing in the versions folder. Either we change the the tests or think of some default value. I am not sure..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also find the newest load_id for this schema present and increase it by one, but that also does not feel right..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we use a current timestamp, then it should be in line with the other destinations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have changed it to be like this though. For lineage purposes it would be interesting to also have the load_id (if available) in the file/table, but for now it is inline with the other destinations

@sh-rp sh-rp force-pushed the d#/filesystem_state_sync branch from c6a65f3 to e7e0192 Compare April 15, 2024 16:27
@sh-rp
Copy link
Collaborator Author

sh-rp commented Apr 15, 2024

There is a lot of state and schema testing going on in test_job_client, I will think about if we can adapt these tests to also run for the filesystem destinations.

sh-rp added 3 commits April 16, 2024 12:11
also fixes table count loading to work for all bucket destinations
@sh-rp
Copy link
Collaborator Author

sh-rp commented Apr 16, 2024

@rudolfix I have added some more tests for the statesync. Imho the best approach would be to standardize the state loading and retrieving further and run all standard sql tests for the filesystem also, right now many standard sql tests rely on executing sql, so this would be a bigger refactor including probably an extension of the "WithStateSync" interface. Additionally the drop command will currently not work on the filesystem, since that also relies on the sql client, but I don't think it needs to if the destinations would have an improved interface that could be implemented by both the sql and filesystem destinations. Maybe you could have a look if the coverage seems enough at this point and we tackle the harmonization of (possibly all?) destinations when we also add the ability to route statesync to a separate place.

self.fs_client.write_text(filepath, json.dumps(data), "utf-8")

def _to_path_safe_string(self, s: str) -> str:
return "".join([c for c in s if re.match(r"\w", c)]) if s else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we instead use c in strings.ascii_letters?

In [2]: import string
In [3]: string.ascii_letters
Out[3]: 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it to a hex string now

sultaniman
sultaniman previously approved these changes Apr 16, 2024
def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]:
if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
for filepath in self.fs_client.listdir(dirname, detail=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listir is just an alias to fs_client.ls or is it implementing some additional behavior?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when listing directories always request a refresh!

all_files = self.fs_client.ls(truncate_dir, detail=False, refresh=True)

IMO this is the only command you should use. fsspec is a mess and this one is proven to work. please replace all commands that list

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, done!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could have our own fsclient wrapper that only exposes stuff we think is reliable..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on having our own abstraction with things we really need

"inserted_at": pendulum.now().isoformat(),
"schema_version_hash": self.schema.version_hash,
}
filepath = f"{self.dataset_path}/{self.schema.loads_table_name}/{self.schema.name}__{load_id}.jsonl"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use path.join instead?


def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use path.join here as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only use to "encode" hash. why not convert hash to hex? still not perfect but less hacky

@@ -155,7 +155,12 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non
for basedir, _dirs, files in client.fs_client.walk(
client.dataset_path, detail=False, refresh=True
):
# remove internal paths
if "_dlt" in basedir or "init" in basedir:
Copy link
Contributor

@sultaniman sultaniman Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"init" maybe should be a reference to constant INIT_FILE_NAME

managed in the regular way by the final destination you have configured.

You will also notice `init` files being present in the root folder and the special `dlt` folders. In the absence of the concepts of schemas and tables
in blob storages and directories, `dlt` uses these special files to harmonize the behavior of the `filesystem` destination with the other implemented destinations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this much gpt 🙂 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean.

@@ -19,6 +19,8 @@
from tests.common.utils import load_json_case
from tests.utils import ALL_TEST_DATA_ITEM_FORMATS, TestDataItemFormat, skip_if_not_active
from dlt.destinations.path_utils import create_path
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration
from tests.load.pipeline.utils import load_table_counts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two can be merged

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! we can remove the mark. if we are running filesystem through standard state tests, it should be good


def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return f"{self.dataset_path}/{self.schema.state_table_name}/{pipeline_name}__{load_id}__{self._to_path_safe_string(version_hash)}.jsonl"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only use to "encode" hash. why not convert hash to hex? still not perfect but less hacky

def _list_dlt_dir(self, dirname: str) -> Iterator[Tuple[str, List[str]]]:
if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
for filepath in self.fs_client.listdir(dirname, detail=False):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when listing directories always request a refresh!

all_files = self.fs_client.ls(truncate_dir, detail=False, refresh=True)

IMO this is the only command you should use. fsspec is a mess and this one is proven to work. please replace all commands that list



def state_resource(state: TPipelineState) -> DltResource:
doc = dlt.mark.with_package_state(state_doc(state), LOAD_PACKAGE_STATE_KEY)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK! if you want you can commit pipeline state when we commit package here. and you can decide if we remove this mark.... I think package state will be available also in dlt.resource right? so you can always write to it. if so you can remove the mark... sorry for the stupid idea ;>

@sh-rp sh-rp force-pushed the d#/filesystem_state_sync branch from 2857a5c to cd4dd23 Compare April 17, 2024 10:17
# commit load packages
extract_step.commit_packages()
# commit load packages with state
extract_step.commit_packages(state)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the state only gets committed to the load package when it has changed now, this is how it is done for other destinations too, but I think we should probably always add the pipeline state to the load package and then check in the destination wether a state entry with this hash exists, right? eventually (I think) all other destinations will also use this mechanism to store the state, this would make sense to me at least.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we could add info if version was bumped. easy to check the pre and post bump and extract state. but for now it is OK

@sh-rp sh-rp requested a review from rudolfix April 17, 2024 10:27
@sh-rp sh-rp marked this pull request as ready for review April 17, 2024 10:27
@sh-rp
Copy link
Collaborator Author

sh-rp commented Apr 17, 2024

Follow up ticket for state synching to separate place: #1233

rudolfix
rudolfix previously approved these changes Apr 17, 2024
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

# commit load packages
extract_step.commit_packages()
# commit load packages with state
extract_step.commit_packages(state)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we could add info if version was bumped. easy to check the pre and post bump and extract state. but for now it is OK

# Conflicts:
#	docs/website/docs/dlt-ecosystem/destinations/filesystem.md
@sh-rp sh-rp merged commit 02daa10 into devel Apr 17, 2024
45 of 48 checks passed
@sh-rp sh-rp deleted the d#/filesystem_state_sync branch April 17, 2024 17:29
zem360 pushed a commit that referenced this pull request Apr 17, 2024
* clean some stuff

* first messy version of filesystem state sync

* clean up a bit

* fix bug in state sync

* enable state tests for all bucket providers

* do not store state to uninitialized dataset folders

* fix linter errors

* get current pipeline from pipeline context

* fix bug in filesystem table init

* update testing pipe

* move away from "current" file, rather iterator bucket path contents

* store pipeline state in load package state and send to filesystem destination from there

* fix tests for changed number of files in filesystem destination

* remove dev code

* create init file also to mark datasets

* fix tests to respect new init file
change filesystem to fallback, to old state loading when used as staging destination

* update filesystem docs

* fix incoming tests of placeholders

* small fixes

* adds some tests for filesystem state
also fixes table count loading to work for all bucket destinations

* fix test helper

* save schema with timestamp instead of load_id

* pr fixes and move pipeline state saving to committing of extracted packages

* ensure pipeline state is only saved to load package if it has changed

* adds missing state injection into state package

* fix athena iceberg locations

* fix google drive filesystem with missing argument
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants