diff --git a/tests/load/filesystem_sftp/test_filesystem_sftp.py b/tests/load/filesystem_sftp/test_filesystem_sftp.py index a47c433749..a6821722cb 100644 --- a/tests/load/filesystem_sftp/test_filesystem_sftp.py +++ b/tests/load/filesystem_sftp/test_filesystem_sftp.py @@ -88,7 +88,33 @@ def test_filesystem_sftp_server(sftp_filesystem): fs.rm(test_file) -def test_filesystem_sftp_write(sftp_filesystem): +def test_filesystem_sftp_pipeline(sftp_filesystem): + os.environ.update( + { + "DESTINATION__FILESYSTEM__BUCKET_URL": "sftp://localhost/data", + "DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PORT": "2222", + "DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_USERNAME": "foo", + "DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD": "pass", + } + ) + pipeline = dlt.pipeline(destination="filesystem", dataset_name="test") + pipeline.run( + [ + dlt.resource( + lambda: [{"id": 1, "name": "DE"}, {"id": 2, "name": "AK"}, {"id": 3, "name": "CA"}] + ) + ], + loader_file_format="parquet", + ) + client = pipeline.destination_client() # type: ignore[assignment] + data_files = client.fs_client.glob(posixpath.join(client.dataset_path, "states/*")) + assert len(data_files) > 0 + assert sorted( + [r["name"] for r in pq.read_table(sftp_filesystem.open(data_files[0], "rb")).to_pylist()] + ) == ["AK", "CA", "DE"] + + +def test_filesystem_sftp_pipeline(sftp_filesystem): import posixpath import pyarrow.parquet as pq @@ -118,45 +144,37 @@ def states(): assert sorted(result_states) == sorted(expected_states) -def test_filesystem_sftp_auth_useranme_password(): - os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost/data/standard_source/samples" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "foo" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD"] = "pass" - - config = get_config() - fs, _ = fsspec_from_config(config) +def run_sftp_auth(user, password=None, key=None, passphrase=None): + env_vars = { + "SOURCES__FILESYSTEM__BUCKET_URL": "sftp://localhost", + "SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT": "2222", + "SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME": user, + } - files = fs.ls("/data/standard_source/samples") - assert len(files) > 0 + if password: + env_vars["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD"] = password + if key: + env_vars["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path(user) + if passphrase: + env_vars["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_PASSPHRASE"] = passphrase - -def test_filesystem_sftp_auth_private_key(): - os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "foo" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path() + os.environ.update(env_vars) config = get_config() fs, _ = fsspec_from_config(config) + assert len(fs.ls("/data/standard_source/samples")) > 0 - files = fs.ls("/data/standard_source/samples") - assert len(files) > 0 +def test_filesystem_sftp_auth_useranme_password(): + run_sftp_auth("foo", "pass") -def test_filesystem_sftp_auth_private_key_protected(): - os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "bobby" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path("bobby") - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_PASSPHRASE"] = "passphrase123" - config = get_config() - fs, _ = fsspec_from_config(config) +def test_filesystem_sftp_auth_private_key(): + run_sftp_auth("foo", key=get_key_path()) - files = fs.ls("/data/standard_source/samples") - assert len(files) > 0 +def test_filesystem_sftp_auth_private_key_protected(): + run_sftp_auth("bobby", key=get_key_path("bobby"), passphrase="passphrase123") # Test requires - ssh_agent with user's bobby key loaded. The commands and file names required are: @@ -169,30 +187,9 @@ def test_filesystem_sftp_auth_private_key_protected(): reason="SSH agent is not running or bobby's private key isn't stored in ~/.ssh/id_rsa", ) def test_filesystem_sftp_auth_private_ssh_agent(): - os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "bobby" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD"] = "passphrase123" - - config = get_config() - fs, _ = fsspec_from_config(config) - - files = fs.ls("/data/standard_source/samples") - - assert len(files) > 0 + run_sftp_auth("bobby", passphrase="passphrase123") def test_filesystem_sftp_auth_ca_signed_pub_key(): - os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "billy" - os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path( - "billy" - ) # billy_rsa-cert.pub is automatically loaded too - - config = get_config() - fs, _ = fsspec_from_config(config) - - files = fs.ls("/data/standard_source/samples") - - assert len(files) > 0 + # billy_rsa-cert.pub is automatically loaded too + run_sftp_auth("billy", key=get_key_path("billy"))