Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/1730 extend filesystem sftp #1769

Merged
merged 45 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
41fbf47
chore: add paramiko dev dependency
donotpush Aug 29, 2024
b398bd8
test: add container for sftp localhost
donotpush Aug 29, 2024
febff56
chore: add tmp bash scripts
donotpush Aug 29, 2024
f56dae3
exp: sftp client with fsspec
donotpush Aug 29, 2024
9d6bf15
chore: sftp timestamp metadata discovered
donotpush Aug 29, 2024
6eb40b4
fix: docs lint
donotpush Aug 29, 2024
e690a70
feat: add fsspec protocol sftp
donotpush Aug 29, 2024
89b097b
fix: lint errors from devel
donotpush Aug 29, 2024
5fd34be
test: sftp server localhost
donotpush Sep 2, 2024
4091119
fix: filesystem SFTP docker-compose tests
donotpush Sep 2, 2024
a1e5d66
fix: json import
donotpush Sep 2, 2024
c7b5072
chore: clean tests and dockerfile
donotpush Sep 3, 2024
2b51113
refactor: ci test exec for sftp server
donotpush Sep 3, 2024
3ff56ab
feat: sftp file url parser
donotpush Sep 3, 2024
531f282
test: sftp reading using file samples
donotpush Sep 3, 2024
99157c9
chore: extended SFTP credentials class
donotpush Sep 5, 2024
733bee2
docs: filesystem SFTP credentials and authentication
donotpush Sep 5, 2024
8f53832
chore: add bobby password protected key-based authentication
donotpush Sep 5, 2024
706e5cd
docs: sftp correction for ssh-agent
donotpush Sep 5, 2024
5ba6820
chore: add docker volume
donotpush Sep 5, 2024
eefaafe
chore: revert ci changes
donotpush Sep 5, 2024
a3833ab
test: refactor sftp with auth methods
donotpush Sep 5, 2024
6f3c56a
test: sftp skip test when agent not configured
donotpush Sep 5, 2024
cb324bb
Merge remote-tracking branch 'origin/devel' into feat/1730-extend-fil…
donotpush Sep 5, 2024
d9c6f92
fix: poetry lock
donotpush Sep 5, 2024
f16fff8
fix: github workflow
donotpush Sep 5, 2024
a94e525
fix: run only sftp tests
donotpush Sep 5, 2024
a8ad274
fix: merge conflict regression
donotpush Sep 5, 2024
4123a17
fix: ssh-agent for tests
donotpush Sep 5, 2024
f4352eb
fix: pytest executions excluding sftp
donotpush Sep 6, 2024
de96bd1
fix: CI test execution
donotpush Sep 9, 2024
69d7e9b
test: sftp login with signed certificate
donotpush Sep 9, 2024
aeac3e3
Merge remote-tracking branch 'origin/devel' into feat/1730-extend-fil…
donotpush Sep 9, 2024
08f07d3
fix: poetry lock regenerated
donotpush Sep 9, 2024
87ab87a
refactor: filesystem sftp tests
donotpush Sep 12, 2024
f4a48c8
fix: filesystem tests for sftp
donotpush Sep 12, 2024
a94cb36
refactor: reduce redundancy
donotpush Sep 12, 2024
b7eb78a
fix: lint and remove duplicated test
donotpush Sep 12, 2024
234c377
chore: change ubuntu version
donotpush Sep 12, 2024
187a55a
fix: enforce test marker
donotpush Sep 12, 2024
329c2ef
fix: ignore sftp tests
donotpush Sep 12, 2024
964db4d
fix: exclude sftp from filesystem tests
donotpush Sep 12, 2024
e494b71
Merge branch 'devel' into feat/1730-extend-filesystem-sftp
rudolfix Sep 14, 2024
9f71cf9
Merge branch 'devel' into feat/1730-extend-filesystem-sftp
rudolfix Sep 14, 2024
7a43d37
adds sftp extra dep
rudolfix Sep 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ env:
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"
# note that all buckets are enabled for testing
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\", \"r2\", \"s3\", \"gs\", \"az\", \"abfss\", \"gdrive\"]" #excludes sftp

jobs:
get_docs_changes:
Expand Down
27 changes: 23 additions & 4 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ env:
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\", \"qdrant\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\", \"sftp\"]"

DESTINATION__WEAVIATE__VECTORIZER: text2vec-contextionary
DESTINATION__WEAVIATE__MODULE_CONFIG: "{\"text2vec-contextionary\": {\"vectorizeClassName\": false, \"vectorizePropertyName\": true}}"
Expand Down Expand Up @@ -95,16 +95,35 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d

- name: Configure SSH Agent for sftp tests
run: |
mkdir -p /home/runner/.ssh
cp tests/load/filesystem_sftp/bootstrap/bobby_rsa /home/runner/.ssh/id_rsa
cp tests/load/filesystem_sftp/bootstrap/bobby_rsa.pub /home/runner/.ssh/id_rsa.pub

