Skip to content

Commit

Permalink
simplifies cloning resources (#671)
Browse files Browse the repository at this point in the history
* does not propagate source section when standalone resources are extracted to preserve their config sections

* name is not set to resource name in table template, must be set explicitly

* removes pipe id, removes separate resource name, defines how resources are cloned and added to the source after it is created

* simplifies and documents with_name method

* does not clone parent pipes on call

* adds explicit standalone resources

* fixes finding classes in union to sub and superclass of cls

* bumps to 0.3.19a0

* implements standalone transformers, cleanup of dlt.sources module

* reorganizes dlt.sources.filesystem module

* removes too early binding of resource name in incremental

* improves tests and docstring for state reset on replace

* adds glob filesystem files + tests

* fixes glob pattern

* fixes globbing on windows
  • Loading branch information
rudolfix authored Oct 10, 2023
1 parent 13c00b5 commit 347042a
Show file tree
Hide file tree
Showing 48 changed files with 3,709 additions and 428 deletions.
3 changes: 0 additions & 3 deletions dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def initialize_credentials(hint: Any, initial_value: Any) -> CredentialsConfigur
first_credentials: CredentialsConfiguration = None
for idx, spec in enumerate(specs_in_union):
try:
# print(spec)
credentials = spec(initial_value)
if credentials.is_resolved():
return credentials
Expand Down Expand Up @@ -87,7 +86,6 @@ def inject_section(section_context: ConfigSectionContext, merge_existing: bool =
def _maybe_parse_native_value(config: TConfiguration, explicit_value: Any, embedded_sections: Tuple[str, ...]) -> Any:
# use initial value to resolve the whole configuration. if explicit value is a mapping it will be applied field by field later
if explicit_value and (not isinstance(explicit_value, C_Mapping) or isinstance(explicit_value, BaseConfiguration)):
# print(f"TRYING TO PARSE NATIVE from {explicit_value}")
try:
config.parse_native_representation(explicit_value)
except ValueError as v_err:
Expand Down Expand Up @@ -167,7 +165,6 @@ def _resolve_config_fields(
specs_in_union: List[Type[BaseConfiguration]] = []
current_value = None
if is_union(hint):
# print(f"HINT UNION?: {key}:{hint}")
# if union contains a type of explicit value which is not a valid hint, return it as current value
if explicit_value and not is_valid_hint(type(explicit_value)) and get_all_types_of_class_in_union(hint, type(explicit_value)):
current_value, traces = explicit_value, []
Expand Down
4 changes: 3 additions & 1 deletion dlt/common/configuration/specs/config_section_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def resource_merge_style(incoming: "ConfigSectionContext", existing: "ConfigSect
"""If top level section is same and there are 3 sections it replaces second element (source module) from existing and keeps the 3rd element (name)"""
incoming.pipeline_name = incoming.pipeline_name or existing.pipeline_name
if len(incoming.sections) == 3 == len(existing.sections) and incoming.sections[0] == existing.sections[0]:
incoming.sections = (incoming.sections[0], existing.sections[1], incoming.sections[2])
# existing does not have middle section then keep incoming
# standalone resources do not emit existing to not overwrite each other
incoming.sections = (incoming.sections[0], existing.sections[1] or incoming.sections[1], incoming.sections[2])
incoming.source_state_key = existing.source_state_key or incoming.source_state_key
else:
incoming.sections = incoming.sections or existing.sections
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ def resource_state(resource_name: str = None, source_state_: Optional[DictStrAny
return state_.setdefault('resources', {}).setdefault(resource_name, {}) # type: ignore


def _reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> None:
"""Alpha version of the resource state. Resets the resource state
def reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> None:
"""Resets the resource state with name `resource_name` by removing it from `source_state`
Args:
resource_name: The resource key to reset
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
from .load_storage import LoadStorage # noqa: F401
from .data_item_storage import DataItemStorage # noqa: F401
from .configuration import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration, TSchemaFileFormat, FilesystemConfiguration # noqa: F401
from .filesystem import filesystem_from_config, filesystem # noqa: F401
from .fsspec_filesystem import fsspec_from_config, fsspec_filesystem # noqa: F401
7 changes: 6 additions & 1 deletion dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from urllib.parse import urlparse
from typing import TYPE_CHECKING, Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union

Expand Down Expand Up @@ -74,7 +75,11 @@ class FilesystemConfiguration(BaseConfiguration):
def protocol(self) -> str:
"""`bucket_url` protocol"""
url = urlparse(self.bucket_url)
return url.scheme or "file"
# this prevents windows absolute paths to be recognized as schemas
if not url.scheme or (os.path.isabs(self.bucket_url) and "\\" in self.bucket_url):
return "file"
else:
return url.scheme

def on_resolved(self) -> None:
url = urlparse(self.bucket_url)
Expand Down
86 changes: 0 additions & 86 deletions dlt/common/storages/filesystem.py

This file was deleted.

223 changes: 223 additions & 0 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import io
import mimetypes
import posixpath
import pathlib
from urllib.parse import urlparse
from io import BytesIO
from typing import cast, Tuple, TypedDict, Optional, Union, Iterator, Any, IO

from fsspec.core import url_to_fs
from fsspec import AbstractFileSystem

from dlt.common import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import DictStrAny
from dlt.common.configuration.specs import CredentialsWithDefault, GcpCredentials, AwsCredentials, AzureCredentials
from dlt.common.storages.configuration import FileSystemCredentials, FilesystemConfiguration

from dlt import version


class FileItem(TypedDict, total=False):
"""A DataItem representing a file"""
file_url: str
file_name: str
mime_type: str
modification_date: pendulum.DateTime
size_in_bytes: int
file_content: Optional[bytes]


# Map of protocol to mtime resolver
# we only need to support a small finite set of protocols
MTIME_DISPATCH = {
"s3": lambda f: ensure_pendulum_datetime(f["LastModified"]),
"adl": lambda f: ensure_pendulum_datetime(f["LastModified"]),
"az": lambda f: ensure_pendulum_datetime(f["last_modified"]),
"gcs": lambda f: ensure_pendulum_datetime(f["updated"]),
"file": lambda f: ensure_pendulum_datetime(f["mtime"]),
"memory": lambda f: ensure_pendulum_datetime(f["created"]),
}
# Support aliases
MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"]
MTIME_DISPATCH["s3a"] = MTIME_DISPATCH["s3"]
MTIME_DISPATCH["abfs"] = MTIME_DISPATCH["az"]


def fsspec_filesystem(protocol: str, credentials: FileSystemCredentials = None) -> Tuple[AbstractFileSystem, str]:
"""Instantiates an authenticated fsspec `FileSystem` for a given `protocol` and credentials.
Please supply credentials instance corresponding to the protocol. The `protocol` is just the code name of the filesystem ie:
* s3
* az, abfs
* gcs, gs
also see filesystem_from_config
"""
return fsspec_from_config(FilesystemConfiguration(protocol, credentials))



def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSystem, str]:
"""Instantiates an authenticated fsspec `FileSystem` from `config` argument.
Authenticates following filesystems:
* s3
* az, abfs
* gcs, gs
All other filesystems are not authenticated
Returns: (fsspec filesystem, normalized url)
"""
proto = config.protocol
fs_kwargs: DictStrAny = {}
if proto == "s3":
fs_kwargs.update(cast(AwsCredentials, config.credentials).to_s3fs_credentials())
elif proto in ["az", "abfs", "adl", "azure"]:
fs_kwargs.update(cast(AzureCredentials, config.credentials).to_adlfs_credentials())
elif proto in ['gcs', 'gs']:
assert isinstance(config.credentials, GcpCredentials)
# Default credentials are handled by gcsfs
if isinstance(config.credentials, CredentialsWithDefault) and config.credentials.has_default_credentials():
fs_kwargs['token'] = None
else:
fs_kwargs['token'] = dict(config.credentials)
fs_kwargs['project'] = config.credentials.project_id
try:
return url_to_fs(config.bucket_url, use_listings_cache=False, **fs_kwargs) # type: ignore[no-any-return]
except ModuleNotFoundError as e:
raise MissingDependencyException("filesystem", [f"{version.DLT_PKG_NAME}[{proto}]"]) from e


class FileItemDict(DictStrAny):
"""A FileItem dictionary with additional methods to get fsspec filesystem, open and read files.
"""

def __init__(
self, mapping: FileItem, credentials: Optional[Union[FileSystemCredentials, AbstractFileSystem]] = None
):
"""Create a dictionary with the filesystem client.
Args:
mapping (FileItem): The file item TypedDict.
credentials (Optional[FileSystemCredentials], optional): The credentials to the
filesystem. Defaults to None.
"""
self.credentials = credentials
super().__init__(**mapping)

@property
def fsspec(self) -> AbstractFileSystem:
"""The filesystem client based on the given credentials.
Returns:
AbstractFileSystem: The fsspec client.
"""
if isinstance(self.credentials, AbstractFileSystem):
return self.credentials
else:
return fsspec_filesystem(self["file_url"], self.credentials)[0]

def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
"""Open the file as a fsspec file.
This method opens the file represented by this dictionary as a file-like object using
the fsspec library.
Args:
**kwargs (Any): The arguments to pass to the fsspec open function.
Returns:
IOBase: The fsspec file.
"""
opened_file: IO[Any]
# if the user has already extracted the content, we use it so there will be no need to
# download the file again.
if "file_content" in self:
bytes_io = BytesIO(self["file_content"])

if "t" in mode:
text_kwargs = {
k: kwargs.pop(k)
for k in ["encoding", "errors", "newline"]
if k in kwargs
}
return io.TextIOWrapper(
bytes_io,
**text_kwargs,
)
else:
return bytes_io
else:
opened_file = self.fsspec.open(self["file_url"], mode=mode, **kwargs)
return opened_file

def read_bytes(self) -> bytes:
"""Read the file content.
Returns:
bytes: The file content.
"""
content: bytes
# same as open, if the user has already extracted the content, we use it.
if "file_content" in self and self["file_content"] is not None:
content = self["file_content"]
else:
content = self.fsspec.read_bytes(self["file_url"])
return content


def guess_mime_type(file_name: str) -> str:
mime_type = mimetypes.guess_type(posixpath.basename(file_name), strict=False)[0]
if not mime_type:
mime_type = "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream")
return mime_type


def glob_files(
fs_client: AbstractFileSystem, bucket_url: str, file_glob: str = "**"
) -> Iterator[FileItem]:
"""Get the files from the filesystem client.
Args:
fs_client (AbstractFileSystem): The filesystem client.
bucket_url (str): The url to the bucket.
file_glob (str): A glob for the filename filter.
Returns:
Iterable[FileItem]: The list of files.
"""
import os
bucket_url_parsed = urlparse(bucket_url)
# if this is file path without scheme
if not bucket_url_parsed.scheme or (os.path.isabs(bucket_url) and "\\" in bucket_url):
# this is a file so create a proper file url
bucket_url = pathlib.Path(bucket_url).absolute().as_uri()
bucket_url_parsed = urlparse(bucket_url)

bucket_path = bucket_url_parsed._replace(scheme='').geturl()
bucket_path = bucket_path[2:] if bucket_path.startswith("//") else bucket_path
filter_url = posixpath.join(bucket_path, file_glob)

glob_result = fs_client.glob(filter_url, detail=True)
if isinstance(glob_result, list):
raise NotImplementedError("Cannot request details when using fsspec.glob. For ADSL (Azure) please use version 2023.9.0 or later")

for file, md in glob_result.items():
if md["type"] != "file":
continue
# make that absolute path on a file://
if bucket_url_parsed.scheme == "file" and not file.startswith("/"):
file = "/" + file
file_name = posixpath.relpath(file, bucket_path)
file_url = bucket_url_parsed.scheme + "://" + file
yield FileItem(
file_name=file_name,
file_url=file_url,
mime_type=guess_mime_type(file_name),
modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md),
size_in_bytes=int(md["size"]),
)
2 changes: 1 addition & 1 deletion dlt/common/storages/transactional_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import fsspec

from dlt.common.pendulum import pendulum, timedelta
from dlt.common.storages.filesystem import MTIME_DISPATCH
from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH


def lock_id(k: int = 4) -> str:
Expand Down
Loading

0 comments on commit 347042a

Please sign in to comment.