Skip to content

Commit

Permalink
iceberg table format support for filesystem destination (#2067)
Browse files Browse the repository at this point in the history
* add pyiceberg dependency and upgrade mypy

- mypy upgrade needed to solve this issue: apache/iceberg-python#768
- uses <1.13.0 requirement on mypy because 1.13.0 gives error
- new lint errors arising due to version upgrade are simply ignored

* extend pyiceberg dependencies

* remove redundant delta annotation

* add basic local filesystem iceberg support

* add active table format setting

* disable merge tests for iceberg table format

* restore non-redundant extra info

* refactor to in-memory iceberg catalog

* add s3 support for iceberg table format

* add schema evolution support for iceberg table format

* extract _register_table function

* add partition support for iceberg table format

* update docstring

* enable child table test for iceberg table format

* enable empty source test for iceberg table format

* make iceberg catalog namespace configurable and default to dataset name

* add optional typing

* fix typo

* improve typing

* extract logic into dedicated function

* add iceberg read support to filesystem sql client

* remove unused import

* add todo

* extract logic into separate functions

* add azure support for iceberg table format

* generalize delta table format tests

* enable get tables function test for iceberg table format

* remove ignores

* undo table directory management change

* enable test_read_interfaces tests for iceberg

* fix active table format filter

* use mixin for object store rs credentials

* generalize catalog typing

* extract pyiceberg scheme mapping into separate function

* generalize credentials mixin test setup

* remove unused import

* add centralized fallback to append when merge is not supported

* Revert "add centralized fallback to append when merge is not supported"

This reverts commit 54cd0bc.

* fall back to append if merge is not supported on filesystem

* fix test for s3-compatible storage

* remove obsolete code path

* exclude gcs read interface tests for iceberg

* add gcs support for iceberg table format

* switch to UnsupportedAuthenticationMethodException

* add iceberg table format docs

* use shorter pipeline name to prevent too long sql identifiers

* add iceberg catalog note to docs

* black format

* use shorter pipeline name to prevent too long sql identifiers

* correct max id length for sqlalchemy mysql dialect

* Revert "use shorter pipeline name to prevent too long sql identifiers"

This reverts commit 6cce03b.

* Revert "use shorter pipeline name to prevent too long sql identifiers"

This reverts commit ef29aa7.

* replace show with execute to prevent useless print output

* add abfss scheme to test

* remove az support for iceberg table format

* remove iceberg bucket test exclusion

* add note to docs on azure scheme support for iceberg table format

* exclude iceberg from duckdb s3-compatibility test

* disable pyiceberg info logs for tests

* extend table format docs and move into own page

* upgrade adlfs to enable account_host attribute

* Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996-iceberg-filesystem

* fix lint errors

* re-add pyiceberg dependency

* enabled iceberg in dbt-duckdb

* upgrade pyiceberg version

* remove pyiceberg mypy errors across python version

* does not install airflow group for dev

* fixes gcp oauth iceberg credentials handling

* fixes ca cert bundle duckdb azure on ci

* allow for airflow dep to be present during type check

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
2 people authored and donotpush committed Dec 11, 2024
1 parent 5ec7386 commit 2d55e4a
Show file tree
Hide file tree
Showing 45 changed files with 1,163 additions and 422 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ jobs:
# key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-redshift

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline,ibis -E deltalake
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline,ibis -E deltalake -E pyiceberg

- name: enable certificates for azure and duckdb
run: sudo mkdir -p /etc/pki/tls/certs && sudo ln -s /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline,ibis -E deltalake
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline,ibis -E deltalake -E pyiceberg

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`

- name: Start SFTP server
run: docker compose -f "tests/load/filesystem_sftp/docker-compose.yml" up -d
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ has-poetry:
poetry --version

dev: has-poetry
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk,airflow
poetry install --all-extras --with docs,providers,pipeline,sources,sentry-sdk

lint:
./tools/check-package.sh
Expand Down
3 changes: 1 addition & 2 deletions dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ def find_call_arguments_to_replace(
if not isinstance(dn_node, ast.Constant) or not isinstance(dn_node.value, str):
raise CliCommandInnerException(
"init",
f"The pipeline script {init_script_name} must pass the {t_arg_name} as"
f" string to '{arg_name}' function in line {dn_node.lineno}",
f"The pipeline script {init_script_name} must pass the {t_arg_name} as string to '{arg_name}' function in line {dn_node.lineno}", # type: ignore[attr-defined]
)
else:
transformed_nodes.append((dn_node, ast.Constant(value=t_value, kind=None)))
Expand Down
15 changes: 14 additions & 1 deletion dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CredentialsWithDefault,
configspec,
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt.common.configuration.specs.exceptions import (
InvalidBoto3Session,
ObjectStoreRsCredentialsException,
Expand All @@ -16,7 +17,9 @@


@configspec
class AwsCredentialsWithoutDefaults(CredentialsConfiguration):
class AwsCredentialsWithoutDefaults(
CredentialsConfiguration, WithObjectStoreRsCredentials, WithPyicebergConfig
):
# credentials without boto implementation
aws_access_key_id: str = None
aws_secret_access_key: TSecretStrValue = None
Expand Down Expand Up @@ -77,6 +80,16 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:

return creds

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"s3.access-key-id": self.aws_access_key_id,
"s3.secret-access-key": self.aws_secret_access_key,
"s3.session-token": self.aws_session_token,
"s3.region": self.region_name,
"s3.endpoint": self.endpoint_url,
"s3.connect-timeout": 300,
}


@configspec
class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
22 changes: 19 additions & 3 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
CredentialsWithDefault,
configspec,
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt import version
from dlt.common.utils import without_none

_AZURE_STORAGE_EXTRA = f"{version.DLT_PKG_NAME}[az]"


@configspec
class AzureCredentialsBase(CredentialsConfiguration):
class AzureCredentialsBase(CredentialsConfiguration, WithObjectStoreRsCredentials):
azure_storage_account_name: str = None
azure_account_host: Optional[str] = None
"""Alternative host when accessing blob storage endpoint ie. my_account.dfs.core.windows.net"""
Expand All @@ -32,7 +33,7 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:


@configspec
class AzureCredentialsWithoutDefaults(AzureCredentialsBase):
class AzureCredentialsWithoutDefaults(AzureCredentialsBase, WithPyicebergConfig):
"""Credentials for Azure Blob Storage, compatible with adlfs"""

azure_storage_account_key: Optional[TSecretStrValue] = None
Expand All @@ -49,6 +50,13 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
account_host=self.azure_account_host,
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"adlfs.account-name": self.azure_storage_account_name,
"adlfs.account-key": self.azure_storage_account_key,
"adlfs.sas-token": self.azure_storage_sas_token,
}

def create_sas_token(self) -> None:
try:
from azure.storage.blob import generate_account_sas, ResourceTypes
Expand All @@ -72,7 +80,7 @@ def on_partial(self) -> None:


@configspec
class AzureServicePrincipalCredentialsWithoutDefaults(AzureCredentialsBase):
class AzureServicePrincipalCredentialsWithoutDefaults(AzureCredentialsBase, WithPyicebergConfig):
azure_tenant_id: str = None
azure_client_id: str = None
azure_client_secret: TSecretStrValue = None
Expand All @@ -86,6 +94,14 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
client_secret=self.azure_client_secret,
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
return {
"adlfs.account-name": self.azure_storage_account_name,
"adlfs.tenant-id": self.azure_tenant_id,
"adlfs.client-id": self.azure_client_id,
"adlfs.client-secret": self.azure_client_secret,
}


@configspec
class AzureCredentials(AzureCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def _get_resolvable_dataclass_fields(cls) -> Iterator[TDtcField]:
def get_resolvable_fields(cls) -> Dict[str, type]:
"""Returns a mapping of fields to their type hints. Dunders should not be resolved and are not returned"""
return {
f.name: eval(f.type) if isinstance(f.type, str) else f.type # type: ignore[arg-type]
f.name: eval(f.type) if isinstance(f.type, str) else f.type
for f in cls._get_resolvable_dataclass_fields()
}

Expand Down
7 changes: 1 addition & 6 deletions dlt/common/configuration/specs/config_providers_context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import contextlib
import dataclasses
import io
from typing import ClassVar, List

Expand All @@ -8,10 +7,6 @@
ConfigProvider,
ContextProvider,
)
from dlt.common.configuration.specs.base_configuration import (
ContainerInjectableContext,
NotResolved,
)
from dlt.common.configuration.specs import (
GcpServiceAccountCredentials,
BaseConfiguration,
Expand Down Expand Up @@ -137,7 +132,7 @@ def _airflow_providers() -> List[ConfigProvider]:
# check if we are in task context and provide more info
from airflow.operators.python import get_current_context # noqa

ti: TaskInstance = get_current_context()["ti"] # type: ignore
ti: TaskInstance = get_current_context()["ti"] # type: ignore[assignment,unused-ignore]

# log outside of stderr/out redirect
if secrets_toml_var is None:
Expand Down
4 changes: 4 additions & 0 deletions dlt/common/configuration/specs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ def __init__(self, spec: Type[Any], native_value: Any):

class ObjectStoreRsCredentialsException(ConfigurationException):
pass


class UnsupportedAuthenticationMethodException(ConfigurationException):
pass
36 changes: 32 additions & 4 deletions dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
InvalidGoogleServicesJson,
NativeValueError,
OAuth2ScopesRequired,
UnsupportedAuthenticationMethodException,
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import DictStrAny, TSecretStrValue, StrAny
from dlt.common.configuration.specs.base_configuration import (
Expand All @@ -23,7 +25,7 @@


@configspec
class GcpCredentials(CredentialsConfiguration):
class GcpCredentials(CredentialsConfiguration, WithObjectStoreRsCredentials, WithPyicebergConfig):
token_uri: Final[str] = dataclasses.field(
default="https://oauth2.googleapis.com/token", init=False, repr=False, compare=False
)
Expand Down Expand Up @@ -126,6 +128,12 @@ def to_native_credentials(self) -> Any:
else:
return ServiceAccountCredentials.from_service_account_info(self)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
raise UnsupportedAuthenticationMethodException(
"Service Account authentication not supported with `iceberg` table format. Use OAuth"
" authentication instead."
)

def __str__(self) -> str:
return f"{self.client_email}@{self.project_id}"

Expand Down Expand Up @@ -176,11 +184,19 @@ def to_native_representation(self) -> str:
return json.dumps(self._info_dict())

def to_object_store_rs_credentials(self) -> Dict[str, str]:
raise NotImplementedError(
"`object_store` Rust crate does not support OAuth for GCP credentials. Reference:"
" https://docs.rs/object_store/latest/object_store/gcp."
raise UnsupportedAuthenticationMethodException(
"OAuth authentication not supported with `delta` table format. Use Service Account or"
" Application Default Credentials authentication instead."
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
self.auth()
return {
"gcs.project-id": self.project_id,
"gcs.oauth2.token": self.token,
"gcs.oauth2.token-expires-at": (pendulum.now().timestamp() + 60) * 1000,
}

def auth(self, scopes: Union[str, List[str]] = None, redirect_url: str = None) -> None:
if not self.refresh_token:
self.add_scopes(scopes)
Expand Down Expand Up @@ -313,6 +329,12 @@ def to_native_credentials(self) -> Any:
else:
return super().to_native_credentials()

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
raise UnsupportedAuthenticationMethodException(
"Application Default Credentials authentication not supported with `iceberg` table"
" format. Use OAuth authentication instead."
)


@configspec
class GcpServiceAccountCredentials(
Expand All @@ -334,3 +356,9 @@ def parse_native_representation(self, native_value: Any) -> None:
except NativeValueError:
pass
GcpOAuthCredentialsWithoutDefaults.parse_native_representation(self, native_value)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
if self.has_default_credentials():
return GcpDefaultCredentials.to_pyiceberg_fileio_config(self)
else:
return GcpOAuthCredentialsWithoutDefaults.to_pyiceberg_fileio_config(self)
24 changes: 24 additions & 0 deletions dlt/common/configuration/specs/mixins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Dict, Any
from abc import abstractmethod, ABC


class WithObjectStoreRsCredentials(ABC):
@abstractmethod
def to_object_store_rs_credentials(self) -> Dict[str, Any]:
"""Returns credentials dictionary for object_store Rust crate.
Can be used for libraries that build on top of the object_store crate, such as `deltalake`.
https://docs.rs/object_store/latest/object_store/
"""
pass


class WithPyicebergConfig(ABC):
@abstractmethod
def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
"""Returns `pyiceberg` FileIO configuration dictionary.
https://py.iceberg.apache.org/configuration/#fileio
"""
pass
2 changes: 1 addition & 1 deletion dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def _flush_items(self, allow_empty_file: bool = False) -> None:
if self.writer_spec.is_binary_format:
self._file = self.open(self._file_name, "wb") # type: ignore
else:
self._file = self.open(self._file_name, "wt", encoding="utf-8", newline="") # type: ignore
self._file = self.open(self._file_name, "wt", encoding="utf-8", newline="")
self._writer = self.writer_cls(self._file, caps=self._caps) # type: ignore[assignment]
self._writer.write_header(self._current_columns)
# write buffer
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/destination/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def verify_schema_capabilities(
exception_log: List[Exception] = []
# combined casing function
case_identifier = lambda ident: capabilities.casefold_identifier(
(str if capabilities.has_case_sensitive_identifiers else str.casefold)(ident) # type: ignore
(str if capabilities.has_case_sensitive_identifiers else str.casefold)(ident)
)
table_name_lookup: DictStrStr = {}
# name collision explanation
Expand Down
6 changes: 3 additions & 3 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages import FilesystemConfiguration
from dlt.common.utils import assert_min_pkg_version
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient

try:
Expand Down Expand Up @@ -191,10 +192,9 @@ def get_delta_tables(

def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str]:
"""Returns dict that can be passed as `storage_options` in `deltalake` library."""
creds = {} # type: ignore
creds = {}
extra_options = {}
# TODO: create a mixin with to_object_store_rs_credentials for a proper discovery
if hasattr(config.credentials, "to_object_store_rs_credentials"):
if isinstance(config.credentials, WithObjectStoreRsCredentials):
creds = config.credentials.to_object_store_rs_credentials()
if config.deltalake_storage_options is not None:
extra_options = config.deltalake_storage_options
Expand Down
Loading

0 comments on commit 2d55e4a

Please sign in to comment.