Skip to content

Commit

Permalink
Feat/1730 extend filesystem sftp (#1769)
Browse files Browse the repository at this point in the history
* chore: add paramiko dev dependency

* test: add container for sftp localhost

* chore: add tmp bash scripts

* exp: sftp client with fsspec

* chore: sftp timestamp metadata discovered

* fix: docs lint

* feat: add fsspec protocol sftp

* fix: lint errors from devel

* test: sftp server localhost

* fix: filesystem SFTP docker-compose tests

* fix: json import

* chore: clean tests and dockerfile

* refactor: ci test exec for sftp server

* feat: sftp file url parser

* test: sftp reading using file samples

* chore: extended SFTP credentials class

* docs: filesystem SFTP credentials and authentication

* chore: add bobby password protected key-based authentication

* docs: sftp correction for ssh-agent

* chore: add docker volume

* chore: revert ci changes

* test: refactor sftp with auth methods

* test: sftp skip test when agent not configured

* fix: poetry lock

* fix: github workflow

* fix: run only sftp tests

* fix: merge conflict regression

* fix: ssh-agent for tests

* fix: pytest executions excluding sftp

* fix: CI test execution

* test: sftp login with signed certificate

* fix: poetry lock regenerated

* refactor: filesystem sftp tests

* fix: filesystem tests for sftp

* refactor: reduce redundancy

* fix: lint and remove duplicated test

* chore: change ubuntu version

* fix: enforce test marker

* fix: ignore sftp tests

* fix: exclude sftp from filesystem tests

* adds sftp extra dep

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
donotpush and rudolfix authored Sep 14, 2024
1 parent eb4b1ba commit 4e45ea4
Show file tree
Hide file tree
Showing 29 changed files with 883 additions and 115 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ env:
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"
# note that all buckets are enabled for testing
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\", \"r2\", \"s3\", \"gs\", \"az\", \"abfss\", \"gdrive\"]" #excludes sftp

jobs:
get_docs_changes:
Expand Down
27 changes: 23 additions & 4 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ env:
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\", \"qdrant\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\", \"sftp\"]"

DESTINATION__WEAVIATE__VECTORIZER: text2vec-contextionary
DESTINATION__WEAVIATE__MODULE_CONFIG: "{\"text2vec-contextionary\": {\"vectorizeClassName\": false, \"vectorizePropertyName\": true}}"
Expand Down Expand Up @@ -95,16 +95,35 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d

- name: Configure SSH Agent for sftp tests
run: |
mkdir -p /home/runner/.ssh
cp tests/load/filesystem_sftp/bootstrap/bobby_rsa /home/runner/.ssh/id_rsa
cp tests/load/filesystem_sftp/bootstrap/bobby_rsa.pub /home/runner/.ssh/id_rsa.pub
# always run full suite, also on branches
- run: poetry run pytest tests/load --ignore tests/load/sources && poetry run pytest tests/cli
name: Run tests Linux
- name: Run tests Linux
run: |
eval "$(ssh-agent -s)"
poetry run pytest tests/load --ignore tests/load/sources
poetry run pytest tests/cli
env:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data
DESTINATION__QDRANT__CREDENTIALS__location: http://localhost:6333
DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PORT: 2222
DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_USERNAME: foo
DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD: pass


- name: Stop weaviate
if: always()
run: docker compose -f ".github/weaviate-compose.yml" down -v

- name: Stop SFTP server
if: always()
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" down -v
8 changes: 5 additions & 3 deletions .github/workflows/test_pyarrow17.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"filesystem\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\", \"r2\", \"s3\", \"gs\", \"az\", \"abfss\", \"gdrive\"]" #excludes sftp

jobs:
get_docs_changes:
Expand Down Expand Up @@ -72,6 +73,7 @@ jobs:
- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/libs tests/load -m needspyarrow17
name: Run needspyarrow17 tests Linux
- name: Run needspyarrow17 tests Linux
run: |
poetry run pytest tests/libs -m "needspyarrow17"
poetry run pytest tests/load -m "needspyarrow17"
2 changes: 2 additions & 0 deletions dlt/common/configuration/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
AnyAzureCredentials,
)

from .sftp_crendentials import SFTPCredentials

