Skip to content

Commit

Permalink
refactor: reduce redundancy
Browse files Browse the repository at this point in the history
  • Loading branch information
donotpush committed Sep 12, 2024
1 parent f4a48c8 commit a94cb36
Showing 1 changed file with 50 additions and 53 deletions.
103 changes: 50 additions & 53 deletions tests/load/filesystem_sftp/test_filesystem_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,33 @@ def test_filesystem_sftp_server(sftp_filesystem):
fs.rm(test_file)


def test_filesystem_sftp_write(sftp_filesystem):
def test_filesystem_sftp_pipeline(sftp_filesystem):
os.environ.update(
{
"DESTINATION__FILESYSTEM__BUCKET_URL": "sftp://localhost/data",
"DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PORT": "2222",
"DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_USERNAME": "foo",
"DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD": "pass",
}
)
pipeline = dlt.pipeline(destination="filesystem", dataset_name="test")
pipeline.run(
[
dlt.resource(
lambda: [{"id": 1, "name": "DE"}, {"id": 2, "name": "AK"}, {"id": 3, "name": "CA"}]
)
],
loader_file_format="parquet",
)
client = pipeline.destination_client() # type: ignore[assignment]
data_files = client.fs_client.glob(posixpath.join(client.dataset_path, "states/*"))
assert len(data_files) > 0
assert sorted(
[r["name"] for r in pq.read_table(sftp_filesystem.open(data_files[0], "rb")).to_pylist()]
) == ["AK", "CA", "DE"]


def test_filesystem_sftp_pipeline(sftp_filesystem):
import posixpath
import pyarrow.parquet as pq

Expand Down Expand Up @@ -118,45 +144,37 @@ def states():
assert sorted(result_states) == sorted(expected_states)


def test_filesystem_sftp_auth_useranme_password():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost/data/standard_source/samples"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "foo"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD"] = "pass"

config = get_config()
fs, _ = fsspec_from_config(config)
def run_sftp_auth(user, password=None, key=None, passphrase=None):
env_vars = {
"SOURCES__FILESYSTEM__BUCKET_URL": "sftp://localhost",
"SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT": "2222",
"SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME": user,
}

files = fs.ls("/data/standard_source/samples")
assert len(files) > 0
if password:
env_vars["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD"] = password
if key:
env_vars["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path(user)
if passphrase:
env_vars["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_PASSPHRASE"] = passphrase


def test_filesystem_sftp_auth_private_key():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "foo"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path()
os.environ.update(env_vars)

config = get_config()
fs, _ = fsspec_from_config(config)
assert len(fs.ls("/data/standard_source/samples")) > 0

files = fs.ls("/data/standard_source/samples")
assert len(files) > 0

def test_filesystem_sftp_auth_useranme_password():
run_sftp_auth("foo", "pass")

def test_filesystem_sftp_auth_private_key_protected():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "bobby"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path("bobby")
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_PASSPHRASE"] = "passphrase123"

config = get_config()
fs, _ = fsspec_from_config(config)
def test_filesystem_sftp_auth_private_key():
run_sftp_auth("foo", key=get_key_path())

files = fs.ls("/data/standard_source/samples")

assert len(files) > 0
def test_filesystem_sftp_auth_private_key_protected():
run_sftp_auth("bobby", key=get_key_path("bobby"), passphrase="passphrase123")


# Test requires - ssh_agent with user's bobby key loaded. The commands and file names required are:
Expand All @@ -169,30 +187,9 @@ def test_filesystem_sftp_auth_private_key_protected():
reason="SSH agent is not running or bobby's private key isn't stored in ~/.ssh/id_rsa",
)
def test_filesystem_sftp_auth_private_ssh_agent():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "bobby"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD"] = "passphrase123"

config = get_config()
fs, _ = fsspec_from_config(config)

files = fs.ls("/data/standard_source/samples")

assert len(files) > 0
run_sftp_auth("bobby", passphrase="passphrase123")


def test_filesystem_sftp_auth_ca_signed_pub_key():
os.environ["SOURCES__FILESYSTEM__BUCKET_URL"] = "sftp://localhost"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_PORT"] = "2222"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_USERNAME"] = "billy"
os.environ["SOURCES__FILESYSTEM__CREDENTIALS__SFTP_KEY_FILENAME"] = get_key_path(
"billy"
) # billy_rsa-cert.pub is automatically loaded too

config = get_config()
fs, _ = fsspec_from_config(config)

files = fs.ls("/data/standard_source/samples")

assert len(files) > 0
# billy_rsa-cert.pub is automatically loaded too
run_sftp_auth("billy", key=get_key_path("billy"))

0 comments on commit a94cb36

Please sign in to comment.