Skip to content

Commit

Permalink
fix: filesystem tests for sftp
Browse files Browse the repository at this point in the history
  • Loading branch information
donotpush committed Sep 12, 2024
1 parent 87ab87a commit f4a48c8
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 42 deletions.
2 changes: 1 addition & 1 deletion dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LoadStorageConfiguration(BaseConfiguration):

def _make_sftp_url(scheme: str, fs_path: str, bucket_url: str) -> str:
parsed_bucket_url = urlparse(bucket_url)
return f"{scheme}://{parsed_bucket_url.hostname}/{fs_path}"
return f"{scheme}://{parsed_bucket_url.hostname}{fs_path}"


def _make_az_url(scheme: str, fs_path: str, bucket_url: str) -> str:
Expand Down
6 changes: 4 additions & 2 deletions tests/load/filesystem/test_filesystem_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ def test_replace_write_disposition(layout: str, default_buckets_env: str) -> Non

# First file from load1 remains, second file is replaced by load2
# assert that only these two files are in the destination folder
is_sftp = urlparse(default_buckets_env).scheme == "sftp"
paths = []
for basedir, _dirs, files in client.fs_client.walk(
client.dataset_path, detail=False, refresh=True
client.dataset_path, detail=False, **({"refresh": True} if not is_sftp else {})
):
# remove internal paths
if "_dlt" in basedir:
Expand Down Expand Up @@ -257,9 +258,10 @@ def test_append_write_disposition(layout: str, default_buckets_env: str) -> None
]
expected_files = sorted([Path(posixpath.join(root_path, fn)) for fn in expected_files]) # type: ignore[misc]

is_sftp = urlparse(default_buckets_env).scheme == "sftp"
paths = []
for basedir, _dirs, files in client.fs_client.walk(
client.dataset_path, detail=False, refresh=True
client.dataset_path, detail=False, **({"refresh": True} if not is_sftp else {})
):
# remove internal paths
if "_dlt" in basedir:
Expand Down
6 changes: 3 additions & 3 deletions tests/load/filesystem_sftp/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ services:
ports:
- "2222:22"
volumes:
- ../../common/storages/samples:/home/foo/sftp/data/samples
- ../../common/storages/samples:/home/bobby/sftp/data/samples
- ../../common/storages/samples:/home/billy/sftp/data/samples
- ../../common/storages/samples:/home/foo/sftp/data/standard_source/samples
- ../../common/storages/samples:/home/bobby/sftp/data/standard_source/samples
- ../../common/storages/samples:/home/billy/sftp/data/standard_source/samples

networks:
sftpserver:
Expand Down
46 changes: 10 additions & 36 deletions tests/load/filesystem_sftp/test_filesystem_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@
from dlt.common.json import json
from dlt.common.configuration.inject import with_config
from dlt.common.storages import FilesystemConfiguration, fsspec_from_config
from dlt.common.storages.fsspec_filesystem import glob_files
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient

from tests.common.storages.utils import assert_sample_files


@with_config(spec=FilesystemConfiguration, sections=("sources", "filesystem"))
def get_config(config: FilesystemConfiguration = None) -> FilesystemConfiguration:
Expand Down Expand Up @@ -121,57 +118,34 @@ def states():
assert sorted(result_states) == sorted(expected_states)


@pytest.mark.parametrize("load_content", (True, False))
@pytest.mark.parametrize("glob_filter", ("**", "**/*.csv", "*.txt", "met_csv/A803/*.csv"))
def test_filesystem_sftp_read(load_content: bool, glob_filter: str) -> None:
# docker volume mount on: /home/foo/sftp/data/samples but /data/samples is the path in the SFTP server
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost/data/samples"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "foo"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path()

config = get_config()
fs, _ = fsspec_from_config(config)

files = fs.ls("/data/samples")

assert len(files) > 0
# use glob to get data
all_file_items = list(glob_files(fs, config.bucket_url, file_glob=glob_filter))

print(all_file_items)
assert_sample_files(all_file_items, fs, config, load_content, glob_filter)


def test_filesystem_sftp_auth_useranme_password():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost/data/samples"
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost/data/standard_source/samples"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "foo"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD"] = "pass"

config = get_config()
fs, _ = fsspec_from_config(config)

files = fs.ls("/data/samples")
files = fs.ls("/data/standard_source/samples")
assert len(files) > 0


def test_filesystem_sftp_auth_private_key():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost/data/samples"
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "foo"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path()

config = get_config()
fs, _ = fsspec_from_config(config)

files = fs.ls("/data/samples")

files = fs.ls("/data/standard_source/samples")
assert len(files) > 0


def test_filesystem_sftp_auth_private_key_protected():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost/data/samples"
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "bobby"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path("bobby")
Expand All @@ -180,7 +154,7 @@ def test_filesystem_sftp_auth_private_key_protected():
config = get_config()
fs, _ = fsspec_from_config(config)

files = fs.ls("/data/samples")
files = fs.ls("/data/standard_source/samples")

assert len(files) > 0

Expand All @@ -195,21 +169,21 @@ def test_filesystem_sftp_auth_private_key_protected():
reason="SSH agent is not running or bobby's private key isn't stored in ~/.ssh/id_rsa",
)
def test_filesystem_sftp_auth_private_ssh_agent():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost/data/samples"
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "bobby"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD"] = "passphrase123"

config = get_config()
fs, _ = fsspec_from_config(config)

files = fs.ls("/data/samples")
files = fs.ls("/data/standard_source/samples")

assert len(files) > 0


def test_filesystem_sftp_auth_ca_signed_pub_key():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost/data/samples"
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "billy"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path(
Expand All @@ -219,6 +193,6 @@ def test_filesystem_sftp_auth_ca_signed_pub_key():
config = get_config()
fs, _ = fsspec_from_config(config)

files = fs.ls("/data/samples")
files = fs.ls("/data/standard_source/samples")

assert len(files) > 0

0 comments on commit f4a48c8

Please sign in to comment.