# always run full suite, also on branches
- run: poetry run pytest tests/load --ignore tests/load/sources && poetry run pytest tests/cli
name: Run tests Linux
- name: Run tests Linux
run: |
eval "$(ssh-agent -s)"
poetry run pytest tests/load --ignore tests/load/sources
poetry run pytest tests/cli
env:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data
DESTINATION__QDRANT__CREDENTIALS__location: http://localhost:6333
DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PORT: 2222
DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_USERNAME: foo
DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD: pass


- name: Stop weaviate
if: always()
run: docker compose -f ".github/weaviate-compose.yml" down -v

- name: Stop SFTP server
if: always()
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" down -v
8 changes: 5 additions & 3 deletions .github/workflows/test_pyarrow17.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"filesystem\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\", \"r2\", \"s3\", \"gs\", \"az\", \"abfss\", \"gdrive\"]" #excludes sftp

jobs:
get_docs_changes:
Expand Down Expand Up @@ -72,6 +73,7 @@ jobs:
- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/libs tests/load -m needspyarrow17
name: Run needspyarrow17 tests Linux
- name: Run needspyarrow17 tests Linux
run: |
poetry run pytest tests/libs -m "needspyarrow17"
poetry run pytest tests/load -m "needspyarrow17"
2 changes: 2 additions & 0 deletions dlt/common/configuration/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
AnyAzureCredentials,
)

from .sftp_crendentials import SFTPCredentials

# backward compatibility for service account credentials
from .gcp_credentials import (
Expand Down Expand Up @@ -62,4 +63,5 @@
"AnyAzureCredentials",
"GcpClientCredentials",
"GcpClientCredentialsWithDefault",
"SFTPCredentials",
]
69 changes: 69 additions & 0 deletions dlt/common/configuration/specs/sftp_crendentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Any, Dict, Optional

from dlt.common.typing import TSecretStrValue, DictStrAny
from dlt.common.configuration.specs.base_configuration import CredentialsConfiguration, configspec


@configspec
class SFTPCredentials(CredentialsConfiguration):
"""Credentials for SFTP filesystem, compatible with fsspec SFTP protocol.

Authentication is attempted in the following order of priority:

- `key_filename` may contain OpenSSH public certificate paths
as well as regular private-key paths; when files ending in `-cert.pub` are found, they are assumed to match
a private key, and both components will be loaded.

- Any key found through an SSH agent: any “id_rsa”, “id_dsa”, or “id_ecdsa” key discoverable in ~/.ssh/.

- Plain username/password authentication, if a password was provided.

- If a private key requires a password to unlock it, and a password is provided, that password will be used to
attempt to unlock the key.

For more information about parameters:
https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect
"""

sftp_port: Optional[int] = 22
sftp_username: Optional[str] = None
sftp_password: Optional[TSecretStrValue] = None
sftp_key_filename: Optional[str] = None
sftp_key_passphrase: Optional[TSecretStrValue] = None
sftp_timeout: Optional[float] = None
sftp_banner_timeout: Optional[float] = None
sftp_auth_timeout: Optional[float] = None
sftp_channel_timeout: Optional[float] = None
sftp_allow_agent: Optional[bool] = True
sftp_look_for_keys: Optional[bool] = True
sftp_compress: Optional[bool] = False
sftp_gss_auth: Optional[bool] = False
sftp_gss_kex: Optional[bool] = False
sftp_gss_deleg_creds: Optional[bool] = True
sftp_gss_host: Optional[str] = None
sftp_gss_trust_dns: Optional[bool] = True

def to_fsspec_credentials(self) -> Dict[str, Any]:
"""Return a dict that can be passed to fsspec SFTP/SSHClient.connect method."""

credentials: Dict[str, Any] = {
"port": self.sftp_port,
"username": self.sftp_username,
"password": self.sftp_password,
"key_filename": self.sftp_key_filename,
"passphrase": self.sftp_key_passphrase,
"timeout": self.sftp_timeout,
"banner_timeout": self.sftp_banner_timeout,
"auth_timeout": self.sftp_auth_timeout,
"channel_timeout": self.sftp_channel_timeout,
"allow_agent": self.sftp_allow_agent,
"look_for_keys": self.sftp_look_for_keys,
"compress": self.sftp_compress,
"gss_auth": self.sftp_gss_auth,
"gss_kex": self.sftp_gss_kex,
"gss_deleg_creds": self.sftp_gss_deleg_creds,
"gss_host": self.sftp_gss_host,
"gss_trust_dns": self.sftp_gss_trust_dns,
}

