Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements in Filesystem Configuration #869

Merged
merged 50 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
03e8de5
Add preliminary tests (#814)
Pipboyguy Dec 19, 2023
939d8ea
Rename keywords, add class attrs (#814)
Pipboyguy Dec 19, 2023
55bdd64
Amend test_filesystem_configuration to reflect new optional params (#…
Pipboyguy Dec 19, 2023
b902731
Update and extend FilesystemConfiguration along with filesystem desti…
Pipboyguy Dec 19, 2023
5326396
Format (#814)
Pipboyguy Dec 19, 2023
6f127c3
Spelling (#814)
Pipboyguy Dec 19, 2023
f648e9d
Improve documentation for fsspec_from_config (#814)
Pipboyguy Dec 19, 2023
af99dee
Merge branch 'devel' into 814-additional-arguments-fsspec
Pipboyguy Dec 22, 2023
07429f6
Improve comment (#814)
Pipboyguy Dec 22, 2023
527abdd
Refactor storage and test configurations, update fsspec_filesystem (#…
Pipboyguy Dec 22, 2023
321b4bf
Merge branch 'devel' into 814-additional-arguments-fsspec
Pipboyguy Dec 23, 2023
d0e92e4
Expand test(#814)
Pipboyguy Dec 23, 2023
32efe61
Import test fixtures (#814)
Pipboyguy Dec 23, 2023
c2169d9
Explicitly mark function scoped fixtures defined in utls.py to help w…
Pipboyguy Dec 23, 2023
228b066
Revert filesystem client tests to devel (#814)
Pipboyguy Dec 23, 2023
f620745
Refactor and update SSL test cases for file systems (#814)
Pipboyguy Dec 23, 2023
c998216
Update test for S3 SSL certificate verification (#814)
Pipboyguy Dec 24, 2023
1de5b9c
Add cryptography to package requirements (#814)
Pipboyguy Dec 24, 2023
b60436e
Use dlt.common.pendulum instead of datetime (#814)
Pipboyguy Dec 26, 2023
99af17e
Move defaults before update statements (#814)
Pipboyguy Dec 26, 2023
cfbdb0b
Revert to devel's poetry.lock and pyproject.toml (#814)
Pipboyguy Dec 26, 2023
691430a
Add cryptography to dev dependency group (#814)
Pipboyguy Dec 26, 2023
8a3a25e
Remove autoused fixtures (#814)
Pipboyguy Dec 26, 2023
2265e7d
Move cert fixture to filesystem test folder (#814)
Pipboyguy Dec 28, 2023
2c6ebc2
Use env variables (#814)
Pipboyguy Dec 29, 2023
6c57688
docs(hubspot): update docs (#864)
IlyaFaer Dec 29, 2023
7c47ac4
moves destination adapters to dlt.destination.adapters
rudolfix Dec 23, 2023
9b8a01b
fixes retry attempt ident
rudolfix Dec 25, 2023
9882f12
evaluates callable as Destination class type
rudolfix Dec 27, 2023
3daabce
adds destination docs
rudolfix Dec 27, 2023
f4421d7
improves configuration docs
rudolfix Dec 28, 2023
2d91670
fixes typos and formatting
rudolfix Dec 29, 2023
079c2ef
corrected data type
Dec 28, 2023
0fe5b67
bumps to version 0.4.2 (#865)
burnash Dec 29, 2023
abe683d
add Google secrets (#859)
AstrakhantsevaAA Dec 29, 2023
8e252ca
Update the release instructions in CONTRIBUTING.md
burnash Dec 29, 2023
c51e1d3
Merge remote-tracking branch 'origin/devel' into 814-additional-argum…
Pipboyguy Jan 1, 2024
3713fc4
Format (#814)
Pipboyguy Jan 1, 2024
0175184
Update filesystem documentation for clarity and grammar
Pipboyguy Jan 1, 2024
31ce0ce
Update file system destination documentation with fsspec additional a…
Pipboyguy Jan 1, 2024
27789c8
Merge branch 'devel' into 814-additional-arguments-fsspec
Pipboyguy Jan 2, 2024
7ecae7f
Remove outdated comment, grammar (#814)
Pipboyguy Jan 2, 2024
f137adf
Move self-signed cert fixture to utils.py (#814)
Pipboyguy Jan 2, 2024
51dbb1b
Add cryptography with latest devel deps(#814)
Pipboyguy Jan 2, 2024
07a01d1
Refactor filesystem and credential handling (#814)
Pipboyguy Jan 3, 2024
0dc61ba
Simplify (#814)
Pipboyguy Jan 3, 2024
313b2d5
Merge branch 'devel' into 814-additional-arguments-fsspec
Pipboyguy Jan 3, 2024
b923715
use os.environ instead of `environment` fixture (#814)
Pipboyguy Jan 4, 2024
5ef1a8d
Merge
Pipboyguy Jan 5, 2024
9720b9d
Merge branch 'devel' into 814-additional-arguments-fsspec
sh-rp Jan 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
from typing import Any, Final, List, Tuple, Union
from typing import Any, Final, List, Tuple, Union, Dict

from dlt.common import json, pendulum
from dlt.common.configuration.specs.api_credentials import OAuth2Credentials
Expand Down Expand Up @@ -48,6 +48,20 @@ def _from_info_dict(self, info: StrAny) -> None:
def __str__(self) -> str:
return f"{self.project_id}"

def to_gcs_credentials(self) -> Dict[str, Any]:
"""
Dict of keyword arguments can be passed to gcsfs.
Delegates default GCS credential handling to gcsfs.
"""
return {
"project": self.project_id,
"token": (
None
if isinstance(self, CredentialsWithDefault) and self.has_default_credentials()
else dict(self)
),
}


@configspec
class GcpServiceAccountCredentialsWithoutDefaults(GcpCredentials):
Expand Down
33 changes: 24 additions & 9 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
from urllib.parse import urlparse
from typing import TYPE_CHECKING, Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union
from urllib.parse import urlparse

from dlt.common.configuration.specs import BaseConfiguration, configspec, CredentialsConfiguration
from dlt.common.configuration import configspec, resolve_type
from dlt.common.configuration.exceptions import ConfigurationValueError
from dlt.common.configuration.specs import CredentialsConfiguration
from dlt.common.configuration.specs import (
GcpServiceAccountCredentials,
AwsCredentials,
Expand All @@ -12,8 +13,9 @@
AzureCredentialsWithoutDefaults,
BaseConfiguration,
)
from dlt.common.typing import DictStrAny
from dlt.common.utils import digest128
from dlt.common.configuration.exceptions import ConfigurationValueError


TSchemaFileFormat = Literal["json", "yaml"]
SchemaFileExtensions = get_args(TSchemaFileFormat)
Expand Down Expand Up @@ -92,9 +94,13 @@ class FilesystemConfiguration(BaseConfiguration):
}

bucket_url: str = None
# should be an union of all possible credentials as found in PROTOCOL_CREDENTIALS

# should be a union of all possible credentials as found in PROTOCOL_CREDENTIALS
credentials: FileSystemCredentials

kwargs: Optional[DictStrAny] = None
client_kwargs: Optional[DictStrAny] = None

@property
def protocol(self) -> str:
"""`bucket_url` protocol"""
Expand All @@ -112,7 +118,7 @@ def on_resolved(self) -> None:
"File path or netloc missing. Field bucket_url of FilesystemClientConfiguration"
" must contain valid url with a path or host:password component."
)
# this is just a path in local file system
# this is just a path in a local file system
if url.path == self.bucket_url:
url = url._replace(scheme="file")
self.bucket_url = url.geturl()
Expand All @@ -124,9 +130,7 @@ def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:

def fingerprint(self) -> str:
"""Returns a fingerprint of bucket_url"""
if self.bucket_url:
return digest128(self.bucket_url)
return ""
return digest128(self.bucket_url) if self.bucket_url else ""

def __str__(self) -> str:
"""Return displayable destination location"""
Expand All @@ -141,4 +145,15 @@ def __str__(self) -> str:

if TYPE_CHECKING:

def __init__(self, bucket_url: str, credentials: FileSystemCredentials = None) -> None: ...
def __init__(
self,
bucket_url: str,
credentials: FileSystemCredentials = None,
kwargs: Optional[DictStrAny] = None,
client_kwargs: Optional[DictStrAny] = None,
) -> None:
self.bucket_url = bucket_url
self.credentials = credentials
self.kwargs = kwargs
self.client_kwargs = client_kwargs
...
124 changes: 69 additions & 55 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
import io
import mimetypes
import posixpath
import pathlib
from urllib.parse import urlparse
import posixpath
from io import BytesIO
from typing import cast, Tuple, TypedDict, Optional, Union, Iterator, Any, IO
from typing import cast, Tuple, TypedDict, Optional, Union, Iterator, Any, IO, Dict, Callable
from urllib.parse import urlparse

from fsspec.core import url_to_fs
from fsspec import AbstractFileSystem
from fsspec.core import url_to_fs

from dlt import version
from dlt.common import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import DictStrAny
from dlt.common.configuration.specs import (
CredentialsWithDefault,
GcpCredentials,
AwsCredentials,
AzureCredentials,
)
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages.configuration import FileSystemCredentials, FilesystemConfiguration

from dlt import version
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import DictStrAny


class FileItem(TypedDict, total=False):
Expand Down Expand Up @@ -50,56 +48,75 @@ class FileItem(TypedDict, total=False):
MTIME_DISPATCH["s3a"] = MTIME_DISPATCH["s3"]
MTIME_DISPATCH["abfs"] = MTIME_DISPATCH["az"]

# Map of protocol to a filesystem type
CREDENTIALS_DISPATCH: Dict[str, Callable[[FilesystemConfiguration], DictStrAny]] = {
"s3": lambda config: cast(AwsCredentials, config.credentials).to_s3fs_credentials(),
"adl": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"az": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"gcs": lambda config: cast(GcpCredentials, config.credentials).to_gcs_credentials(),
"gs": lambda config: cast(GcpCredentials, config.credentials).to_gcs_credentials(),
"abfs": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"azure": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
}


def fsspec_filesystem(
protocol: str, credentials: FileSystemCredentials = None
protocol: str,
credentials: FileSystemCredentials = None,
kwargs: Optional[DictStrAny] = None,
client_kwargs: Optional[DictStrAny] = None,
) -> Tuple[AbstractFileSystem, str]:
"""Instantiates an authenticated fsspec `FileSystem` for a given `protocol` and credentials.

Please supply credentials instance corresponding to the protocol. The `protocol` is just the code name of the filesystem ie:
Please supply credentials instance corresponding to the protocol.
The `protocol` is just the code name of the filesystem i.e.:
* s3
* az, abfs
* gcs, gs

also see filesystem_from_config
"""
return fsspec_from_config(FilesystemConfiguration(protocol, credentials))
return fsspec_from_config(
FilesystemConfiguration(protocol, credentials, kwargs=kwargs, client_kwargs=client_kwargs)
)


def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSystem, str]:
"""Instantiates an authenticated fsspec `FileSystem` from `config` argument.

Authenticates following filesystems:
Authenticates the following filesystems:
* s3
* az, abfs
* gcs, gs

Additional fsspec filesystem arguments and client arguments are gathered from the
FilesystemConfiguration object and passed to the `url_to_fs` factory.

All other filesystems are not authenticated

Returns: (fsspec filesystem, normalized url)

"""
proto = config.protocol
fs_kwargs: DictStrAny = {}
if proto == "s3":
fs_kwargs.update(cast(AwsCredentials, config.credentials).to_s3fs_credentials())
elif proto in ["az", "abfs", "adl", "azure"]:
fs_kwargs.update(cast(AzureCredentials, config.credentials).to_adlfs_credentials())
elif proto in ["gcs", "gs"]:
assert isinstance(config.credentials, GcpCredentials)
# Default credentials are handled by gcsfs
if (
isinstance(config.credentials, CredentialsWithDefault)
and config.credentials.has_default_credentials()
):
fs_kwargs["token"] = None
else:
fs_kwargs["token"] = dict(config.credentials)
fs_kwargs["project"] = config.credentials.project_id
fs_kwargs: DictStrAny = {"use_listings_cache": False}
credentials = CREDENTIALS_DISPATCH.get(proto, lambda _: {})(config)

if config.kwargs is not None:
fs_kwargs.update(config.kwargs)
if config.client_kwargs is not None:
fs_kwargs["client_kwargs"] = config.client_kwargs

if "client_kwargs" in fs_kwargs and "client_kwargs" in credentials:
fs_kwargs["client_kwargs"].update(credentials.pop("client_kwargs"))

fs_kwargs.update(credentials)

try:
return url_to_fs(config.bucket_url, use_listings_cache=False, **fs_kwargs) # type: ignore[no-any-return]
return url_to_fs(config.bucket_url, **fs_kwargs) # type: ignore
except ModuleNotFoundError as e:
raise MissingDependencyException("filesystem", [f"{version.DLT_PKG_NAME}[{proto}]"]) from e
raise MissingDependencyException(
"filesystem", [f"{version.DLT_PKG_NAME}[{config.protocol}]"]
) from e


class FileItemDict(DictStrAny):
Expand All @@ -122,7 +139,7 @@ def __init__(

@property
def fsspec(self) -> AbstractFileSystem:
"""The filesystem client based on the given credentials.
"""The filesystem client is based on the given credentials.

Returns:
AbstractFileSystem: The fsspec client.
Expand Down Expand Up @@ -150,16 +167,15 @@ def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
if "file_content" in self:
bytes_io = BytesIO(self["file_content"])

if "t" in mode:
text_kwargs = {
k: kwargs.pop(k) for k in ["encoding", "errors", "newline"] if k in kwargs
}
return io.TextIOWrapper(
bytes_io,
**text_kwargs,
)
else:
if "t" not in mode:
return bytes_io
text_kwargs = {
k: kwargs.pop(k) for k in ["encoding", "errors", "newline"] if k in kwargs
}
return io.TextIOWrapper(
bytes_io,
**text_kwargs,
)
else:
opened_file = self.fsspec.open(self["file_url"], mode=mode, **kwargs)
return opened_file
Expand All @@ -171,19 +187,17 @@ def read_bytes(self) -> bytes:
bytes: The file content.
"""
content: bytes
# same as open, if the user has already extracted the content, we use it.
if "file_content" in self and self["file_content"] is not None:
content = self["file_content"]
else:
content = self.fsspec.read_bytes(self["file_url"])
return content
return ( # type: ignore
self["file_content"]
if "file_content" in self and self["file_content"] is not None
else self.fsspec.read_bytes(self["file_url"])
)


def guess_mime_type(file_name: str) -> str:
mime_type = mimetypes.guess_type(posixpath.basename(file_name), strict=False)[0]
if not mime_type:
mime_type = "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream")
return mime_type
return mimetypes.guess_type(posixpath.basename(file_name), strict=False)[
0
] or "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream")


def glob_files(
Expand All @@ -202,7 +216,7 @@ def glob_files(
import os

bucket_url_parsed = urlparse(bucket_url)
# if this is file path without scheme
# if this is a file path without a scheme
if not bucket_url_parsed.scheme or (os.path.isabs(bucket_url) and "\\" in bucket_url):
# this is a file so create a proper file url
bucket_url = pathlib.Path(bucket_url).absolute().as_uri()
Expand All @@ -224,9 +238,9 @@ def glob_files(
continue
# make that absolute path on a file://
if bucket_url_parsed.scheme == "file" and not file.startswith("/"):
file = "/" + file
file = f"/{file}"
file_name = posixpath.relpath(file, bucket_path)
file_url = bucket_url_parsed.scheme + "://" + file
file_url = f"{bucket_url_parsed.scheme}://{file}"
yield FileItem(
file_name=file_name,
file_url=file_url,
Expand Down
6 changes: 3 additions & 3 deletions docs/website/docs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ def _initial_providers():

def pytest_configure(config):
# push sentry to ci
os.environ[
"RUNTIME__SENTRY_DSN"
] = "https://[email protected]/4504819859914752"
os.environ["RUNTIME__SENTRY_DSN"] = (
"https://[email protected]/4504819859914752"
)
Loading