Skip to content

Commit

Permalink
fixes filesystem dest on windows (#1335)
Browse files Browse the repository at this point in the history
* uses local path and pathlib to handle local filesystem in filesystem destination

* tests windows paths for filesystem destination

* uses datetime as load_package_timestamp

* splits remote path and remote uri in filesystem load job

* adds tests cases for windows extended paths + docs
  • Loading branch information
rudolfix authored May 8, 2024
1 parent e48da74 commit dfd4d89
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 73 deletions.
1 change: 0 additions & 1 deletion dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,4 @@ def make_file_uri(local_path: str) -> str:
"""
p_ = pathlib.Path(local_path)
p_ = p_.expanduser().resolve()
# return "file:///" + p_.as_posix().lstrip("/")
return p_.as_uri()
5 changes: 5 additions & 0 deletions dlt/destinations/fs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
class FSClientBase(ABC):
fs_client: AbstractFileSystem

@property
@abstractmethod
def dataset_path(self) -> str:
pass

@abstractmethod
def get_table_dir(self, table_name: str) -> str:
"""returns directory for given table"""
Expand Down
110 changes: 68 additions & 42 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import posixpath
import pathlib
import os
import base64
from types import TracebackType
Expand All @@ -8,8 +9,6 @@
from dlt.common import json, pendulum
from dlt.common.typing import DictStrAny

import re

import dlt
from dlt.common import logger, time
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
Expand Down Expand Up @@ -52,42 +51,47 @@ def __init__(
file_name = FileStorage.get_file_name_from_file_path(local_path)
self.config = config
self.dataset_path = dataset_path
self.is_local_filesystem = config.protocol == "file"
# pick local filesystem pathlib or posix for buckets
self.pathlib = os.path if self.is_local_filesystem else posixpath
self.destination_file_name = path_utils.create_path(
config.layout,
file_name,
schema_name,
load_id,
current_datetime=config.current_datetime,
load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore
load_package_timestamp=dlt.current.load_package()["state"]["created_at"],
extra_placeholders=config.extra_placeholders,
)

super().__init__(file_name)
fs_client, _ = fsspec_from_config(config)
self.destination_file_name = path_utils.create_path(
config.layout,
file_name,
schema_name,
load_id,
current_datetime=config.current_datetime,
load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore
extra_placeholders=config.extra_placeholders,
)

# We would like to avoid failing for local filesystem where
# deeply nested directory will not exist before writing a file.
# It `auto_mkdir` is disabled by default in fsspec so we made some
# trade offs between different options and decided on this.
item = self.make_remote_path()
if self.config.protocol == "file":
fs_client.makedirs(posixpath.dirname(item), exist_ok=True)
if self.is_local_filesystem:
fs_client.makedirs(self.pathlib.dirname(item), exist_ok=True)
fs_client.put_file(local_path, item)

def make_remote_path(self) -> str:
return (
f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}"
"""Returns path on the remote filesystem to which copy the file, without scheme. For local filesystem a native path is used"""
# path.join does not normalize separators and available
# normalization functions are very invasive and may string the trailing separator
return self.pathlib.join( # type: ignore[no-any-return]
self.dataset_path,
path_utils.normalize_path_sep(self.pathlib, self.destination_file_name),
)

def make_remote_uri(self) -> str:
"""Returns uri to the remote filesystem to which copy the file"""
remote_path = self.make_remote_path()
if self.is_local_filesystem:
return self.config.make_file_uri(remote_path)
else:
return f"{self.config.protocol}://{remote_path}"

def state(self) -> TLoadJobState:
return "completed"

Expand All @@ -100,7 +104,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
jobs = super().create_followup_jobs(final_state)
if final_state == "completed":
ref_job = NewReferenceJob(
file_name=self.file_name(), status="running", remote_path=self.make_remote_path()
file_name=self.file_name(), status="running", remote_path=self.make_remote_uri()
)
jobs.append(ref_job)
return jobs
Expand All @@ -111,36 +115,49 @@ class FilesystemClient(FSClientBase, JobClientBase, WithStagingDataset, WithStat

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
fs_client: AbstractFileSystem
fs_path: str
# a path (without the scheme) to a location in the bucket where dataset is present
bucket_path: str
# name of the dataset
dataset_name: str

def __init__(self, schema: Schema, config: FilesystemDestinationClientConfiguration) -> None:
super().__init__(schema, config)
self.fs_client, self.fs_path = fsspec_from_config(config)
self.fs_client, fs_path = fsspec_from_config(config)
self.is_local_filesystem = config.protocol == "file"
self.bucket_path = (
config.make_local_path(config.bucket_url) if self.is_local_filesystem else fs_path
)
# pick local filesystem pathlib or posix for buckets
self.pathlib = os.path if self.is_local_filesystem else posixpath

self.config: FilesystemDestinationClientConfiguration = config
# verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables
# cannot be replaced and we cannot initialize folders consistently
self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout)
self._dataset_path = self.config.normalize_dataset_name(self.schema)
self.dataset_name = self.config.normalize_dataset_name(self.schema)

def drop_storage(self) -> None:
if self.is_storage_initialized():
self.fs_client.rm(self.dataset_path, recursive=True)

@property
def dataset_path(self) -> str:
return posixpath.join(self.fs_path, self._dataset_path)
"""A path within a bucket to tables in a dataset
NOTE: dataset_name changes if with_staging_dataset is active
"""
return self.pathlib.join(self.bucket_path, self.dataset_name) # type: ignore[no-any-return]

@contextmanager
def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
current_dataset_path = self._dataset_path
current_dataset_name = self.dataset_name
try:
self._dataset_path = self.schema.naming.normalize_table_identifier(
current_dataset_path + "_staging"
self.dataset_name = self.schema.naming.normalize_table_identifier(
current_dataset_name + "_staging"
)
yield self
finally:
# restore previous dataset name
self._dataset_path = current_dataset_path
self.dataset_name = current_dataset_name

def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
# clean up existing files for tables selected for truncating
Expand All @@ -152,7 +169,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:

# we mark the storage folder as initialized
self.fs_client.makedirs(self.dataset_path, exist_ok=True)
self.fs_client.touch(posixpath.join(self.dataset_path, INIT_FILE_NAME))
self.fs_client.touch(self.pathlib.join(self.dataset_path, INIT_FILE_NAME))

def truncate_tables(self, table_names: List[str]) -> None:
"""Truncate table with given name"""
Expand All @@ -161,7 +178,7 @@ def truncate_tables(self, table_names: List[str]) -> None:
for table_dir in table_dirs:
for table_file in self.list_files_with_prefixes(table_dir, table_prefixes):
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
# print(f"DEL {item}")
# print(f"DEL {table_file}")
try:
# NOTE: must use rm_file to get errors on delete
self.fs_client.rm_file(table_file)
Expand All @@ -188,7 +205,7 @@ def update_stored_schema(
self.fs_client.makedirs(directory, exist_ok=True)
# we need to mark the folders of the data tables as initialized
if tables_name in self.schema.dlt_table_names():
self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME))
self.fs_client.touch(self.pathlib.join(directory, INIT_FILE_NAME))

# don't store schema when used as staging
if not self.config.as_staging:
Expand All @@ -199,17 +216,21 @@ def update_stored_schema(
def get_table_dir(self, table_name: str) -> str:
# dlt tables do not respect layout (for now)
table_prefix = self.get_table_prefix(table_name)
return posixpath.dirname(table_prefix)
return self.pathlib.dirname(table_prefix) # type: ignore[no-any-return]

def get_table_prefix(self, table_name: str) -> str:
# dlt tables do not respect layout (for now)
if table_name.startswith(self.schema._dlt_tables_prefix):
table_prefix = posixpath.join(table_name, "")
# dlt tables get layout where each tables is a folder
# it is crucial to append and keep "/" at the end
table_prefix = self.pathlib.join(table_name, "")
else:
table_prefix = self.table_prefix_layout.format(
schema_name=self.schema.name, table_name=table_name
)
return posixpath.join(self.dataset_path, table_prefix)
return self.pathlib.join( # type: ignore[no-any-return]
self.dataset_path, path_utils.normalize_path_sep(self.pathlib, table_prefix)
)

def get_table_dirs(self, table_names: Iterable[str]) -> List[str]:
"""Gets directories where table data is stored."""
Expand All @@ -227,15 +248,20 @@ def list_files_with_prefixes(self, table_dir: str, prefixes: List[str]) -> List[
result = []
for current_dir, _dirs, files in self.fs_client.walk(table_dir, detail=False, refresh=True):
for file in files:
filename = posixpath.join(current_dir, file)
# skip INIT files
if file == INIT_FILE_NAME:
continue
filepath = self.pathlib.join(
path_utils.normalize_path_sep(self.pathlib, current_dir), file
)
for p in prefixes:
if filename.startswith(p):
result.append(posixpath.join(current_dir, file))
continue
if filepath.startswith(p):
result.append(filepath)
break
return result

def is_storage_initialized(self) -> bool:
return self.fs_client.exists(posixpath.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return]
return self.fs_client.exists(self.pathlib.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return]

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
# skip the state table, we create a jsonl file in the complete_load step
Expand Down Expand Up @@ -272,7 +298,7 @@ def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
#

def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None:
dirname = posixpath.dirname(filepath)
dirname = self.pathlib.dirname(filepath)
if not self.fs_client.isdir(dirname):
return
self.fs_client.write_text(filepath, json.dumps(data), "utf-8")
Expand All @@ -283,7 +309,7 @@ def _to_path_safe_string(self, s: str) -> str:

def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str]]]:
dirname = self.get_table_dir(table_name)
if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)):
if not self.fs_client.exists(self.pathlib.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
for filepath in self.list_table_files(table_name):
filename = os.path.splitext(os.path.basename(filepath))[0]
Expand All @@ -302,7 +328,7 @@ def _store_load(self, load_id: str) -> None:
"inserted_at": pendulum.now().isoformat(),
"schema_version_hash": self.schema.version_hash,
}
filepath = posixpath.join(
filepath = self.pathlib.join(
self.dataset_path,
self.schema.loads_table_name,
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}.jsonl",
Expand All @@ -320,7 +346,7 @@ def complete_load(self, load_id: str) -> None:

def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return posixpath.join(
return self.pathlib.join( # type: ignore[no-any-return]
self.get_table_dir(self.schema.state_table_name),
f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)
Expand Down Expand Up @@ -370,7 +396,7 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
def _get_schema_file_name(self, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""

return posixpath.join(
return self.pathlib.join( # type: ignore[no-any-return]
self.get_table_dir(self.schema.version_table_name),
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)
Expand Down
13 changes: 11 additions & 2 deletions dlt/destinations/path_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@
SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS = ("schema_name",)


def normalize_path_sep(pathlib: Any, path: str) -> str:
"""Normalizes path in `path` separator to one used by `pathlib`"""
if pathlib.sep == "/":
return path.replace("\\", "/")
if pathlib.sep == "\\":
return path.replace("/", "\\")
return path


def get_placeholders(layout: str) -> List[str]:
return re.findall(r"\{(.*?)\}", layout)

Expand All @@ -89,7 +98,7 @@ def get_unused_placeholders(

def prepare_datetime_params(
current_datetime: Optional[pendulum.DateTime] = None,
load_package_timestamp: Optional[str] = None,
load_package_timestamp: Optional[pendulum.DateTime] = None,
) -> Dict[str, str]:
params: Dict[str, str] = {}
current_timestamp: pendulum.DateTime = None
Expand Down Expand Up @@ -205,7 +214,7 @@ def create_path(
file_name: str,
schema_name: str,
load_id: str,
load_package_timestamp: Optional[str] = None,
load_package_timestamp: Optional[pendulum.DateTime] = None,
current_datetime: Optional[TCurrentDateTime] = None,
extra_placeholders: Optional[Dict[str, Any]] = None,
) -> str:
Expand Down
14 changes: 14 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,20 @@ bucket_url="file://localhost/c$/a/b/c"
bucket_url="file:////localhost/c$/a/b/c"
```

:::caution
Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception.

To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry). `dlt` recognizes both regular and UNC extended paths

```toml
[destination.regular_extended]
bucket_url = '\\?\C:\a\b\c'

[destination.unc_extended]
bucket_url='\\?\UNC\localhost\c$\a\b\c'
```
:::

## Write disposition
The filesystem destination handles the write dispositions as follows:
- `append` - files belonging to such tables are added to the dataset folder
Expand Down
12 changes: 12 additions & 0 deletions docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ You can use both native local file system paths and in form of `file:` uri. Abso
You can find relevant examples in [filesystem destination documentation](../destinations/filesystem.md#local-file-system) which follows
the same rules to specify the `bucket_url`.

:::caution
Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception.

To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry).
**Note that Python glob does not work with extended UNC paths** so you will not be able to use them

```toml
[sources.filesystem]
bucket_url = '\\?\C:\a\b\c'
```
:::

## Run the pipeline

1. Before running the pipeline, ensure that you have installed all the necessary dependencies by
Expand Down
Loading

0 comments on commit dfd4d89

Please sign in to comment.