return credentials
2 changes: 1 addition & 1 deletion dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def get_delta_tables(

def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str]:
"""Returns dict that can be passed as `storage_options` in `deltalake` library."""
creds = {}
creds = {} # type: ignore
extra_options = {}
# TODO: create a mixin with to_object_store_rs_credentials for a proper discovery
if hasattr(config.credentials, "to_object_store_rs_credentials"):
Expand Down
16 changes: 14 additions & 2 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
GcpOAuthCredentials,
AnyAzureCredentials,
BaseConfiguration,
SFTPCredentials,
)
from dlt.common.typing import DictStrAny
from dlt.common.utils import digest128
Expand Down Expand Up @@ -48,10 +49,19 @@ class LoadStorageConfiguration(BaseConfiguration):


FileSystemCredentials = Union[
AwsCredentials, GcpServiceAccountCredentials, AnyAzureCredentials, GcpOAuthCredentials
AwsCredentials,
GcpServiceAccountCredentials,
AnyAzureCredentials,
GcpOAuthCredentials,
SFTPCredentials,
]


def _make_sftp_url(scheme: str, fs_path: str, bucket_url: str) -> str:
parsed_bucket_url = urlparse(bucket_url)
return f"{scheme}://{parsed_bucket_url.hostname}{fs_path}"


def _make_az_url(scheme: str, fs_path: str, bucket_url: str) -> str:
parsed_bucket_url = urlparse(bucket_url)
if parsed_bucket_url.username:
Expand All @@ -76,7 +86,7 @@ def _make_file_url(scheme: str, fs_path: str, bucket_url: str) -> str:
return p_.as_uri()


MAKE_URI_DISPATCH = {"az": _make_az_url, "file": _make_file_url}
MAKE_URI_DISPATCH = {"az": _make_az_url, "file": _make_file_url, "sftp": _make_sftp_url}

