Skip to content

Commit

Permalink
Support aws config endpoint_url with fsspec
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Oct 19, 2023
1 parent b4365dd commit 925ce13
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 5 deletions.
4 changes: 3 additions & 1 deletion dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ class AwsCredentialsWithoutDefaults(CredentialsConfiguration):
aws_session_token: Optional[TSecretStrValue] = None
profile_name: Optional[str] = None
region_name: Optional[str] = None
endpoint_url: Optional[str] = None

def to_s3fs_credentials(self) -> Dict[str, Optional[str]]:
"""Dict of keyword arguments that can be passed to s3fs"""
return dict(
key=self.aws_access_key_id,
secret=self.aws_secret_access_key,
token=self.aws_session_token,
profile=self.profile_name
profile=self.profile_name,
endpoint_url=self.endpoint_url,
)

def to_native_representation(self) -> Dict[str, Optional[str]]:
Expand Down
14 changes: 14 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ You need to create a S3 bucket and a user who can access that bucket. `dlt` is n
5. To grab the access and secret key for the user. Go to IAM > Users and in the “Security Credentials”, click on “Create Access Key”, and preferably select “Command Line Interface” and create the access key.
6. Grab the “Access Key” and “Secret Access Key” created that are to be used in "secrets.toml".

##### Using S3 compatible storage

To use an S3 compatible storage other than AWS S3 like [MinIO](https://min.io/) or [Cloudflare R2](https://www.cloudflare.com/en-ca/developer-platform/r2/) you may supply an `endpoint_url` in the config. This should be set along with aws credentials:

```toml
[destination.filesystem]
bucket_url = "s3://[your_bucket_name]" # replace with your bucket name,

[destination.filesystem.credentials]
aws_access_key_id = "please set me up!" # copy the access key here
aws_secret_access_key = "please set me up!" # copy the secret access key here
endpoint_url = "https://<account_id>.r2.cloudflarestorage.com" # copy your endpoint URL here
```

#### Google Storage
Run `pip install dlt[gs]` which will install `gcfs` package.

Expand Down
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,7 @@ ignore_missing_imports=true
ignore_missing_imports=true

[mypy-utils.*]
ignore_missing_imports=true

[mypy-s3fs.*]
ignore_missing_imports=true
20 changes: 17 additions & 3 deletions tests/load/filesystem/test_aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def test_aws_credentials_resolved_from_default(environment: Dict[str, str]) -> N
# assert config.profile_name == "default"


@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured')
def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None:
set_aws_credentials_env(environment)

Expand All @@ -63,7 +62,6 @@ def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None:
c.parse_native_representation("boto3")


@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured')
def test_aws_credentials_from_boto3(environment: Dict[str, str]) -> None:
try:
import boto3
Expand All @@ -89,7 +87,6 @@ def test_aws_credentials_from_boto3(environment: Dict[str, str]) -> None:
assert c.aws_access_key_id == "fake_access_key"


@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured')
def test_aws_credentials_for_profile(environment: Dict[str, str]) -> None:
import botocore.exceptions

Expand All @@ -107,6 +104,23 @@ def test_aws_credentials_for_profile(environment: Dict[str, str]) -> None:
pytest.skip("This test requires dlt-ci-user aws profile to be present")


def test_aws_credentials_with_endpoint_url(environment: Dict[str, str]) -> None:
set_aws_credentials_env(environment)
environment['CREDENTIALS__ENDPOINT_URL'] = 'https://123.r2.cloudflarestorage.com'

config = resolve_configuration(AwsCredentials())

assert config.endpoint_url == 'https://123.r2.cloudflarestorage.com'

assert config.to_s3fs_credentials() == {
"key": "fake_access_key",
"secret": "fake_secret_key",
"token": "fake_session_token",
"profile": None,
"endpoint_url": "https://123.r2.cloudflarestorage.com",
}


def set_aws_credentials_env(environment: Dict[str, str]) -> None:
environment['AWS_ACCESS_KEY_ID'] = 'fake_access_key'
environment['AWS_SECRET_ACCESS_KEY'] = 'fake_secret_key'
Expand Down
26 changes: 25 additions & 1 deletion tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import posixpath
from typing import Union
from typing import Union, Dict
import pytest
from dlt.common.configuration.inject import with_config

Expand All @@ -11,7 +11,9 @@
from dlt.common.utils import uniq_id

from tests.utils import preserve_environ, autouse_test_storage
from tests.common.configuration.utils import environment
from tests.common.storages.utils import assert_sample_files
from tests.load.utils import ALL_FILESYSTEM_DRIVERS


@with_config(spec=FilesystemConfiguration, sections=("destination", "filesystem"))
Expand Down Expand Up @@ -64,3 +66,25 @@ def test_filesystem_dict(all_buckets_env: str, load_content: bool) -> None:
assert_sample_files(all_file_items, filesystem, config, load_content)
except NotImplementedError as ex:
pytest.skip("Skipping due to " + str(ex))


@pytest.mark.skipif("s3" not in ALL_FILESYSTEM_DRIVERS, reason="s3 destination not configured")
def test_filesystem_instance_from_s3_endpoint(environment: Dict[str, str]) -> None:
"""Test that fsspec instance is correctly configured when using endpoint URL.
E.g. when using an S3 compatible service such as Cloudflare R2
"""
from s3fs import S3FileSystem
environment['DESTINATION__FILESYSTEM__BUCKET_URL'] = 's3://dummy-bucket'
environment['CREDENTIALS__ENDPOINT_URL'] = 'https://fake-s3-endpoint.example.com'
environment['CREDENTIALS__AWS_ACCESS_KEY_ID'] = 'fake-access-key'
environment['CREDENTIALS__AWS_SECRET_ACCESS_KEY'] = 'fake-secret-key'

config = get_config()

filesystem, bucket_name = fsspec_from_config(config)

assert isinstance(filesystem, S3FileSystem)
assert filesystem.endpoint_url == 'https://fake-s3-endpoint.example.com'
assert bucket_name == 'dummy-bucket'
assert filesystem.key == 'fake-access-key'
assert filesystem.secret == 'fake-secret-key'

0 comments on commit 925ce13

Please sign in to comment.