Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes filesystem dest on windows #1335

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,4 @@ def make_file_uri(local_path: str) -> str:
"""
p_ = pathlib.Path(local_path)
p_ = p_.expanduser().resolve()
# return "file:///" + p_.as_posix().lstrip("/")
return p_.as_uri()
5 changes: 5 additions & 0 deletions dlt/destinations/fs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
class FSClientBase(ABC):
fs_client: AbstractFileSystem

@property
@abstractmethod
def dataset_path(self) -> str:
pass

@abstractmethod
def get_table_dir(self, table_name: str) -> str:
"""returns directory for given table"""
Expand Down
110 changes: 68 additions & 42 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import posixpath
import pathlib
import os
import base64
from types import TracebackType
Expand All @@ -8,8 +9,6 @@
from dlt.common import json, pendulum
from dlt.common.typing import DictStrAny

import re

import dlt
from dlt.common import logger, time
from dlt.common.schema import Schema, TSchemaTables, TTableSchema
Expand Down Expand Up @@ -52,42 +51,47 @@ def __init__(
file_name = FileStorage.get_file_name_from_file_path(local_path)
self.config = config
self.dataset_path = dataset_path
self.is_local_filesystem = config.protocol == "file"
# pick local filesystem pathlib or posix for buckets
self.pathlib = os.path if self.is_local_filesystem else posixpath
self.destination_file_name = path_utils.create_path(
config.layout,
file_name,
schema_name,
load_id,
current_datetime=config.current_datetime,
load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore
load_package_timestamp=dlt.current.load_package()["state"]["created_at"],
extra_placeholders=config.extra_placeholders,
)

super().__init__(file_name)
fs_client, _ = fsspec_from_config(config)
self.destination_file_name = path_utils.create_path(
config.layout,
file_name,
schema_name,
load_id,
current_datetime=config.current_datetime,
load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore
extra_placeholders=config.extra_placeholders,
)

# We would like to avoid failing for local filesystem where
# deeply nested directory will not exist before writing a file.
# It `auto_mkdir` is disabled by default in fsspec so we made some
# trade offs between different options and decided on this.
item = self.make_remote_path()
if self.config.protocol == "file":
fs_client.makedirs(posixpath.dirname(item), exist_ok=True)
if self.is_local_filesystem:
fs_client.makedirs(self.pathlib.dirname(item), exist_ok=True)
fs_client.put_file(local_path, item)

def make_remote_path(self) -> str:
return (
f"{self.config.protocol}://{posixpath.join(self.dataset_path, self.destination_file_name)}"
"""Returns path on the remote filesystem to which copy the file, without scheme. For local filesystem a native path is used"""
# path.join does not normalize separators and available
# normalization functions are very invasive and may string the trailing separator
return self.pathlib.join( # type: ignore[no-any-return]
self.dataset_path,
path_utils.normalize_path_sep(self.pathlib, self.destination_file_name),
)

def make_remote_uri(self) -> str:
"""Returns uri to the remote filesystem to which copy the file"""
remote_path = self.make_remote_path()
if self.is_local_filesystem:
return self.config.make_file_uri(remote_path)
else:
return f"{self.config.protocol}://{remote_path}"

def state(self) -> TLoadJobState:
return "completed"

Expand All @@ -100,7 +104,7 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
jobs = super().create_followup_jobs(final_state)
if final_state == "completed":
ref_job = NewReferenceJob(
file_name=self.file_name(), status="running", remote_path=self.make_remote_path()
file_name=self.file_name(), status="running", remote_path=self.make_remote_uri()
)
jobs.append(ref_job)
return jobs
Expand All @@ -111,36 +115,49 @@ class FilesystemClient(FSClientBase, JobClientBase, WithStagingDataset, WithStat

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
fs_client: AbstractFileSystem
fs_path: str
# a path (without the scheme) to a location in the bucket where dataset is present
bucket_path: str
# name of the dataset
dataset_name: str

def __init__(self, schema: Schema, config: FilesystemDestinationClientConfiguration) -> None:
super().__init__(schema, config)
self.fs_client, self.fs_path = fsspec_from_config(config)
self.fs_client, fs_path = fsspec_from_config(config)
self.is_local_filesystem = config.protocol == "file"
self.bucket_path = (
config.make_local_path(config.bucket_url) if self.is_local_filesystem else fs_path
)
# pick local filesystem pathlib or posix for buckets
self.pathlib = os.path if self.is_local_filesystem else posixpath

self.config: FilesystemDestinationClientConfiguration = config
# verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables
# cannot be replaced and we cannot initialize folders consistently
self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout)
self._dataset_path = self.config.normalize_dataset_name(self.schema)
self.dataset_name = self.config.normalize_dataset_name(self.schema)

def drop_storage(self) -> None:
if self.is_storage_initialized():
self.fs_client.rm(self.dataset_path, recursive=True)

@property
def dataset_path(self) -> str:
return posixpath.join(self.fs_path, self._dataset_path)
"""A path within a bucket to tables in a dataset
NOTE: dataset_name changes if with_staging_dataset is active
"""
return self.pathlib.join(self.bucket_path, self.dataset_name) # type: ignore[no-any-return]

@contextmanager
def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
current_dataset_path = self._dataset_path
current_dataset_name = self.dataset_name
try:
self._dataset_path = self.schema.naming.normalize_table_identifier(
current_dataset_path + "_staging"
self.dataset_name = self.schema.naming.normalize_table_identifier(
current_dataset_name + "_staging"
)
yield self
finally:
# restore previous dataset name
self._dataset_path = current_dataset_path
self.dataset_name = current_dataset_name

def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
# clean up existing files for tables selected for truncating
Expand All @@ -152,7 +169,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:

# we mark the storage folder as initialized
self.fs_client.makedirs(self.dataset_path, exist_ok=True)
self.fs_client.touch(posixpath.join(self.dataset_path, INIT_FILE_NAME))
self.fs_client.touch(self.pathlib.join(self.dataset_path, INIT_FILE_NAME))

def truncate_tables(self, table_names: List[str]) -> None:
"""Truncate table with given name"""
Expand All @@ -161,7 +178,7 @@ def truncate_tables(self, table_names: List[str]) -> None:
for table_dir in table_dirs:
for table_file in self.list_files_with_prefixes(table_dir, table_prefixes):
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
# print(f"DEL {item}")
# print(f"DEL {table_file}")
try:
# NOTE: must use rm_file to get errors on delete
self.fs_client.rm_file(table_file)
Expand All @@ -188,7 +205,7 @@ def update_stored_schema(
self.fs_client.makedirs(directory, exist_ok=True)
# we need to mark the folders of the data tables as initialized
if tables_name in self.schema.dlt_table_names():
self.fs_client.touch(posixpath.join(directory, INIT_FILE_NAME))
self.fs_client.touch(self.pathlib.join(directory, INIT_FILE_NAME))

# don't store schema when used as staging
if not self.config.as_staging:
Expand All @@ -199,17 +216,21 @@ def update_stored_schema(
def get_table_dir(self, table_name: str) -> str:
# dlt tables do not respect layout (for now)
table_prefix = self.get_table_prefix(table_name)
return posixpath.dirname(table_prefix)
return self.pathlib.dirname(table_prefix) # type: ignore[no-any-return]

def get_table_prefix(self, table_name: str) -> str:
# dlt tables do not respect layout (for now)
if table_name.startswith(self.schema._dlt_tables_prefix):
table_prefix = posixpath.join(table_name, "")
# dlt tables get layout where each tables is a folder
# it is crucial to append and keep "/" at the end
table_prefix = self.pathlib.join(table_name, "")
else:
table_prefix = self.table_prefix_layout.format(
schema_name=self.schema.name, table_name=table_name
)
return posixpath.join(self.dataset_path, table_prefix)
return self.pathlib.join( # type: ignore[no-any-return]
self.dataset_path, path_utils.normalize_path_sep(self.pathlib, table_prefix)
)

def get_table_dirs(self, table_names: Iterable[str]) -> List[str]:
"""Gets directories where table data is stored."""
Expand All @@ -227,15 +248,20 @@ def list_files_with_prefixes(self, table_dir: str, prefixes: List[str]) -> List[
result = []
for current_dir, _dirs, files in self.fs_client.walk(table_dir, detail=False, refresh=True):
for file in files:
filename = posixpath.join(current_dir, file)
# skip INIT files
if file == INIT_FILE_NAME:
continue
filepath = self.pathlib.join(
path_utils.normalize_path_sep(self.pathlib, current_dir), file
)
for p in prefixes:
if filename.startswith(p):
result.append(posixpath.join(current_dir, file))
continue
if filepath.startswith(p):
result.append(filepath)
break
return result

def is_storage_initialized(self) -> bool:
return self.fs_client.exists(posixpath.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return]
return self.fs_client.exists(self.pathlib.join(self.dataset_path, INIT_FILE_NAME)) # type: ignore[no-any-return]

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
# skip the state table, we create a jsonl file in the complete_load step
Expand Down Expand Up @@ -272,7 +298,7 @@ def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
#

def _write_to_json_file(self, filepath: str, data: DictStrAny) -> None:
dirname = posixpath.dirname(filepath)
dirname = self.pathlib.dirname(filepath)
if not self.fs_client.isdir(dirname):
return
self.fs_client.write_text(filepath, json.dumps(data), "utf-8")
Expand All @@ -283,7 +309,7 @@ def _to_path_safe_string(self, s: str) -> str:

def _list_dlt_table_files(self, table_name: str) -> Iterator[Tuple[str, List[str]]]:
dirname = self.get_table_dir(table_name)
if not self.fs_client.exists(posixpath.join(dirname, INIT_FILE_NAME)):
if not self.fs_client.exists(self.pathlib.join(dirname, INIT_FILE_NAME)):
raise DestinationUndefinedEntity({"dir": dirname})
for filepath in self.list_table_files(table_name):
filename = os.path.splitext(os.path.basename(filepath))[0]
Expand All @@ -302,7 +328,7 @@ def _store_load(self, load_id: str) -> None:
"inserted_at": pendulum.now().isoformat(),
"schema_version_hash": self.schema.version_hash,
}
filepath = posixpath.join(
filepath = self.pathlib.join(
self.dataset_path,
self.schema.loads_table_name,
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}.jsonl",
Expand All @@ -320,7 +346,7 @@ def complete_load(self, load_id: str) -> None:

def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""
return posixpath.join(
return self.pathlib.join( # type: ignore[no-any-return]
self.get_table_dir(self.schema.state_table_name),
f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)
Expand Down Expand Up @@ -370,7 +396,7 @@ def get_stored_state(self, pipeline_name: str) -> Optional[StateInfo]:
def _get_schema_file_name(self, version_hash: str, load_id: str) -> str:
"""gets full path for schema file for a given hash"""

return posixpath.join(
return self.pathlib.join( # type: ignore[no-any-return]
self.get_table_dir(self.schema.version_table_name),
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)
Expand Down
13 changes: 11 additions & 2 deletions dlt/destinations/path_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@
SUPPORTED_TABLE_NAME_PREFIX_PLACEHOLDERS = ("schema_name",)


def normalize_path_sep(pathlib: Any, path: str) -> str:
"""Normalizes path in `path` separator to one used by `pathlib`"""
if pathlib.sep == "/":
return path.replace("\\", "/")
if pathlib.sep == "\\":
return path.replace("/", "\\")
return path


def get_placeholders(layout: str) -> List[str]:
return re.findall(r"\{(.*?)\}", layout)

Expand All @@ -89,7 +98,7 @@ def get_unused_placeholders(

def prepare_datetime_params(
current_datetime: Optional[pendulum.DateTime] = None,
load_package_timestamp: Optional[str] = None,
load_package_timestamp: Optional[pendulum.DateTime] = None,
) -> Dict[str, str]:
params: Dict[str, str] = {}
current_timestamp: pendulum.DateTime = None
Expand Down Expand Up @@ -205,7 +214,7 @@ def create_path(
file_name: str,
schema_name: str,
load_id: str,
load_package_timestamp: Optional[str] = None,
load_package_timestamp: Optional[pendulum.DateTime] = None,
current_datetime: Optional[TCurrentDateTime] = None,
extra_placeholders: Optional[Dict[str, Any]] = None,
) -> str:
Expand Down
14 changes: 14 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,20 @@ bucket_url="file://localhost/c$/a/b/c"
bucket_url="file:////localhost/c$/a/b/c"
```

:::caution
Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception.

To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry). `dlt` recognizes both regular and UNC extended paths

```toml
[destination.regular_extended]
bucket_url = '\\?\C:\a\b\c'

[destination.unc_extended]
bucket_url='\\?\UNC\localhost\c$\a\b\c'
```
:::

## Write disposition
The filesystem destination handles the write dispositions as follows:
- `append` - files belonging to such tables are added to the dataset folder
Expand Down
12 changes: 12 additions & 0 deletions docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ You can use both native local file system paths and in form of `file:` uri. Abso
You can find relevant examples in [filesystem destination documentation](../destinations/filesystem.md#local-file-system) which follows
the same rules to specify the `bucket_url`.

:::caution
Windows supports paths up to 255 characters. When you access a path longer than 255 characters you'll see `FileNotFound` exception.

To go over this limit you can use [extended paths](https://learn.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=registry).
**Note that Python glob does not work with extended UNC paths** so you will not be able to use them

```toml
[sources.filesystem]
bucket_url = '\\?\C:\a\b\c'
```
:::

## Run the pipeline

1. Before running the pipeline, ensure that you have installed all the necessary dependencies by
Expand Down
Loading
Loading