diff --git a/.github/workflows/test_local_destinations.yml b/.github/workflows/test_local_destinations.yml index 50d973bad4..6c538d1968 100644 --- a/.github/workflows/test_local_destinations.yml +++ b/.github/workflows/test_local_destinations.yml @@ -86,7 +86,7 @@ jobs: - name: Install dependencies run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate - - run: poetry run pytest tests/load tests/cli + - run: poetry run pytest tests/load && poetry run pytest tests/cli name: Run tests Linux env: DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data diff --git a/dlt/common/configuration/inject.py b/dlt/common/configuration/inject.py index c623c400e0..1880727a0f 100644 --- a/dlt/common/configuration/inject.py +++ b/dlt/common/configuration/inject.py @@ -176,6 +176,7 @@ def _wrap(*args: Any, **kwargs: Any) -> Any: def last_config(**kwargs: Any) -> Any: + """Get configuration instance used to inject function arguments """ return kwargs[_LAST_DLT_CONFIG] diff --git a/dlt/common/configuration/specs/azure_credentials.py b/dlt/common/configuration/specs/azure_credentials.py index 0843b4eea8..49393a6343 100644 --- a/dlt/common/configuration/specs/azure_credentials.py +++ b/dlt/common/configuration/specs/azure_credentials.py @@ -45,6 +45,7 @@ def on_partial(self) -> None: if not self.is_partial(): self.resolve() + @configspec class AzureCredentials(AzureCredentialsWithoutDefaults, CredentialsWithDefault): def on_partial(self) -> None: diff --git a/dlt/common/configuration/specs/base_configuration.py b/dlt/common/configuration/specs/base_configuration.py index d4af2e8555..59168024bf 100644 --- a/dlt/common/configuration/specs/base_configuration.py +++ b/dlt/common/configuration/specs/base_configuration.py @@ -113,7 +113,7 @@ def wrap(cls: Type[TAnyClass]) -> Type[TAnyClass]: cls = type(cls.__name__, (cls, _F_BaseConfiguration), fields) # get all annotations without corresponding attributes and set them to None for ann in cls.__annotations__: - if not hasattr(cls, ann) and not ann.startswith(("__", "_abc_impl")): + if not hasattr(cls, ann) and not ann.startswith(("__", "_abc_")): setattr(cls, ann, None) # get all attributes without corresponding annotations for att_name, att_value in list(cls.__dict__.items()): @@ -129,7 +129,7 @@ def wrap(cls: Type[TAnyClass]) -> Type[TAnyClass]: except NameError: # Dealing with BaseConfiguration itself before it is defined continue - if not att_name.startswith(("__", "_abc_impl")) and not isinstance(att_value, (staticmethod, classmethod, property)): + if not att_name.startswith(("__", "_abc_")) and not isinstance(att_value, (staticmethod, classmethod, property)): if att_name not in cls.__annotations__: raise ConfigFieldMissingTypeHintException(att_name, cls) hint = cls.__annotations__[att_name] @@ -211,7 +211,7 @@ def _get_resolvable_dataclass_fields(cls) -> Iterator[TDtcField]: """Yields all resolvable dataclass fields in the order they should be resolved""" # Sort dynamic type hint fields last because they depend on other values yield from sorted( - (f for f in cls.__dataclass_fields__.values() if not f.name.startswith("__")), + (f for f in cls.__dataclass_fields__.values() if cls.__is_valid_field(f)), key=lambda f: f.name in cls.__hint_resolvers__ ) @@ -264,7 +264,8 @@ def __delitem__(self, __key: str) -> None: raise KeyError("Configuration fields cannot be deleted") def __iter__(self) -> Iterator[str]: - return filter(lambda k: not k.startswith("__"), self.__dataclass_fields__.__iter__()) + """Iterator or valid key names""" + return map(lambda field: field.name, filter(lambda val: self.__is_valid_field(val), self.__dataclass_fields__.values())) def __len__(self) -> int: return sum(1 for _ in self.__iter__()) @@ -279,7 +280,11 @@ def update(self, other: Any = (), /, **kwds: Any) -> None: # helper functions def __has_attr(self, __key: str) -> bool: - return __key in self.__dataclass_fields__ and not __key.startswith("__") + return __key in self.__dataclass_fields__ and self.__is_valid_field(self.__dataclass_fields__[__key]) + + @staticmethod + def __is_valid_field(field: TDtcField) -> bool: + return not field.name.startswith("__") and field._field_type is dataclasses._FIELD # type: ignore def call_method_in_mro(config, method_name: str) -> None: # python multi-inheritance is cooperative and this would require that all configurations cooperatively diff --git a/dlt/common/storages/__init__.py b/dlt/common/storages/__init__.py index 7b4260e9d5..f318b09ee0 100644 --- a/dlt/common/storages/__init__.py +++ b/dlt/common/storages/__init__.py @@ -5,4 +5,5 @@ from .normalize_storage import NormalizeStorage # noqa: F401 from .load_storage import LoadStorage # noqa: F401 from .data_item_storage import DataItemStorage # noqa: F401 -from .configuration import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration, TSchemaFileFormat # noqa: F401 +from .configuration import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration, TSchemaFileFormat, FilesystemConfiguration # noqa: F401 +from .filesystem import filesystem_from_config, filesystem # noqa: F401 \ No newline at end of file diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index 5fe4ce4a7c..0b88f3b010 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -1,6 +1,11 @@ -from typing import TYPE_CHECKING, Literal, Optional, get_args +from urllib.parse import urlparse +from typing import TYPE_CHECKING, Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union -from dlt.common.configuration.specs import BaseConfiguration, configspec +from dlt.common.configuration.specs import BaseConfiguration, configspec, CredentialsConfiguration +from dlt.common.configuration import configspec, resolve_type +from dlt.common.configuration.specs import GcpServiceAccountCredentials, AwsCredentials, GcpOAuthCredentials, AzureCredentials, AzureCredentialsWithoutDefaults, BaseConfiguration +from dlt.common.utils import digest128 +from dlt.common.configuration.exceptions import ConfigurationValueError TSchemaFileFormat = Literal["json", "yaml"] SchemaFileExtensions = get_args(TSchemaFileFormat) @@ -36,3 +41,76 @@ class LoadStorageConfiguration(BaseConfiguration): if TYPE_CHECKING: def __init__(self, load_volume_path: str = None, delete_completed_jobs: bool = None) -> None: ... + + +FileSystemCredentials = Union[AwsCredentials, GcpServiceAccountCredentials, AzureCredentials, GcpOAuthCredentials] + +@configspec +class FilesystemConfiguration(BaseConfiguration): + """A configuration defining filesystem location and access credentials. + + When configuration is resolved, `bucket_url` is used to extract a protocol and request corresponding credentials class. + * s3 + * gs, gcs + * az, abfs, adl + * file, memory + * gdrive + """ + PROTOCOL_CREDENTIALS: ClassVar[Dict[str, Any]] = { + "gs": Union[GcpServiceAccountCredentials, GcpOAuthCredentials], + "gcs": Union[GcpServiceAccountCredentials, GcpOAuthCredentials], + "gdrive": GcpOAuthCredentials, + "s3": AwsCredentials, + "az": Union[AzureCredentialsWithoutDefaults, AzureCredentials], + "abfs": Union[AzureCredentialsWithoutDefaults, AzureCredentials], + "adl": Union[AzureCredentialsWithoutDefaults, AzureCredentials], + } + + bucket_url: str = None + # should be an union of all possible credentials as found in PROTOCOL_CREDENTIALS + credentials: FileSystemCredentials + + @property + def protocol(self) -> str: + """`bucket_url` protocol""" + url = urlparse(self.bucket_url) + return url.scheme or "file" + + def on_resolved(self) -> None: + url = urlparse(self.bucket_url) + if not url.path and not url.netloc: + raise ConfigurationValueError("File path or netloc missing. Field bucket_url of FilesystemClientConfiguration must contain valid url with a path or host:password component.") + # this is just a path in local file system + if url.path == self.bucket_url: + url = url._replace(scheme="file") + self.bucket_url = url.geturl() + + @resolve_type('credentials') + def resolve_credentials_type(self) -> Type[CredentialsConfiguration]: + # use known credentials or empty credentials for unknown protocol + return self.PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value] + + def fingerprint(self) -> str: + """Returns a fingerprint of bucket_url""" + if self.bucket_url: + return digest128(self.bucket_url) + return "" + + def __str__(self) -> str: + """Return displayable destination location""" + url = urlparse(self.bucket_url) + # do not show passwords + if url.password: + new_netloc = f"{url.username}:****@{url.hostname}" + if url.port: + new_netloc += f":{url.port}" + return url._replace(netloc=new_netloc).geturl() + return self.bucket_url + + if TYPE_CHECKING: + def __init__( + self, + bucket_url: str, + credentials: FileSystemCredentials = None + ) -> None: + ... diff --git a/dlt/common/storages/filesystem.py b/dlt/common/storages/filesystem.py new file mode 100644 index 0000000000..f968c37b5f --- /dev/null +++ b/dlt/common/storages/filesystem.py @@ -0,0 +1,80 @@ +from typing import cast, Tuple, TypedDict, Optional, Union + +from fsspec.core import url_to_fs +from fsspec import AbstractFileSystem + +from dlt.common import pendulum +from dlt.common.exceptions import MissingDependencyException +from dlt.common.time import ensure_pendulum_datetime +from dlt.common.typing import DictStrAny +from dlt.common.configuration.specs import CredentialsWithDefault, GcpCredentials, AwsCredentials, AzureCredentials +from dlt.common.storages.configuration import FileSystemCredentials, FilesystemConfiguration + +from dlt import version + + +class FileItem(TypedDict): + """A DataItem representing a file""" + file_url: str + file_name: str + mime_type: str + modification_date: pendulum.DateTime + file_content: Optional[Union[str, bytes]] + + +# Map of protocol to mtime resolver +# we only need to support a small finite set of protocols +MTIME_DISPATCH = { + "s3": lambda f: ensure_pendulum_datetime(f["LastModified"]), + "adl": lambda f: ensure_pendulum_datetime(f["LastModified"]), + "az": lambda f: ensure_pendulum_datetime(f["last_modified"]), + "gcs": lambda f: ensure_pendulum_datetime(f["updated"]), + "file": lambda f: ensure_pendulum_datetime(f["mtime"]), + "memory": lambda f: ensure_pendulum_datetime(f["created"]), +} +# Support aliases +MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"] +MTIME_DISPATCH["s3a"] = MTIME_DISPATCH["s3"] +MTIME_DISPATCH["abfs"] = MTIME_DISPATCH["az"] + + +def filesystem(protocol: str, credentials: FileSystemCredentials = None) -> Tuple[AbstractFileSystem, str]: + """Instantiates an authenticated fsspec `FileSystem` for a given `protocol` and credentials. + + Please supply credentials instance corresponding to the protocol + """ + return filesystem_from_config(FilesystemConfiguration(protocol, credentials)) + + + +def filesystem_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSystem, str]: + """Instantiates an authenticated fsspec `FileSystem` from `config` argument. + + Authenticates following filesystems: + * s3 + * az, abfs + * gcs, gs + + All other filesystems are not authenticated + + Returns: (fsspec filesystem, normalized url) + + """ + proto = config.protocol + fs_kwargs: DictStrAny = {} + if proto == "s3": + fs_kwargs.update(cast(AwsCredentials, config.credentials).to_s3fs_credentials()) + elif proto in ["az", "abfs", "adl", "azure"]: + fs_kwargs.update(cast(AzureCredentials, config.credentials).to_adlfs_credentials()) + elif proto in ['gcs', 'gs']: + assert isinstance(config.credentials, GcpCredentials) + # Default credentials are handled by gcsfs + if isinstance(config.credentials, CredentialsWithDefault) and config.credentials.has_default_credentials(): + fs_kwargs['token'] = None + else: + fs_kwargs['token'] = dict(config.credentials) + fs_kwargs['project'] = config.credentials.project_id + try: + return url_to_fs(config.bucket_url, use_listings_cache=False, **fs_kwargs) # type: ignore[no-any-return] + except ModuleNotFoundError as e: + raise MissingDependencyException("filesystem", [f"{version.DLT_PKG_NAME}[{proto}]"]) from e diff --git a/dlt/common/storages/transactional_file.py b/dlt/common/storages/transactional_file.py index 2174f32453..cae50d1951 100644 --- a/dlt/common/storages/transactional_file.py +++ b/dlt/common/storages/transactional_file.py @@ -12,11 +12,12 @@ from pathlib import Path import posixpath from contextlib import contextmanager -from dlt.common.pendulum import pendulum, timedelta from threading import Timer - import fsspec +from dlt.common.pendulum import pendulum, timedelta +from dlt.common.storages.filesystem import MTIME_DISPATCH + def lock_id(k: int = 4) -> str: """Generate a time based random id. @@ -44,19 +45,6 @@ def run(self) -> None: class TransactionalFile: """A transaction handler which wraps a file path.""" - # Map of protocol to mtime resolver - # we only need to support a small finite set of protocols - _mtime_dispatch = { - "s3": lambda f: pendulum.parser.parse(f["LastModified"]), - "adl": lambda f: pendulum.parser.parse(f["LastModified"]), - "gcs": lambda f: pendulum.parser.parse(f["updated"]), - "file": lambda f: pendulum.from_timestamp(f["mtime"]), - } - # Support aliases - _mtime_dispatch["gs"] = _mtime_dispatch["gcs"] - _mtime_dispatch["s3a"] = _mtime_dispatch["s3"] - _mtime_dispatch["azure"] = _mtime_dispatch["adl"] - POLLING_INTERVAL = 0.5 LOCK_TTL_SECONDS = 30.0 @@ -68,7 +56,7 @@ def __init__(self, path: str, fs: fsspec.AbstractFileSystem) -> None: fs: The fsspec file system. """ proto = fs.protocol[0] if isinstance(fs.protocol, (list, tuple)) else fs.protocol - self.extract_mtime = self._mtime_dispatch.get(proto, self._mtime_dispatch["file"]) + self.extract_mtime = MTIME_DISPATCH.get(proto, MTIME_DISPATCH["file"]) parsed_path = Path(path) if not parsed_path.is_absolute(): diff --git a/dlt/destinations/filesystem/__init__.py b/dlt/destinations/filesystem/__init__.py index 2b0f7bf6a2..3dc6c62480 100644 --- a/dlt/destinations/filesystem/__init__.py +++ b/dlt/destinations/filesystem/__init__.py @@ -6,11 +6,11 @@ from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import JobClientBase, DestinationClientDwhWithStagingConfiguration -from dlt.destinations.filesystem.configuration import FilesystemClientConfiguration +from dlt.destinations.filesystem.configuration import FilesystemDestinationClientConfiguration -@with_config(spec=FilesystemClientConfiguration, sections=(known_sections.DESTINATION, "filesystem",)) -def _configure(config: FilesystemClientConfiguration = config.value) -> FilesystemClientConfiguration: +@with_config(spec=FilesystemDestinationClientConfiguration, sections=(known_sections.DESTINATION, "filesystem",)) +def _configure(config: FilesystemDestinationClientConfiguration = config.value) -> FilesystemDestinationClientConfiguration: return config @@ -25,5 +25,5 @@ def client(schema: Schema, initial_config: DestinationClientDwhWithStagingConfig return FilesystemClient(schema, _configure(initial_config)) # type: ignore -def spec() -> Type[FilesystemClientConfiguration]: - return FilesystemClientConfiguration +def spec() -> Type[FilesystemDestinationClientConfiguration]: + return FilesystemDestinationClientConfiguration diff --git a/dlt/destinations/filesystem/configuration.py b/dlt/destinations/filesystem/configuration.py index 7640e2f5dc..174dfafb1a 100644 --- a/dlt/destinations/filesystem/configuration.py +++ b/dlt/destinations/filesystem/configuration.py @@ -1,71 +1,26 @@ from urllib.parse import urlparse -from typing import Final, Type, Optional, Union, TYPE_CHECKING +from typing import Final, Type, Optional, Any, TYPE_CHECKING from dlt.common.configuration import configspec, resolve_type from dlt.common.destination.reference import CredentialsConfiguration, DestinationClientStagingConfiguration -from dlt.common.configuration.specs import GcpServiceAccountCredentials, AwsCredentials, GcpOAuthCredentials, AzureCredentials, AzureCredentialsWithoutDefaults -from dlt.common.utils import digest128 -from dlt.common.configuration.exceptions import ConfigurationValueError - - -PROTOCOL_CREDENTIALS = { - "gs": Union[GcpServiceAccountCredentials, GcpOAuthCredentials], - "gcs": Union[GcpServiceAccountCredentials, GcpOAuthCredentials], - "gdrive": GcpOAuthCredentials, - "s3": AwsCredentials, - "az": Union[AzureCredentialsWithoutDefaults, AzureCredentials], - "abfs": Union[AzureCredentialsWithoutDefaults, AzureCredentials], -} +from dlt.common.storages import FilesystemConfiguration @configspec -class FilesystemClientConfiguration(DestinationClientStagingConfiguration): +class FilesystemDestinationClientConfiguration(FilesystemConfiguration, DestinationClientStagingConfiguration): # type: ignore[misc] destination_name: Final[str] = "filesystem" # type: ignore - # should be an union of all possible credentials as found in PROTOCOL_CREDENTIALS - credentials: Union[AwsCredentials, GcpServiceAccountCredentials, AzureCredentials, GcpOAuthCredentials] - - @property - def protocol(self) -> str: - url = urlparse(self.bucket_url) - return url.scheme or "file" - - def on_resolved(self) -> None: - url = urlparse(self.bucket_url) - if not url.path and not url.netloc: - raise ConfigurationValueError("File path or netloc missing. Field bucket_url of FilesystemClientConfiguration must contain valid url with a path or host:password component.") - # this is just a path in local file system - if url.path == self.bucket_url: - url = url._replace(scheme="file") - self.bucket_url = url.geturl() @resolve_type('credentials') def resolve_credentials_type(self) -> Type[CredentialsConfiguration]: # use known credentials or empty credentials for unknown protocol - return PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value] - - def fingerprint(self) -> str: - """Returns a fingerprint of bucket_url""" - if self.bucket_url: - return digest128(self.bucket_url) - return "" - - def __str__(self) -> str: - """Return displayable destination location""" - url = urlparse(self.bucket_url) - # do not show passwords - if url.password: - new_netloc = f"{url.username}:****@{url.hostname}" - if url.port: - new_netloc += f":{url.port}" - return url._replace(netloc=new_netloc).geturl() - return self.bucket_url + return self.PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration] # type: ignore[return-value] if TYPE_CHECKING: def __init__( self, destination_name: str = None, - credentials: Optional[GcpServiceAccountCredentials] = None, + credentials: Optional[Any] = None, dataset_name: str = None, default_schema_name: Optional[str] = None, bucket_url: str = None, diff --git a/dlt/destinations/filesystem/filesystem.py b/dlt/destinations/filesystem/filesystem.py index e214b4b533..3691c6417b 100644 --- a/dlt/destinations/filesystem/filesystem.py +++ b/dlt/destinations/filesystem/filesystem.py @@ -1,19 +1,18 @@ import posixpath import os from types import TracebackType -from typing import ClassVar, List, Optional, Type, Iterable, Set +from typing import ClassVar, List, Type, Iterable, Set from fsspec import AbstractFileSystem from dlt.common import logger from dlt.common.schema import Schema, TSchemaTables, TTableSchema -from dlt.common.storages import FileStorage +from dlt.common.storages import FileStorage, LoadStorage, filesystem_from_config from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import NewLoadJob, TLoadJobState, LoadJob, JobClientBase, FollowupJob + from dlt.destinations.job_impl import EmptyLoadJob from dlt.destinations.filesystem import capabilities -from dlt.destinations.filesystem.configuration import FilesystemClientConfiguration -from dlt.destinations.filesystem.filesystem_client import client_from_config -from dlt.common.storages import LoadStorage +from dlt.destinations.filesystem.configuration import FilesystemDestinationClientConfiguration from dlt.destinations.job_impl import NewReferenceJob from dlt.destinations import path_utils @@ -24,7 +23,7 @@ def __init__( local_path: str, dataset_path: str, *, - config: FilesystemClientConfiguration, + config: FilesystemDestinationClientConfiguration, schema_name: str, load_id: str ) -> None: @@ -34,7 +33,7 @@ def __init__( self.destination_file_name = LoadFilesystemJob.make_destination_filename(config.layout, file_name, schema_name, load_id) super().__init__(file_name) - fs_client, _ = client_from_config(config) + fs_client, _ = filesystem_from_config(config) self.destination_file_name = LoadFilesystemJob.make_destination_filename(config.layout, file_name, schema_name, load_id) item = self.make_remote_path() logger.info("PUT file {item}") @@ -76,10 +75,10 @@ class FilesystemClient(JobClientBase): fs_client: AbstractFileSystem fs_path: str - def __init__(self, schema: Schema, config: FilesystemClientConfiguration) -> None: + def __init__(self, schema: Schema, config: FilesystemDestinationClientConfiguration) -> None: super().__init__(schema, config) - self.fs_client, self.fs_path = client_from_config(config) - self.config: FilesystemClientConfiguration = config + self.fs_client, self.fs_path = filesystem_from_config(config) + self.config: FilesystemDestinationClientConfiguration = config # verify files layout. we need {table_name} and only allow {schema_name} before it, otherwise tables # cannot be replaced and we cannot initialize folders consistently self.table_prefix_layout = path_utils.get_table_prefix_layout(config.layout) diff --git a/dlt/destinations/filesystem/filesystem_client.py b/dlt/destinations/filesystem/filesystem_client.py deleted file mode 100644 index f62e291c04..0000000000 --- a/dlt/destinations/filesystem/filesystem_client.py +++ /dev/null @@ -1,33 +0,0 @@ -from typing import cast, Tuple - -from fsspec.core import url_to_fs -from fsspec import AbstractFileSystem - -from dlt.common.exceptions import MissingDependencyException -from dlt.common.typing import DictStrAny -from dlt.common.configuration.specs import CredentialsWithDefault, GcpCredentials, AwsCredentials, AzureCredentials - -from dlt.destinations.filesystem.configuration import FilesystemClientConfiguration - -from dlt import version - - -def client_from_config(config: FilesystemClientConfiguration) -> Tuple[AbstractFileSystem, str]: - proto = config.protocol - fs_kwargs: DictStrAny = {} - if proto == "s3": - fs_kwargs.update(cast(AwsCredentials, config.credentials).to_s3fs_credentials()) - elif proto in ["az", "abfs"]: - fs_kwargs.update(cast(AzureCredentials, config.credentials).to_adlfs_credentials()) - elif proto in ['gcs', 'gs']: - assert isinstance(config.credentials, GcpCredentials) - # Default credentials are handled by gcsfs - if isinstance(config.credentials, CredentialsWithDefault) and config.credentials.has_default_credentials(): - fs_kwargs['token'] = None - else: - fs_kwargs['token'] = dict(config.credentials) - fs_kwargs['project'] = config.credentials.project_id - try: - return url_to_fs(config.bucket_url, use_listings_cache=False, **fs_kwargs) # type: ignore[no-any-return] - except ModuleNotFoundError as e: - raise MissingDependencyException("filesystem destination", [f"{version.DLT_PKG_NAME}[{proto}]"]) from e diff --git a/tests/common/storages/test_transactional_file.py b/tests/common/storages/test_transactional_file.py index 9d3d735b9c..420b1e84ae 100644 --- a/tests/common/storages/test_transactional_file.py +++ b/tests/common/storages/test_transactional_file.py @@ -6,6 +6,7 @@ import fsspec import pytest +from dlt.common.storages import filesystem from dlt.common.storages.transactional_file import TransactionalFile from tests.utils import skipifwindows @@ -13,7 +14,7 @@ @pytest.fixture(scope="session") def fs() -> fsspec.AbstractFileSystem: - return fsspec.filesystem("file") + return filesystem("file")[0] @pytest.fixture diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index bbcd011338..247558fa24 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -6,7 +6,7 @@ from dlt.common.utils import digest128, uniq_id from dlt.common.storages import LoadStorage, FileStorage -from dlt.destinations.filesystem.filesystem import LoadFilesystemJob, FilesystemClientConfiguration +from dlt.destinations.filesystem.filesystem import LoadFilesystemJob, FilesystemDestinationClientConfiguration from tests.load.filesystem.utils import perform_load from tests.utils import clean_test_storage, init_test_logging @@ -35,9 +35,9 @@ def logger_autouse() -> None: ) -def test_filesystem_configuration() -> None: - assert FilesystemClientConfiguration().fingerprint() == "" - assert FilesystemClientConfiguration(bucket_url="s3://cool").fingerprint() == digest128("s3://cool") +def test_filesystem_destination_configuration() -> None: + assert FilesystemDestinationClientConfiguration().fingerprint() == "" + assert FilesystemDestinationClientConfiguration(bucket_url="s3://cool").fingerprint() == digest128("s3://cool") @pytest.mark.parametrize('write_disposition', ('replace', 'append', 'merge')) diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py new file mode 100644 index 0000000000..5d39e91bc2 --- /dev/null +++ b/tests/load/filesystem/test_filesystem_common.py @@ -0,0 +1,49 @@ +import os +import posixpath +from typing import Union +import pytest +from dlt.common.configuration.inject import with_config + +from dlt.common import pendulum +from dlt.common.configuration.specs import AzureCredentials, AzureCredentialsWithoutDefaults +from dlt.common.storages import filesystem_from_config, FilesystemConfiguration +from dlt.common.storages.filesystem import MTIME_DISPATCH +from dlt.common.utils import uniq_id + +from tests.utils import preserve_environ, autouse_test_storage + + +@with_config(spec=FilesystemConfiguration, sections=("destination", "filesystem")) +def get_config(config: FilesystemConfiguration = None) -> FilesystemConfiguration: + return config + + +def test_filesystem_configuration() -> None: + config = FilesystemConfiguration(bucket_url="az://root") + assert config.protocol == "az" + # print(config.resolve_credentials_type()) + assert config.resolve_credentials_type() == Union[AzureCredentialsWithoutDefaults, AzureCredentials] + # make sure that only bucket_url and credentials are there + assert dict(config) == {'bucket_url': 'az://root', 'credentials': None} + + +def test_filesystem_instance(all_buckets_env: str) -> None: + bucket_url = os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] + config = get_config() + assert bucket_url.startswith(config.protocol) + filesystem, url = filesystem_from_config(config) + if config.protocol != "file": + assert bucket_url.endswith(url) + # do a few file ops + now = pendulum.now() + filename = "filesystem_common_" + uniq_id() + file_url = posixpath.join(url, filename) + try: + filesystem.pipe(file_url, b"test bytes") + files = filesystem.ls(url, detail=True) + details = next(d for d in files if d["name"] == file_url) + # print(details) + # print(MTIME_DISPATCH[config.protocol](details)) + assert (MTIME_DISPATCH[config.protocol](details) - now).seconds < 60 + finally: + filesystem.rm(file_url) \ No newline at end of file