# backward compatibility for service account credentials
from .gcp_credentials import (
Expand Down Expand Up @@ -62,4 +63,5 @@
"AnyAzureCredentials",
"GcpClientCredentials",
"GcpClientCredentialsWithDefault",
"SFTPCredentials",
]
69 changes: 69 additions & 0 deletions dlt/common/configuration/specs/sftp_crendentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Any, Dict, Optional

from dlt.common.typing import TSecretStrValue, DictStrAny
from dlt.common.configuration.specs.base_configuration import CredentialsConfiguration, configspec


@configspec
class SFTPCredentials(CredentialsConfiguration):
"""Credentials for SFTP filesystem, compatible with fsspec SFTP protocol.
Authentication is attempted in the following order of priority:
- `key_filename` may contain OpenSSH public certificate paths
as well as regular private-key paths; when files ending in `-cert.pub` are found, they are assumed to match
a private key, and both components will be loaded.
- Any key found through an SSH agent: any “id_rsa”, “id_dsa”, or “id_ecdsa” key discoverable in ~/.ssh/.
- Plain username/password authentication, if a password was provided.
- If a private key requires a password to unlock it, and a password is provided, that password will be used to
attempt to unlock the key.
For more information about parameters:
https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect
"""

sftp_port: Optional[int] = 22
sftp_username: Optional[str] = None
sftp_password: Optional[TSecretStrValue] = None
sftp_key_filename: Optional[str] = None
sftp_key_passphrase: Optional[TSecretStrValue] = None
sftp_timeout: Optional[float] = None
sftp_banner_timeout: Optional[float] = None
sftp_auth_timeout: Optional[float] = None
sftp_channel_timeout: Optional[float] = None
sftp_allow_agent: Optional[bool] = True
sftp_look_for_keys: Optional[bool] = True
sftp_compress: Optional[bool] = False
sftp_gss_auth: Optional[bool] = False
sftp_gss_kex: Optional[bool] = False
sftp_gss_deleg_creds: Optional[bool] = True
sftp_gss_host: Optional[str] = None
sftp_gss_trust_dns: Optional[bool] = True

def to_fsspec_credentials(self) -> Dict[str, Any]:
"""Return a dict that can be passed to fsspec SFTP/SSHClient.connect method."""

credentials: Dict[str, Any] = {
"port": self.sftp_port,
"username": self.sftp_username,
"password": self.sftp_password,
"key_filename": self.sftp_key_filename,
"passphrase": self.sftp_key_passphrase,
"timeout": self.sftp_timeout,
"banner_timeout": self.sftp_banner_timeout,
"auth_timeout": self.sftp_auth_timeout,
"channel_timeout": self.sftp_channel_timeout,
"allow_agent": self.sftp_allow_agent,
"look_for_keys": self.sftp_look_for_keys,
"compress": self.sftp_compress,
"gss_auth": self.sftp_gss_auth,
"gss_kex": self.sftp_gss_kex,
"gss_deleg_creds": self.sftp_gss_deleg_creds,
"gss_host": self.sftp_gss_host,
"gss_trust_dns": self.sftp_gss_trust_dns,
}

return credentials
2 changes: 1 addition & 1 deletion dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def get_delta_tables(

def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str]:
"""Returns dict that can be passed as `storage_options` in `deltalake` library."""
creds = {}
creds = {} # type: ignore
extra_options = {}
# TODO: create a mixin with to_object_store_rs_credentials for a proper discovery
if hasattr(config.credentials, "to_object_store_rs_credentials"):
Expand Down
16 changes: 14 additions & 2 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
GcpOAuthCredentials,
AnyAzureCredentials,
BaseConfiguration,
SFTPCredentials,
)
from dlt.common.typing import DictStrAny
from dlt.common.utils import digest128
Expand Down Expand Up @@ -48,10 +49,19 @@ class LoadStorageConfiguration(BaseConfiguration):


FileSystemCredentials = Union[
AwsCredentials, GcpServiceAccountCredentials, AnyAzureCredentials, GcpOAuthCredentials
AwsCredentials,
GcpServiceAccountCredentials,
AnyAzureCredentials,
GcpOAuthCredentials,
SFTPCredentials,
]


