Skip to content

Commit

Permalink
Merge branch 'devel' into reorder_sidebar
Browse files Browse the repository at this point in the history
  • Loading branch information
burnash authored Sep 14, 2024
2 parents 71d9b96 + 4e45ea4 commit 942e659
Show file tree
Hide file tree
Showing 81 changed files with 4,185 additions and 984 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ env:
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"
# note that all buckets are enabled for testing
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\", \"r2\", \"s3\", \"gs\", \"az\", \"abfss\", \"gdrive\"]" #excludes sftp

jobs:
get_docs_changes:
Expand Down
28 changes: 24 additions & 4 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ env:
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\", \"qdrant\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\", \"sftp\"]"

DESTINATION__WEAVIATE__VECTORIZER: text2vec-contextionary
DESTINATION__WEAVIATE__MODULE_CONFIG: "{\"text2vec-contextionary\": {\"vectorizeClassName\": false, \"vectorizePropertyName\": true}}"
Expand Down Expand Up @@ -95,15 +95,35 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant --with sentry-sdk --with pipeline -E deltalake
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d

- name: Configure SSH Agent for sftp tests
run: |
mkdir -p /home/runner/.ssh
cp tests/load/filesystem_sftp/bootstrap/bobby_rsa /home/runner/.ssh/id_rsa
cp tests/load/filesystem_sftp/bootstrap/bobby_rsa.pub /home/runner/.ssh/id_rsa.pub
# always run full suite, also on branches
- run: poetry run pytest tests/load --ignore tests/load/sources && poetry run pytest tests/cli
name: Run tests Linux
- name: Run tests Linux
run: |
eval "$(ssh-agent -s)"
poetry run pytest tests/load --ignore tests/load/sources
poetry run pytest tests/cli
env:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data
DESTINATION__QDRANT__CREDENTIALS__location: http://localhost:6333
DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PORT: 2222
DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_USERNAME: foo
DESTINATION__FILESYSTEM__CREDENTIALS__SFTP_PASSWORD: pass


- name: Stop weaviate
if: always()
run: docker compose -f ".github/weaviate-compose.yml" down -v

- name: Stop SFTP server
if: always()
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" down -v
8 changes: 5 additions & 3 deletions .github/workflows/test_pyarrow17.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"filesystem\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\", \"r2\", \"s3\", \"gs\", \"az\", \"abfss\", \"gdrive\"]" #excludes sftp

jobs:
get_docs_changes:
Expand Down Expand Up @@ -72,6 +73,7 @@ jobs:
- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/libs tests/load -m needspyarrow17
name: Run needspyarrow17 tests Linux
- name: Run needspyarrow17 tests Linux
run: |
poetry run pytest tests/libs -m "needspyarrow17"
poetry run pytest tests/load -m "needspyarrow17"
99 changes: 99 additions & 0 deletions .github/workflows/test_sqlalchemy_destinations.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Tests destinations that can run without credentials.
# i.e. local postgres, duckdb, filesystem (with local fs/memory bucket)

name: dest | sqlalchemy mysql and sqlite

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
# NOTE: this workflow can't use github secrets!
# DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"sqlalchemy\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml

run_loader:
name: dest | sqlalchemy mysql and sqlite
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
# Run on sqlalchemy 1.4 and 2.0
matrix:
sqlalchemy: [1.4, 2]
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

# Service containers to run with `container-job`
services:
# Label used to access the service container
mysql:
image: mysql:8
env:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: dlt_data
MYSQL_USER: loader
MYSQL_PASSWORD: loader
ports:
- 3306:3306
# Wait for the service to be ready before completing the job
options: >-
--health-cmd="mysqladmin ping -h localhost -u root -proot"
--health-interval=10s
--health-timeout=5s
--health-retries=5
steps:
- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E parquet -E filesystem -E sqlalchemy -E cli --with sentry-sdk --with pipeline && poetry run pip install mysqlclient && poetry run pip install "sqlalchemy==${{ matrix.sqlalchemy }}"

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

# always run full suite, also on branches
- run: poetry run pytest tests/load -x --ignore tests/load/sources
name: Run tests Linux
env:
DESTINATION__SQLALCHEMY_MYSQL__CREDENTIALS: mysql://root:[email protected]:3306/dlt_data # Use root cause we need to create databases
DESTINATION__SQLALCHEMY_SQLITE__CREDENTIALS: sqlite:///_storage/dl_data.sqlite
2 changes: 2 additions & 0 deletions dlt/common/configuration/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
AnyAzureCredentials,
)

from .sftp_crendentials import SFTPCredentials

# backward compatibility for service account credentials
from .gcp_credentials import (
Expand Down Expand Up @@ -62,4 +63,5 @@
"AnyAzureCredentials",
"GcpClientCredentials",
"GcpClientCredentialsWithDefault",
"SFTPCredentials",
]
69 changes: 69 additions & 0 deletions dlt/common/configuration/specs/sftp_crendentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Any, Dict, Optional

