Skip to content

Commit

Permalink
Merge branch 'devel' into feat/2134-snowflake-hints
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Dec 16, 2024
2 parents 07206d7 + b8bac75 commit e912c51
Show file tree
Hide file tree
Showing 95 changed files with 2,164 additions and 653 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ jobs:
# key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-redshift

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline,ibis -E deltalake
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline,ibis -E deltalake -E pyiceberg

- name: enable certificates for azure and duckdb
run: sudo mkdir -p /etc/pki/tls/certs && sudo ln -s /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline,ibis -E deltalake
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline,ibis -E deltalake -E pyiceberg

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d
Expand Down
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ has-poetry:
poetry --version

dev: has-poetry
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk,airflow
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk

lint:
./tools/check-package.sh
Expand All @@ -63,7 +63,6 @@ format:
lint-snippets:
cd docs/tools && poetry run python check_embedded_snippets.py full


lint-and-test-snippets: lint-snippets
poetry run mypy --config-file mypy.ini docs/website docs/tools --exclude docs/tools/lint_setup --exclude docs/website/docs_processed
poetry run flake8 --max-line-length=200 docs/website docs/tools --exclude docs/website/.dlt-repo
Expand All @@ -82,7 +81,7 @@ lint-security:
poetry run bandit -r dlt/ -n 3 -l

test:
(set -a && . tests/.env && poetry run pytest tests)
poetry run pytest tests

test-load-local:
DESTINATION__POSTGRES__CREDENTIALS=postgresql://loader:loader@localhost:5432/dlt_data DESTINATION__DUCKDB__CREDENTIALS=duckdb:///_storage/test_quack.duckdb poetry run pytest tests -k '(postgres or duckdb)'
Expand Down
2 changes: 0 additions & 2 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
)
from dlt.pipeline import progress
from dlt import destinations
from dlt.destinations.dataset import dataset as _dataset

pipeline = _pipeline
current = _current
Expand Down Expand Up @@ -80,7 +79,6 @@
"TCredentials",
"sources",
"destinations",
"_dataset",
]

# verify that no injection context was created
Expand Down
3 changes: 1 addition & 2 deletions dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ def find_call_arguments_to_replace(
if not isinstance(dn_node, ast.Constant) or not isinstance(dn_node.value, str):
raise CliCommandInnerException(
"init",
f"The pipeline script {init_script_name} must pass the {t_arg_name} as"
f" string to '{arg_name}' function in line {dn_node.lineno}",
f"The pipeline script {init_script_name} must pass the {t_arg_name} as string to '{arg_name}' function in line {dn_node.lineno}", # type: ignore[attr-defined]
)
else:
transformed_nodes.append((dn_node, ast.Constant(value=t_value, kind=None)))
Expand Down
15 changes: 14 additions & 1 deletion dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CredentialsWithDefault,
configspec,
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt.common.configuration.specs.exceptions import (
InvalidBoto3Session,
ObjectStoreRsCredentialsException,
Expand All @@ -16,7 +17,9 @@


@configspec
class AwsCredentialsWithoutDefaults(CredentialsConfiguration):
class AwsCredentialsWithoutDefaults(
CredentialsConfiguration, WithObjectStoreRsCredentials, WithPyicebergConfig
):
# credentials without boto implementation
aws_access_key_id: str = None
aws_secret_access_key: TSecretStrValue = None
Expand Down Expand Up @@ -77,6 +80,16 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:

return creds

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"s3.access-key-id": self.aws_access_key_id,
"s3.secret-access-key": self.aws_secret_access_key,
"s3.session-token": self.aws_session_token,
"s3.region": self.region_name,
"s3.endpoint": self.endpoint_url,
"s3.connect-timeout": 300,
}


@configspec
class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
22 changes: 19 additions & 3 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
CredentialsWithDefault,
configspec,
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt import version
from dlt.common.utils import without_none

_AZURE_STORAGE_EXTRA = f"{version.DLT_PKG_NAME}[az]"


@configspec
class AzureCredentialsBase(CredentialsConfiguration):
class AzureCredentialsBase(CredentialsConfiguration, WithObjectStoreRsCredentials):
azure_storage_account_name: str = None
azure_account_host: Optional[str] = None
"""Alternative host when accessing blob storage endpoint ie. my_account.dfs.core.windows.net"""
Expand All @@ -32,7 +33,7 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:


@configspec
class AzureCredentialsWithoutDefaults(AzureCredentialsBase):
class AzureCredentialsWithoutDefaults(AzureCredentialsBase, WithPyicebergConfig):
"""Credentials for Azure Blob Storage, compatible with adlfs"""

azure_storage_account_key: Optional[TSecretStrValue] = None
Expand All @@ -49,6 +50,13 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
account_host=self.azure_account_host,
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"adlfs.account-name": self.azure_storage_account_name,
"adlfs.account-key": self.azure_storage_account_key,
"adlfs.sas-token": self.azure_storage_sas_token,
}

