From 45e83ecba570731a9538d48c0c197889d77c271a Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 7 May 2024 16:48:51 +0200 Subject: [PATCH 1/5] uses local path and pathlib to handle local filesystem in filesystem destination --- dlt/destinations/fs_client.py | 5 + .../impl/filesystem/filesystem.py | 97 +++++++++++-------- dlt/destinations/path_utils.py | 9 ++ 3 files changed, 71 insertions(+), 40 deletions(-) diff --git a/dlt/destinations/fs_client.py b/dlt/destinations/fs_client.py index 73f3adb534..5153659614 100644 --- a/dlt/destinations/fs_client.py +++ b/dlt/destinations/fs_client.py @@ -6,6 +6,11 @@ class FSClientBase(ABC): fs_client: AbstractFileSystem + @property + @abstractmethod + def dataset_path(self) -> str: + pass + @abstractmethod def get_table_dir(self, table_name: str) -> str: """returns directory for given table""" diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 15f81029d5..e3e57f97e4 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -8,8 +8,6 @@ from dlt.common import json, pendulum from dlt.common.typing import DictStrAny -import re - import dlt from dlt.common import logger, time from dlt.common.schema import Schema, TSchemaTables, TTableSchema @@ -52,6 +50,9 @@ def __init__( file_name = FileStorage.get_file_name_from_file_path(local_path) self.config = config self.dataset_path = dataset_path + self.is_local_filesystem = config.protocol == "file" + # pick local filesystem pathlib or posix for buckets + self.pathlib = os.path if self.is_local_filesystem else posixpath self.destination_file_name = path_utils.create_path( config.layout, file_name, @@ -64,28 +65,21 @@ def __init__( super().__init__(file_name) fs_client, _ = fsspec_from_config(config) - self.destination_file_name = path_utils.create_path( - config.layout, - file_name, - schema_name, - load_id, - current_datetime=config.current_datetime, - load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore - extra_placeholders=config.extra_placeholders, - ) - # We would like to avoid failing for local filesystem where # deeply nested directory will not exist before writing a file. # It `auto_mkdir` is disabled by default in fsspec so we made some # trade offs between different options and decided on this. item = self.make_remote_path() - if self.config.protocol == "file": - fs_client.makedirs(posixpath.dirname(item), exist_ok=True) + if self.is_local_filesystem: + fs_client.makedirs(self.pathlib.dirname(item), exist_ok=True) fs_client.put_file(local_path, item) def make_remote_path(self) -> str: - return ( - f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}" + # path.join does not normalize separators and available + # normalization functions are very invasive and may string the trailing separator + return self.pathlib.join( # type: ignore[no-any-return] + self.dataset_path, + path_utils.normalize_path_sep(self.pathlib, self.destination_file_name), ) def state(self) -> TLoadJobState: @@ -111,16 +105,27 @@ class FilesystemClient(FSClientBase, JobClientBase, WithStagingDataset, WithStat capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() fs_client: AbstractFileSystem - fs_path: str + # a path (without the scheme) to a location in the bucket where dataset is present + bucket_path: str + # name of the dataset + dataset_name: str def __init__(self, schema: Schema, config: FilesystemDestinationClientConfiguration) -> None: super().__init__(schema, config) - self.fs_client, self.fs_path = fsspec_from_config(config) + self.fs_client, fs_path = fsspec_from_config(config) + self.is_local_filesystem = config.protocol == "file" + # + self.bucket_path = ( + config.make_local_path(config.bucket_url) if self.is_local_filesystem else fs_path + ) + # pick local filesystem pathlib or posix for buckets + self.pathlib = os.path if self.is_local_filesystem else posixpath + self.config: FilesystemDestinationClientConfiguration = config # verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables # cannot be replaced and we cannot initialize folders consistently self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout) - self._dataset_path = self.config.normalize_dataset_name(self.schema) + self.dataset_name = self.config.normalize_dataset_name(self.schema) def drop_storage(self) -> None: if self.is_storage_initialized(): @@ -128,19 +133,22 @@ def drop_storage(self) -> None: @property def dataset_path(self) -> str: - return posixpath.join(self.fs_path, self._dataset_path) + """A path within a bucket to tables in a dataset + NOTE: dataset_name changes if with_staging_dataset is active + """ + return self.pathlib.join(self.bucket_path, self.dataset_name) # type: ignore[no-any-return] @contextmanager def with_staging_dataset(self) -> Iterator["FilesystemClient"]: - current_dataset_path = self._dataset_path + current_dataset_name = self.dataset_name try: - self._dataset_path = self.schema.naming.normalize_table_identifier( - current_dataset_path + "_staging" + self.dataset_name = self.schema.naming.normalize_table_identifier( + current_dataset_name + "_staging" ) yield self finally: # restore previous dataset name - self._dataset_path = current_dataset_path + self.dataset_name = current_dataset_name def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: # clean up existing files for tables selected for truncating @@ -152,7 +160,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: # we mark the storage folder as initialized self.fs_client.makedirs(self.dataset_path, exist_ok=True) - self.fs_client.touch(posixpath.join(self.dataset_path, INIT_FILE_NAME)) + self.fs_client.touch(self.pathlib.join(self.dataset_path, INIT_FILE_NAME)) def truncate_tables(self, table_names: List[str]) -> None: """Truncate table with given name""" @@ -161,7 +169,7 @@ def truncate_tables(self, table_names: List[str]) -> None: for table_dir in table_dirs: for table_file in self.list_files_with_prefixes(table_dir, table_prefixes): # NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors - # print(f"DEL {item}") + # print(f"DEL {table_file}") try: # NOTE: must use rm_file to get errors on delete self.fs_client.rm_file(table_file) @@ -188,7 +196,7 @@ def update_stored_schema( self.fs_client.makedirs(directory, exist_ok=True) # we need to mark the folders of the data tables as initialized if tables_name in self.schema.dlt_table_names(): - self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME)) + self.fs_client.touch(self.pathlib.join(directory, INIT_FILE_NAME)) # don't store schema when used as staging if not self.config.as_staging: @@ -199,17 +207,21 @@ def update_stored_schema( def get_table_dir(self, table_name: str) -> str: # dlt tables do not respect layout (for now) table_prefix = self.get_table_prefix(table_name) - return posixpath.dirname(table_prefix) + return self.pathlib.dirname(table_prefix) # type: ignore[no-any-return] def get_table_prefix(self, table_name: str) -> str: # dlt tables do not respect layout (for now) if table_name.startswith(self.schema._dlt_tables_prefix): - table_prefix = posixpath.join(table_name, "") + # dlt tables get layout where each tables is a folder + # it is crucial to append and keep "/" at the end + table_prefix = self.pathlib.join(table_name, "") else: table_prefix = self.table_prefix_layout.format( schema_name=self.schema.name, table_name=table_name ) - return posixpath.join(self.dataset_path, table_prefix) + return self.pathlib.join( # type: ignore[no-any-return] + self.dataset_path, path_utils.normalize_path_sep(self.pathlib, table_prefix) + ) def get_table_dirs(self, table_names: Iterable[str]) -> List[str]: """Gets directories where table data is stored.""" @@ -227,15 +239,20 @@ def list_files_with_prefixes(self, table_dir: str, prefixes: List[str]) -> List[ result = [] for current_dir, _dirs, files in self.fs_client.walk(table_dir, detail=False, refresh=True): for file in files: - filename = posixpath.join(current_dir, file) + # skip INIT files + if file == INIT_FILE_NAME: + continue + filepath = self.pathlib.join( + path_utils.normalize_path_sep(self.pathlib, current_dir), file + ) for p in prefixes: - if filename.startswith(p): - result.append(posixpath.join(current_dir, file)) - continue + if filepath.startswith(p): + result.append(filepath) + break return result def is_storage_initialized(self) -> bool: - return self.fs_client.exists(posixpath.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return] + return self.fs_client.exists(self.pathlib.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return] def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: # skip the state table, we create a jsonl file in the complete_load step @@ -272,7 +289,7 @@ def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: # def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None: - dirname = posixpath.dirname(filepath) + dirname = self.pathlib.dirname(filepath) if not self.fs_client.isdir(dirname): return self.fs_client.write_text(filepath, json.dumps(data), "utf-8") @@ -283,7 +300,7 @@ def _to_path_safe_string(self, s: str) -> str: def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str]]]: dirname = self.get_table_dir(table_name) - if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)): + if not self.fs_client.exists(self.pathlib.join(dirname, INIT_FILE_NAME)): raise DestinationUndefinedEntity({"dir": dirname}) for filepath in self.list_table_files(table_name): filename = os.path.splitext(os.path.basename(filepath))[0] @@ -302,7 +319,7 @@ def _store_load(self, load_id: str) -> None: "inserted_at": pendulum.now().isoformat(), "schema_version_hash": self.schema.version_hash, } - filepath = posixpath.join( + filepath = self.pathlib.join( self.dataset_path, self.schema.loads_table_name, f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}.jsonl", @@ -320,7 +337,7 @@ def complete_load(self, load_id: str) -> None: def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str: """gets full path for schema file for a given hash""" - return posixpath.join( + return self.pathlib.join( # type: ignore[no-any-return] self.get_table_dir(self.schema.state_table_name), f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", ) @@ -370,7 +387,7 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]: def _get_schema_file_name(self, version_hash: str, load_id: str) -> str: """gets full path for schema file for a given hash""" - return posixpath.join( + return self.pathlib.join( # type: ignore[no-any-return] self.get_table_dir(self.schema.version_table_name), f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", ) diff --git a/dlt/destinations/path_utils.py b/dlt/destinations/path_utils.py index fe28cad0db..425d72a66b 100644 --- a/dlt/destinations/path_utils.py +++ b/dlt/destinations/path_utils.py @@ -75,6 +75,15 @@ SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS = ("schema_name",) +def normalize_path_sep(pathlib: Any, path: str) -> str: + """Normalizes path in `path` separator to one used by `pathlib`""" + if pathlib.sep == "/": + return path.replace("\\", "/") + if pathlib.sep == "\\": + return path.replace("/", "\\") + return path + + def get_placeholders(layout: str) -> List[str]: return re.findall(r"\{(.*?)\}", layout) From dc6f1baf7a284ee51a09651fadb035480d6d94cc Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 7 May 2024 16:49:07 +0200 Subject: [PATCH 2/5] tests windows paths for filesystem destination --- .../common/storages/test_local_filesystem.py | 1 + .../load/filesystem/test_filesystem_client.py | 2 +- tests/load/filesystem/utils.py | 2 +- tests/pipeline/test_pipeline.py | 65 ++++++++++++++++++- 4 files changed, 67 insertions(+), 3 deletions(-) diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index 7b16fa8d57..19c7f82ceb 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -22,6 +22,7 @@ (UNC_LOCAL_PATH, pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_uri()), (UNC_WSL_PATH, pathlib.PureWindowsPath(UNC_WSL_PATH).as_uri()), (r"C:\hello", "file:///C:/hello"), + # (r"\\?\C:\hello", "file:///C:/hello"), (r"a\b $\b", "file:///" + pathlib.Path(r"a\\" + quote("b $") + r"\b").resolve().as_posix()), # same paths but with POSIX separators ( diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 64283869d1..860961c807 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -67,7 +67,7 @@ def test_successful_load(write_disposition: str, layout: str, with_gdrive_bucket ) as load_info: client, jobs, _, load_id = load_info layout = client.config.layout - dataset_path = posixpath.join(client.fs_path, client.config.dataset_name) + dataset_path = posixpath.join(client.bucket_path, client.config.dataset_name) # Assert dataset dir exists assert client.fs_client.isdir(dataset_path) diff --git a/tests/load/filesystem/utils.py b/tests/load/filesystem/utils.py index 6e697fdef9..ce15997ed6 100644 --- a/tests/load/filesystem/utils.py +++ b/tests/load/filesystem/utils.py @@ -48,7 +48,7 @@ def perform_load( client.initialize_storage(truncate_tables=truncate_tables) client.update_stored_schema() - root_path = posixpath.join(client.fs_path, client.config.dataset_name) + root_path = posixpath.join(client.bucket_path, client.config.dataset_name) files = load.load_storage.list_new_jobs(load_id) try: diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 0cbaa37735..01ab303afc 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,4 +1,5 @@ import asyncio +import pathlib from concurrent.futures import ThreadPoolExecutor import itertools import logging @@ -37,6 +38,7 @@ from dlt.common.schema import Schema from dlt.destinations import filesystem, redshift, dummy +from dlt.destinations.impl.filesystem.filesystem import INIT_FILE_NAME from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted from dlt.extract.extract import ExtractStorage from dlt.extract import DltResource, DltSource @@ -47,7 +49,7 @@ from tests.common.utils import TEST_SENTRY_DSN from tests.common.configuration.utils import environment -from tests.utils import TEST_STORAGE_ROOT +from tests.utils import TEST_STORAGE_ROOT, skipifnotwindows from tests.extract.utils import expect_extracted_file from tests.pipeline.utils import ( assert_data_table_counts, @@ -2161,3 +2163,64 @@ def test_yielding_empty_list_creates_table() -> None: rows = list(cur.fetchall()) assert len(rows) == 1 assert rows[0] == (1, None) + + +@skipifnotwindows +def test_local_filesystem_destination() -> None: + # make it unc path + unc_path = "\\\\localhost\\" + os.path.abspath("_storage").replace(":", "$") + print(unc_path) + + dataset_name = "mydata_" + uniq_id() + + @dlt.resource + def stateful_resource(): + dlt.current.source_state()["mark"] = 1 + yield [1, 2, 3] + + pipeline = dlt.pipeline( + pipeline_name="local_files", + destination=dlt.destinations.filesystem(unc_path), + dataset_name=dataset_name, + ) + info = pipeline.run(stateful_resource(), table_name="numbers", write_disposition="replace") + assert_load_info(info) + + info = pipeline.run(stateful_resource(), table_name="numbers", write_disposition="replace") + assert_load_info(info) + + pipeline = pipeline.drop() + + # must be able to restore the schema and state + pipeline.sync_destination() + assert pipeline.state["sources"]["local_files"]["mark"] == 1 + assert pipeline.default_schema.get_table("numbers") is not None + + # check all the files, paths may get messed up in many different ways + # and data may land anywhere especially on Windows + expected_dataset = pathlib.Path("_storage").joinpath(dataset_name).resolve() + assert expected_dataset.exists() + assert expected_dataset.is_dir() + + # count files in tables + assert expected_dataset.joinpath(INIT_FILE_NAME).is_file() + # one numbers table (replaced) + assert len(list(expected_dataset.joinpath("numbers").glob("*"))) == 1 + # two loads + init + assert len(list(expected_dataset.joinpath("_dlt_loads").glob("*"))) == 3 + # one schema (dedup on hash) + assert len(list(expected_dataset.joinpath("_dlt_version").glob("*"))) == 2 + # one state (not sent twice) + assert len(list(expected_dataset.joinpath("_dlt_pipeline_state").glob("*"))) == 2 + + fs_client = pipeline._fs_client() + # all path formats we use must lead to "_storage" relative to tests + assert ( + pathlib.Path(fs_client.dataset_path).resolve() + == pathlib.Path(unc_path).joinpath(dataset_name).resolve() + ) + # same for client + assert len(fs_client.list_table_files("numbers")) == 1 + assert len(fs_client.list_table_files("_dlt_loads")) == 2 + assert len(fs_client.list_table_files("_dlt_version")) == 1 + assert len(fs_client.list_table_files("_dlt_pipeline_state")) == 1 From 3af94e4a7e18f6551d4e2516839655c076314c5f Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 7 May 2024 21:41:38 +0200 Subject: [PATCH 3/5] uses datetime as load_package_timestamp --- dlt/destinations/path_utils.py | 4 ++-- tests/destinations/test_path_utils.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dlt/destinations/path_utils.py b/dlt/destinations/path_utils.py index 425d72a66b..532b0aa9dc 100644 --- a/dlt/destinations/path_utils.py +++ b/dlt/destinations/path_utils.py @@ -98,7 +98,7 @@ def get_unused_placeholders( def prepare_datetime_params( current_datetime: Optional[pendulum.DateTime] = None, - load_package_timestamp: Optional[str] = None, + load_package_timestamp: Optional[pendulum.DateTime] = None, ) -> Dict[str, str]: params: Dict[str, str] = {} current_timestamp: pendulum.DateTime = None @@ -214,7 +214,7 @@ def create_path( file_name: str, schema_name: str, load_id: str, - load_package_timestamp: Optional[str] = None, + load_package_timestamp: Optional[pendulum.DateTime] = None, current_datetime: Optional[TCurrentDateTime] = None, extra_placeholders: Optional[Dict[str, Any]] = None, ) -> str: diff --git a/tests/destinations/test_path_utils.py b/tests/destinations/test_path_utils.py index 64dbc371fc..21ee83b4eb 100644 --- a/tests/destinations/test_path_utils.py +++ b/tests/destinations/test_path_utils.py @@ -314,7 +314,7 @@ def test_layout_validity( return_value=job_info, ) - now_timestamp = frozen_datetime.to_iso8601_string() + now_timestamp = frozen_datetime if is_valid: path = create_path( layout, @@ -404,7 +404,7 @@ def test_create_path_uses_provided_load_package_timestamp(test_load: TestLoad) - "{schema_name}/{table_name}/{load_id}.{file_id}.{timestamp}.{ext}", schema_name="schema_name", load_id=load_id, - load_package_timestamp=now.to_iso8601_string(), + load_package_timestamp=now, file_name=job_info.file_name(), ) @@ -425,7 +425,7 @@ def test_create_path_resolves_current_datetime(test_load: TestLoad) -> None: load_id, job_info = test_load now = pendulum.now() timestamp = int(now.timestamp()) - now_timestamp = now.to_iso8601_string() + now_timestamp = now calls = 0 def current_datetime_callback(): @@ -508,7 +508,7 @@ def test_create_path_uses_load_package_timestamp_as_current_datetime( load_id, job_info = test_load now = pendulum.now() timestamp = int(now.timestamp()) - now_timestamp = now.to_iso8601_string() + now_timestamp = now logger_spy = mocker.spy(logger, "info") ensure_pendulum_datetime_spy = mocker.spy( dlt.destinations.path_utils, "ensure_pendulum_datetime" @@ -557,7 +557,7 @@ def inc(self, *args, **kwargs): } now = pendulum.now() timestamp = int(now.timestamp()) - now_timestamp = now.to_iso8601_string() + now_timestamp = now created_path = create_path( "{schema_name}/{table_name}/{callable_1}-{otter}/{load_id}.{file_id}.{timestamp}.{ext}", schema_name="schema_name", From 0b806f2a658d51ab3fc585ecc0b622864d4b9ac8 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 7 May 2024 21:42:06 +0200 Subject: [PATCH 4/5] splits remote path and remote uri in filesystem load job --- dlt/common/storages/configuration.py | 1 - dlt/destinations/impl/filesystem/filesystem.py | 14 ++++++++++++-- .../test_destination_name_and_config.py | 11 ++++++----- tests/load/filesystem/test_filesystem_client.py | 8 +++++--- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index 1c5f39ea82..a1838fab6e 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -168,5 +168,4 @@ def make_file_uri(local_path: str) -> str: """ p_ = pathlib.Path(local_path) p_ = p_.expanduser().resolve() - # return "file:///" + p_.as_posix().lstrip("/") return p_.as_uri() diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index e3e57f97e4..eec8ae85ca 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,4 +1,5 @@ import posixpath +import pathlib import os import base64 from types import TracebackType @@ -59,7 +60,7 @@ def __init__( schema_name, load_id, current_datetime=config.current_datetime, - load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore + load_package_timestamp=dlt.current.load_package()["state"]["created_at"], extra_placeholders=config.extra_placeholders, ) @@ -75,6 +76,7 @@ def __init__( fs_client.put_file(local_path, item) def make_remote_path(self) -> str: + """Returns path on the remote filesystem to which copy the file, without scheme. For local filesystem a native path is used""" # path.join does not normalize separators and available # normalization functions are very invasive and may string the trailing separator return self.pathlib.join( # type: ignore[no-any-return] @@ -82,6 +84,14 @@ def make_remote_path(self) -> str: path_utils.normalize_path_sep(self.pathlib, self.destination_file_name), ) + def make_remote_uri(self) -> str: + """Returns uri to the remote filesystem to which copy the file""" + remote_path = self.make_remote_path() + if self.is_local_filesystem: + return self.config.make_file_uri(remote_path) + else: + return f"{self.config.protocol}://{remote_path}" + def state(self) -> TLoadJobState: return "completed" @@ -94,7 +104,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]: jobs = super().create_followup_jobs(final_state) if final_state == "completed": ref_job = NewReferenceJob( - file_name=self.file_name(), status="running", remote_path=self.make_remote_path() + file_name=self.file_name(), status="running", remote_path=self.make_remote_uri() ) jobs.append(ref_job) return jobs diff --git a/tests/destinations/test_destination_name_and_config.py b/tests/destinations/test_destination_name_and_config.py index 930a72d95d..11de706722 100644 --- a/tests/destinations/test_destination_name_and_config.py +++ b/tests/destinations/test_destination_name_and_config.py @@ -5,8 +5,9 @@ import dlt from dlt.common.configuration.exceptions import ConfigFieldMissingException from dlt.common.typing import DictStrStr -from dlt.destinations import duckdb, dummy, filesystem from dlt.common.utils import uniq_id +from dlt.common.storages import FilesystemConfiguration +from dlt.destinations import duckdb, dummy, filesystem from tests.common.configuration.utils import environment from tests.utils import TEST_STORAGE_ROOT @@ -59,7 +60,7 @@ def test_set_name_and_environment() -> None: def test_preserve_destination_instance() -> None: dummy1 = dummy(destination_name="dummy1", environment="dev/null/1") filesystem1 = filesystem( - posixpath.join("file://", posixpath.abspath(TEST_STORAGE_ROOT)), + FilesystemConfiguration.make_file_uri(TEST_STORAGE_ROOT), destination_name="local_fs", environment="devel", ) @@ -209,7 +210,7 @@ def test_destination_config_in_name(environment: DictStrStr) -> None: with pytest.raises(ConfigFieldMissingException): p.destination_client() - environment["DESTINATION__FILESYSTEM-PROD__BUCKET_URL"] = "file://" + posixpath.abspath( - TEST_STORAGE_ROOT + environment["DESTINATION__FILESYSTEM-PROD__BUCKET_URL"] = FilesystemConfiguration.make_file_uri( + "_storage" ) - assert p.destination_client().fs_path.endswith(TEST_STORAGE_ROOT) # type: ignore[attr-defined] + assert p._fs_client().dataset_path.endswith(p.dataset_name) diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 860961c807..ca962adb16 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -4,6 +4,7 @@ import pytest +from dlt.common.time import ensure_pendulum_datetime from dlt.common.utils import digest128, uniq_id from dlt.common.storages import FileStorage, ParsedLoadJobFileName @@ -55,7 +56,7 @@ def test_successful_load(write_disposition: str, layout: str, with_gdrive_bucket os.environ.pop("DESTINATION__FILESYSTEM__LAYOUT", None) dataset_name = "test_" + uniq_id() - timestamp = "2024-04-05T09:16:59.942779Z" + timestamp = ensure_pendulum_datetime("2024-04-05T09:16:59.942779Z") mocked_timestamp = {"state": {"created_at": timestamp}} with mock.patch( "dlt.current.load_package", @@ -102,7 +103,8 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non dataset_name = "test_" + uniq_id() # NOTE: context manager will delete the dataset at the end so keep it open until the end - timestamp = "2024-04-05T09:16:59.942779Z" + # state is typed now + timestamp = ensure_pendulum_datetime("2024-04-05T09:16:59.942779Z") mocked_timestamp = {"state": {"created_at": timestamp}} with mock.patch( "dlt.current.load_package", @@ -173,7 +175,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None dataset_name = "test_" + uniq_id() # NOTE: context manager will delete the dataset at the end so keep it open until the end # also we would like to have reliable timestamp for this test so we patch it - timestamp = "2024-04-05T09:16:59.942779Z" + timestamp = ensure_pendulum_datetime("2024-04-05T09:16:59.942779Z") mocked_timestamp = {"state": {"created_at": timestamp}} with mock.patch( "dlt.current.load_package", From aca319b84425b2bb36c206799727d4199f7b1215 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 7 May 2024 23:20:48 +0200 Subject: [PATCH 5/5] adds tests cases for windows extended paths + docs --- .../impl/filesystem/filesystem.py | 1 - .../dlt-ecosystem/destinations/filesystem.md | 14 +++++++++ .../verified-sources/filesystem.md | 12 ++++++++ .../common/storages/test_local_filesystem.py | 29 ++++++++++++++----- .../load/pipeline/test_filesystem_pipeline.py | 10 +++---- tests/pipeline/test_pipeline.py | 20 ++++++++----- 6 files changed, 65 insertions(+), 21 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index eec8ae85ca..5070ff061c 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -124,7 +124,6 @@ def __init__(self, schema: Schema, config: FilesystemDestinationClientConfigurat super().__init__(schema, config) self.fs_client, fs_path = fsspec_from_config(config) self.is_local_filesystem = config.protocol == "file" - # self.bucket_path = ( config.make_local_path(config.bucket_url) if self.is_local_filesystem else fs_path ) diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 56012ac773..0d719b4cfa 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -213,6 +213,20 @@ bucket_url="file://localhost/c$/a/b/c" bucket_url="file:////localhost/c$/a/b/c" ``` +:::caution +Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception. + + To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry). `dlt` recognizes both regular and UNC extended paths + +```toml +[destination.regular_extended] +bucket_url = '\\?\C:\a\b\c' + +[destination.unc_extended] +bucket_url='\\?\UNC\localhost\c$\a\b\c' +``` +::: + ## Write disposition The filesystem destination handles the write dispositions as follows: - `append` - files belonging to such tables are added to the dataset folder diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md index 7184d4ccf1..5c322db108 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md @@ -151,6 +151,18 @@ You can use both native local file system paths and in form of `file:` uri. Abso You can find relevant examples in [filesystem destination documentation](../destinations/filesystem.md#local-file-system) which follows the same rules to specify the `bucket_url`. +:::caution +Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception. + + To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry). + **Note that Python glob does not work with extended UNC paths** so you will not be able to use them + +```toml +[sources.filesystem] +bucket_url = '\\?\C:\a\b\c' +``` +::: + ## Run the pipeline 1. Before running the pipeline, ensure that you have installed all the necessary dependencies by diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index 19c7f82ceb..14e3cc23d4 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -2,6 +2,7 @@ import pytest import pathlib from urllib.parse import quote +from typing import Tuple from dlt.common.configuration.exceptions import ConfigurationValueError from dlt.common.configuration.resolve import resolve_configuration @@ -12,6 +13,7 @@ from tests.utils import skipifnotwindows, skipifwindows UNC_LOCAL_PATH = r"\\localhost\c$\tests\common\test.csv" +UNC_LOCAL_EXT_PATH = r"\\?\UNC\localhost\c$\tests\common\test.csv" UNC_WSL_PATH = r"\\wsl.localhost\Ubuntu-18.04\home\rudolfix\ .dlt" @@ -20,9 +22,10 @@ "bucket_url,file_url", ( (UNC_LOCAL_PATH, pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_uri()), + (UNC_LOCAL_EXT_PATH, pathlib.PureWindowsPath(UNC_LOCAL_EXT_PATH).as_uri()), (UNC_WSL_PATH, pathlib.PureWindowsPath(UNC_WSL_PATH).as_uri()), (r"C:\hello", "file:///C:/hello"), - # (r"\\?\C:\hello", "file:///C:/hello"), + (r"\\?\C:\hello", "file://%3F/C%3A/hello"), (r"a\b $\b", "file:///" + pathlib.Path(r"a\\" + quote("b $") + r"\b").resolve().as_posix()), # same paths but with POSIX separators ( @@ -232,21 +235,31 @@ def test_filesystem_decompress() -> None: # create windows UNC paths, on POSIX systems they are not used WIN_ABS_PATH = os.path.abspath(TEST_SAMPLE_FILES) +WIN_ABS_EXT_PATH = "\\\\?\\" + os.path.abspath(TEST_SAMPLE_FILES) WIN_UNC_PATH = "\\\\localhost\\" + WIN_ABS_PATH.replace(":", "$").lower() +WIN_UNC_EXT_PATH = "\\\\?\\UNC\\localhost\\" + WIN_ABS_PATH.replace(":", "$").lower() -@skipifnotwindows -@pytest.mark.parametrize( - "bucket_url", - ( +if os.name == "nt": + windows_local_files: Tuple[str, ...] = ( WIN_UNC_PATH, "file:///" + pathlib.Path(WIN_UNC_PATH).as_posix(), "file://localhost/" + pathlib.Path(WIN_ABS_PATH).as_posix().replace(":", "$"), + # WIN_UNC_EXT_PATH, + # "file:///" + pathlib.Path(WIN_UNC_EXT_PATH).as_posix(), + # "file://localhost/" + pathlib.Path(WIN_UNC_EXT_PATH).as_posix().replace(":", "$"), WIN_ABS_PATH, - "file:///" + pathlib.Path(WIN_ABS_PATH).as_posix(), + WIN_ABS_EXT_PATH, + pathlib.Path(WIN_ABS_PATH).as_uri(), + pathlib.Path(WIN_ABS_EXT_PATH).as_uri(), # r"\\wsl.localhost\Ubuntu-18.04\home\rudolfix\src\dlt\tests\common\storages\samples" - ), -) + ) +else: + windows_local_files = () + + +@skipifnotwindows +@pytest.mark.parametrize("bucket_url", windows_local_files) @pytest.mark.parametrize("load_content", [True, False]) @pytest.mark.parametrize("glob_filter", ("**", "**/*.csv", "*.txt", "met_csv/A803/*.csv")) def test_windows_unc_path(load_content: bool, bucket_url: str, glob_filter: str) -> None: diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 9cf4a925c4..7680bc6e90 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -83,7 +83,7 @@ def test_pipeline_csv_filesystem_destination(item_type: TestDataItemFormat) -> N os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True" os.environ["RESTORE_FROM_DESTINATION"] = "False" # store locally - os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage" + os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage" pipeline = dlt.pipeline( pipeline_name="parquet_test_" + uniq_id(), @@ -110,7 +110,7 @@ def test_csv_options(item_type: TestDataItemFormat) -> None: os.environ["NORMALIZE__DATA_WRITER__DELIMITER"] = "|" os.environ["NORMALIZE__DATA_WRITER__INCLUDE_HEADER"] = "False" # store locally - os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage" + os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage" pipeline = dlt.pipeline( pipeline_name="parquet_test_" + uniq_id(), destination="filesystem", @@ -139,7 +139,7 @@ def test_csv_quoting_style(item_type: TestDataItemFormat) -> None: os.environ["NORMALIZE__DATA_WRITER__QUOTING"] = "quote_all" os.environ["NORMALIZE__DATA_WRITER__INCLUDE_HEADER"] = "False" # store locally - os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage" + os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage" pipeline = dlt.pipeline( pipeline_name="parquet_test_" + uniq_id(), destination="filesystem", @@ -170,7 +170,7 @@ def test_pipeline_parquet_filesystem_destination() -> None: import pyarrow.parquet as pq # Module is evaluated by other tests # store locally - os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage" + os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage" pipeline = dlt.pipeline( pipeline_name="parquet_test_" + uniq_id(), destination="filesystem", @@ -264,7 +264,7 @@ def count(*args, **kwargs) -> Any: "hiphip": counter("Hurraaaa"), } now = pendulum.now() - os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage" + os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage" os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" # the reason why we are patching pendulum.from_timestamp is that diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 01ab303afc..a828de40fd 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -2165,12 +2165,18 @@ def test_yielding_empty_list_creates_table() -> None: assert rows[0] == (1, None) -@skipifnotwindows -def test_local_filesystem_destination() -> None: - # make it unc path - unc_path = "\\\\localhost\\" + os.path.abspath("_storage").replace(":", "$") - print(unc_path) +local_paths = [os.path.abspath("_storage"), "_storage"] +if os.name == "nt": + local_paths += [ + # UNC extended path + "\\\\?\\UNC\\localhost\\" + os.path.abspath("_storage").replace(":", "$"), + # UNC path + "\\\\localhost\\" + os.path.abspath("_storage").replace(":", "$"), + ] + +@pytest.mark.parametrize("local_path", local_paths) +def test_local_filesystem_destination(local_path: str) -> None: dataset_name = "mydata_" + uniq_id() @dlt.resource @@ -2180,7 +2186,7 @@ def stateful_resource(): pipeline = dlt.pipeline( pipeline_name="local_files", - destination=dlt.destinations.filesystem(unc_path), + destination=dlt.destinations.filesystem(local_path), dataset_name=dataset_name, ) info = pipeline.run(stateful_resource(), table_name="numbers", write_disposition="replace") @@ -2217,7 +2223,7 @@ def stateful_resource(): # all path formats we use must lead to "_storage" relative to tests assert ( pathlib.Path(fs_client.dataset_path).resolve() - == pathlib.Path(unc_path).joinpath(dataset_name).resolve() + == pathlib.Path(local_path).joinpath(dataset_name).resolve() ) # same for client assert len(fs_client.list_table_files("numbers")) == 1