Skip to content

Commit

Permalink
Refactor storage and test configurations, update fsspec_filesystem (#814
Browse files Browse the repository at this point in the history
)

Refactored common storage configuration and revised imports in line with PEP rules. Also updated the string formatting in filesystem related tests and moved `test_s3_wrong_certificate` method to `test_filesystem_client.py`. There’s an addition of `kwargs` and `client_kwargs` parameters to `fsspec_filesystem` function allowing for a more configurable function.

Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Dec 22, 2023
1 parent 07429f6 commit 527abdd
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 32 deletions.
15 changes: 11 additions & 4 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
from urllib.parse import urlparse
from typing import TYPE_CHECKING, Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union
from urllib.parse import urlparse

from dlt.common.configuration.specs import BaseConfiguration, configspec, CredentialsConfiguration
from dlt.common.configuration import configspec, resolve_type
from dlt.common.configuration.exceptions import ConfigurationValueError
from dlt.common.configuration.specs import CredentialsConfiguration
from dlt.common.configuration.specs import (
GcpServiceAccountCredentials,
AwsCredentials,
Expand All @@ -14,7 +15,6 @@
)
from dlt.common.typing import DictStrAny
from dlt.common.utils import digest128
from dlt.common.configuration.exceptions import ConfigurationValueError


TSchemaFileFormat = Literal["json", "yaml"]
Expand Down Expand Up @@ -94,8 +94,10 @@ class FilesystemConfiguration(BaseConfiguration):
}

bucket_url: str = None

# should be a union of all possible credentials as found in PROTOCOL_CREDENTIALS
credentials: FileSystemCredentials

kwargs: Optional[DictStrAny] = None
client_kwargs: Optional[DictStrAny] = None

Expand Down Expand Up @@ -149,4 +151,9 @@ def __init__(
credentials: FileSystemCredentials = None,
kwargs: Optional[DictStrAny] = None,
client_kwargs: Optional[DictStrAny] = None,
) -> None: ...
) -> None:
self.bucket_url = bucket_url
self.credentials = credentials
self.kwargs = kwargs
self.client_kwargs = client_kwargs
...
11 changes: 8 additions & 3 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ class FileItem(TypedDict, total=False):


def fsspec_filesystem(
protocol: str, credentials: FileSystemCredentials = None
protocol: str,
credentials: FileSystemCredentials = None,
kwargs: Optional[DictStrAny] = None,
client_kwargs: Optional[DictStrAny] = None,
) -> Tuple[AbstractFileSystem, str]:
"""Instantiates an authenticated fsspec `FileSystem` for a given `protocol` and credentials.
Expand All @@ -64,7 +67,9 @@ def fsspec_filesystem(
also see filesystem_from_config
"""
return fsspec_from_config(FilesystemConfiguration(protocol, credentials))
return fsspec_from_config(
FilesystemConfiguration(protocol, credentials, kwargs=kwargs, client_kwargs=client_kwargs)
)


def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSystem, str]:
Expand Down Expand Up @@ -111,7 +116,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
fs_kwargs["token"] = dict(config.credentials)
fs_kwargs["project"] = config.credentials.project_id
try:
return url_to_fs(config.bucket_url, **fs_kwargs)
return url_to_fs(config.bucket_url, **fs_kwargs) # type: ignore
except ModuleNotFoundError as e:
raise MissingDependencyException("filesystem", [f"{version.DLT_PKG_NAME}[{proto}]"]) from e

Expand Down
36 changes: 19 additions & 17 deletions tests/load/filesystem/test_filesystem_client.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import posixpath
import os
import posixpath
from typing import Dict, List, Any, Union
from typing_extensions import LiteralString

import pytest

from dlt.common.utils import digest128, uniq_id
from dlt.common.storages import FileStorage, ParsedLoadJobFileName

from dlt.common.utils import digest128, uniq_id
from dlt.destinations.impl.filesystem.filesystem import (
LoadFilesystemJob,
FilesystemDestinationClientConfiguration,
)