from dlt.common.typing import TSecretStrValue, DictStrAny
from dlt.common.configuration.specs.base_configuration import CredentialsConfiguration, configspec


@configspec
class SFTPCredentials(CredentialsConfiguration):
"""Credentials for SFTP filesystem, compatible with fsspec SFTP protocol.
Authentication is attempted in the following order of priority:
- `key_filename` may contain OpenSSH public certificate paths
as well as regular private-key paths; when files ending in `-cert.pub` are found, they are assumed to match
a private key, and both components will be loaded.
- Any key found through an SSH agent: any “id_rsa”, “id_dsa”, or “id_ecdsa” key discoverable in ~/.ssh/.
- Plain username/password authentication, if a password was provided.
- If a private key requires a password to unlock it, and a password is provided, that password will be used to
attempt to unlock the key.
For more information about parameters:
https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect
"""

sftp_port: Optional[int] = 22
sftp_username: Optional[str] = None
sftp_password: Optional[TSecretStrValue] = None
sftp_key_filename: Optional[str] = None
sftp_key_passphrase: Optional[TSecretStrValue] = None
sftp_timeout: Optional[float] = None
sftp_banner_timeout: Optional[float] = None
sftp_auth_timeout: Optional[float] = None
sftp_channel_timeout: Optional[float] = None
sftp_allow_agent: Optional[bool] = True
sftp_look_for_keys: Optional[bool] = True
sftp_compress: Optional[bool] = False
sftp_gss_auth: Optional[bool] = False
sftp_gss_kex: Optional[bool] = False
sftp_gss_deleg_creds: Optional[bool] = True
sftp_gss_host: Optional[str] = None
sftp_gss_trust_dns: Optional[bool] = True

def to_fsspec_credentials(self) -> Dict[str, Any]:
"""Return a dict that can be passed to fsspec SFTP/SSHClient.connect method."""

credentials: Dict[str, Any] = {
"port": self.sftp_port,
"username": self.sftp_username,
"password": self.sftp_password,
"key_filename": self.sftp_key_filename,
"passphrase": self.sftp_key_passphrase,
"timeout": self.sftp_timeout,
"banner_timeout": self.sftp_banner_timeout,
"auth_timeout": self.sftp_auth_timeout,
"channel_timeout": self.sftp_channel_timeout,
"allow_agent": self.sftp_allow_agent,
"look_for_keys": self.sftp_look_for_keys,
"compress": self.sftp_compress,
"gss_auth": self.sftp_gss_auth,
"gss_kex": self.sftp_gss_kex,
"gss_deleg_creds": self.sftp_gss_deleg_creds,
"gss_host": self.sftp_gss_host,
"gss_trust_dns": self.sftp_gss_trust_dns,
}

return credentials
10 changes: 8 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None
"""The destination can override the parallelism strategy"""

max_query_parameters: Optional[int] = None
"""The maximum number of parameters that can be supplied in a single parametrized query"""

supports_native_boolean: bool = True
"""The destination supports a native boolean type, otherwise bool columns are usually stored as integers"""

def generates_case_sensitive_identifiers(self) -> bool:
"""Tells if capabilities as currently adjusted, will generate case sensitive identifiers"""
# must have case sensitive support and folding function must preserve casing
Expand Down Expand Up @@ -220,8 +226,8 @@ def generic_capabilities(
caps.merge_strategies_selector = merge_strategies_selector
return caps

def get_type_mapper(self) -> DataTypeMapper:
return self.type_mapper(self)
def get_type_mapper(self, *args: Any, **kwargs: Any) -> DataTypeMapper:
return self.type_mapper(self, *args, **kwargs)


def merge_caps_file_formats(
Expand Down
16 changes: 15 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ def from_normalized_mapping(
schema=normalized_doc[naming_convention.normalize_identifier("schema")],
)

def to_normalized_mapping(self, naming_convention: NamingConvention) -> Dict[str, Any]:
"""Convert this instance to mapping where keys are normalized according to given naming convention
Args:
naming_convention: Naming convention that should be used to normalize keys
Returns:
Dict[str, Any]: Mapping with normalized keys (e.g. {Version: ..., SchemaName: ...})
"""
return {
naming_convention.normalize_identifier(key): value
for key, value in self._asdict().items()
}


@dataclasses.dataclass
class StateInfo:
Expand Down Expand Up @@ -439,7 +453,7 @@ def __init__(
self.capabilities = capabilities

@abstractmethod
def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
def initialize_storage(self, truncate_tables: Optional[Iterable[str]] = None) -> None:
"""Prepares storage to be used ie. creates database schema or file system folder. Truncates requested tables."""
pass

Expand Down
Loading

0 comments on commit 942e659

Please sign in to comment.