Skip to content

Commit

Permalink
Update and extend FilesystemConfiguration along with filesystem desti…
Browse files Browse the repository at this point in the history
…nation factory

Updated FilesystemConfiguration's __init__ method
to accept two additional optional parameters:
kwargs and client_kwargs.
This will provide greater flexibility in configuration,
facilitating custom arguments
that can be passed from the Filesystem configuration.
Tests are adapted to reflect these changes,
and further modifications were made
to implement these new parameters within fsspec_filesystem.py.
  • Loading branch information
Pipboyguy committed Dec 19, 2023
1 parent 55bdd64 commit b902731
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
4 changes: 3 additions & 1 deletion dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ def __str__(self) -> str:


if TYPE_CHECKING:
def __init__(self, bucket_url: str, credentials: FileSystemCredentials = None) -> None: ...
def __init__(self, bucket_url: str, credentials: FileSystemCredentials = None,
kwargs: Optional[DictStrAny] = None,
client_kwargs: Optional[DictStrAny] = None) -> None: ...
14 changes: 13 additions & 1 deletion dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
"""
proto = config.protocol
fs_kwargs: DictStrAny = {}

if config.kwargs is not None:
fs_kwargs.update(config.kwargs)

if config.client_kwargs is not None:
fs_kwargs["client_kwargs"] = config.client_kwargs

fs_kwargs["use_listings_cache"] = False # is this default necessary?

if proto == "s3":
fs_kwargs.update(cast(AwsCredentials, config.credentials).to_s3fs_credentials())
elif proto in ["az", "abfs", "adl", "azure"]:
Expand All @@ -98,7 +107,6 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
fs_kwargs["token"] = dict(config.credentials)
fs_kwargs["project"] = config.credentials.project_id
try:
fs_kwargs["use_listings_cache"] = False
return url_to_fs(config.bucket_url, **fs_kwargs)
except ModuleNotFoundError as e:
raise MissingDependencyException("filesystem", [f"{version.DLT_PKG_NAME}[{proto}]"]) from e
Expand All @@ -107,6 +115,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
class FileItemDict(DictStrAny):
"""A FileItem dictionary with additional methods to get fsspec filesystem, open and read files."""


def __init__(
self,
mapping: FileItem,
Expand All @@ -122,6 +131,7 @@ def __init__(
self.credentials = credentials
super().__init__(**mapping)


@property
def fsspec(self) -> AbstractFileSystem:
"""The filesystem client is based on the given credentials.
Expand All @@ -134,6 +144,7 @@ def fsspec(self) -> AbstractFileSystem:
else:
return fsspec_filesystem(self["file_url"], self.credentials)[0]


def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
"""Open the file as a fsspec file.
Expand Down Expand Up @@ -166,6 +177,7 @@ def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
opened_file = self.fsspec.open(self["file_url"], mode=mode, **kwargs)
return opened_file


def read_bytes(self) -> bytes:
"""Read the file content.
Expand Down
15 changes: 10 additions & 5 deletions tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import os
import posixpath
from typing import Union, Dict

import pytest
from dlt.common.configuration.inject import with_config

from dlt.common import pendulum
from dlt.common.configuration.inject import with_config
from dlt.common.configuration.specs import AzureCredentials, AzureCredentialsWithoutDefaults
from dlt.common.storages import fsspec_from_config, FilesystemConfiguration
from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH, glob_files
from dlt.common.utils import uniq_id

from tests.utils import preserve_environ, autouse_test_storage
from tests.common.configuration.utils import environment
from tests.common.storages.utils import assert_sample_files
from tests.load.utils import ALL_FILESYSTEM_DRIVERS
Expand Down Expand Up @@ -107,8 +106,14 @@ def test_filesystem_instance_from_s3_endpoint_with_additional_arguments(environm
"""Test that fsspec instance is correctly configured when using endpoint URL, along with additional arguments."""
from s3fs import S3FileSystem

config = FilesystemConfiguration(bucket_url="s3://dummy-bucket", kwargs={'use_ssl': True},
client_kwargs={'verify': 'public.crt'})
environment["DESTINATION__FILESYSTEM__BUCKET_URL"] = "s3://dummy-bucket"
environment["CREDENTIALS__ENDPOINT_URL"] = "https://fake-s3-endpoint.example.com"
environment["CREDENTIALS__AWS_ACCESS_KEY_ID"] = "fake-access-key"
environment["CREDENTIALS__AWS_SECRET_ACCESS_KEY"] = "fake-secret-key"

config = get_config(FilesystemConfiguration(bucket_url="az://root", kwargs={'use_ssl': True},
client_kwargs={'verify': 'public.crt'}))

filesystem, bucket_name = fsspec_from_config(config)

assert isinstance(filesystem, S3FileSystem)
Expand Down

0 comments on commit b902731

Please sign in to comment.