def create_sas_token(self) -> None:
try:
from azure.storage.blob import generate_account_sas, ResourceTypes
Expand All @@ -72,7 +80,7 @@ def on_partial(self) -> None:


@configspec
class AzureServicePrincipalCredentialsWithoutDefaults(AzureCredentialsBase):
class AzureServicePrincipalCredentialsWithoutDefaults(AzureCredentialsBase, WithPyicebergConfig):
azure_tenant_id: str = None
azure_client_id: str = None
azure_client_secret: TSecretStrValue = None
Expand All @@ -86,6 +94,14 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
client_secret=self.azure_client_secret,
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"adlfs.account-name": self.azure_storage_account_name,
"adlfs.tenant-id": self.azure_tenant_id,
"adlfs.client-id": self.azure_client_id,
"adlfs.client-secret": self.azure_client_secret,
}


@configspec
class AzureCredentials(AzureCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def _get_resolvable_dataclass_fields(cls) -> Iterator[TDtcField]:
def get_resolvable_fields(cls) -> Dict[str, type]:
"""Returns a mapping of fields to their type hints. Dunders should not be resolved and are not returned"""
return {
f.name: eval(f.type) if isinstance(f.type, str) else f.type # type: ignore[arg-type]
f.name: eval(f.type) if isinstance(f.type, str) else f.type
for f in cls._get_resolvable_dataclass_fields()
}

Expand Down
7 changes: 1 addition & 6 deletions dlt/common/configuration/specs/config_providers_context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import contextlib
import dataclasses
import io
from typing import ClassVar, List

Expand All @@ -8,10 +7,6 @@
ConfigProvider,
ContextProvider,
)
from dlt.common.configuration.specs.base_configuration import (
ContainerInjectableContext,
NotResolved,
)
from dlt.common.configuration.specs import (
GcpServiceAccountCredentials,
BaseConfiguration,
Expand Down Expand Up @@ -137,7 +132,7 @@ def _airflow_providers() -> List[ConfigProvider]:
# check if we are in task context and provide more info
from airflow.operators.python import get_current_context # noqa

ti: TaskInstance = get_current_context()["ti"] # type: ignore
ti: TaskInstance = get_current_context()["ti"] # type: ignore[assignment,unused-ignore]

# log outside of stderr/out redirect
if secrets_toml_var is None:
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/configuration/specs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ def __init__(self, spec: Type[Any], native_value: Any):

class ObjectStoreRsCredentialsException(ConfigurationException):
pass


class UnsupportedAuthenticationMethodException(ConfigurationException):
pass
36 changes: 32 additions & 4 deletions dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
InvalidGoogleServicesJson,
NativeValueError,
OAuth2ScopesRequired,
UnsupportedAuthenticationMethodException,
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import DictStrAny, TSecretStrValue, StrAny
from dlt.common.configuration.specs.base_configuration import (
Expand All @@ -23,7 +25,7 @@


@configspec
class GcpCredentials(CredentialsConfiguration):
class GcpCredentials(CredentialsConfiguration, WithObjectStoreRsCredentials, WithPyicebergConfig):
token_uri: Final[str] = dataclasses.field(
default="https://oauth2.googleapis.com/token", init=False, repr=False, compare=False
)
Expand Down Expand Up @@ -126,6 +128,12 @@ def to_native_credentials(self) -> Any:
else:
return ServiceAccountCredentials.from_service_account_info(self)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
raise UnsupportedAuthenticationMethodException(
"Service Account authentication not supported with `iceberg` table format. Use OAuth"
" authentication instead."
)

def __str__(self) -> str:
return f"{self.client_email}@{self.project_id}"

Expand Down Expand Up @@ -176,11 +184,19 @@ def to_native_representation(self) -> str:
return json.dumps(self._info_dict())

def to_object_store_rs_credentials(self) -> Dict[str, str]:
raise NotImplementedError(
"`object_store` Rust crate does not support OAuth for GCP credentials. Reference:"
" https://docs.rs/object_store/latest/object_store/gcp."
raise UnsupportedAuthenticationMethodException(
"OAuth authentication not supported with `delta` table format. Use Service Account or"
" Application Default Credentials authentication instead."
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
self.auth()
return {
"gcs.project-id": self.project_id,
"gcs.oauth2.token": self.token,
"gcs.oauth2.token-expires-at": (pendulum.now().timestamp() + 60) * 1000,
}

def auth(self, scopes: Union[str, List[str]] = None, redirect_url: str = None) -> None:
if not self.refresh_token:
self.add_scopes(scopes)
Expand Down Expand Up @@ -313,6 +329,12 @@ def to_native_credentials(self) -> Any:
else:
return super().to_native_credentials()

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
raise UnsupportedAuthenticationMethodException(
"Application Default Credentials authentication not supported with `iceberg` table"
" format. Use OAuth authentication instead."
)


@configspec
class GcpServiceAccountCredentials(
Expand All @@ -334,3 +356,9 @@ def parse_native_representation(self, native_value: Any) -> None:
except NativeValueError:
pass
GcpOAuthCredentialsWithoutDefaults.parse_native_representation(self, native_value)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
if self.has_default_credentials():
return GcpDefaultCredentials.to_pyiceberg_fileio_config(self)
else:
return GcpOAuthCredentialsWithoutDefaults.to_pyiceberg_fileio_config(self)
24 changes: 24 additions & 0 deletions dlt/common/configuration/specs/mixins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Dict, Any
from abc import abstractmethod, ABC


class WithObjectStoreRsCredentials(ABC):
@abstractmethod
def to_object_store_rs_credentials(self) -> Dict[str, Any]:
"""Returns credentials dictionary for object_store Rust crate.
Can be used for libraries that build on top of the object_store crate, such as `deltalake`.
https://docs.rs/object_store/latest/object_store/
"""
pass


class WithPyicebergConfig(ABC):
@abstractmethod
def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
"""Returns `pyiceberg` FileIO configuration dictionary.
https://py.iceberg.apache.org/configuration/#fileio
"""
pass
2 changes: 1 addition & 1 deletion dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def _flush_items(self, allow_empty_file: bool = False) -> None:
if self.writer_spec.is_binary_format:
self._file = self.open(self._file_name, "wb") # type: ignore
else:
self._file = self.open(self._file_name, "wt", encoding="utf-8", newline="") # type: ignore
self._file = self.open(self._file_name, "wt", encoding="utf-8", newline="")
self._writer = self.writer_cls(self._file, caps=self._caps) # type: ignore[assignment]
self._writer.write_header(self._current_columns)
# write buffer
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,10 @@ def __getattr__(self, table: str) -> SupportsReadableRelation: ...

def ibis(self) -> IbisBackend: ...

def row_counts(
self, *, data_tables: bool = True, dlt_tables: bool = False, table_names: List[str] = None
) -> SupportsReadableRelation: ...


class JobClientBase(ABC):
def __init__(
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/destination/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def verify_schema_capabilities(
exception_log: List[Exception] = []
# combined casing function
case_identifier = lambda ident: capabilities.casefold_identifier(
(str if capabilities.has_case_sensitive_identifiers else str.casefold)(ident) # type: ignore
(str if capabilities.has_case_sensitive_identifiers else str.casefold)(ident)
)
table_name_lookup: DictStrStr = {}
# name collision explanation
Expand Down
6 changes: 3 additions & 3 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages import FilesystemConfiguration
from dlt.common.utils import assert_min_pkg_version
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient

try:
Expand Down Expand Up @@ -191,10 +192,9 @@ def get_delta_tables(

def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str]:
"""Returns dict that can be passed as `storage_options` in `deltalake` library."""
creds = {} # type: ignore
creds = {}
extra_options = {}
# TODO: create a mixin with to_object_store_rs_credentials for a proper discovery
if hasattr(config.credentials, "to_object_store_rs_credentials"):
if isinstance(config.credentials, WithObjectStoreRsCredentials):
creds = config.credentials.to_object_store_rs_credentials()
if config.deltalake_storage_options is not None:
extra_options = config.deltalake_storage_options
Expand Down
Loading

0 comments on commit e912c51

Please sign in to comment.