diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index 4ab40d11f6..6b79060f07 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -123,7 +123,7 @@ jobs: shell: cmd - name: Install pipeline dependencies - run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline -E deltalake - run: | poetry run pytest tests/extract tests/pipeline tests/libs tests/cli/common tests/destinations diff --git a/.github/workflows/test_destinations.yml b/.github/workflows/test_destinations.yml index 037f9da3e5..e75cd6c780 100644 --- a/.github/workflows/test_destinations.yml +++ b/.github/workflows/test_destinations.yml @@ -75,7 +75,7 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline -E deltalake - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_local_destinations.yml b/.github/workflows/test_local_destinations.yml index dfe8e56735..263d3f588c 100644 --- a/.github/workflows/test_local_destinations.yml +++ b/.github/workflows/test_local_destinations.yml @@ -90,7 +90,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations - name: Install dependencies - run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline + run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline -E deltalake - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/dlt/common/configuration/specs/aws_credentials.py b/dlt/common/configuration/specs/aws_credentials.py index ee49e79e40..97803a60e3 100644 --- a/dlt/common/configuration/specs/aws_credentials.py +++ b/dlt/common/configuration/specs/aws_credentials.py @@ -45,6 +45,14 @@ def to_session_credentials(self) -> Dict[str, str]: aws_session_token=self.aws_session_token, ) + def to_object_store_rs_credentials(self) -> Dict[str, str]: + # https://docs.rs/object_store/latest/object_store/aws + assert self.region_name is not None, "`object_store` Rust crate requires AWS region." + creds = self.to_session_credentials() + if creds["aws_session_token"] is None: + creds.pop("aws_session_token") + return {**creds, **{"region": self.region_name}} + @configspec class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault): diff --git a/dlt/common/configuration/specs/azure_credentials.py b/dlt/common/configuration/specs/azure_credentials.py index 8b8fc259f2..7fa34fa00f 100644 --- a/dlt/common/configuration/specs/azure_credentials.py +++ b/dlt/common/configuration/specs/azure_credentials.py @@ -27,6 +27,13 @@ def to_adlfs_credentials(self) -> Dict[str, Any]: sas_token=self.azure_storage_sas_token, ) + def to_object_store_rs_credentials(self) -> Dict[str, str]: + # https://docs.rs/object_store/latest/object_store/azure + creds = self.to_adlfs_credentials() + if creds["sas_token"] is None: + creds.pop("sas_token") + return creds + def create_sas_token(self) -> None: from azure.storage.blob import generate_account_sas, ResourceTypes @@ -61,6 +68,10 @@ def to_adlfs_credentials(self) -> Dict[str, Any]: client_secret=self.azure_client_secret, ) + def to_object_store_rs_credentials(self) -> Dict[str, str]: + # https://docs.rs/object_store/latest/object_store/azure + return self.to_adlfs_credentials() + @configspec class AzureCredentials(AzureCredentialsWithoutDefaults, CredentialsWithDefault): diff --git a/dlt/common/configuration/specs/base_configuration.py b/dlt/common/configuration/specs/base_configuration.py index 9ef756a2a6..1751b6ae13 100644 --- a/dlt/common/configuration/specs/base_configuration.py +++ b/dlt/common/configuration/specs/base_configuration.py @@ -1,5 +1,4 @@ import copy -import inspect import contextlib import dataclasses import warnings @@ -221,6 +220,11 @@ def wrap(cls: Type[TAnyClass]) -> Type[TAnyClass]: if att_name not in cls.__annotations__: raise ConfigFieldMissingTypeHintException(att_name, cls) hint = cls.__annotations__[att_name] + # resolve the annotation as per PEP 563 + # NOTE: we do not use get_type_hints because at this moment cls is an unknown name + # (ie. used as decorator and module is being imported) + if isinstance(hint, str): + hint = eval(hint) # context can have any type if not is_valid_hint(hint) and not is_context: @@ -321,7 +325,10 @@ def _get_resolvable_dataclass_fields(cls) -> Iterator[TDtcField]: @classmethod def get_resolvable_fields(cls) -> Dict[str, type]: """Returns a mapping of fields to their type hints. Dunders should not be resolved and are not returned""" - return {f.name: f.type for f in cls._get_resolvable_dataclass_fields()} + return { + f.name: eval(f.type) if isinstance(f.type, str) else f.type # type: ignore[arg-type] + for f in cls._get_resolvable_dataclass_fields() + } def is_resolved(self) -> bool: return self.__is_resolved__ diff --git a/dlt/common/configuration/specs/gcp_credentials.py b/dlt/common/configuration/specs/gcp_credentials.py index 9927b81ebf..a1d82fc577 100644 --- a/dlt/common/configuration/specs/gcp_credentials.py +++ b/dlt/common/configuration/specs/gcp_credentials.py @@ -1,6 +1,6 @@ import dataclasses import sys -from typing import Any, ClassVar, Final, List, Tuple, Union, Dict +from typing import Any, ClassVar, Final, List, Tuple, Union, Dict, Optional from dlt.common.json import json from dlt.common.pendulum import pendulum @@ -74,6 +74,7 @@ def to_gcs_credentials(self) -> Dict[str, Any]: @configspec class GcpServiceAccountCredentialsWithoutDefaults(GcpCredentials): private_key: TSecretValue = None + private_key_id: Optional[str] = None client_email: str = None type: Final[str] = dataclasses.field( # noqa: A003 default="service_account", init=False, repr=False, compare=False @@ -122,6 +123,10 @@ def to_native_credentials(self) -> Any: else: return ServiceAccountCredentials.from_service_account_info(self) + def to_object_store_rs_credentials(self) -> Dict[str, str]: + # https://docs.rs/object_store/latest/object_store/gcp + return {"service_account_key": json.dumps(dict(self))} + def __str__(self) -> str: return f"{self.client_email}@{self.project_id}" @@ -171,6 +176,12 @@ def parse_native_representation(self, native_value: Any) -> None: def to_native_representation(self) -> str: return json.dumps(self._info_dict()) + def to_object_store_rs_credentials(self) -> Dict[str, str]: + raise NotImplementedError( + "`object_store` Rust crate does not support OAuth for GCP credentials. Reference:" + " https://docs.rs/object_store/latest/object_store/gcp." + ) + def auth(self, scopes: Union[str, List[str]] = None, redirect_url: str = None) -> None: if not self.refresh_token: self.add_scopes(scopes) diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index 089b4a1d5e..e5ceb859f1 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -1,4 +1,15 @@ -from typing import Any, Callable, ClassVar, List, Literal, Optional, Sequence, Tuple, Set, get_args +from typing import ( + Any, + Callable, + ClassVar, + Literal, + Optional, + Sequence, + Tuple, + Set, + Protocol, + get_args, +) from dlt.common.configuration.utils import serialize_value from dlt.common.configuration import configspec @@ -9,7 +20,6 @@ DestinationLoadingWithoutStagingNotSupported, ) from dlt.common.utils import identity -from dlt.common.pendulum import pendulum from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE from dlt.common.wei import EVM_DECIMAL_PRECISION @@ -23,12 +33,28 @@ ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat)) +class LoaderFileFormatAdapter(Protocol): + """Callback protocol for `loader_file_format_adapter` capability.""" + + def __call__( + self, + preferred_loader_file_format: TLoaderFileFormat, + supported_loader_file_formats: Sequence[TLoaderFileFormat], + /, + *, + table_schema: "TTableSchema", # type: ignore[name-defined] # noqa: F821 + ) -> Tuple[TLoaderFileFormat, Sequence[TLoaderFileFormat]]: ... + + @configspec class DestinationCapabilitiesContext(ContainerInjectableContext): """Injectable destination capabilities required for many Pipeline stages ie. normalize""" preferred_loader_file_format: TLoaderFileFormat = None supported_loader_file_formats: Sequence[TLoaderFileFormat] = None + loader_file_format_adapter: LoaderFileFormatAdapter = None + """Callable that adapts `preferred_loader_file_format` and `supported_loader_file_formats` at runtime.""" + supported_table_formats: Sequence["TTableFormat"] = None # type: ignore[name-defined] # noqa: F821 recommended_file_size: Optional[int] = None """Recommended file size in bytes when writing extract/load files""" preferred_staging_file_format: Optional[TLoaderFileFormat] = None @@ -65,14 +91,18 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): @staticmethod def generic_capabilities( preferred_loader_file_format: TLoaderFileFormat = None, + loader_file_format_adapter: LoaderFileFormatAdapter = None, + supported_table_formats: Sequence["TTableFormat"] = None, # type: ignore[name-defined] # noqa: F821 ) -> "DestinationCapabilitiesContext": from dlt.common.data_writers.escape import format_datetime_literal caps = DestinationCapabilitiesContext() caps.preferred_loader_file_format = preferred_loader_file_format caps.supported_loader_file_formats = ["jsonl", "insert_values", "parquet", "csv"] + caps.loader_file_format_adapter = loader_file_format_adapter caps.preferred_staging_file_format = None caps.supported_staging_file_formats = [] + caps.supported_table_formats = supported_table_formats or [] caps.escape_identifier = identity caps.escape_literal = serialize_value caps.format_datetime_literal = format_datetime_literal diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 4919711f58..9bb843a4c5 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -48,6 +48,7 @@ from dlt.common.schema.exceptions import UnknownTableException from dlt.common.storages import FileStorage from dlt.common.storages.load_storage import ParsedLoadJobFileName +from dlt.common.storages.load_package import LoadJobInfo TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"] TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration") @@ -312,7 +313,9 @@ def should_truncate_table_before_load(self, table: TTableSchema) -> bool: return table["write_disposition"] == "replace" def create_table_chain_completed_followup_jobs( - self, table_chain: Sequence[TTableSchema] + self, + table_chain: Sequence[TTableSchema], + table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None, ) -> List[NewLoadJob]: """Creates a list of followup jobs that should be executed after a table chain is completed""" return [] diff --git a/dlt/common/libs/deltalake.py b/dlt/common/libs/deltalake.py new file mode 100644 index 0000000000..32847303f8 --- /dev/null +++ b/dlt/common/libs/deltalake.py @@ -0,0 +1,90 @@ +from typing import Optional, Dict, Union + +from dlt import version +from dlt.common import logger +from dlt.common.libs.pyarrow import pyarrow as pa +from dlt.common.libs.pyarrow import dataset_to_table, cast_arrow_schema_types +from dlt.common.schema.typing import TWriteDisposition +from dlt.common.exceptions import MissingDependencyException +from dlt.common.storages import FilesystemConfiguration + +try: + from deltalake import write_deltalake +except ModuleNotFoundError: + raise MissingDependencyException( + "dlt deltalake helpers", + [f"{version.DLT_PKG_NAME}[deltalake]"], + "Install `deltalake` so dlt can create Delta tables in the `filesystem` destination.", + ) + + +def ensure_delta_compatible_arrow_table(table: pa.table) -> pa.Table: + """Returns Arrow table compatible with Delta table format. + + Casts table schema to replace data types not supported by Delta. + """ + ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP = { + # maps type check function to type factory function + pa.types.is_null: pa.string(), + pa.types.is_time: pa.string(), + pa.types.is_decimal256: pa.string(), # pyarrow does not allow downcasting to decimal128 + } + adjusted_schema = cast_arrow_schema_types( + table.schema, ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP + ) + return table.cast(adjusted_schema) + + +def get_delta_write_mode(write_disposition: TWriteDisposition) -> str: + """Translates dlt write disposition to Delta write mode.""" + if write_disposition in ("append", "merge"): # `merge` disposition resolves to `append` + return "append" + elif write_disposition == "replace": + return "overwrite" + else: + raise ValueError( + "`write_disposition` must be `append`, `replace`, or `merge`," + f" but `{write_disposition}` was provided." + ) + + +def write_delta_table( + path: str, + data: Union[pa.Table, pa.dataset.Dataset], + write_disposition: TWriteDisposition, + storage_options: Optional[Dict[str, str]] = None, +) -> None: + """Writes in-memory Arrow table to on-disk Delta table.""" + + table = dataset_to_table(data) + + # throws warning for `s3` protocol: https://github.com/delta-io/delta-rs/issues/2460 + # TODO: upgrade `deltalake` lib after https://github.com/delta-io/delta-rs/pull/2500 + # is released + write_deltalake( # type: ignore[call-overload] + table_or_uri=path, + data=ensure_delta_compatible_arrow_table(table), + mode=get_delta_write_mode(write_disposition), + schema_mode="merge", # enable schema evolution (adding new columns) + storage_options=storage_options, + engine="rust", # `merge` schema mode requires `rust` engine + ) + + +def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str]: + """Returns dict that can be passed as `storage_options` in `deltalake` library.""" + creds = {} + extra_options = {} + if config.protocol in ("az", "gs", "s3"): + creds = config.credentials.to_object_store_rs_credentials() + if config.deltalake_storage_options is not None: + extra_options = config.deltalake_storage_options + shared_keys = creds.keys() & extra_options.keys() + if len(shared_keys) > 0: + logger.warning( + "The `deltalake_storage_options` configuration dictionary contains " + "keys also provided by dlt's credential system: " + + ", ".join([f"`{key}`" for key in shared_keys]) + + ". dlt will use the values in `deltalake_storage_options`." + ) + return {**creds, **extra_options} diff --git a/dlt/common/libs/pyarrow.py b/dlt/common/libs/pyarrow.py index d6ee5be4cd..28f3ddb598 100644 --- a/dlt/common/libs/pyarrow.py +++ b/dlt/common/libs/pyarrow.py @@ -28,6 +28,7 @@ import pyarrow import pyarrow.parquet import pyarrow.compute + import pyarrow.dataset except ModuleNotFoundError: raise MissingDependencyException( "dlt pyarrow helpers", @@ -37,6 +38,8 @@ TAnyArrowItem = Union[pyarrow.Table, pyarrow.RecordBatch] +ARROW_DECIMAL_MAX_PRECISION = 76 + def get_py_arrow_datatype( column: TColumnType, @@ -411,6 +414,29 @@ def pq_stream_with_new_columns( yield tbl +def dataset_to_table(data: Union[pyarrow.Table, pyarrow.dataset.Dataset]) -> pyarrow.Table: + return data.to_table() if isinstance(data, pyarrow.dataset.Dataset) else data + + +def cast_arrow_schema_types( + schema: pyarrow.Schema, + type_map: Dict[Callable[[pyarrow.DataType], bool], Callable[..., pyarrow.DataType]], +) -> pyarrow.Schema: + """Returns type-casted Arrow schema. + + Replaces data types for fields matching a type check in `type_map`. + Type check functions in `type_map` are assumed to be mutually exclusive, i.e. + a data type does not match more than one type check function. + """ + for i, e in enumerate(schema.types): + for type_check, cast_type in type_map.items(): + if type_check(e): + adjusted_field = schema.field(i).with_type(cast_type) + schema = schema.set(i, adjusted_field) + break # if type matches type check, do not do other type checks + return schema + + class NameNormalizationClash(ValueError): def __init__(self, reason: str) -> None: msg = f"Arrow column name clash after input data normalization. {reason}" diff --git a/dlt/common/libs/pydantic.py b/dlt/common/libs/pydantic.py index e6af064b8f..774a1641a7 100644 --- a/dlt/common/libs/pydantic.py +++ b/dlt/common/libs/pydantic.py @@ -106,6 +106,9 @@ def pydantic_to_table_schema_columns( inner_type = extract_inner_type(annotation) if is_union_type(inner_type): + # TODO: order those types deterministically before getting first one + # order of the types in union is in many cases not deterministic + # https://docs.python.org/3/library/typing.html#typing.get_args first_argument_type = get_args(inner_type)[0] inner_type = extract_inner_type(first_argument_type) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index b48770e4ef..fb360b38d3 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -64,7 +64,7 @@ "dedup_sort", ] """Known hints of a column used to declare hint regexes.""" -TTableFormat = Literal["iceberg", "parquet", "jsonl"] +TTableFormat = Literal["iceberg", "delta"] TTypeDetections = Literal[ "timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double" ] diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index 6e100536af..09beb0015e 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -83,6 +83,7 @@ class FilesystemConfiguration(BaseConfiguration): """Indicates read only filesystem access. Will enable caching""" kwargs: Optional[DictStrAny] = None client_kwargs: Optional[DictStrAny] = None + deltalake_storage_options: Optional[DictStrAny] = None @property def protocol(self) -> str: diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index e7c7f7a164..4d72458e3e 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -141,7 +141,7 @@ def create_load_id() -> str: class ParsedLoadJobFileName(NamedTuple): - """Represents a file name of a job in load package. The file name contains name of a table, number of times the job was retired, extension + """Represents a file name of a job in load package. The file name contains name of a table, number of times the job was retried, extension and a 5 bytes random string to make job file name unique. The job id does not contain retry count and is immutable during loading of the data """ diff --git a/dlt/destinations/impl/athena/__init__.py b/dlt/destinations/impl/athena/__init__.py index 2577a6825e..87a11f9f41 100644 --- a/dlt/destinations/impl/athena/__init__.py +++ b/dlt/destinations/impl/athena/__init__.py @@ -11,6 +11,7 @@ def capabilities() -> DestinationCapabilitiesContext: # athena only supports loading from staged files on s3 for now caps.preferred_loader_file_format = None caps.supported_loader_file_formats = [] + caps.supported_table_formats = ["iceberg"] caps.preferred_staging_file_format = "parquet" caps.supported_staging_file_formats = ["parquet", "jsonl"] caps.escape_identifier = escape_athena_identifier diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 8f043ba4d5..60ea64a4e7 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -451,11 +451,11 @@ def _get_table_update_sql( {partition_clause} LOCATION '{location.rstrip('/')}' TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""") - elif table_format == "jsonl": - sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name} - ({columns}) - ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' - LOCATION '{location}';""") + # elif table_format == "jsonl": + # sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name} + # ({columns}) + # ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' + # LOCATION '{location}';""") else: sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name} ({columns}) diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index 16affbc164..3c78493b57 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -16,6 +16,7 @@ from dlt.common.pendulum import pendulum from dlt.common.schema import Schema, TTableSchema, TSchemaTables from dlt.common.storages import FileStorage +from dlt.common.storages.load_package import LoadJobInfo from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.exceptions import ( DestinationTerminalException, @@ -157,7 +158,9 @@ def restore_file_load(self, file_path: str) -> LoadJob: return JOBS[job_id] def create_table_chain_completed_followup_jobs( - self, table_chain: Sequence[TTableSchema] + self, + table_chain: Sequence[TTableSchema], + table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None, ) -> List[NewLoadJob]: """Creates a list of followup jobs that should be executed after a table chain is completed""" return [] diff --git a/dlt/destinations/impl/filesystem/__init__.py b/dlt/destinations/impl/filesystem/__init__.py index 12e83216cf..49fabd61d7 100644 --- a/dlt/destinations/impl/filesystem/__init__.py +++ b/dlt/destinations/impl/filesystem/__init__.py @@ -1,5 +1,24 @@ -from dlt.common.destination import DestinationCapabilitiesContext +from typing import Sequence, Tuple + +from dlt.common.schema.typing import TTableSchema +from dlt.common.destination import DestinationCapabilitiesContext, TLoaderFileFormat + + +def loader_file_format_adapter( + preferred_loader_file_format: TLoaderFileFormat, + supported_loader_file_formats: Sequence[TLoaderFileFormat], + /, + *, + table_schema: TTableSchema, +) -> Tuple[TLoaderFileFormat, Sequence[TLoaderFileFormat]]: + if table_schema.get("table_format") == "delta": + return ("parquet", ["parquet"]) + return (preferred_loader_file_format, supported_loader_file_formats) def capabilities() -> DestinationCapabilitiesContext: - return DestinationCapabilitiesContext.generic_capabilities("jsonl") + return DestinationCapabilitiesContext.generic_capabilities( + preferred_loader_file_format="jsonl", + loader_file_format_adapter=loader_file_format_adapter, + supported_table_formats=["delta"], + ) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index d75226be13..9d15ba959e 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -1,18 +1,18 @@ import posixpath -import pathlib import os import base64 + from types import TracebackType -from typing import ClassVar, List, Type, Iterable, Set, Iterator, Optional, Tuple, cast +from typing import ClassVar, List, Type, Iterable, Iterator, Optional, Tuple, Sequence, cast from fsspec import AbstractFileSystem from contextlib import contextmanager -from dlt.common import json, pendulum -from dlt.common.typing import DictStrAny import dlt -from dlt.common import logger, time +from dlt.common import logger, time, json, pendulum +from dlt.common.typing import DictStrAny from dlt.common.schema import Schema, TSchemaTables, TTableSchema from dlt.common.storages import FileStorage, fsspec_from_config +from dlt.common.storages.load_package import LoadJobInfo, ParsedLoadJobFileName from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import ( NewLoadJob, @@ -25,9 +25,10 @@ StorageSchemaInfo, StateInfo, DoNothingJob, + DoNothingFollowupJob, ) from dlt.common.destination.exceptions import DestinationUndefinedEntity -from dlt.destinations.job_impl import EmptyLoadJob +from dlt.destinations.job_impl import EmptyLoadJob, NewReferenceJob from dlt.destinations.impl.filesystem import capabilities from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration from dlt.destinations.job_impl import NewReferenceJob @@ -41,57 +42,48 @@ class LoadFilesystemJob(LoadJob): def __init__( self, + client: "FilesystemClient", local_path: str, - dataset_path: str, - *, - config: FilesystemDestinationClientConfiguration, - schema_name: str, load_id: str, + table: TTableSchema, ) -> None: - file_name = FileStorage.get_file_name_from_file_path(local_path) - self.config = config - self.dataset_path = dataset_path - self.is_local_filesystem = config.protocol == "file" + self.client = client + self.table = table + self.is_local_filesystem = client.config.protocol == "file" # pick local filesystem pathlib or posix for buckets self.pathlib = os.path if self.is_local_filesystem else posixpath + + file_name = FileStorage.get_file_name_from_file_path(local_path) + super().__init__(file_name) + self.destination_file_name = path_utils.create_path( - config.layout, + client.config.layout, file_name, - schema_name, + client.schema.name, load_id, - current_datetime=config.current_datetime, + current_datetime=client.config.current_datetime, load_package_timestamp=dlt.current.load_package()["state"]["created_at"], - extra_placeholders=config.extra_placeholders, + extra_placeholders=client.config.extra_placeholders, ) - - super().__init__(file_name) - fs_client, _ = fsspec_from_config(config) # We would like to avoid failing for local filesystem where # deeply nested directory will not exist before writing a file. # It `auto_mkdir` is disabled by default in fsspec so we made some # trade offs between different options and decided on this. - item = self.make_remote_path() + # remote_path = f"{client.config.protocol}://{posixpath.join(dataset_path, destination_file_name)}" + remote_path = self.make_remote_path() if self.is_local_filesystem: - fs_client.makedirs(self.pathlib.dirname(item), exist_ok=True) - fs_client.put_file(local_path, item) + client.fs_client.makedirs(self.pathlib.dirname(remote_path), exist_ok=True) + client.fs_client.put_file(local_path, remote_path) def make_remote_path(self) -> str: """Returns path on the remote filesystem to which copy the file, without scheme. For local filesystem a native path is used""" # path.join does not normalize separators and available # normalization functions are very invasive and may string the trailing separator return self.pathlib.join( # type: ignore[no-any-return] - self.dataset_path, + self.client.dataset_path, path_utils.normalize_path_sep(self.pathlib, self.destination_file_name), ) - def make_remote_uri(self) -> str: - """Returns uri to the remote filesystem to which copy the file""" - remote_path = self.make_remote_path() - if self.is_local_filesystem: - return self.config.make_file_uri(remote_path) - else: - return f"{self.config.protocol}://{remote_path}" - def state(self) -> TLoadJobState: return "completed" @@ -99,12 +91,60 @@ def exception(self) -> str: raise NotImplementedError() +class DeltaLoadFilesystemJob(NewReferenceJob): + def __init__( + self, + client: "FilesystemClient", + table: TTableSchema, + table_jobs: Sequence[LoadJobInfo], + ) -> None: + self.client = client + self.table = table + self.table_jobs = table_jobs + + ref_file_name = ParsedLoadJobFileName( + table["name"], ParsedLoadJobFileName.new_file_id(), 0, "reference" + ).file_name() + super().__init__( + file_name=ref_file_name, + status="running", + remote_path=self.client.make_remote_uri(self.make_remote_path()), + ) + + self.write() + + def write(self) -> None: + from dlt.common.libs.pyarrow import pyarrow as pa + from dlt.common.libs.deltalake import ( + write_delta_table, + _deltalake_storage_options, + ) + + file_paths = [job.file_path for job in self.table_jobs] + + write_delta_table( + path=self.client.make_remote_uri(self.make_remote_path()), + data=pa.dataset.dataset(file_paths), + write_disposition=self.table["write_disposition"], + storage_options=_deltalake_storage_options(self.client.config), + ) + + def make_remote_path(self) -> str: + # directory path, not file path + return self.client.get_table_dir(self.table["name"]) + + def state(self) -> TLoadJobState: + return "completed" + + class FollowupFilesystemJob(FollowupJob, LoadFilesystemJob): def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]: jobs = super().create_followup_jobs(final_state) if final_state == "completed": ref_job = NewReferenceJob( - file_name=self.file_name(), status="running", remote_path=self.make_remote_uri() + file_name=self.file_name(), + status="running", + remote_path=self.client.make_remote_uri(self.make_remote_path()), ) jobs.append(ref_job) return jobs @@ -282,19 +322,24 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> # where we want to load the state the regular way if table["name"] == self.schema.state_table_name and not self.config.as_staging: return DoNothingJob(file_path) + if table.get("table_format") == "delta": + import dlt.common.libs.deltalake # assert dependencies are installed + + return DoNothingFollowupJob(file_path) cls = FollowupFilesystemJob if self.config.as_staging else LoadFilesystemJob - return cls( - file_path, - self.dataset_path, - config=self.config, - schema_name=self.schema.name, - load_id=load_id, - ) + return cls(self, file_path, load_id, table) def restore_file_load(self, file_path: str) -> LoadJob: return EmptyLoadJob.from_file_path(file_path, "completed") + def make_remote_uri(self, remote_path: str) -> str: + """Returns uri to the remote filesystem to which copy the file""" + if self.is_local_filesystem: + return self.config.make_file_uri(remote_path) + else: + return f"{self.config.protocol}://{remote_path}" + def __enter__(self) -> "FilesystemClient": return self @@ -306,6 +351,12 @@ def __exit__( def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: return False + def should_truncate_table_before_load(self, table: TTableSchema) -> bool: + return ( + table["write_disposition"] == "replace" + and not table.get("table_format") == "delta" # Delta can do a logical replace + ) + # # state stuff # @@ -473,3 +524,25 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]: def get_stored_schema_by_hash(self, version_hash: str) -> Optional[StorageSchemaInfo]: return self._get_stored_schema_by_hash_or_newest(version_hash) + + def create_table_chain_completed_followup_jobs( + self, + table_chain: Sequence[TTableSchema], + table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None, + ) -> List[NewLoadJob]: + def get_table_jobs( + table_jobs: Sequence[LoadJobInfo], table_name: str + ) -> Sequence[LoadJobInfo]: + return [job for job in table_jobs if job.job_file_info.table_name == table_name] + + assert table_chain_jobs is not None + jobs = super().create_table_chain_completed_followup_jobs(table_chain, table_chain_jobs) + table_format = table_chain[0].get("table_format") + if table_format == "delta": + delta_jobs = [ + DeltaLoadFilesystemJob(self, table, get_table_jobs(table_chain_jobs, table["name"])) + for table in table_chain + ] + jobs.extend(delta_jobs) + + return jobs diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 853972fcba..ac3636db2b 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -36,6 +36,7 @@ TTableFormat, ) from dlt.common.storages import FileStorage +from dlt.common.storages.load_package import LoadJobInfo from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns, TSchemaTables from dlt.common.schema.typing import LOADS_TABLE_NAME, VERSION_TABLE_NAME from dlt.common.destination.reference import ( @@ -247,10 +248,12 @@ def _create_replace_followup_jobs( return jobs def create_table_chain_completed_followup_jobs( - self, table_chain: Sequence[TTableSchema] + self, + table_chain: Sequence[TTableSchema], + table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None, ) -> List[NewLoadJob]: """Creates a list of followup jobs for merge write disposition and staging replace strategies""" - jobs = super().create_table_chain_completed_followup_jobs(table_chain) + jobs = super().create_table_chain_completed_followup_jobs(table_chain, table_chain_jobs) write_disposition = table_chain[0]["write_disposition"] if write_disposition == "append": jobs.extend(self._create_append_followup_jobs(table_chain)) diff --git a/dlt/destinations/job_impl.py b/dlt/destinations/job_impl.py index 218f73cc59..a4e4b998af 100644 --- a/dlt/destinations/job_impl.py +++ b/dlt/destinations/job_impl.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod import os import tempfile # noqa: 251 -from typing import Dict, Iterable, List +from typing import Dict, Iterable, List, Optional from dlt.common.json import json from dlt.common.destination.reference import NewLoadJob, FollowupJob, TLoadJobState, LoadJob diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 287474c82c..6fd1928970 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -158,6 +158,10 @@ def columns(self) -> TTableHintTemplate[TTableSchemaColumns]: def schema_contract(self) -> TTableHintTemplate[TSchemaContract]: return self._hints.get("schema_contract") + @property + def table_format(self) -> TTableHintTemplate[TTableFormat]: + return None if self._hints is None else self._hints.get("table_format") + def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: """Computes the table schema based on hints and column definitions passed during resource creation. `item` parameter is used to resolve table hints based on data. diff --git a/dlt/load/load.py b/dlt/load/load.py index d96a6b7116..8c7eb431e8 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -256,8 +256,12 @@ def create_followup_jobs( if table_chain := get_completed_table_chain( schema, all_jobs, top_job_table, starting_job.job_file_info().job_id() ): + table_chain_names = [table["name"] for table in table_chain] + table_chain_jobs = [ + job for job in all_jobs if job.job_file_info.table_name in table_chain_names + ] if follow_up_jobs := client.create_table_chain_completed_followup_jobs( - table_chain + table_chain, table_chain_jobs ): jobs = jobs + follow_up_jobs jobs = jobs + starting_job.create_followup_jobs(state) diff --git a/dlt/normalize/items_normalizers.py b/dlt/normalize/items_normalizers.py index 81220da2dd..eed98d7563 100644 --- a/dlt/normalize/items_normalizers.py +++ b/dlt/normalize/items_normalizers.py @@ -9,10 +9,7 @@ from dlt.common.runtime import signals from dlt.common.schema.typing import TSchemaEvolutionMode, TTableSchemaColumns, TSchemaContractDict from dlt.common.schema.utils import has_table_seen_data -from dlt.common.storages import ( - NormalizeStorage, - LoadStorage, -) +from dlt.common.storages import NormalizeStorage from dlt.common.storages.data_item_storage import DataItemStorage from dlt.common.storages.load_package import ParsedLoadJobFileName from dlt.common.typing import DictStrAny, TDataItem diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index b90c15a5f7..75cb9be707 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -20,7 +20,7 @@ from dlt.common.runners import TRunMetrics, Runnable, NullExecutor from dlt.common.runtime import signals from dlt.common.runtime.collector import Collector, NULL_COLLECTOR -from dlt.common.schema.typing import TStoredSchema +from dlt.common.schema.typing import TStoredSchema, TTableSchema from dlt.common.schema.utils import merge_schema_updates from dlt.common.storages import ( NormalizeStorage, @@ -110,33 +110,76 @@ def w_normalize_files( ) -> TWorkerRV: destination_caps = config.destination_capabilities schema_updates: List[TSchemaUpdate] = [] - item_normalizers: Dict[TDataItemFormat, ItemsNormalizer] = {} - # Use default storage if parquet is not supported to make normalizer fallback to read rows from the file + # normalizers are cached per table name + item_normalizers: Dict[str, ItemsNormalizer] = {} + preferred_file_format = ( destination_caps.preferred_loader_file_format or destination_caps.preferred_staging_file_format ) # TODO: capabilities.supported_*_formats can be None, it should have defaults - supported_formats = destination_caps.supported_loader_file_formats or [] + supported_file_formats = destination_caps.supported_loader_file_formats or [] + supported_table_formats = destination_caps.supported_table_formats or [] # process all files with data items and write to buffered item storage with Container().injectable_context(destination_caps): schema = Schema.from_stored_schema(stored_schema) normalize_storage = NormalizeStorage(False, normalize_storage_config) - load_storage = LoadStorage(False, supported_formats, loader_storage_config) + load_storage = LoadStorage(False, supported_file_formats, loader_storage_config) + + def _get_items_normalizer( + item_format: TDataItemFormat, table_schema: Optional[TTableSchema] + ) -> ItemsNormalizer: + table_name = table_schema["name"] + if table_name in item_normalizers: + return item_normalizers[table_name] + + if ( + "table_format" in table_schema + and table_schema["table_format"] not in supported_table_formats + ): + logger.warning( + "Destination does not support the configured `table_format` value " + f"`{table_schema['table_format']}` for table `{table_schema['name']}`. " + "The setting will probably be ignored." + ) + + items_preferred_file_format = preferred_file_format + items_supported_file_formats = supported_file_formats + if destination_caps.loader_file_format_adapter is not None: + items_preferred_file_format, items_supported_file_formats = ( + destination_caps.loader_file_format_adapter( + preferred_file_format, + ( + supported_file_formats.copy() + if isinstance(supported_file_formats, list) + else supported_file_formats + ), + table_schema=table_schema, + ) + ) - def _get_items_normalizer(item_format: TDataItemFormat) -> ItemsNormalizer: - if item_format in item_normalizers: - return item_normalizers[item_format] # force file format + best_writer_spec = None if config.loader_file_format: - # TODO: pass supported_formats, when used in pipeline we already checked that - # but if normalize is used standalone `supported_loader_file_formats` may be unresolved - best_writer_spec = get_best_writer_spec(item_format, config.loader_file_format) - else: + if config.loader_file_format in items_supported_file_formats: + # TODO: pass supported_file_formats, when used in pipeline we already checked that + # but if normalize is used standalone `supported_loader_file_formats` may be unresolved + best_writer_spec = get_best_writer_spec( + item_format, config.loader_file_format + ) + else: + logger.warning( + f"The configured value `{config.loader_file_format}` " + "for `loader_file_format` is not supported for table " + f"`{table_schema['name']}` and will be ignored. Dlt " + "will use a supported format instead." + ) + + if best_writer_spec is None: # find best spec among possible formats taking into account destination preference best_writer_spec = resolve_best_writer_spec( - item_format, supported_formats, preferred_file_format + item_format, items_supported_file_formats, items_preferred_file_format ) # if best_writer_spec.file_format != preferred_file_format: # logger.warning( @@ -159,7 +202,7 @@ def _get_items_normalizer(item_format: TDataItemFormat) -> ItemsNormalizer: f" {item_storage.writer_cls.__name__} for item format {item_format} and file" f" format {item_storage.writer_spec.file_format}" ) - norm = item_normalizers[item_format] = cls( + norm = item_normalizers[table_name] = cls( item_storage, normalize_storage, schema, @@ -211,7 +254,8 @@ def _gather_metrics_and_close( ) root_tables.add(root_table_name) normalizer = _get_items_normalizer( - DataWriter.item_format_from_file_extension(parsed_file_name.file_format) + DataWriter.item_format_from_file_extension(parsed_file_name.file_format), + stored_schema["tables"].get(root_table_name, {"name": root_table_name}), ) logger.debug( f"Processing extracted items in {extracted_items_file} in load_id" diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index b0fb6fe57c..8dfb93b8da 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -104,7 +104,7 @@ from dlt.common.warnings import deprecated, Dlt04DeprecationWarning from dlt.common.versioned_state import json_encode_state, json_decode_state -from dlt.extract import DltSource +from dlt.extract import DltSource, DltResource from dlt.extract.exceptions import SourceExhausted from dlt.extract.extract import Extract, data_to_sources from dlt.normalize import Normalize @@ -662,6 +662,7 @@ def run( Returns: LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo. """ + signals.raise_if_signalled() self.activate() self._set_destinations(destination=destination, staging=staging) @@ -679,7 +680,6 @@ def run( self.sync_destination(destination, staging, dataset_name) # sync only once self._state_restored = True - # normalize and load pending data if self.list_extracted_load_packages(): self.normalize(loader_file_format=loader_file_format) diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 9c7d961d3a..e93ffb54d4 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -480,6 +480,40 @@ You can choose the following file formats: * [parquet](../file-formats/parquet.md) is supported * [csv](../file-formats/csv.md) is supported +## Supported table formats +You can choose the following table formats: +* [Delta](../table-formats/delta.md) is supported + +### Delta table format +You need the `deltalake` package to use this format: + +```sh +pip install "dlt[deltalake]" +``` + +Set the `table_format` argument to `delta` when defining your resource: + +```py +@dlt.resource(table_format="delta") +def my_delta_resource(): + ... +``` + +> `dlt` always uses `parquet` as `loader_file_format` when using the `delta` table format. Any setting of `loader_file_format` is disregarded. + +#### Storage options +You can pass storage options by configuring `destination.filesystem.deltalake_storage_options`: + +```toml +[destination.filesystem] +deltalake_storage_options = '{"AWS_S3_LOCKING_PROVIDER": "dynamodb", DELTA_DYNAMO_TABLE_NAME": "custom_table_name"}' +``` + +`dlt` passes these options to the `storage_options` argument of the `write_deltalake` method in the `deltalake` library. Look at their [documentation](https://delta-io.github.io/delta-rs/api/delta_writer/#deltalake.write_deltalake) to see which options can be used. + +You don't need to specify credentials here. `dlt` merges the required credentials with the options you provided, before passing it as `storage_options`. + +>❗When using `s3`, you need to specify storage options to [configure](https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/) locking behavior. ## Syncing of `dlt` state This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination). To this end, special folders and files that will be created at your destination which hold information about your pipeline state, schemas and completed loads. These folders DO NOT respect your diff --git a/docs/website/docs/dlt-ecosystem/table-formats/delta.md b/docs/website/docs/dlt-ecosystem/table-formats/delta.md new file mode 100644 index 0000000000..7840f40d11 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/table-formats/delta.md @@ -0,0 +1,13 @@ +--- +title: Delta +description: The Delta table format +keywords: [delta, table formats] +--- + +# Delta table format + +[Delta](https://delta.io/) is an open source table format. `dlt` can store data as Delta tables. + +## Supported Destinations + +Supported by: **Databricks**, **filesystem** diff --git a/docs/website/docs/dlt-ecosystem/table-formats/iceberg.md b/docs/website/docs/dlt-ecosystem/table-formats/iceberg.md new file mode 100644 index 0000000000..a34bab9a0c --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/table-formats/iceberg.md @@ -0,0 +1,13 @@ +--- +title: Iceberg +description: The Iceberg table format +keywords: [iceberg, table formats] +--- + +# Iceberg table format + +[Iceberg](https://iceberg.apache.org/) is an open source table format. `dlt` can store data as Iceberg tables. + +## Supported Destinations + +Supported by: **Athena** diff --git a/poetry.lock b/poetry.lock index b476bc4a9f..e61d505a4a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "about-time" @@ -2400,6 +2400,30 @@ files = [ {file = "decorator-5.1.1.tar.gz", hash = "sha256:637996211036b6385ef91435e4fae22989472f9d571faba8927ba8253acbc330"}, ] +[[package]] +name = "deltalake" +version = "0.17.4" +description = "Native Delta Lake Python binding based on delta-rs with Pandas integration" +optional = true +python-versions = ">=3.8" +files = [ + {file = "deltalake-0.17.4-cp38-abi3-macosx_10_12_x86_64.whl", hash = "sha256:3f048bd4cdd3500fbb0d1b34046966ca4b7cefd1e9df71460b881ee8ad7f844a"}, + {file = "deltalake-0.17.4-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:b539265d8293794872e1dc3b2daad50abe05ab425e961824b3ac1155bb294604"}, + {file = "deltalake-0.17.4-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:55e6be5f5ab8d5d34d2ea58d86e93eec2da5d2476e3c15e9520239457618bca4"}, + {file = "deltalake-0.17.4-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:94dde6c2d0a07e9ce47be367d016541d3a499839350852205819353441e1a9c1"}, + {file = "deltalake-0.17.4-cp38-abi3-win_amd64.whl", hash = "sha256:f51f499d50dad88bdc18c5ed7c2319114759f3220f83aa2d32166c19accee4ce"}, + {file = "deltalake-0.17.4.tar.gz", hash = "sha256:c3c10577afc46d4b10ed16246d814a8c40b3663099066681eeba89f908373814"}, +] + +[package.dependencies] +pyarrow = ">=8" +pyarrow-hotfix = "*" + +[package.extras] +devel = ["mypy (>=1.8.0,<1.9.0)", "packaging (>=20)", "pytest", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-timeout", "ruff (>=0.3.0,<0.4.0)", "sphinx (<=4.5)", "sphinx-rtd-theme", "toml", "wheel"] +pandas = ["pandas"] +pyspark = ["delta-spark", "numpy (==1.22.2)", "pyspark"] + [[package]] name = "deprecated" version = "1.2.14" @@ -3680,106 +3704,6 @@ files = [ {file = "google_re2-1.1-4-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1f4d4f0823e8b2f6952a145295b1ff25245ce9bb136aff6fe86452e507d4c1dd"}, {file = "google_re2-1.1-4-cp39-cp39-win32.whl", hash = "sha256:1afae56b2a07bb48cfcfefaa15ed85bae26a68f5dc7f9e128e6e6ea36914e847"}, {file = "google_re2-1.1-4-cp39-cp39-win_amd64.whl", hash = "sha256:aa7d6d05911ab9c8adbf3c225a7a120ab50fd2784ac48f2f0d140c0b7afc2b55"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:222fc2ee0e40522de0b21ad3bc90ab8983be3bf3cec3d349c80d76c8bb1a4beb"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:d4763b0b9195b72132a4e7de8e5a9bf1f05542f442a9115aa27cfc2a8004f581"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_13_0_arm64.whl", hash = "sha256:209649da10c9d4a93d8a4d100ecbf9cc3b0252169426bec3e8b4ad7e57d600cf"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_13_0_x86_64.whl", hash = "sha256:68813aa333c1604a2df4a495b2a6ed065d7c8aebf26cc7e7abb5a6835d08353c"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:370a23ec775ad14e9d1e71474d56f381224dcf3e72b15d8ca7b4ad7dd9cd5853"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:14664a66a3ddf6bc9e56f401bf029db2d169982c53eff3f5876399104df0e9a6"}, - {file = "google_re2-1.1-5-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3ea3722cc4932cbcebd553b69dce1b4a73572823cff4e6a244f1c855da21d511"}, - {file = "google_re2-1.1-5-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e14bb264c40fd7c627ef5678e295370cd6ba95ca71d835798b6e37502fc4c690"}, - {file = "google_re2-1.1-5-cp310-cp310-win32.whl", hash = "sha256:39512cd0151ea4b3969c992579c79b423018b464624ae955be685fc07d94556c"}, - {file = "google_re2-1.1-5-cp310-cp310-win_amd64.whl", hash = "sha256:ac66537aa3bc5504320d922b73156909e3c2b6da19739c866502f7827b3f9fdf"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b5ea68d54890c9edb1b930dcb2658819354e5d3f2201f811798bbc0a142c2b4"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:33443511b6b83c35242370908efe2e8e1e7cae749c766b2b247bf30e8616066c"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:413d77bdd5ba0bfcada428b4c146e87707452ec50a4091ec8e8ba1413d7e0619"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_13_0_x86_64.whl", hash = "sha256:5171686e43304996a34baa2abcee6f28b169806d0e583c16d55e5656b092a414"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:3b284db130283771558e31a02d8eb8fb756156ab98ce80035ae2e9e3a5f307c4"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:296e6aed0b169648dc4b870ff47bd34c702a32600adb9926154569ef51033f47"}, - {file = "google_re2-1.1-5-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:38d50e68ead374160b1e656bbb5d101f0b95fb4cc57f4a5c12100155001480c5"}, - {file = "google_re2-1.1-5-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2a0416a35921e5041758948bcb882456916f22845f66a93bc25070ef7262b72a"}, - {file = "google_re2-1.1-5-cp311-cp311-win32.whl", hash = "sha256:a1d59568bbb5de5dd56dd6cdc79907db26cce63eb4429260300c65f43469e3e7"}, - {file = "google_re2-1.1-5-cp311-cp311-win_amd64.whl", hash = "sha256:72f5a2f179648b8358737b2b493549370debd7d389884a54d331619b285514e3"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:cbc72c45937b1dc5acac3560eb1720007dccca7c9879138ff874c7f6baf96005"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5fadd1417fbef7235fa9453dba4eb102e6e7d94b1e4c99d5fa3dd4e288d0d2ae"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_13_0_arm64.whl", hash = "sha256:040f85c63cc02696485b59b187a5ef044abe2f99b92b4fb399de40b7d2904ccc"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_13_0_x86_64.whl", hash = "sha256:64e3b975ee6d9bbb2420494e41f929c1a0de4bcc16d86619ab7a87f6ea80d6bd"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:8ee370413e00f4d828eaed0e83b8af84d7a72e8ee4f4bd5d3078bc741dfc430a"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:5b89383001079323f693ba592d7aad789d7a02e75adb5d3368d92b300f5963fd"}, - {file = "google_re2-1.1-5-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:63cb4fdfbbda16ae31b41a6388ea621510db82feb8217a74bf36552ecfcd50ad"}, - {file = "google_re2-1.1-5-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9ebedd84ae8be10b7a71a16162376fd67a2386fe6361ef88c622dcf7fd679daf"}, - {file = "google_re2-1.1-5-cp312-cp312-win32.whl", hash = "sha256:c8e22d1692bc2c81173330c721aff53e47ffd3c4403ff0cd9d91adfd255dd150"}, - {file = "google_re2-1.1-5-cp312-cp312-win_amd64.whl", hash = "sha256:5197a6af438bb8c4abda0bbe9c4fbd6c27c159855b211098b29d51b73e4cbcf6"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_12_0_arm64.whl", hash = "sha256:b6727e0b98417e114b92688ad2aa256102ece51f29b743db3d831df53faf1ce3"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:711e2b6417eb579c61a4951029d844f6b95b9b373b213232efd413659889a363"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_13_0_arm64.whl", hash = "sha256:71ae8b3df22c5c154c8af0f0e99d234a450ef1644393bc2d7f53fc8c0a1e111c"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_13_0_x86_64.whl", hash = "sha256:94a04e214bc521a3807c217d50cf099bbdd0c0a80d2d996c0741dbb995b5f49f"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_14_0_arm64.whl", hash = "sha256:a770f75358508a9110c81a1257721f70c15d9bb592a2fb5c25ecbd13566e52a5"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_14_0_x86_64.whl", hash = "sha256:07c9133357f7e0b17c6694d5dcb82e0371f695d7c25faef2ff8117ef375343ff"}, - {file = "google_re2-1.1-5-cp38-cp38-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:204ca6b1cf2021548f4a9c29ac015e0a4ab0a7b6582bf2183d838132b60c8fda"}, - {file = "google_re2-1.1-5-cp38-cp38-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f0b95857c2c654f419ca684ec38c9c3325c24e6ba7d11910a5110775a557bb18"}, - {file = "google_re2-1.1-5-cp38-cp38-win32.whl", hash = "sha256:347ac770e091a0364e822220f8d26ab53e6fdcdeaec635052000845c5a3fb869"}, - {file = "google_re2-1.1-5-cp38-cp38-win_amd64.whl", hash = "sha256:ec32bb6de7ffb112a07d210cf9f797b7600645c2d5910703fa07f456dd2150e0"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:eb5adf89060f81c5ff26c28e261e6b4997530a923a6093c9726b8dec02a9a326"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:a22630c9dd9ceb41ca4316bccba2643a8b1d5c198f21c00ed5b50a94313aaf10"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_13_0_arm64.whl", hash = "sha256:544dc17fcc2d43ec05f317366375796351dec44058e1164e03c3f7d050284d58"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_13_0_x86_64.whl", hash = "sha256:19710af5ea88751c7768575b23765ce0dfef7324d2539de576f75cdc319d6654"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_14_0_arm64.whl", hash = "sha256:f82995a205e08ad896f4bd5ce4847c834fab877e1772a44e5f262a647d8a1dec"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_14_0_x86_64.whl", hash = "sha256:63533c4d58da9dc4bc040250f1f52b089911699f0368e0e6e15f996387a984ed"}, - {file = "google_re2-1.1-5-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:79e00fcf0cb04ea35a22b9014712d448725ce4ddc9f08cc818322566176ca4b0"}, - {file = "google_re2-1.1-5-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bc41afcefee2da6c4ed883a93d7f527c4b960cd1d26bbb0020a7b8c2d341a60a"}, - {file = "google_re2-1.1-5-cp39-cp39-win32.whl", hash = "sha256:486730b5e1f1c31b0abc6d80abe174ce4f1188fe17d1b50698f2bf79dc6e44be"}, - {file = "google_re2-1.1-5-cp39-cp39-win_amd64.whl", hash = "sha256:4de637ca328f1d23209e80967d1b987d6b352cd01b3a52a84b4d742c69c3da6c"}, - {file = "google_re2-1.1-6-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:621e9c199d1ff0fdb2a068ad450111a84b3bf14f96dfe5a8a7a0deae5f3f4cce"}, - {file = "google_re2-1.1-6-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:220acd31e7dde95373f97c3d1f3b3bd2532b38936af28b1917ee265d25bebbf4"}, - {file = "google_re2-1.1-6-cp310-cp310-macosx_13_0_arm64.whl", hash = "sha256:db34e1098d164f76251a6ece30e8f0ddfd65bb658619f48613ce71acb3f9cbdb"}, - {file = "google_re2-1.1-6-cp310-cp310-macosx_13_0_x86_64.whl", hash = "sha256:5152bac41d8073977582f06257219541d0fc46ad99b0bbf30e8f60198a43b08c"}, - {file = "google_re2-1.1-6-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:6191294799e373ee1735af91f55abd23b786bdfd270768a690d9d55af9ea1b0d"}, - {file = "google_re2-1.1-6-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:070cbafbb4fecbb02e98feb28a1eb292fb880f434d531f38cc33ee314b521f1f"}, - {file = "google_re2-1.1-6-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8437d078b405a59a576cbed544490fe041140f64411f2d91012e8ec05ab8bf86"}, - {file = "google_re2-1.1-6-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f00f9a9af8896040e37896d9b9fc409ad4979f1ddd85bb188694a7d95ddd1164"}, - {file = "google_re2-1.1-6-cp310-cp310-win32.whl", hash = "sha256:df26345f229a898b4fd3cafd5f82259869388cee6268fc35af16a8e2293dd4e5"}, - {file = "google_re2-1.1-6-cp310-cp310-win_amd64.whl", hash = "sha256:3665d08262c57c9b28a5bdeb88632ad792c4e5f417e5645901695ab2624f5059"}, - {file = "google_re2-1.1-6-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:b26b869d8aa1d8fe67c42836bf3416bb72f444528ee2431cfb59c0d3e02c6ce3"}, - {file = "google_re2-1.1-6-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:41fd4486c57dea4f222a6bb7f1ff79accf76676a73bdb8da0fcbd5ba73f8da71"}, - {file = "google_re2-1.1-6-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:0ee378e2e74e25960070c338c28192377c4dd41e7f4608f2688064bd2badc41e"}, - {file = "google_re2-1.1-6-cp311-cp311-macosx_13_0_x86_64.whl", hash = "sha256:a00cdbf662693367b36d075b29feb649fd7ee1b617cf84f85f2deebeda25fc64"}, - {file = "google_re2-1.1-6-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:4c09455014217a41499432b8c8f792f25f3df0ea2982203c3a8c8ca0e7895e69"}, - {file = "google_re2-1.1-6-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:6501717909185327935c7945e23bb5aa8fc7b6f237b45fe3647fa36148662158"}, - {file = "google_re2-1.1-6-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3510b04790355f199e7861c29234081900e1e1cbf2d1484da48aa0ba6d7356ab"}, - {file = "google_re2-1.1-6-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8c0e64c187ca406764f9e9ad6e750d62e69ed8f75bf2e865d0bfbc03b642361c"}, - {file = "google_re2-1.1-6-cp311-cp311-win32.whl", hash = "sha256:2a199132350542b0de0f31acbb3ca87c3a90895d1d6e5235f7792bb0af02e523"}, - {file = "google_re2-1.1-6-cp311-cp311-win_amd64.whl", hash = "sha256:83bdac8ceaece8a6db082ea3a8ba6a99a2a1ee7e9f01a9d6d50f79c6f251a01d"}, - {file = "google_re2-1.1-6-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:81985ff894cd45ab5a73025922ac28c0707759db8171dd2f2cc7a0e856b6b5ad"}, - {file = "google_re2-1.1-6-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5635af26065e6b45456ccbea08674ae2ab62494008d9202df628df3b267bc095"}, - {file = "google_re2-1.1-6-cp312-cp312-macosx_13_0_arm64.whl", hash = "sha256:813b6f04de79f4a8fdfe05e2cb33e0ccb40fe75d30ba441d519168f9d958bd54"}, - {file = "google_re2-1.1-6-cp312-cp312-macosx_13_0_x86_64.whl", hash = "sha256:5ec2f5332ad4fd232c3f2d6748c2c7845ccb66156a87df73abcc07f895d62ead"}, - {file = "google_re2-1.1-6-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:5a687b3b32a6cbb731647393b7c4e3fde244aa557f647df124ff83fb9b93e170"}, - {file = "google_re2-1.1-6-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:39a62f9b3db5d3021a09a47f5b91708b64a0580193e5352751eb0c689e4ad3d7"}, - {file = "google_re2-1.1-6-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ca0f0b45d4a1709cbf5d21f355e5809ac238f1ee594625a1e5ffa9ff7a09eb2b"}, - {file = "google_re2-1.1-6-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a64b3796a7a616c7861247bd061c9a836b5caf0d5963e5ea8022125601cf7b09"}, - {file = "google_re2-1.1-6-cp312-cp312-win32.whl", hash = "sha256:32783b9cb88469ba4cd9472d459fe4865280a6b1acdad4480a7b5081144c4eb7"}, - {file = "google_re2-1.1-6-cp312-cp312-win_amd64.whl", hash = "sha256:259ff3fd2d39035b9cbcbf375995f83fa5d9e6a0c5b94406ff1cc168ed41d6c6"}, - {file = "google_re2-1.1-6-cp38-cp38-macosx_12_0_arm64.whl", hash = "sha256:e4711bcffe190acd29104d8ecfea0c0e42b754837de3fb8aad96e6cc3c613cdc"}, - {file = "google_re2-1.1-6-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:4d081cce43f39c2e813fe5990e1e378cbdb579d3f66ded5bade96130269ffd75"}, - {file = "google_re2-1.1-6-cp38-cp38-macosx_13_0_arm64.whl", hash = "sha256:4f123b54d48450d2d6b14d8fad38e930fb65b5b84f1b022c10f2913bd956f5b5"}, - {file = "google_re2-1.1-6-cp38-cp38-macosx_13_0_x86_64.whl", hash = "sha256:e1928b304a2b591a28eb3175f9db7f17c40c12cf2d4ec2a85fdf1cc9c073ff91"}, - {file = "google_re2-1.1-6-cp38-cp38-macosx_14_0_arm64.whl", hash = "sha256:3a69f76146166aec1173003c1f547931bdf288c6b135fda0020468492ac4149f"}, - {file = "google_re2-1.1-6-cp38-cp38-macosx_14_0_x86_64.whl", hash = "sha256:fc08c388f4ebbbca345e84a0c56362180d33d11cbe9ccfae663e4db88e13751e"}, - {file = "google_re2-1.1-6-cp38-cp38-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b057adf38ce4e616486922f2f47fc7d19c827ba0a7f69d540a3664eba2269325"}, - {file = "google_re2-1.1-6-cp38-cp38-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4138c0b933ab099e96f5d8defce4486f7dfd480ecaf7f221f2409f28022ccbc5"}, - {file = "google_re2-1.1-6-cp38-cp38-win32.whl", hash = "sha256:9693e45b37b504634b1abbf1ee979471ac6a70a0035954592af616306ab05dd6"}, - {file = "google_re2-1.1-6-cp38-cp38-win_amd64.whl", hash = "sha256:5674d437baba0ea287a5a7f8f81f24265d6ae8f8c09384e2ef7b6f84b40a7826"}, - {file = "google_re2-1.1-6-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:7783137cb2e04f458a530c6d0ee9ef114815c1d48b9102f023998c371a3b060e"}, - {file = "google_re2-1.1-6-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:a49b7153935e7a303675f4deb5f5d02ab1305adefc436071348706d147c889e0"}, - {file = "google_re2-1.1-6-cp39-cp39-macosx_13_0_arm64.whl", hash = "sha256:a96a8bb309182090704593c60bdb369a2756b38fe358bbf0d40ddeb99c71769f"}, - {file = "google_re2-1.1-6-cp39-cp39-macosx_13_0_x86_64.whl", hash = "sha256:dff3d4be9f27ef8ec3705eed54f19ef4ab096f5876c15fe011628c69ba3b561c"}, - {file = "google_re2-1.1-6-cp39-cp39-macosx_14_0_arm64.whl", hash = "sha256:40f818b0b39e26811fa677978112a8108269977fdab2ba0453ac4363c35d9e66"}, - {file = "google_re2-1.1-6-cp39-cp39-macosx_14_0_x86_64.whl", hash = "sha256:8a7e53538cdb40ef4296017acfbb05cab0c19998be7552db1cfb85ba40b171b9"}, - {file = "google_re2-1.1-6-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6ee18e7569fb714e5bb8c42809bf8160738637a5e71ed5a4797757a1fb4dc4de"}, - {file = "google_re2-1.1-6-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1cda4f6d1a7d5b43ea92bc395f23853fba0caf8b1e1efa6e8c48685f912fcb89"}, - {file = "google_re2-1.1-6-cp39-cp39-win32.whl", hash = "sha256:6a9cdbdc36a2bf24f897be6a6c85125876dc26fea9eb4247234aec0decbdccfd"}, - {file = "google_re2-1.1-6-cp39-cp39-win_amd64.whl", hash = "sha256:73f646cecfad7cc5b4330b4192c25f2e29730a3b8408e089ffd2078094208196"}, ] [[package]] @@ -6558,6 +6482,17 @@ files = [ [package.dependencies] numpy = ">=1.16.6" +[[package]] +name = "pyarrow-hotfix" +version = "0.6" +description = "" +optional = true +python-versions = ">=3.5" +files = [ + {file = "pyarrow_hotfix-0.6-py3-none-any.whl", hash = "sha256:dcc9ae2d220dff0083be6a9aa8e0cdee5182ad358d4931fce825c545e5c89178"}, + {file = "pyarrow_hotfix-0.6.tar.gz", hash = "sha256:79d3e030f7ff890d408a100ac16d6f00b14d44a502d7897cd9fc3e3a534e9945"}, +] + [[package]] name = "pyasn1" version = "0.5.0" @@ -9368,6 +9303,7 @@ cli = ["cron-descriptor", "pipdeptree"] clickhouse = ["adlfs", "clickhouse-connect", "clickhouse-driver", "gcsfs", "pyarrow", "s3fs"] databricks = ["databricks-sql-connector"] dbt = ["dbt-athena-community", "dbt-bigquery", "dbt-core", "dbt-databricks", "dbt-duckdb", "dbt-redshift", "dbt-snowflake"] +deltalake = ["deltalake", "pyarrow"] dremio = ["pyarrow"] duckdb = ["duckdb"] filesystem = ["botocore", "s3fs"] @@ -9387,4 +9323,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.13" -content-hash = "4584e2332c46a3a409ee605a1f03d110b765a491024ee96e44f62902c0769711" +content-hash = "d9034fc091a6e823373e742530d67b9c075d329afd2fee3bad7467716d2b2b9a" diff --git a/pyproject.toml b/pyproject.toml index 8e98445f02..9086acea9b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,6 +85,7 @@ databricks-sql-connector = {version = ">=2.9.3,<3.0.0", optional = true} dbt-databricks = {version = ">=1.7.3", optional = true} clickhouse-driver = { version = ">=0.2.7", optional = true } clickhouse-connect = { version = ">=0.7.7", optional = true } +deltalake = { version = ">=0.17.4", optional = true } [tool.poetry.extras] dbt = ["dbt-core", "dbt-redshift", "dbt-bigquery", "dbt-duckdb", "dbt-snowflake", "dbt-athena-community", "dbt-databricks"] @@ -110,6 +111,7 @@ qdrant = ["qdrant-client"] databricks = ["databricks-sql-connector"] clickhouse = ["clickhouse-driver", "clickhouse-connect", "s3fs", "gcsfs", "adlfs", "pyarrow"] dremio = ["pyarrow"] +deltalake = ["deltalake", "pyarrow"] [tool.poetry.scripts] dlt = "dlt.cli._dlt:_main" diff --git a/tests/cases.py b/tests/cases.py index 2b655fdc8b..d145ec1d94 100644 --- a/tests/cases.py +++ b/tests/cases.py @@ -257,6 +257,11 @@ def assert_all_data_types_row( else: db_mapping[binary_col] = bytes(db_mapping[binary_col]) + # `delta` table format stores `wei` type as string + if "col8" in db_mapping: + if isinstance(db_mapping["col8"], str): + db_mapping["col8"] = int(db_mapping["col8"]) + # redshift and bigquery return strings from structured fields if "col9" in db_mapping: if isinstance(db_mapping["col9"], str): @@ -270,7 +275,7 @@ def assert_all_data_types_row( if "col10" in db_mapping: db_mapping["col10"] = db_mapping["col10"].isoformat() if "col11" in db_mapping: - db_mapping["col11"] = db_mapping["col11"].isoformat() + db_mapping["col11"] = ensure_pendulum_time(db_mapping["col11"]).isoformat() if expect_filtered_null_columns: for key, expected in expected_rows.items(): @@ -291,6 +296,8 @@ def arrow_table_all_data_types( include_time: bool = True, include_binary: bool = True, include_decimal: bool = True, + include_decimal_default_precision: bool = False, + include_decimal_arrow_max_precision: bool = False, include_date: bool = True, include_not_normalized_name: bool = True, include_name_clash: bool = False, @@ -337,6 +344,20 @@ def arrow_table_all_data_types( if include_decimal: data["decimal"] = [Decimal(str(round(random.uniform(0, 100), 4))) for _ in range(num_rows)] + if include_decimal_default_precision: + from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION + + data["decimal_default_precision"] = [ + Decimal(int("1" * DEFAULT_NUMERIC_PRECISION)) for _ in range(num_rows) + ] + + if include_decimal_arrow_max_precision: + from dlt.common.libs.pyarrow import ARROW_DECIMAL_MAX_PRECISION + + data["decimal_arrow_max_precision"] = [ + Decimal(int("1" * ARROW_DECIMAL_MAX_PRECISION)) for _ in range(num_rows) + ] + if include_date: data["date"] = pd.date_range("2021-01-01", periods=num_rows, tz=tz).date diff --git a/tests/common/configuration/test_annotation_future.py b/tests/common/configuration/test_annotation_future.py new file mode 100644 index 0000000000..800d689fb7 --- /dev/null +++ b/tests/common/configuration/test_annotation_future.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from typing import Optional + +from dlt.common.configuration import configspec +from dlt.common.configuration.resolve import resolve_configuration +from dlt.common.configuration.specs import BaseConfiguration + +from tests.utils import preserve_environ +from tests.common.configuration.utils import environment + + +def test_str_annotations(environment) -> None: + @configspec + class DataConf(BaseConfiguration): + x: int = None + x_7: Optional[int] = 3 + + assert DataConf.__annotations__ == {"x": "int", "x_7": "Optional[int]"} + assert DataConf.get_resolvable_fields() == {"x": int, "x_7": Optional[int]} + + # resolve it + environment["X"] = "10" + c = resolve_configuration(DataConf()) + assert c.x == 10 diff --git a/tests/libs/test_deltalake.py b/tests/libs/test_deltalake.py new file mode 100644 index 0000000000..d55f788fbe --- /dev/null +++ b/tests/libs/test_deltalake.py @@ -0,0 +1,182 @@ +import os +from typing import Iterator, Tuple, cast + +import pytest +from deltalake import DeltaTable + +import dlt +from dlt.common.libs.pyarrow import pyarrow as pa +from dlt.common.libs.deltalake import ( + write_delta_table, + _deltalake_storage_options, +) +from dlt.common.configuration.specs import AwsCredentials +from dlt.destinations.impl.filesystem.filesystem import ( + FilesystemClient, + FilesystemDestinationClientConfiguration, +) + +from tests.cases import arrow_table_all_data_types + + +@pytest.fixture() +def filesystem_client() -> Iterator[Tuple[FilesystemClient, str]]: + """Returns tuple of local `FilesystemClient` instance and remote directory string. + + Remote directory is removed on teardown. + """ + # setup + os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage" + client = cast(FilesystemClient, dlt.pipeline(destination="filesystem").destination_client()) + remote_dir = os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] + "/tmp_dir" + + yield (client, remote_dir) + + # teardown + if client.fs_client.exists(remote_dir): + client.fs_client.rm(remote_dir, recursive=True) + + +def test_deltalake_storage_options() -> None: + config = FilesystemDestinationClientConfiguration() + + # no credentials, no deltalake_storage_options + config.bucket_url = "_storage://foo" + assert _deltalake_storage_options(config) == dict() + + # no credentials, yes deltalake_storage_options + config.deltalake_storage_options = {"foo": "bar"} + assert _deltalake_storage_options(config) == {"foo": "bar"} + + # yes credentials, yes deltalake_storage_options: no shared keys + creds = AwsCredentials( + aws_access_key_id="dummy_key_id", + aws_secret_access_key="dummy_acces_key", # type: ignore[arg-type] + aws_session_token="dummy_session_token", # type: ignore[arg-type] + region_name="dummy_region_name", + ) + config.credentials = creds + config.bucket_url = "s3://foo" + assert _deltalake_storage_options(config).keys() == { + "aws_access_key_id", + "aws_secret_access_key", + "aws_session_token", + "region", + "foo", + } + + # yes credentials, yes deltalake_storage_options: yes shared keys + config.deltalake_storage_options = {"aws_access_key_id": "i_will_overwrite"} + assert _deltalake_storage_options(config).keys() == { + "aws_access_key_id", + "aws_secret_access_key", + "aws_session_token", + "region", + } + assert _deltalake_storage_options(config)["aws_access_key_id"] == "i_will_overwrite" + + +def test_write_delta_table(filesystem_client) -> None: + client, remote_dir = filesystem_client + client = cast(FilesystemClient, client) + storage_options = _deltalake_storage_options(client.config) + + with pytest.raises(Exception): + # bug in `delta-rs` causes error when writing big decimal values + # https://github.com/delta-io/delta-rs/issues/2510 + # if this test fails, the bug has been fixed and we should remove this + # note from the docs: + write_delta_table( + remote_dir + "/corrupt_delta_table", + arrow_table_all_data_types("arrow-table", include_decimal_default_precision=True)[0], + write_disposition="append", + storage_options=storage_options, + ) + + arrow_table = arrow_table_all_data_types( + "arrow-table", + include_decimal_default_precision=False, + include_decimal_arrow_max_precision=True, + num_rows=2, + )[0] + + # first write should create Delta table with same shape as input Arrow table + write_delta_table( + remote_dir, arrow_table, write_disposition="append", storage_options=storage_options + ) + dt = DeltaTable(remote_dir, storage_options=storage_options) + assert dt.version() == 0 + dt_arrow_table = dt.to_pyarrow_table() + assert dt_arrow_table.shape == (arrow_table.num_rows, arrow_table.num_columns) + + # table contents should be different because "time" column has type `string` + # in Delta table, but type `time` in Arrow source table + assert not dt_arrow_table.equals(arrow_table) + casted_cols = ("null", "time", "decimal_arrow_max_precision") + assert dt_arrow_table.drop_columns(casted_cols).equals(arrow_table.drop_columns(casted_cols)) + + # another `append` should create a new table version with twice the number of rows + write_delta_table( + remote_dir, arrow_table, write_disposition="append", storage_options=storage_options + ) + dt = DeltaTable(remote_dir, storage_options=storage_options) + assert dt.version() == 1 + assert dt.to_pyarrow_table().shape == (arrow_table.num_rows * 2, arrow_table.num_columns) + + # the `replace` write disposition should trigger a "logical delete" + write_delta_table( + remote_dir, arrow_table, write_disposition="replace", storage_options=storage_options + ) + dt = DeltaTable(remote_dir, storage_options=storage_options) + assert dt.version() == 2 + assert dt.to_pyarrow_table().shape == (arrow_table.num_rows, arrow_table.num_columns) + + # the previous table version should still exist + dt.load_version(1) + assert dt.to_pyarrow_table().shape == (arrow_table.num_rows * 2, arrow_table.num_columns) + + # `merge` should resolve to `append` bevavior + write_delta_table( + remote_dir, arrow_table, write_disposition="merge", storage_options=storage_options + ) + dt = DeltaTable(remote_dir, storage_options=storage_options) + assert dt.version() == 3 + assert dt.to_pyarrow_table().shape == (arrow_table.num_rows * 2, arrow_table.num_columns) + + # add column in source table + evolved_arrow_table = arrow_table.append_column( + "new", pa.array([1 for _ in range(arrow_table.num_rows)]) + ) + assert ( + evolved_arrow_table.num_columns == arrow_table.num_columns + 1 + ) # ensure column was appendend + + # new column should be propagated to Delta table (schema evolution is supported) + write_delta_table( + remote_dir, evolved_arrow_table, write_disposition="append", storage_options=storage_options + ) + dt = DeltaTable(remote_dir, storage_options=storage_options) + assert dt.version() == 4 + dt_arrow_table = dt.to_pyarrow_table() + assert dt_arrow_table.shape == (arrow_table.num_rows * 3, evolved_arrow_table.num_columns) + assert "new" in dt_arrow_table.schema.names + assert dt_arrow_table.column("new").to_pylist() == [1, 1, None, None, None, None] + + # providing a subset of columns should lead to missing columns being null-filled + write_delta_table( + remote_dir, arrow_table, write_disposition="append", storage_options=storage_options + ) + dt = DeltaTable(remote_dir, storage_options=storage_options) + assert dt.version() == 5 + dt_arrow_table = dt.to_pyarrow_table() + assert dt_arrow_table.shape == (arrow_table.num_rows * 4, evolved_arrow_table.num_columns) + assert dt_arrow_table.column("new").to_pylist() == [None, None, 1, 1, None, None, None, None] + + with pytest.raises(ValueError): + # unsupported value for `write_disposition` should raise ValueError + write_delta_table( + remote_dir, + arrow_table, + write_disposition="foo", # type:ignore[arg-type] + storage_options=storage_options, + ) diff --git a/tests/libs/test_pydantic.py b/tests/libs/test_pydantic.py index d6dc29e0c8..951eabbde4 100644 --- a/tests/libs/test_pydantic.py +++ b/tests/libs/test_pydantic.py @@ -161,7 +161,9 @@ class User(BaseModel): address: Annotated[UserAddress, "PII", "address"] uuid_or_str: Union[str, UUID4, None] unity: Union[UserAddress, UserLabel, Dict[str, UserAddress]] - location: Annotated[Optional[Union[str, List[str]]], None] + # NOTE: added "int" because this type was clashing with a type + # in a delta-rs library that got cached and that re-orders the union + location: Annotated[Optional[Union[str, List[str], int]], None] something_required: Annotated[Union[str, int], type(None)] final_location: Final[Annotated[Union[str, int], None]] # type: ignore[misc] final_optional: Final[Annotated[Optional[str], None]] # type: ignore[misc] diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index 4519f1ea83..fbfd08271b 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -14,7 +14,6 @@ INIT_FILE_NAME, ) - from dlt.destinations.path_utils import create_path, prepare_datetime_params from tests.load.filesystem.utils import perform_load from tests.utils import clean_test_storage, init_test_logging diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index c069f88a15..270e1ff70c 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -47,6 +47,7 @@ def test_filesystem_configuration() -> None: "credentials": None, "client_kwargs": None, "kwargs": None, + "deltalake_storage_options": None, } @@ -145,6 +146,7 @@ def test_filesystem_configuration_with_additional_arguments() -> None: bucket_url="az://root", kwargs={"use_ssl": True}, client_kwargs={"verify": "public.crt"}, + deltalake_storage_options={"AWS_S3_LOCKING_PROVIDER": "dynamodb"}, ) assert dict(config) == { "read_only": False, @@ -152,6 +154,7 @@ def test_filesystem_configuration_with_additional_arguments() -> None: "credentials": None, "kwargs": {"use_ssl": True}, "client_kwargs": {"verify": "public.crt"}, + "deltalake_storage_options": {"AWS_S3_LOCKING_PROVIDER": "dynamodb"}, } diff --git a/tests/load/filesystem/test_object_store_rs_credentials.py b/tests/load/filesystem/test_object_store_rs_credentials.py new file mode 100644 index 0000000000..4e43b7c5d8 --- /dev/null +++ b/tests/load/filesystem/test_object_store_rs_credentials.py @@ -0,0 +1,148 @@ +"""Tests translation of `dlt` credentials into `object_store` Rust crate credentials.""" + +from typing import Any, Dict, cast + +import pytest +from deltalake import DeltaTable +from deltalake.exceptions import TableNotFoundError + +import dlt +from dlt.common.typing import TSecretStrValue +from dlt.common.configuration import resolve_configuration +from dlt.common.configuration.specs import ( + AnyAzureCredentials, + AzureServicePrincipalCredentialsWithoutDefaults, + AzureCredentialsWithoutDefaults, + AwsCredentials, + AwsCredentialsWithoutDefaults, + GcpServiceAccountCredentialsWithoutDefaults, + GcpOAuthCredentialsWithoutDefaults, +) + +from tests.load.utils import AZ_BUCKET, AWS_BUCKET, GCS_BUCKET, ALL_FILESYSTEM_DRIVERS + +if all(driver not in ALL_FILESYSTEM_DRIVERS for driver in ("az", "s3", "gs")): + pytest.skip( + "Requires at least one of `az`, `s3`, `gs` in `ALL_FILESYSTEM_DRIVERS`.", + allow_module_level=True, + ) + + +FS_CREDS: Dict[str, Any] = dlt.secrets.get("destination.filesystem.credentials") +assert ( + FS_CREDS is not None +), "`destination.filesystem.credentials` must be configured for these tests." + + +def can_connect(bucket_url: str, object_store_rs_credentials: Dict[str, str]) -> bool: + """Returns True if client can connect to object store, False otherwise. + + Uses `deltatable` library as Python interface to `object_store` Rust crate. + """ + try: + DeltaTable( + bucket_url, + storage_options=object_store_rs_credentials, + ) + except TableNotFoundError: + # this error implies the connection was succesful + # there is no Delta table at `bucket_url` + return True + return False + + +@pytest.mark.skipif( + "az" not in ALL_FILESYSTEM_DRIVERS, reason="`az` not in `ALL_FILESYSTEM_DRIVERS`" +) +def test_azure_object_store_rs_credentials() -> None: + creds: AnyAzureCredentials + + creds = AzureServicePrincipalCredentialsWithoutDefaults( + **dlt.secrets.get("destination.fsazureprincipal.credentials") + ) + assert can_connect(AZ_BUCKET, creds.to_object_store_rs_credentials()) + + # without SAS token + creds = AzureCredentialsWithoutDefaults( + azure_storage_account_name=FS_CREDS["azure_storage_account_name"], + azure_storage_account_key=FS_CREDS["azure_storage_account_key"], + ) + assert creds.azure_storage_sas_token is None + assert can_connect(AZ_BUCKET, creds.to_object_store_rs_credentials()) + + # with SAS token + creds = resolve_configuration(creds) + assert creds.azure_storage_sas_token is not None + assert can_connect(AZ_BUCKET, creds.to_object_store_rs_credentials()) + + +@pytest.mark.skipif( + "s3" not in ALL_FILESYSTEM_DRIVERS, reason="`s3` not in `ALL_FILESYSTEM_DRIVERS`" +) +def test_aws_object_store_rs_credentials() -> None: + creds: AwsCredentialsWithoutDefaults + + # AwsCredentials: no user-provided session token + creds = AwsCredentials( + aws_access_key_id=FS_CREDS["aws_access_key_id"], + aws_secret_access_key=FS_CREDS["aws_secret_access_key"], + region_name=FS_CREDS["region_name"], + ) + assert creds.aws_session_token is None + object_store_rs_creds = creds.to_object_store_rs_credentials() + assert object_store_rs_creds["aws_session_token"] is not None # auto-generated token + assert can_connect(AWS_BUCKET, object_store_rs_creds) + + # AwsCredentials: user-provided session token + # use previous credentials to create session token for new credentials + sess_creds = creds.to_session_credentials() + creds = AwsCredentials( + aws_access_key_id=sess_creds["aws_access_key_id"], + aws_secret_access_key=cast(TSecretStrValue, sess_creds["aws_secret_access_key"]), + aws_session_token=cast(TSecretStrValue, sess_creds["aws_session_token"]), + region_name=FS_CREDS["region_name"], + ) + assert creds.aws_session_token is not None + object_store_rs_creds = creds.to_object_store_rs_credentials() + assert object_store_rs_creds["aws_session_token"] is not None + assert can_connect(AWS_BUCKET, object_store_rs_creds) + + # AwsCredentialsWithoutDefaults: no user-provided session token + creds = AwsCredentialsWithoutDefaults( + aws_access_key_id=FS_CREDS["aws_access_key_id"], + aws_secret_access_key=FS_CREDS["aws_secret_access_key"], + region_name=FS_CREDS["region_name"], + ) + assert creds.aws_session_token is None + object_store_rs_creds = creds.to_object_store_rs_credentials() + assert "aws_session_token" not in object_store_rs_creds # no auto-generated token + assert can_connect(AWS_BUCKET, object_store_rs_creds) + + # AwsCredentialsWithoutDefaults: user-provided session token + creds = AwsCredentialsWithoutDefaults( + aws_access_key_id=sess_creds["aws_access_key_id"], + aws_secret_access_key=cast(TSecretStrValue, sess_creds["aws_secret_access_key"]), + aws_session_token=cast(TSecretStrValue, sess_creds["aws_session_token"]), + region_name=FS_CREDS["region_name"], + ) + assert creds.aws_session_token is not None + object_store_rs_creds = creds.to_object_store_rs_credentials() + assert object_store_rs_creds["aws_session_token"] is not None + assert can_connect(AWS_BUCKET, object_store_rs_creds) + + +@pytest.mark.skipif( + "gs" not in ALL_FILESYSTEM_DRIVERS, reason="`gs` not in `ALL_FILESYSTEM_DRIVERS`" +) +def test_gcp_object_store_rs_credentials() -> None: + creds = GcpServiceAccountCredentialsWithoutDefaults( + project_id=FS_CREDS["project_id"], + private_key=FS_CREDS["private_key"], + private_key_id=FS_CREDS["private_key_id"], + client_email=FS_CREDS["client_email"], + ) + assert can_connect(GCS_BUCKET, creds.to_object_store_rs_credentials()) + + # GcpOAuthCredentialsWithoutDefaults is currently not supported + with pytest.raises(NotImplementedError): + GcpOAuthCredentialsWithoutDefaults().to_object_store_rs_credentials() diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 623284d8a7..efbdc082f1 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -12,11 +12,11 @@ from dlt.common import pendulum from dlt.common.storages.load_package import ParsedLoadJobFileName from dlt.common.utils import uniq_id -from dlt.common.storages.load_storage import LoadJobInfo from dlt.destinations import filesystem from dlt.destinations.impl.filesystem.filesystem import FilesystemClient +from dlt.pipeline.exceptions import PipelineStepFailed -from tests.cases import arrow_table_all_data_types +from tests.cases import arrow_table_all_data_types, table_update_and_row, assert_all_data_types_row from tests.common.utils import load_json_case from tests.utils import ALL_TEST_DATA_ITEM_FORMATS, TestDataItemFormat, skip_if_not_active from dlt.destinations.path_utils import create_path @@ -25,12 +25,18 @@ DestinationTestConfiguration, ) -from tests.pipeline.utils import load_table_counts +from tests.pipeline.utils import load_table_counts, assert_load_info, load_tables_to_dicts skip_if_not_active("filesystem") +@pytest.fixture +def local_filesystem_pipeline() -> dlt.Pipeline: + os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "_storage" + return dlt.pipeline(pipeline_name="fs_pipe", destination="filesystem", full_refresh=True) + + def test_pipeline_merge_write_disposition(default_buckets_env: str) -> None: """Run pipeline twice with merge write disposition Regardless wether primary key is set or not, filesystem appends @@ -216,6 +222,244 @@ def some_source(): assert table.column("value").to_pylist() == [1, 2, 3, 4, 5] +@pytest.mark.essential +def test_delta_table_core( + default_buckets_env: str, + local_filesystem_pipeline: dlt.Pipeline, +) -> None: + """Tests core functionality for `delta` table format. + + Tests all data types, all filesystems, all write dispositions. + """ + + from tests.pipeline.utils import _get_delta_table + + if default_buckets_env.startswith("memory://"): + pytest.skip( + "`deltalake` library does not support `memory` protocol (write works, read doesn't)" + ) + if default_buckets_env.startswith("s3://"): + # https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/ + os.environ["DESTINATION__FILESYSTEM__DELTALAKE_STORAGE_OPTIONS"] = ( + '{"AWS_S3_ALLOW_UNSAFE_RENAME": "true"}' + ) + + # create resource that yields rows with all data types + column_schemas, row = table_update_and_row() + + @dlt.resource(columns=column_schemas, table_format="delta") + def data_types(): + nonlocal row + yield [row] * 10 + + # run pipeline, this should create Delta table + info = local_filesystem_pipeline.run(data_types()) + assert_load_info(info) + + # `delta` table format should use `parquet` file format + completed_jobs = info.load_packages[0].jobs["completed_jobs"] + data_types_jobs = [ + job for job in completed_jobs if job.job_file_info.table_name == "data_types" + ] + assert all([job.file_path.endswith((".parquet", ".reference")) for job in data_types_jobs]) + + # 10 rows should be loaded to the Delta table and the content of the first + # row should match expected values + rows = load_tables_to_dicts(local_filesystem_pipeline, "data_types", exclude_system_cols=True)[ + "data_types" + ] + assert len(rows) == 10 + assert_all_data_types_row(rows[0], schema=column_schemas) + + # another run should append rows to the table + info = local_filesystem_pipeline.run(data_types()) + assert_load_info(info) + rows = load_tables_to_dicts(local_filesystem_pipeline, "data_types", exclude_system_cols=True)[ + "data_types" + ] + assert len(rows) == 20 + + # ensure "replace" write disposition is handled + # should do logical replace, increasing the table version + info = local_filesystem_pipeline.run(data_types(), write_disposition="replace") + assert_load_info(info) + client = cast(FilesystemClient, local_filesystem_pipeline.destination_client()) + assert _get_delta_table(client, "data_types").version() == 2 + rows = load_tables_to_dicts(local_filesystem_pipeline, "data_types", exclude_system_cols=True)[ + "data_types" + ] + assert len(rows) == 10 + + # `merge` resolves to `append` behavior + info = local_filesystem_pipeline.run(data_types(), write_disposition="merge") + assert_load_info(info) + rows = load_tables_to_dicts(local_filesystem_pipeline, "data_types", exclude_system_cols=True)[ + "data_types" + ] + assert len(rows) == 20 + + +def test_delta_table_multiple_files( + local_filesystem_pipeline: dlt.Pipeline, +) -> None: + """Tests loading multiple files into a Delta table. + + Files should be loaded into the Delta table in a single commit. + """ + + from tests.pipeline.utils import _get_delta_table + + os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "2" # force multiple files + + @dlt.resource(table_format="delta") + def delta_table(): + yield [{"foo": True}] * 10 + + info = local_filesystem_pipeline.run(delta_table()) + assert_load_info(info) + + # multiple Parquet files should have been created + completed_jobs = info.load_packages[0].jobs["completed_jobs"] + delta_table_parquet_jobs = [ + job + for job in completed_jobs + if job.job_file_info.table_name == "delta_table" + and job.job_file_info.file_format == "parquet" + ] + assert len(delta_table_parquet_jobs) == 5 # 10 records, max 2 per file + + # all 10 records should have been loaded into a Delta table in a single commit + client = cast(FilesystemClient, local_filesystem_pipeline.destination_client()) + assert _get_delta_table(client, "delta_table").version() == 0 + rows = load_tables_to_dicts(local_filesystem_pipeline, "delta_table", exclude_system_cols=True)[ + "delta_table" + ] + assert len(rows) == 10 + + +def test_delta_table_child_tables( + local_filesystem_pipeline: dlt.Pipeline, +) -> None: + """Tests child table handling for `delta` table format.""" + + @dlt.resource(table_format="delta") + def complex_table(): + yield [ + { + "foo": 1, + "child": [{"bar": True, "grandchild": [1, 2]}, {"bar": True, "grandchild": [1]}], + }, + { + "foo": 2, + "child": [ + {"bar": False, "grandchild": [1, 3]}, + ], + }, + ] + + info = local_filesystem_pipeline.run(complex_table()) + assert_load_info(info) + rows_dict = load_tables_to_dicts( + local_filesystem_pipeline, + "complex_table", + "complex_table__child", + "complex_table__child__grandchild", + exclude_system_cols=True, + ) + # assert row counts + assert len(rows_dict["complex_table"]) == 2 + assert len(rows_dict["complex_table__child"]) == 3 + assert len(rows_dict["complex_table__child__grandchild"]) == 5 + # assert column names + assert rows_dict["complex_table"][0].keys() == {"foo"} + assert rows_dict["complex_table__child"][0].keys() == {"bar"} + assert rows_dict["complex_table__child__grandchild"][0].keys() == {"value"} + + # test write disposition handling with child tables + info = local_filesystem_pipeline.run(complex_table()) + assert_load_info(info) + rows_dict = load_tables_to_dicts( + local_filesystem_pipeline, + "complex_table", + "complex_table__child", + "complex_table__child__grandchild", + exclude_system_cols=True, + ) + assert len(rows_dict["complex_table"]) == 2 * 2 + assert len(rows_dict["complex_table__child"]) == 3 * 2 + assert len(rows_dict["complex_table__child__grandchild"]) == 5 * 2 + + info = local_filesystem_pipeline.run(complex_table(), write_disposition="replace") + assert_load_info(info) + rows_dict = load_tables_to_dicts( + local_filesystem_pipeline, + "complex_table", + "complex_table__child", + "complex_table__child__grandchild", + exclude_system_cols=True, + ) + assert len(rows_dict["complex_table"]) == 2 + assert len(rows_dict["complex_table__child"]) == 3 + assert len(rows_dict["complex_table__child__grandchild"]) == 5 + + +def test_delta_table_mixed_source( + local_filesystem_pipeline: dlt.Pipeline, +) -> None: + """Tests file format handling in mixed source. + + One resource uses `delta` table format, the other doesn't. + """ + + @dlt.resource(table_format="delta") + def delta_table(): + yield [{"foo": True}] + + @dlt.resource() + def non_delta_table(): + yield [1, 2, 3] + + @dlt.source + def s(): + return [delta_table(), non_delta_table()] + + info = local_filesystem_pipeline.run( + s(), loader_file_format="jsonl" + ) # set file format at pipeline level + assert_load_info(info) + completed_jobs = info.load_packages[0].jobs["completed_jobs"] + + # `jsonl` file format should be overridden for `delta_table` resource + # because it's not supported for `delta` table format + delta_table_jobs = [ + job for job in completed_jobs if job.job_file_info.table_name == "delta_table" + ] + assert all([job.file_path.endswith((".parquet", ".reference")) for job in delta_table_jobs]) + + # `jsonl` file format should be respected for `non_delta_table` resource + non_delta_table_job = [ + job for job in completed_jobs if job.job_file_info.table_name == "non_delta_table" + ][0] + assert non_delta_table_job.file_path.endswith(".jsonl") + + +def test_delta_table_dynamic_dispatch( + local_filesystem_pipeline: dlt.Pipeline, +) -> None: + @dlt.resource(primary_key="id", table_name=lambda i: i["type"], table_format="delta") + def github_events(): + with open( + "tests/normalize/cases/github.events.load_page_1_duck.json", "r", encoding="utf-8" + ) as f: + yield json.load(f) + + info = local_filesystem_pipeline.run(github_events()) + assert_load_info(info) + completed_jobs = info.load_packages[0].jobs["completed_jobs"] + # 20 event types, two jobs per table (.parquet and .reference), 1 job for _dlt_pipeline_state + assert len(completed_jobs) == 2 * 20 + 1 + + TEST_LAYOUTS = ( "{schema_name}/{table_name}/{load_id}.{file_id}.{ext}", "{schema_name}.{table_name}.{load_id}.{file_id}.{ext}", diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index b4dae919f8..7affcc5a81 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -1,5 +1,4 @@ -import posixpath -from typing import Any, Dict, List, Tuple, Callable, Sequence +from typing import Any, Dict, List, Callable, Sequence import pytest import random from os import environ @@ -8,7 +7,7 @@ import dlt from dlt.common import json, sleep from dlt.common.pipeline import LoadInfo -from dlt.common.schema.typing import LOADS_TABLE_NAME +from dlt.common.schema.utils import get_table_format from dlt.common.typing import DictStrAny from dlt.destinations.impl.filesystem.filesystem import FilesystemClient from dlt.destinations.fs_client import FSClientBase @@ -16,7 +15,6 @@ from dlt.common.storages import FileStorage from dlt.destinations.exceptions import DatabaseUndefinedRelation -from tests.utils import TEST_STORAGE_ROOT PIPELINE_TEST_CASES_PATH = "./tests/pipeline/cases/" @@ -156,19 +154,39 @@ def _load_file(client: FSClientBase, filepath) -> List[Dict[str, Any]]: # # Load table dicts # +def _get_delta_table(client: FilesystemClient, table_name: str) -> "DeltaTable": # type: ignore[name-defined] # noqa: F821 + from deltalake import DeltaTable + from dlt.common.libs.deltalake import _deltalake_storage_options + + table_dir = client.get_table_dir(table_name) + remote_table_dir = f"{client.config.protocol}://{table_dir}" + return DeltaTable( + remote_table_dir, + storage_options=_deltalake_storage_options(client.config), + ) + + def _load_tables_to_dicts_fs(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, Any]]]: """For now this will expect the standard layout in the filesystem destination, if changed the results will not be correct""" client = p._fs_client() result: Dict[str, Any] = {} for table_name in table_names: - table_files = client.list_table_files(table_name) - for file in table_files: - items = _load_file(client, file) - if table_name in result: - result[table_name] = result[table_name] + items - else: - result[table_name] = items + if ( + table_name in p.default_schema.data_table_names() + and get_table_format(p.default_schema.tables, table_name) == "delta" + ): + assert isinstance(client, FilesystemClient) + dt = _get_delta_table(client, table_name) + result[table_name] = dt.to_pyarrow_table().to_pylist() + else: + table_files = client.list_table_files(table_name) + for file in table_files: + items = _load_file(client, file) + if table_name in result: + result[table_name] = result[table_name] + items + else: + result[table_name] = items return result @@ -194,11 +212,29 @@ def _load_tables_to_dicts_sql( def load_tables_to_dicts( - p: dlt.Pipeline, *table_names: str, schema_name: str = None + p: dlt.Pipeline, + *table_names: str, + schema_name: str = None, + exclude_system_cols: bool = False, + sortkey: str = None, ) -> Dict[str, List[Dict[str, Any]]]: + def _exclude_system_cols(dict_: Dict[str, Any]) -> Dict[str, Any]: + return {k: v for k, v in dict_.items() if not k.startswith("_dlt")} + + def _sort_list_of_dicts(list_: List[Dict[str, Any]], sortkey: str) -> List[Dict[str, Any]]: + """Sort list of dictionaries by dictionary key.""" + return sorted(list_, key=lambda d: d[sortkey]) + if _is_filesystem(p): - return _load_tables_to_dicts_fs(p, *table_names) - return _load_tables_to_dicts_sql(p, *table_names, schema_name=schema_name) + result = _load_tables_to_dicts_fs(p, *table_names) + else: + result = _load_tables_to_dicts_sql(p, *table_names, schema_name=schema_name) + + if exclude_system_cols: + result = {k: [_exclude_system_cols(d) for d in v] for k, v in result.items()} + if sortkey is not None: + result = {k: _sort_list_of_dicts(v, sortkey) for k, v in result.items()} + return result def assert_only_table_columns(