def _make_sftp_url(scheme: str, fs_path: str, bucket_url: str) -> str:
parsed_bucket_url = urlparse(bucket_url)
return f"{scheme}://{parsed_bucket_url.hostname}{fs_path}"


def _make_az_url(scheme: str, fs_path: str, bucket_url: str) -> str:
parsed_bucket_url = urlparse(bucket_url)
if parsed_bucket_url.username:
Expand All @@ -76,7 +86,7 @@ def _make_file_url(scheme: str, fs_path: str, bucket_url: str) -> str:
return p_.as_uri()


MAKE_URI_DISPATCH = {"az": _make_az_url, "file": _make_file_url}
MAKE_URI_DISPATCH = {"az": _make_az_url, "file": _make_file_url, "sftp": _make_sftp_url}

MAKE_URI_DISPATCH["adl"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["abfs"] = MAKE_URI_DISPATCH["az"]
Expand Down Expand Up @@ -109,6 +119,7 @@ class FilesystemConfiguration(BaseConfiguration):
* az, abfs, adl, abfss, azure
* file, memory
* gdrive
* sftp
"""

PROTOCOL_CREDENTIALS: ClassVar[Dict[str, Any]] = {
Expand All @@ -121,6 +132,7 @@ class FilesystemConfiguration(BaseConfiguration):
"adl": AnyAzureCredentials,
"abfss": AnyAzureCredentials,
"azure": AnyAzureCredentials,
"sftp": SFTPCredentials,
}

bucket_url: str = None
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
GcpCredentials,
AwsCredentials,
AzureCredentials,
SFTPCredentials,
)
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages.configuration import (
Expand Down Expand Up @@ -64,6 +65,7 @@ class FileItem(TypedDict, total=False):
"file": lambda f: ensure_pendulum_datetime(f["mtime"]),
"memory": lambda f: ensure_pendulum_datetime(f["created"]),
"gdrive": lambda f: ensure_pendulum_datetime(f["modifiedTime"]),
"sftp": lambda f: ensure_pendulum_datetime(f["mtime"]),
}
# Support aliases
MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"]
Expand All @@ -77,6 +79,7 @@ class FileItem(TypedDict, total=False):
"az": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"gs": lambda config: cast(GcpCredentials, config.credentials).to_gcs_credentials(),
"gdrive": lambda config: {"credentials": cast(GcpCredentials, config.credentials)},
"sftp": lambda config: cast(SFTPCredentials, config.credentials).to_fsspec_credentials(),
}
CREDENTIALS_DISPATCH["adl"] = CREDENTIALS_DISPATCH["az"]
CREDENTIALS_DISPATCH["abfs"] = CREDENTIALS_DISPATCH["az"]
Expand Down Expand Up @@ -136,6 +139,10 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny:
register_implementation("gdrive", GoogleDriveFileSystem, "GoogleDriveFileSystem")

fs_kwargs.update(DEFAULT_KWARGS.get(protocol, {}))

if protocol == "sftp":
fs_kwargs.clear()

if config.kwargs is not None:
fs_kwargs.update(config.kwargs)
if config.client_kwargs is not None:
Expand All @@ -155,6 +162,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
* s3
* az, abfs, abfss, adl, azure
* gcs, gs
* sftp
All other filesystems are not authenticated
Expand Down
92 changes: 92 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,98 @@ bucket_url='\\?\UNC\localhost\c$\a\b\c'
```
:::

### SFTP
Run `pip install "dlt[sftp]` which will install the `paramiko` package alongside `dlt`, enabling secure SFTP transfers.

Configure your SFTP credentials by editing the `.dlt/secrets.toml` file. By default, the file contains placeholders for AWS credentials. You should replace these with your SFTP credentials.

Below are the possible fields for SFTP credentials configuration:

```text
sftp_port # The port for SFTP, defaults to 22 (standard for SSH/SFTP)
sftp_username # Your SFTP username, defaults to None
sftp_password # Your SFTP password (if using password-based auth), defaults to None
sftp_key_filename # Path to your private key file for key-based authentication, defaults to None
sftp_key_passphrase # Passphrase for your private key (if applicable), defaults to None
sftp_timeout # Timeout for establishing a connection, defaults to None
sftp_banner_timeout # Timeout for receiving the banner during authentication, defaults to None
sftp_auth_timeout # Authentication timeout, defaults to None
sftp_channel_timeout # Channel timeout for SFTP operations, defaults to None
sftp_allow_agent # Use SSH agent for key management (if available), defaults to True
sftp_look_for_keys # Search for SSH keys in the default SSH directory (~/.ssh/), defaults to True
sftp_compress # Enable compression (can improve performance over slow networks), defaults to False
sftp_gss_auth # Use GSS-API for authentication, defaults to False
sftp_gss_kex # Use GSS-API for key exchange, defaults to False
sftp_gss_deleg_creds # Delegate credentials with GSS-API, defaults to True
sftp_gss_host # Host for GSS-API, defaults to None
sftp_gss_trust_dns # Trust DNS for GSS-API, defaults to True
```
> For more information about credentials parameters: https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect
### Authentication Methods

SFTP authentication is attempted in the following order of priority:

1. **Key-based authentication**: If you provide a `key_filename` containing the path to a private key or a corresponding OpenSSH public certificate (e.g., `id_rsa` and `id_rsa-cert.pub`), these will be used for authentication. If the private key requires a passphrase, you can specify it via `sftp_key_passphrase`. If your private key requires a passphrase to unlock, and you’ve provided one, it will be used to attempt to unlock the key.

2. **SSH Agent-based authentication**: If `allow_agent=True` (default), Paramiko will look for any SSH keys stored in your local SSH agent (such as `id_rsa`, `id_dsa`, or `id_ecdsa` keys stored in `~/.ssh/`).

3. **Username/Password authentication**: If a password is provided (`sftp_password`), plain username/password authentication will be attempted.

4. **GSS-API authentication**: If GSS-API (Kerberos) is enabled (sftp_gss_auth=True), authentication will use the Kerberos protocol. GSS-API may also be used for key exchange (sftp_gss_kex=True) and credential delegation (sftp_gss_deleg_creds=True). This method is useful in environments where Kerberos is set up, often in enterprise networks.


#### 1. **Key-based Authentication**

If you use an SSH key instead of a password, you can specify the path to your private key in the configuration.

```toml
[destination.filesystem]
bucket_url = "sftp://[hostname]/[path]"
file_glob = "*"

[destination.filesystem.credentials]
sftp_username = "foo"
sftp_key_filename = "/path/to/id_rsa" # Replace with the path to your private key file
sftp_key_passphrase = "your_passphrase" # Optional: passphrase for your private key
```

#### 2. **SSH Agent-based Authentication**

If you have an SSH agent running with loaded keys, you can allow Paramiko to use these keys automatically. You can omit the password and key fields if you're relying on the SSH agent.

```toml
[destination.filesystem]
bucket_url = "sftp://[hostname]/[path]"
file_glob = "*"

[destination.filesystem.credentials]
sftp_username = "foo"
sftp_key_passphrase = "your_passphrase" # Optional: passphrase for your private key
```
The loaded key must be one of the following types stored in ~/.ssh/: id_rsa, id_dsa, or id_ecdsa.

#### 3. **Username/Password Authentication**

This is the simplest form of authentication, where you supply a username and password directly.

```toml
[destination.filesystem]
bucket_url = "sftp://[hostname]/[path]" # The hostname of your SFTP server and the remote path
file_glob = "*" # Pattern to match the files you want to upload/download

[destination.filesystem.credentials]
sftp_username = "foo" # Replace "foo" with your SFTP username
sftp_password = "pass" # Replace "pass" with your SFTP password
```


### Notes:
- **Key-based Authentication**: Make sure your private key has the correct permissions (`chmod 600`), or SSH will refuse to use it.
- **Timeouts**: It's important to adjust timeout values based on your network conditions to avoid connection issues.

This configuration allows flexible SFTP authentication, whether you're using passwords, keys, or agents, and ensures secure communication between your local environment and the SFTP server.

## Write disposition
The filesystem destination handles the write dispositions as follows:
- `append` - files belonging to such tables are added to the dataset folder
Expand Down
Loading

0 comments on commit 4e45ea4

Please sign in to comment.