Skip to content

Commit

Permalink
Support aws config endpoint_url with fsspec (#701)
Browse files Browse the repository at this point in the history
* Support aws config `endpoint_url` with fsspec

* Test r2 bucket
  • Loading branch information
steinitzu authored Oct 22, 2023
1 parent 1fb60d7 commit e71bd37
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 17 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ env:
DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DESTINATION__FILESYSTEM__CREDENTIALS__AZURE_STORAGE_ACCOUNT_NAME: dltdata
DESTINATION__FILESYSTEM__CREDENTIALS__AZURE_STORAGE_ACCOUNT_KEY: ${{ secrets.AZURE_STORAGE_ACCOUNT_KEY }}

# For s3 compatible tests
TESTS__R2_AWS_ACCESS_KEY_ID: a4950a5003b26f5a71ac97ef3848ff4c
TESTS__R2_AWS_SECRET_ACCESS_KEY: ${{ secrets.CLOUDFLARE_R2_SECRET_ACCESS_KEY }}
TESTS__R2_ENDPOINT_URL: https://9830548e4e4b582989be0811f2a0a97f.r2.cloudflarestorage.com

# DESTINATION__ATHENA__CREDENTIALS__AWS_ACCESS_KEY_ID: AKIAT4QMVMC4J46G55G4
# DESTINATION__ATHENA__CREDENTIALS__AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
# DESTINATION__ATHENA__CREDENTIALS__REGION_NAME: eu-central-1
Expand Down
4 changes: 3 additions & 1 deletion dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ class AwsCredentialsWithoutDefaults(CredentialsConfiguration):
aws_session_token: Optional[TSecretStrValue] = None
profile_name: Optional[str] = None
region_name: Optional[str] = None
endpoint_url: Optional[str] = None

def to_s3fs_credentials(self) -> Dict[str, Optional[str]]:
"""Dict of keyword arguments that can be passed to s3fs"""
return dict(
key=self.aws_access_key_id,
secret=self.aws_secret_access_key,
token=self.aws_session_token,
profile=self.profile_name
profile=self.profile_name,
endpoint_url=self.endpoint_url,
)

def to_native_representation(self) -> Dict[str, Optional[str]]:
Expand Down
14 changes: 14 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ You need to create a S3 bucket and a user who can access that bucket. `dlt` is n
5. To grab the access and secret key for the user. Go to IAM > Users and in the “Security Credentials”, click on “Create Access Key”, and preferably select “Command Line Interface” and create the access key.
6. Grab the “Access Key” and “Secret Access Key” created that are to be used in "secrets.toml".

##### Using S3 compatible storage

To use an S3 compatible storage other than AWS S3 like [MinIO](https://min.io/) or [Cloudflare R2](https://www.cloudflare.com/en-ca/developer-platform/r2/) you may supply an `endpoint_url` in the config. This should be set along with aws credentials:

```toml
[destination.filesystem]
bucket_url = "s3://[your_bucket_name]" # replace with your bucket name,

[destination.filesystem.credentials]
aws_access_key_id = "please set me up!" # copy the access key here
aws_secret_access_key = "please set me up!" # copy the secret access key here
endpoint_url = "https://<account_id>.r2.cloudflarestorage.com" # copy your endpoint URL here
```

#### Google Storage
Run `pip install dlt[gs]` which will install `gcfs` package.

Expand Down
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,7 @@ ignore_missing_imports=true
ignore_missing_imports=true

[mypy-utils.*]
ignore_missing_imports=true

[mypy-s3fs.*]
ignore_missing_imports=true
1 change: 1 addition & 0 deletions tests/.dlt/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ bucket_url_gs="gs://ci-test-bucket"
bucket_url_s3="s3://dlt-ci-test-bucket"
bucket_url_file="file://_storage"
bucket_url_az="az://dlt-ci-test-bucket"
bucket_url_r2="s3://dlt-ci-test-bucket"
memory="memory://m"
# gdrive_url="gdrive://dlt-ci-tests"
19 changes: 16 additions & 3 deletions tests/load/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,27 @@
import pytest
from typing import Iterator

from tests.load.utils import ALL_BUCKETS
from tests.load.utils import DEFAULT_BUCKETS, ALL_BUCKETS
from tests.utils import preserve_environ


@pytest.fixture(scope='function', params=ALL_BUCKETS)
def all_buckets_env(request) -> Iterator[str]:
@pytest.fixture(scope='function', params=DEFAULT_BUCKETS)
def default_buckets_env(request) -> Iterator[str]:
"""Parametrized fixture to configure filesystem destination bucket in env for each test bucket
"""
os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = request.param
yield request.param



@pytest.fixture(scope='function', params=ALL_BUCKETS)
def all_buckets_env(request) -> Iterator[str]:
if isinstance(request.param, dict):
bucket_url = request.param['bucket_url']
# R2 bucket needs to override all credentials
for key, value in request.param['credentials'].items():
os.environ[f'DESTINATION__FILESYSTEM__CREDENTIALS__{key.upper()}'] = value
else:
bucket_url = request.param
os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = bucket_url
yield bucket_url
20 changes: 17 additions & 3 deletions tests/load/filesystem/test_aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def test_aws_credentials_resolved_from_default(environment: Dict[str, str]) -> N
# assert config.profile_name == "default"


@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured')
def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None:
set_aws_credentials_env(environment)

Expand All @@ -63,7 +62,6 @@ def test_aws_credentials_from_botocore(environment: Dict[str, str]) -> None:
c.parse_native_representation("boto3")


@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured')
def test_aws_credentials_from_boto3(environment: Dict[str, str]) -> None:
try:
import boto3
Expand All @@ -89,7 +87,6 @@ def test_aws_credentials_from_boto3(environment: Dict[str, str]) -> None:
assert c.aws_access_key_id == "fake_access_key"


@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured')
def test_aws_credentials_for_profile(environment: Dict[str, str]) -> None:
import botocore.exceptions

Expand All @@ -107,6 +104,23 @@ def test_aws_credentials_for_profile(environment: Dict[str, str]) -> None:
pytest.skip("This test requires dlt-ci-user aws profile to be present")


def test_aws_credentials_with_endpoint_url(environment: Dict[str, str]) -> None:
set_aws_credentials_env(environment)
environment['CREDENTIALS__ENDPOINT_URL'] = 'https://123.r2.cloudflarestorage.com'

config = resolve_configuration(AwsCredentials())

assert config.endpoint_url == 'https://123.r2.cloudflarestorage.com'

assert config.to_s3fs_credentials() == {
"key": "fake_access_key",
"secret": "fake_secret_key",
"token": "fake_session_token",
"profile": None,
"endpoint_url": "https://123.r2.cloudflarestorage.com",
}


def set_aws_credentials_env(environment: Dict[str, str]) -> None:
environment['AWS_ACCESS_KEY_ID'] = 'fake_access_key'
environment['AWS_SECRET_ACCESS_KEY'] = 'fake_secret_key'
Expand Down
6 changes: 3 additions & 3 deletions tests/load/filesystem/test_filesystem_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_filesystem_destination_configuration() -> None:

@pytest.mark.parametrize('write_disposition', ('replace', 'append', 'merge'))
@pytest.mark.parametrize('layout', ALL_LAYOUTS)
def test_successful_load(write_disposition: str, layout: str, all_buckets_env: str) -> None:
def test_successful_load(write_disposition: str, layout: str, default_buckets_env: str) -> None:
"""Test load is successful with an empty destination dataset"""
if layout:
os.environ['DESTINATION__FILESYSTEM__LAYOUT'] = layout
Expand Down Expand Up @@ -74,7 +74,7 @@ def test_successful_load(write_disposition: str, layout: str, all_buckets_env: s


@pytest.mark.parametrize('layout', ALL_LAYOUTS)
def test_replace_write_disposition(layout: str, all_buckets_env: str) -> None:
def test_replace_write_disposition(layout: str, default_buckets_env: str) -> None:
if layout:
os.environ['DESTINATION__FILESYSTEM__LAYOUT'] = layout
else:
Expand Down Expand Up @@ -111,7 +111,7 @@ def test_replace_write_disposition(layout: str, all_buckets_env: str) -> None:


@pytest.mark.parametrize('layout', ALL_LAYOUTS)
def test_append_write_disposition(layout: str, all_buckets_env: str) -> None:
def test_append_write_disposition(layout: str, default_buckets_env: str) -> None:
"""Run load twice with append write_disposition and assert that there are two copies of each file in destination"""
if layout:
os.environ['DESTINATION__FILESYSTEM__LAYOUT'] = layout
Expand Down
28 changes: 26 additions & 2 deletions tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import posixpath
from typing import Union
from typing import Union, Dict
import pytest
from dlt.common.configuration.inject import with_config

Expand All @@ -11,7 +11,9 @@
from dlt.common.utils import uniq_id

from tests.utils import preserve_environ, autouse_test_storage
from tests.common.configuration.utils import environment
from tests.common.storages.utils import assert_sample_files
from tests.load.utils import ALL_FILESYSTEM_DRIVERS


@with_config(spec=FilesystemConfiguration, sections=("destination", "filesystem"))
Expand Down Expand Up @@ -51,7 +53,7 @@ def test_filesystem_instance(all_buckets_env: str) -> None:


@pytest.mark.parametrize("load_content", (True, False))
def test_filesystem_dict(all_buckets_env: str, load_content: bool) -> None:
def test_filesystem_dict(default_buckets_env: str, load_content: bool) -> None:
bucket_url = os.environ['DESTINATION__FILESYSTEM__BUCKET_URL']
config = get_config()
if config.protocol in ["memory", "file"]:
Expand All @@ -64,3 +66,25 @@ def test_filesystem_dict(all_buckets_env: str, load_content: bool) -> None:
assert_sample_files(all_file_items, filesystem, config, load_content)
except NotImplementedError as ex:
pytest.skip("Skipping due to " + str(ex))


@pytest.mark.skipif("s3" not in ALL_FILESYSTEM_DRIVERS, reason="s3 destination not configured")
def test_filesystem_instance_from_s3_endpoint(environment: Dict[str, str]) -> None:
"""Test that fsspec instance is correctly configured when using endpoint URL.
E.g. when using an S3 compatible service such as Cloudflare R2
"""
from s3fs import S3FileSystem
environment['DESTINATION__FILESYSTEM__BUCKET_URL'] = 's3://dummy-bucket'
environment['CREDENTIALS__ENDPOINT_URL'] = 'https://fake-s3-endpoint.example.com'
environment['CREDENTIALS__AWS_ACCESS_KEY_ID'] = 'fake-access-key'
environment['CREDENTIALS__AWS_SECRET_ACCESS_KEY'] = 'fake-secret-key'

config = get_config()

filesystem, bucket_name = fsspec_from_config(config)

assert isinstance(filesystem, S3FileSystem)
assert filesystem.endpoint_url == 'https://fake-s3-endpoint.example.com'
assert bucket_name == 'dummy-bucket'
assert filesystem.key == 'fake-access-key'
assert filesystem.secret == 'fake-secret-key'
2 changes: 1 addition & 1 deletion tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def assert_file_matches(layout: str, job: LoadJobInfo, load_id: str, client: Fil
assert local_path.read_bytes() == client.fs_client.read_bytes(destination_path)


def test_pipeline_merge_write_disposition(all_buckets_env: str) -> None:
def test_pipeline_merge_write_disposition(default_buckets_env: str) -> None:
"""Run pipeline twice with merge write disposition
Resource with primary key falls back to append. Resource without keys falls back to replace.
"""
Expand Down
27 changes: 23 additions & 4 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,32 @@
GCS_BUCKET = dlt.config.get("tests.bucket_url_gs", str)
AZ_BUCKET = dlt.config.get("tests.bucket_url_az", str)
FILE_BUCKET = dlt.config.get("tests.bucket_url_file", str)
R2_BUCKET = dlt.config.get("tests.bucket_url_r2", str)
MEMORY_BUCKET = dlt.config.get("tests.memory", str)

ALL_FILESYSTEM_DRIVERS = dlt.config.get("ALL_FILESYSTEM_DRIVERS", list) or ["s3", "gs", "az", "file", "memory"]
ALL_FILESYSTEM_DRIVERS = dlt.config.get("ALL_FILESYSTEM_DRIVERS", list) or ["s3", "gs", "az", "file", "memory", "r2"]

# Filter out buckets not in all filesystem drivers
ALL_BUCKETS = [GCS_BUCKET, AWS_BUCKET, FILE_BUCKET, MEMORY_BUCKET, AZ_BUCKET]
ALL_BUCKETS = [bucket for bucket in ALL_BUCKETS if bucket.split(':')[0] in ALL_FILESYSTEM_DRIVERS]
DEFAULT_BUCKETS = [GCS_BUCKET, AWS_BUCKET, FILE_BUCKET, MEMORY_BUCKET, AZ_BUCKET]
DEFAULT_BUCKETS = [bucket for bucket in DEFAULT_BUCKETS if bucket.split(':')[0] in ALL_FILESYSTEM_DRIVERS]

# Add r2 in extra buckets so it's not run for all tests
R2_BUCKET_CONFIG = dict(
bucket_url=R2_BUCKET,
# Credentials included so we can override aws credentials in env later
credentials=dict(
aws_access_key_id=dlt.config.get("tests.r2_aws_access_key_id", str),
aws_secret_access_key=dlt.config.get("tests.r2_aws_secret_access_key", str),
endpoint_url=dlt.config.get("tests.r2_endpoint_url", str),
)
)

EXTRA_BUCKETS: List[Dict[str, Any]] = []
if "r2" in ALL_FILESYSTEM_DRIVERS:
EXTRA_BUCKETS.append(R2_BUCKET_CONFIG)

ALL_BUCKETS = DEFAULT_BUCKETS + EXTRA_BUCKETS


@dataclass
class DestinationTestConfiguration:
Expand Down Expand Up @@ -145,7 +164,7 @@ def destinations_configs(
destination_configs += [DestinationTestConfiguration(destination="filesystem", bucket_url=FILE_BUCKET, file_format="jsonl")]

if all_buckets_filesystem_configs:
for bucket in ALL_BUCKETS:
for bucket in DEFAULT_BUCKETS:
destination_configs += [DestinationTestConfiguration(destination="filesystem", bucket_url=bucket, extra_info=bucket)]

# filter out non active destinations
Expand Down

0 comments on commit e71bd37

Please sign in to comment.