Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplifies cloning resources #671

Merged
merged 15 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def initialize_credentials(hint: Any, initial_value: Any) -> CredentialsConfigur
first_credentials: CredentialsConfiguration = None
for idx, spec in enumerate(specs_in_union):
try:
# print(spec)
credentials = spec(initial_value)
if credentials.is_resolved():
return credentials
Expand Down Expand Up @@ -87,7 +86,6 @@ def inject_section(section_context: ConfigSectionContext, merge_existing: bool =
def _maybe_parse_native_value(config: TConfiguration, explicit_value: Any, embedded_sections: Tuple[str, ...]) -> Any:
# use initial value to resolve the whole configuration. if explicit value is a mapping it will be applied field by field later
if explicit_value and (not isinstance(explicit_value, C_Mapping) or isinstance(explicit_value, BaseConfiguration)):
# print(f"TRYING TO PARSE NATIVE from {explicit_value}")
try:
config.parse_native_representation(explicit_value)
except ValueError as v_err:
Expand Down Expand Up @@ -167,7 +165,6 @@ def _resolve_config_fields(
specs_in_union: List[Type[BaseConfiguration]] = []
current_value = None
if is_union(hint):
# print(f"HINT UNION?: {key}:{hint}")
# if union contains a type of explicit value which is not a valid hint, return it as current value
if explicit_value and not is_valid_hint(type(explicit_value)) and get_all_types_of_class_in_union(hint, type(explicit_value)):
current_value, traces = explicit_value, []
Expand Down
4 changes: 3 additions & 1 deletion dlt/common/configuration/specs/config_section_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def resource_merge_style(incoming: "ConfigSectionContext", existing: "ConfigSect
"""If top level section is same and there are 3 sections it replaces second element (source module) from existing and keeps the 3rd element (name)"""
incoming.pipeline_name = incoming.pipeline_name or existing.pipeline_name
if len(incoming.sections) == 3 == len(existing.sections) and incoming.sections[0] == existing.sections[0]:
incoming.sections = (incoming.sections[0], existing.sections[1], incoming.sections[2])
# existing does not have middle section then keep incoming
# standalone resources do not emit existing to not overwrite each other
incoming.sections = (incoming.sections[0], existing.sections[1] or incoming.sections[1], incoming.sections[2])
incoming.source_state_key = existing.source_state_key or incoming.source_state_key
else:
incoming.sections = incoming.sections or existing.sections
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ def resource_state(resource_name: str = None, source_state_: Optional[DictStrAny
return state_.setdefault('resources', {}).setdefault(resource_name, {}) # type: ignore


def _reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> None:
"""Alpha version of the resource state. Resets the resource state
def reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> None:
"""Resets the resource state with name `resource_name` by removing it from `source_state`

Args:
resource_name: The resource key to reset
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
from .load_storage import LoadStorage # noqa: F401
from .data_item_storage import DataItemStorage # noqa: F401
from .configuration import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration, TSchemaFileFormat, FilesystemConfiguration # noqa: F401
from .filesystem import filesystem_from_config, filesystem # noqa: F401
from .fsspec_filesystem import fsspec_from_config, fsspec_filesystem # noqa: F401
7 changes: 6 additions & 1 deletion dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from urllib.parse import urlparse
from typing import TYPE_CHECKING, Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union

Expand Down Expand Up @@ -74,7 +75,11 @@ class FilesystemConfiguration(BaseConfiguration):
def protocol(self) -> str:
"""`bucket_url` protocol"""
url = urlparse(self.bucket_url)
return url.scheme or "file"
# this prevents windows absolute paths to be recognized as schemas
if not url.scheme or (os.path.isabs(self.bucket_url) and "\\" in self.bucket_url):
return "file"
else:
return url.scheme

def on_resolved(self) -> None:
url = urlparse(self.bucket_url)
Expand Down
86 changes: 0 additions & 86 deletions dlt/common/storages/filesystem.py

This file was deleted.

223 changes: 223 additions & 0 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import io
import mimetypes
import posixpath
import pathlib
from urllib.parse import urlparse
from io import BytesIO
from typing import cast, Tuple, TypedDict, Optional, Union, Iterator, Any, IO

from fsspec.core import url_to_fs
from fsspec import AbstractFileSystem

from dlt.common import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import DictStrAny
from dlt.common.configuration.specs import CredentialsWithDefault, GcpCredentials, AwsCredentials, AzureCredentials
from dlt.common.storages.configuration import FileSystemCredentials, FilesystemConfiguration

from dlt import version


class FileItem(TypedDict, total=False):
"""A DataItem representing a file"""
file_url: str
file_name: str
mime_type: str
modification_date: pendulum.DateTime
size_in_bytes: int
file_content: Optional[bytes]


# Map of protocol to mtime resolver
# we only need to support a small finite set of protocols
MTIME_DISPATCH = {
"s3": lambda f: ensure_pendulum_datetime(f["LastModified"]),
"adl": lambda f: ensure_pendulum_datetime(f["LastModified"]),
"az": lambda f: ensure_pendulum_datetime(f["last_modified"]),
"gcs": lambda f: ensure_pendulum_datetime(f["updated"]),
"file": lambda f: ensure_pendulum_datetime(f["mtime"]),
"memory": lambda f: ensure_pendulum_datetime(f["created"]),
}
# Support aliases
MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"]
MTIME_DISPATCH["s3a"] = MTIME_DISPATCH["s3"]
MTIME_DISPATCH["abfs"] = MTIME_DISPATCH["az"]


def fsspec_filesystem(protocol: str, credentials: FileSystemCredentials = None) -> Tuple[AbstractFileSystem, str]:
"""Instantiates an authenticated fsspec `FileSystem` for a given `protocol` and credentials.

Please supply credentials instance corresponding to the protocol. The `protocol` is just the code name of the filesystem ie:
* s3
* az, abfs
* gcs, gs

also see filesystem_from_config
"""
return fsspec_from_config(FilesystemConfiguration(protocol, credentials))



def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSystem, str]:
"""Instantiates an authenticated fsspec `FileSystem` from `config` argument.

Authenticates following filesystems:
* s3
* az, abfs
* gcs, gs

All other filesystems are not authenticated

Returns: (fsspec filesystem, normalized url)

"""
proto = config.protocol
fs_kwargs: DictStrAny = {}
if proto == "s3":
fs_kwargs.update(cast(AwsCredentials, config.credentials).to_s3fs_credentials())
elif proto in ["az", "abfs", "adl", "azure"]:
fs_kwargs.update(cast(AzureCredentials, config.credentials).to_adlfs_credentials())
elif proto in ['gcs', 'gs']:
assert isinstance(config.credentials, GcpCredentials)
# Default credentials are handled by gcsfs
if isinstance(config.credentials, CredentialsWithDefault) and config.credentials.has_default_credentials():
fs_kwargs['token'] = None
else:
fs_kwargs['token'] = dict(config.credentials)
fs_kwargs['project'] = config.credentials.project_id
try:
return url_to_fs(config.bucket_url, use_listings_cache=False, **fs_kwargs) # type: ignore[no-any-return]
except ModuleNotFoundError as e:
raise MissingDependencyException("filesystem", [f"{version.DLT_PKG_NAME}[{proto}]"]) from e


class FileItemDict(DictStrAny):
"""A FileItem dictionary with additional methods to get fsspec filesystem, open and read files.
"""

def __init__(
self, mapping: FileItem, credentials: Optional[Union[FileSystemCredentials, AbstractFileSystem]] = None
):
"""Create a dictionary with the filesystem client.

Args:
mapping (FileItem): The file item TypedDict.
credentials (Optional[FileSystemCredentials], optional): The credentials to the
filesystem. Defaults to None.
"""
self.credentials = credentials
super().__init__(**mapping)

@property
def fsspec(self) -> AbstractFileSystem:
"""The filesystem client based on the given credentials.

Returns:
AbstractFileSystem: The fsspec client.
"""
if isinstance(self.credentials, AbstractFileSystem):
return self.credentials
else:
return fsspec_filesystem(self["file_url"], self.credentials)[0]

def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
"""Open the file as a fsspec file.

This method opens the file represented by this dictionary as a file-like object using
the fsspec library.

Args:
**kwargs (Any): The arguments to pass to the fsspec open function.

Returns:
IOBase: The fsspec file.
"""
opened_file: IO[Any]
# if the user has already extracted the content, we use it so there will be no need to
# download the file again.
if "file_content" in self:
bytes_io = BytesIO(self["file_content"])

if "t" in mode:
text_kwargs = {
k: kwargs.pop(k)
for k in ["encoding", "errors", "newline"]
if k in kwargs
}
return io.TextIOWrapper(
bytes_io,
**text_kwargs,
)
else:
return bytes_io
else:
opened_file = self.fsspec.open(self["file_url"], mode=mode, **kwargs)
return opened_file

def read_bytes(self) -> bytes:
"""Read the file content.

Returns:
bytes: The file content.
"""
content: bytes
# same as open, if the user has already extracted the content, we use it.
if "file_content" in self and self["file_content"] is not None:
content = self["file_content"]
else:
content = self.fsspec.read_bytes(self["file_url"])
return content


def guess_mime_type(file_name: str) -> str:
mime_type = mimetypes.guess_type(posixpath.basename(file_name), strict=False)[0]
if not mime_type:
mime_type = "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream")
return mime_type


def glob_files(
fs_client: AbstractFileSystem, bucket_url: str, file_glob: str = "**"
) -> Iterator[FileItem]:
"""Get the files from the filesystem client.

Args:
fs_client (AbstractFileSystem): The filesystem client.
bucket_url (str): The url to the bucket.
file_glob (str): A glob for the filename filter.

Returns:
Iterable[FileItem]: The list of files.
"""
import os
bucket_url_parsed = urlparse(bucket_url)
# if this is file path without scheme
if not bucket_url_parsed.scheme or (os.path.isabs(bucket_url) and "\\" in bucket_url):
# this is a file so create a proper file url
bucket_url = pathlib.Path(bucket_url).absolute().as_uri()
bucket_url_parsed = urlparse(bucket_url)

bucket_path = bucket_url_parsed._replace(scheme='').geturl()
bucket_path = bucket_path[2:] if bucket_path.startswith("//") else bucket_path
filter_url = posixpath.join(bucket_path, file_glob)

glob_result = fs_client.glob(filter_url, detail=True)
if isinstance(glob_result, list):
raise NotImplementedError("Cannot request details when using fsspec.glob. For ADSL (Azure) please use version 2023.9.0 or later")

for file, md in glob_result.items():
if md["type"] != "file":
continue
# make that absolute path on a file://
if bucket_url_parsed.scheme == "file" and not file.startswith("/"):
file = "/" + file
file_name = posixpath.relpath(file, bucket_path)
file_url = bucket_url_parsed.scheme + "://" + file
yield FileItem(
file_name=file_name,
file_url=file_url,
mime_type=guess_mime_type(file_name),
modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md),
size_in_bytes=int(md["size"]),
)
2 changes: 1 addition & 1 deletion dlt/common/storages/transactional_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import fsspec

from dlt.common.pendulum import pendulum, timedelta
from dlt.common.storages.filesystem import MTIME_DISPATCH
from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH


def lock_id(k: int = 4) -> str:
Expand Down
Loading