MAKE_URI_DISPATCH["adl"] = MAKE_URI_DISPATCH["az"]
MAKE_URI_DISPATCH["abfs"] = MAKE_URI_DISPATCH["az"]
Expand Down Expand Up @@ -109,6 +119,7 @@ class FilesystemConfiguration(BaseConfiguration):
* az, abfs, adl, abfss, azure
* file, memory
* gdrive
* sftp
"""

PROTOCOL_CREDENTIALS: ClassVar[Dict[str, Any]] = {
Expand All @@ -121,6 +132,7 @@ class FilesystemConfiguration(BaseConfiguration):
"adl": AnyAzureCredentials,
"abfss": AnyAzureCredentials,
"azure": AnyAzureCredentials,
"sftp": SFTPCredentials,
}

bucket_url: str = None
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
GcpCredentials,
AwsCredentials,
AzureCredentials,
SFTPCredentials,
)
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages.configuration import (
Expand Down Expand Up @@ -64,6 +65,7 @@ class FileItem(TypedDict, total=False):
"file": lambda f: ensure_pendulum_datetime(f["mtime"]),
"memory": lambda f: ensure_pendulum_datetime(f["created"]),
"gdrive": lambda f: ensure_pendulum_datetime(f["modifiedTime"]),
"sftp": lambda f: ensure_pendulum_datetime(f["mtime"]),
}
# Support aliases
MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"]
Expand All @@ -77,6 +79,7 @@ class FileItem(TypedDict, total=False):
"az": lambda config: cast(AzureCredentials, config.credentials).to_adlfs_credentials(),
"gs": lambda config: cast(GcpCredentials, config.credentials).to_gcs_credentials(),
"gdrive": lambda config: {"credentials": cast(GcpCredentials, config.credentials)},
"sftp": lambda config: cast(SFTPCredentials, config.credentials).to_fsspec_credentials(),
}
CREDENTIALS_DISPATCH["adl"] = CREDENTIALS_DISPATCH["az"]
CREDENTIALS_DISPATCH["abfs"] = CREDENTIALS_DISPATCH["az"]
Expand Down Expand Up @@ -136,6 +139,10 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny:
register_implementation("gdrive", GoogleDriveFileSystem, "GoogleDriveFileSystem")

fs_kwargs.update(DEFAULT_KWARGS.get(protocol, {}))

if protocol == "sftp":
fs_kwargs.clear()

if config.kwargs is not None:
fs_kwargs.update(config.kwargs)
if config.client_kwargs is not None:
Expand All @@ -155,6 +162,7 @@ def fsspec_from_config(config: FilesystemConfiguration) -> Tuple[AbstractFileSys
* s3
* az, abfs, abfss, adl, azure
* gcs, gs
* sftp

All other filesystems are not authenticated

Expand Down
92 changes: 92 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,98 @@ bucket_url='\\?\UNC\localhost\c$\a\b\c'
```
:::

### SFTP
Run `pip install "dlt[sftp]` which will install the `paramiko` package alongside `dlt`, enabling secure SFTP transfers.

Configure your SFTP credentials by editing the `.dlt/secrets.toml` file. By default, the file contains placeholders for AWS credentials. You should replace these with your SFTP credentials.

Below are the possible fields for SFTP credentials configuration:

```text
sftp_port # The port for SFTP, defaults to 22 (standard for SSH/SFTP)
sftp_username # Your SFTP username, defaults to None
sftp_password # Your SFTP password (if using password-based auth), defaults to None
sftp_key_filename # Path to your private key file for key-based authentication, defaults to None
sftp_key_passphrase # Passphrase for your private key (if applicable), defaults to None
sftp_timeout # Timeout for establishing a connection, defaults to None
sftp_banner_timeout # Timeout for receiving the banner during authentication, defaults to None
sftp_auth_timeout # Authentication timeout, defaults to None
sftp_channel_timeout # Channel timeout for SFTP operations, defaults to None
sftp_allow_agent # Use SSH agent for key management (if available), defaults to True
sftp_look_for_keys # Search for SSH keys in the default SSH directory (~/.ssh/), defaults to True
sftp_compress # Enable compression (can improve performance over slow networks), defaults to False
sftp_gss_auth # Use GSS-API for authentication, defaults to False
sftp_gss_kex # Use GSS-API for key exchange, defaults to False
sftp_gss_deleg_creds # Delegate credentials with GSS-API, defaults to True
sftp_gss_host # Host for GSS-API, defaults to None
sftp_gss_trust_dns # Trust DNS for GSS-API, defaults to True
```
> For more information about credentials parameters: https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect

### Authentication Methods

SFTP authentication is attempted in the following order of priority:

1. **Key-based authentication**: If you provide a `key_filename` containing the path to a private key or a corresponding OpenSSH public certificate (e.g., `id_rsa` and `id_rsa-cert.pub`), these will be used for authentication. If the private key requires a passphrase, you can specify it via `sftp_key_passphrase`. If your private key requires a passphrase to unlock, and you’ve provided one, it will be used to attempt to unlock the key.

2. **SSH Agent-based authentication**: If `allow_agent=True` (default), Paramiko will look for any SSH keys stored in your local SSH agent (such as `id_rsa`, `id_dsa`, or `id_ecdsa` keys stored in `~/.ssh/`).

3. **Username/Password authentication**: If a password is provided (`sftp_password`), plain username/password authentication will be attempted.

4. **GSS-API authentication**: If GSS-API (Kerberos) is enabled (sftp_gss_auth=True), authentication will use the Kerberos protocol. GSS-API may also be used for key exchange (sftp_gss_kex=True) and credential delegation (sftp_gss_deleg_creds=True). This method is useful in environments where Kerberos is set up, often in enterprise networks.


#### 1. **Key-based Authentication**

If you use an SSH key instead of a password, you can specify the path to your private key in the configuration.

```toml
[destination.filesystem]
bucket_url = "sftp://[hostname]/[path]"
file_glob = "*"

[destination.filesystem.credentials]
sftp_username = "foo"
sftp_key_filename = "/path/to/id_rsa" # Replace with the path to your private key file
sftp_key_passphrase = "your_passphrase" # Optional: passphrase for your private key
```

#### 2. **SSH Agent-based Authentication**

If you have an SSH agent running with loaded keys, you can allow Paramiko to use these keys automatically. You can omit the password and key fields if you're relying on the SSH agent.

```toml
[destination.filesystem]
bucket_url = "sftp://[hostname]/[path]"
file_glob = "*"

[destination.filesystem.credentials]
sftp_username = "foo"
sftp_key_passphrase = "your_passphrase" # Optional: passphrase for your private key
```
The loaded key must be one of the following types stored in ~/.ssh/: id_rsa, id_dsa, or id_ecdsa.

#### 3. **Username/Password Authentication**

This is the simplest form of authentication, where you supply a username and password directly.

```toml
[destination.filesystem]
bucket_url = "sftp://[hostname]/[path]" # The hostname of your SFTP server and the remote path
file_glob = "*" # Pattern to match the files you want to upload/download

[destination.filesystem.credentials]
sftp_username = "foo" # Replace "foo" with your SFTP username
sftp_password = "pass" # Replace "pass" with your SFTP password
```


### Notes:
- **Key-based Authentication**: Make sure your private key has the correct permissions (`chmod 600`), or SSH will refuse to use it.
- **Timeouts**: It's important to adjust timeout values based on your network conditions to avoid connection issues.

This configuration allows flexible SFTP authentication, whether you're using passwords, keys, or agents, and ensures secure communication between your local environment and the SFTP server.

## Write disposition
The filesystem destination handles the write dispositions as follows:
- `append` - files belonging to such tables are added to the dataset folder
Expand Down
Loading
Loading