From 925ce13b230634354c6be8e29839790063e99c8b Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Wed, 18 Oct 2023 20:10:13 -0400 Subject: [PATCH] Support aws config `endpoint_url` with fsspec --- .../configuration/specs/aws_credentials.py | 4 ++- .../dlt-ecosystem/destinations/filesystem.md | 14 ++++++++++ mypy.ini | 3 +++ tests/load/filesystem/test_aws_credentials.py | 20 +++++++++++--- .../load/filesystem/test_filesystem_common.py | 26 ++++++++++++++++++- 5 files changed, 62 insertions(+), 5 deletions(-) diff --git a/dlt/common/configuration/specs/aws_credentials.py b/dlt/common/configuration/specs/aws_credentials.py index 4e73f8ef1f..6ba661ae88 100644 --- a/dlt/common/configuration/specs/aws_credentials.py +++ b/dlt/common/configuration/specs/aws_credentials.py @@ -15,6 +15,7 @@ class AwsCredentialsWithoutDefaults(CredentialsConfiguration): aws_session_token: Optional[TSecretStrValue] = None profile_name: Optional[str] = None region_name: Optional[str] = None + endpoint_url: Optional[str] = None def to_s3fs_credentials(self) -> Dict[str, Optional[str]]: """Dict of keyword arguments that can be passed to s3fs""" @@ -22,7 +23,8 @@ def to_s3fs_credentials(self) -> Dict[str, Optional[str]]: key=self.aws_access_key_id, secret=self.aws_secret_access_key, token=self.aws_session_token, - profile=self.profile_name + profile=self.profile_name, + endpoint_url=self.endpoint_url, ) def to_native_representation(self) -> Dict[str, Optional[str]]: diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 32bf561a82..41b2714f98 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -90,6 +90,20 @@ You need to create a S3 bucket and a user who can access that bucket. `dlt` is n 5. To grab the access and secret key for the user. Go to IAM > Users and in the “Security Credentials”, click on “Create Access Key”, and preferably select “Command Line Interface” and create the access key. 6. Grab the “Access Key” and “Secret Access Key” created that are to be used in "secrets.toml". +##### Using S3 compatible storage + +To use an S3 compatible storage other than AWS S3 like [MinIO](https://min.io/) or [Cloudflare R2](https://www.cloudflare.com/en-ca/developer-platform/r2/) you may supply an `endpoint_url` in the config. This should be set along with aws credentials: + +```toml +[destination.filesystem] +bucket_url = "s3://[your_bucket_name]" # replace with your bucket name, + +[destination.filesystem.credentials] +aws_access_key_id = "please set me up!" # copy the access key here +aws_secret_access_key = "please set me up!" # copy the secret access key here +endpoint_url = "https://.r2.cloudflarestorage.com" # copy your endpoint URL here +``` + #### Google Storage Run `pip install dlt[gs]` which will install `gcfs` package. diff --git a/mypy.ini b/mypy.ini index 924bc7c48b..58d617a11c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -97,4 +97,7 @@ ignore_missing_imports=true ignore_missing_imports=true [mypy-utils.*] +ignore_missing_imports=true + +[mypy-s3fs.*] ignore_missing_imports=true \ No newline at end of file diff --git a/tests/load/filesystem/test_aws_credentials.py b/tests/load/filesystem/test_aws_credentials.py index 83cf4b2cab..bf9e0bd681 100644 --- a/tests/load/filesystem/test_aws_credentials.py +++ b/tests/load/filesystem/test_aws_credentials.py @@ -37,7 +37,6 @@ def test_aws_credentials_resolved_from_default(environment: Dict[str, str]) -> N # assert config.profile_name == "default" -@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured') def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None: set_aws_credentials_env(environment) @@ -63,7 +62,6 @@ def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None: c.parse_native_representation("boto3") -@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured') def test_aws_credentials_from_boto3(environment: Dict[str, str]) -> None: try: import boto3 @@ -89,7 +87,6 @@ def test_aws_credentials_from_boto3(environment: Dict[str, str]) -> None: assert c.aws_access_key_id == "fake_access_key" -@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured') def test_aws_credentials_for_profile(environment: Dict[str, str]) -> None: import botocore.exceptions @@ -107,6 +104,23 @@ def test_aws_credentials_for_profile(environment: Dict[str, str]) -> None: pytest.skip("This test requires dlt-ci-user aws profile to be present") +def test_aws_credentials_with_endpoint_url(environment: Dict[str, str]) -> None: + set_aws_credentials_env(environment) + environment['CREDENTIALS__ENDPOINT_URL'] = 'https://123.r2.cloudflarestorage.com' + + config = resolve_configuration(AwsCredentials()) + + assert config.endpoint_url == 'https://123.r2.cloudflarestorage.com' + + assert config.to_s3fs_credentials() == { + "key": "fake_access_key", + "secret": "fake_secret_key", + "token": "fake_session_token", + "profile": None, + "endpoint_url": "https://123.r2.cloudflarestorage.com", + } + + def set_aws_credentials_env(environment: Dict[str, str]) -> None: environment['AWS_ACCESS_KEY_ID'] = 'fake_access_key' environment['AWS_SECRET_ACCESS_KEY'] = 'fake_secret_key' diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 6e1fe5c549..7402b78d3c 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -1,6 +1,6 @@ import os import posixpath -from typing import Union +from typing import Union, Dict import pytest from dlt.common.configuration.inject import with_config @@ -11,7 +11,9 @@ from dlt.common.utils import uniq_id from tests.utils import preserve_environ, autouse_test_storage +from tests.common.configuration.utils import environment from tests.common.storages.utils import assert_sample_files +from tests.load.utils import ALL_FILESYSTEM_DRIVERS @with_config(spec=FilesystemConfiguration, sections=("destination", "filesystem")) @@ -64,3 +66,25 @@ def test_filesystem_dict(all_buckets_env: str, load_content: bool) -> None: assert_sample_files(all_file_items, filesystem, config, load_content) except NotImplementedError as ex: pytest.skip("Skipping due to " + str(ex)) + + +@pytest.mark.skipif("s3" not in ALL_FILESYSTEM_DRIVERS, reason="s3 destination not configured") +def test_filesystem_instance_from_s3_endpoint(environment: Dict[str, str]) -> None: + """Test that fsspec instance is correctly configured when using endpoint URL. + E.g. when using an S3 compatible service such as Cloudflare R2 + """ + from s3fs import S3FileSystem + environment['DESTINATION__FILESYSTEM__BUCKET_URL'] = 's3://dummy-bucket' + environment['CREDENTIALS__ENDPOINT_URL'] = 'https://fake-s3-endpoint.example.com' + environment['CREDENTIALS__AWS_ACCESS_KEY_ID'] = 'fake-access-key' + environment['CREDENTIALS__AWS_SECRET_ACCESS_KEY'] = 'fake-secret-key' + + config = get_config() + + filesystem, bucket_name = fsspec_from_config(config) + + assert isinstance(filesystem, S3FileSystem) + assert filesystem.endpoint_url == 'https://fake-s3-endpoint.example.com' + assert bucket_name == 'dummy-bucket' + assert filesystem.key == 'fake-access-key' + assert filesystem.secret == 'fake-secret-key'