from tests.load.filesystem.utils import perform_load
from tests.utils import clean_test_storage, init_test_logging
from tests.utils import preserve_environ, autouse_test_storage


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -55,7 +54,7 @@ def test_successful_load(write_disposition: str, layout: str, default_buckets_en
else:
os.environ.pop("DESTINATION__FILESYSTEM__LAYOUT", None)

dataset_name = "test_" + uniq_id()
dataset_name = f"test_{uniq_id()}"

with perform_load(
dataset_name, NORMALIZED_FILES, write_disposition=write_disposition
Expand Down Expand Up @@ -83,7 +82,7 @@ def test_successful_load(write_disposition: str, layout: str, default_buckets_en
),
)

# File is created with correct filename and path
# File is created with the correct filename and path
assert client.fs_client.isfile(destination_path)


Expand All @@ -93,13 +92,13 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non
os.environ["DESTINATION__FILESYSTEM__LAYOUT"] = layout
else:
os.environ.pop("DESTINATION__FILESYSTEM__LAYOUT", None)
dataset_name = "test_" + uniq_id()
dataset_name = f"test_{uniq_id()}"
# NOTE: context manager will delete the dataset at the end so keep it open until the end
with perform_load(dataset_name, NORMALIZED_FILES, write_disposition="replace") as load_info:
client, _, root_path, load_id1 = load_info
layout = client.config.layout

# this path will be kept after replace
# this path will be kept after replacement
job_2_load_1_path = posixpath.join(
root_path,
LoadFilesystemJob.make_destination_filename(
Expand All @@ -120,14 +119,13 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non
),
)

# First file from load1 remains, second file is replaced by load2
# The first file from load1 remains, the second file is replaced by load2
# assert that only these two files are in the destination folder
paths = []
paths: List[Union[LiteralString, str, bytes, None, Any]] = []
for basedir, _dirs, files in client.fs_client.walk(
client.dataset_path, detail=False, refresh=True
):
for f in files:
paths.append(posixpath.join(basedir, f))
paths.extend(posixpath.join(basedir, f) for f in files)
ls = set(paths)
assert ls == {job_2_load_1_path, job_1_load_2_path}

Expand All @@ -139,7 +137,7 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None
os.environ["DESTINATION__FILESYSTEM__LAYOUT"] = layout
else:
os.environ.pop("DESTINATION__FILESYSTEM__LAYOUT", None)
dataset_name = "test_" + uniq_id()
dataset_name = f"test_{uniq_id()}"
# NOTE: context manager will delete the dataset at the end so keep it open until the end
with perform_load(dataset_name, NORMALIZED_FILES, write_disposition="append") as load_info:
client, jobs1, root_path, load_id1 = load_info
Expand All @@ -159,10 +157,14 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None
]
expected_files = sorted([posixpath.join(root_path, fn) for fn in expected_files])

paths = []
paths: List[Union[LiteralString, str, bytes, None, Any]] = []
for basedir, _dirs, files in client.fs_client.walk(
client.dataset_path, detail=False, refresh=True
):
for f in files:
paths.append(posixpath.join(basedir, f))
paths.extend(posixpath.join(basedir, f) for f in files)
assert list(sorted(paths)) == expected_files


def test_s3_wrong_certificate(environment: Dict[str, str]) -> None:
"""Test that an exception is raised when the wrong certificate is provided."""
pytest.skip("Not implemented yet")
10 changes: 2 additions & 8 deletions tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from dlt.common.storages import fsspec_from_config, FilesystemConfiguration
from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH, glob_files
from dlt.common.utils import uniq_id
from tests.common.configuration.utils import environment
from tests.common.storages.utils import assert_sample_files
from tests.load.utils import ALL_FILESYSTEM_DRIVERS

Expand Down Expand Up @@ -46,7 +45,7 @@ def test_filesystem_instance(all_buckets_env: str) -> None:
assert bucket_url.endswith(url)
# do a few file ops
now = pendulum.now()
filename = "filesystem_common_" + uniq_id()
filename = f"filesystem_common_{uniq_id()}"
file_url = posixpath.join(url, filename)
try:
filesystem.pipe(file_url, b"test bytes")
Expand Down Expand Up @@ -74,7 +73,7 @@ def test_filesystem_dict(default_buckets_env: str, load_content: bool) -> None:
)
assert_sample_files(all_file_items, filesystem, config, load_content)
except NotImplementedError as ex:
pytest.skip("Skipping due to " + str(ex))
pytest.skip(f"Skipping due to {str(ex)}")


@pytest.mark.skipif("s3" not in ALL_FILESYSTEM_DRIVERS, reason="s3 destination not configured")
Expand Down Expand Up @@ -110,8 +109,3 @@ def test_filesystem_configuration_with_additional_arguments() -> None:
"kwargs": {"use_ssl": True},
"client_kwargs": {"verify": "public.crt"},
}


def test_s3_wrong_certificate(environment: Dict[str, str]) -> None:
"""Test that an exception is raised when the wrong certificate is provided."""
pytest.skip("Not implemented yet")

0 comments on commit 527abdd

Please sign in to comment.