diff --git a/.github/workflows/test_dbt_runner.yml b/.github/workflows/test_dbt_runner.yml index 13810fbc0d..ad29909d9a 100644 --- a/.github/workflows/test_dbt_runner.yml +++ b/.github/workflows/test_dbt_runner.yml @@ -60,7 +60,7 @@ jobs: - name: Install dependencies # install dlt with postgres support - run: poetry install --no-interaction -E postgres --with sentry-sdk,dbt + run: poetry install --no-interaction -E postgres -E postgis --with sentry-sdk,dbt - name: create secrets.toml run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml diff --git a/.github/workflows/test_destinations.yml b/.github/workflows/test_destinations.yml index a784d25794..f6204ab0aa 100644 --- a/.github/workflows/test_destinations.yml +++ b/.github/workflows/test_destinations.yml @@ -78,7 +78,7 @@ jobs: - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' - run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake -E pyiceberg + run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake -E pyiceberg - name: Upgrade sqlalchemy run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg` diff --git a/.github/workflows/test_local_destinations.yml b/.github/workflows/test_local_destinations.yml index 7efa071320..652644fef6 100644 --- a/.github/workflows/test_local_destinations.yml +++ b/.github/workflows/test_local_destinations.yml @@ -48,7 +48,7 @@ jobs: # Label used to access the service container postgres: # Docker Hub image - image: postgres + image: postgis/postgis # Provide the password for postgres env: POSTGRES_DB: dlt_data @@ -95,7 +95,7 @@ jobs: key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations - name: Install dependencies - run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake -E pyiceberg + run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake -E pyiceberg - name: Upgrade sqlalchemy run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg` diff --git a/.github/workflows/test_local_sources.yml b/.github/workflows/test_local_sources.yml index 8a3ba2a670..39689f5c85 100644 --- a/.github/workflows/test_local_sources.yml +++ b/.github/workflows/test_local_sources.yml @@ -43,7 +43,7 @@ jobs: # Label used to access the service container postgres: # Docker Hub image - image: postgres + image: postgis/postgis # Provide the password for postgres env: POSTGRES_DB: dlt_data @@ -83,7 +83,7 @@ jobs: # TODO: which deps should we enable? - name: Install dependencies - run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources + run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources # run sources tests in load against configured destinations - run: poetry run pytest tests/load/sources diff --git a/dlt/common/destination/typing.py b/dlt/common/destination/typing.py index 8cc08756cd..c79a2b0adc 100644 --- a/dlt/common/destination/typing.py +++ b/dlt/common/destination/typing.py @@ -1,6 +1,10 @@ from typing import Optional -from dlt.common.schema.typing import _TTableSchemaBase, TWriteDisposition, TTableReferenceParam +from dlt.common.schema.typing import ( + _TTableSchemaBase, + TWriteDisposition, + TTableReferenceParam, +) class PreparedTableSchema(_TTableSchemaBase, total=False): diff --git a/dlt/common/incremental/__init__.py b/dlt/common/incremental/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/extract/incremental/typing.py b/dlt/common/incremental/typing.py similarity index 66% rename from dlt/extract/incremental/typing.py rename to dlt/common/incremental/typing.py index 7b7786b529..460e2f234b 100644 --- a/dlt/extract/incremental/typing.py +++ b/dlt/common/incremental/typing.py @@ -2,9 +2,7 @@ from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar, Union -from dlt.common.schema.typing import TColumnNames -from dlt.common.typing import TSortOrder -from dlt.extract.items import TTableHintTemplate +from dlt.common.typing import TSortOrder, TTableHintTemplate, TColumnNames TCursorValue = TypeVar("TCursorValue", bound=Any) LastValueFunc = Callable[[Sequence[TCursorValue]], Any] @@ -19,10 +17,12 @@ class IncrementalColumnState(TypedDict): class IncrementalArgs(TypedDict, total=False): cursor_path: str - initial_value: Optional[str] - last_value_func: Optional[LastValueFunc[str]] + initial_value: Optional[Any] + last_value_func: Optional[Union[LastValueFunc[str], Literal["min", "max"]]] + """Last value callable or name of built in function""" primary_key: Optional[TTableHintTemplate[TColumnNames]] - end_value: Optional[str] + end_value: Optional[Any] row_order: Optional[TSortOrder] allow_external_schedulers: Optional[bool] lag: Optional[Union[float, int]] + on_cursor_value_missing: Optional[OnCursorValueMissing] diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index dba1036f85..9d3d5792ea 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -48,7 +48,6 @@ ) from dlt.common.schema import Schema from dlt.common.schema.typing import ( - TColumnNames, TColumnSchema, TWriteDispositionConfig, TSchemaContract, @@ -56,7 +55,7 @@ from dlt.common.storages.load_package import ParsedLoadJobFileName from dlt.common.storages.load_storage import LoadPackageInfo from dlt.common.time import ensure_pendulum_datetime, precise_time -from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize +from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize, TColumnNames from dlt.common.jsonpath import delete_matches, TAnyJsonPath from dlt.common.data_writers.writers import TLoaderFileFormat from dlt.common.utils import RowCounts, merge_row_counts diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index ed6c1c6d78..c8f5de03ed 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -19,7 +19,7 @@ from dlt.common.data_types import TDataType from dlt.common.normalizers.typing import TNormalizersConfig -from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat +from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat, TColumnNames try: from pydantic import BaseModel as _PydanticBaseModel @@ -132,8 +132,6 @@ class TColumnPropInfo(NamedTuple): "timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double" ] TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]] -TColumnNames = Union[str, Sequence[str]] -"""A string representing a column name or a list of""" class TColumnType(TypedDict, total=False): @@ -166,6 +164,7 @@ class TColumnSchema(TColumnSchemaBase, total=False): variant: Optional[bool] hard_delete: Optional[bool] dedup_sort: Optional[TSortOrder] + incremental: Optional[bool] TTableSchemaColumns = Dict[str, TColumnSchema] diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index e2e1f959dc..038abdc4d0 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -547,6 +547,17 @@ def merge_diff(table: TTableSchema, table_diff: TPartialTableSchema) -> TPartial * table hints are added or replaced from diff * nothing gets deleted """ + + incremental_a_col = get_first_column_name_with_prop( + table, "incremental", include_incomplete=True + ) + if incremental_a_col: + incremental_b_col = get_first_column_name_with_prop( + table_diff, "incremental", include_incomplete=True + ) + if incremental_b_col: + table["columns"][incremental_a_col].pop("incremental") + # add new columns when all checks passed updated_columns = merge_columns(table["columns"], table_diff["columns"]) table.update(table_diff) diff --git a/dlt/common/typing.py b/dlt/common/typing.py index 771f1fd59b..8986d753f3 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -29,6 +29,7 @@ Iterator, Generator, NamedTuple, + Sequence, ) from typing_extensions import ( @@ -112,6 +113,8 @@ class SecretSentinel: TSecretStrValue = Annotated[str, SecretSentinel] +TColumnNames = Union[str, Sequence[str]] +"""A string representing a column name or a list of""" TDataItem: TypeAlias = Any """A single data item as extracted from data source""" TDataItems: TypeAlias = Union[TDataItem, List[TDataItem]] @@ -126,6 +129,10 @@ class SecretSentinel: TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv", "reference"] """known loader file formats""" +TDynHintType = TypeVar("TDynHintType") +TFunHintTemplate = Callable[[TDataItem], TDynHintType] +TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]] + class ConfigValueSentinel(NamedTuple): """Class to create singleton sentinel for config and secret injected value""" diff --git a/dlt/destinations/impl/bigquery/bigquery_adapter.py b/dlt/destinations/impl/bigquery/bigquery_adapter.py index 5f6a1fab85..05b26530d9 100644 --- a/dlt/destinations/impl/bigquery/bigquery_adapter.py +++ b/dlt/destinations/impl/bigquery/bigquery_adapter.py @@ -4,10 +4,8 @@ from dlt.common.destination import PreparedTableSchema from dlt.common.pendulum import timezone -from dlt.common.schema.typing import ( - TColumnNames, - TTableSchemaColumns, -) +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate diff --git a/dlt/destinations/impl/lancedb/lancedb_adapter.py b/dlt/destinations/impl/lancedb/lancedb_adapter.py index 4314dd703f..d192168d0a 100644 --- a/dlt/destinations/impl/lancedb/lancedb_adapter.py +++ b/dlt/destinations/impl/lancedb/lancedb_adapter.py @@ -1,6 +1,7 @@ from typing import Any, Dict -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate diff --git a/dlt/destinations/impl/postgres/factory.py b/dlt/destinations/impl/postgres/factory.py index bde0e35f3d..e0dc2836eb 100644 --- a/dlt/destinations/impl/postgres/factory.py +++ b/dlt/destinations/impl/postgres/factory.py @@ -1,19 +1,19 @@ import typing as t +from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE from dlt.common.data_writers.configuration import CsvFormatConfiguration -from dlt.common.destination import Destination, DestinationCapabilitiesContext from dlt.common.data_writers.escape import escape_postgres_identifier, escape_postgres_literal -from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE +from dlt.common.destination import Destination, DestinationCapabilitiesContext from dlt.common.destination.typing import PreparedTableSchema from dlt.common.exceptions import TerminalValueError from dlt.common.schema.typing import TColumnSchema, TColumnType from dlt.common.wei import EVM_DECIMAL_PRECISION - -from dlt.destinations.type_mapping import TypeMapperImpl from dlt.destinations.impl.postgres.configuration import ( PostgresCredentials, PostgresClientConfiguration, ) +from dlt.destinations.impl.postgres.postgres_adapter import GEOMETRY_HINT, SRID_HINT +from dlt.destinations.type_mapping import TypeMapperImpl if t.TYPE_CHECKING: from dlt.destinations.impl.postgres.postgres import PostgresClient @@ -55,6 +55,7 @@ class PostgresTypeMapper(TypeMapperImpl): "character varying": "text", "smallint": "bigint", "integer": "bigint", + "geometry": "text", } def to_db_integer_type(self, column: TColumnSchema, table: PreparedTableSchema = None) -> str: @@ -108,11 +109,18 @@ def to_db_datetime_type( def from_destination_type( self, db_type: str, precision: t.Optional[int] = None, scale: t.Optional[int] = None ) -> TColumnType: - if db_type == "numeric": - if (precision, scale) == self.capabilities.wei_precision: - return dict(data_type="wei") + if db_type == "numeric" and (precision, scale) == self.capabilities.wei_precision: + return dict(data_type="wei") + if db_type.startswith("geometry"): + return dict(data_type="text") return super().from_destination_type(db_type, precision, scale) + def to_destination_type(self, column: TColumnSchema, table: PreparedTableSchema) -> str: + if column.get(GEOMETRY_HINT): + srid = column.get(SRID_HINT, 4326) + return f"geometry(Geometry, {srid})" + return super().to_destination_type(column, table) + class postgres(Destination[PostgresClientConfiguration, "PostgresClient"]): spec = PostgresClientConfiguration diff --git a/dlt/destinations/impl/postgres/postgres.py b/dlt/destinations/impl/postgres/postgres.py index 682f70da04..2459ee1dbe 100644 --- a/dlt/destinations/impl/postgres/postgres.py +++ b/dlt/destinations/impl/postgres/postgres.py @@ -2,9 +2,9 @@ from dlt.common import logger from dlt.common.data_writers.configuration import CsvFormatConfiguration +from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.exceptions import ( DestinationInvalidFileFormat, - DestinationTerminalException, ) from dlt.common.destination.reference import ( HasFollowupJobs, @@ -12,20 +12,16 @@ RunnableLoadJob, FollowupJobRequest, LoadJob, - TLoadJobState, ) -from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.exceptions import TerminalValueError from dlt.common.schema import TColumnSchema, TColumnHint, Schema -from dlt.common.schema.typing import TColumnType, TTableFormat +from dlt.common.schema.typing import TColumnType from dlt.common.schema.utils import is_nullable_column from dlt.common.storages.file_storage import FileStorage - -from dlt.destinations.sql_jobs import SqlStagingCopyFollowupJob, SqlJobParams -from dlt.destinations.insert_job_client import InsertValuesJobClient -from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient from dlt.destinations.impl.postgres.configuration import PostgresClientConfiguration +from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient +from dlt.destinations.insert_job_client import InsertValuesJobClient from dlt.destinations.sql_client import SqlClientBase +from dlt.destinations.sql_jobs import SqlStagingCopyFollowupJob, SqlJobParams HINT_TO_POSTGRES_ATTR: Dict[TColumnHint, str] = {"unique": "UNIQUE"} @@ -43,15 +39,16 @@ def generate_sql( with sql_client.with_staging_dataset(): staging_table_name = sql_client.make_qualified_table_name(table["name"]) table_name = sql_client.make_qualified_table_name(table["name"]) - # drop destination table - sql.append(f"DROP TABLE IF EXISTS {table_name};") - # moving staging table to destination schema - sql.append( - f"ALTER TABLE {staging_table_name} SET SCHEMA" - f" {sql_client.fully_qualified_dataset_name()};" + sql.extend( + ( + f"DROP TABLE IF EXISTS {table_name};", + ( + f"ALTER TABLE {staging_table_name} SET SCHEMA" + f" {sql_client.fully_qualified_dataset_name()};" + ), + f"CREATE TABLE {staging_table_name} (like {table_name} including all);", + ) ) - # recreate staging table - sql.append(f"CREATE TABLE {staging_table_name} (like {table_name} including all);") return sql @@ -111,8 +108,7 @@ def run(self) -> None: split_columns.append(norm_col) if norm_col in split_headers and is_nullable_column(col): split_null_headers.append(norm_col) - split_unknown_headers = set(split_headers).difference(split_columns) - if split_unknown_headers: + if split_unknown_headers := set(split_headers).difference(split_columns): raise DestinationInvalidFileFormat( "postgres", "csv", @@ -130,15 +126,8 @@ def run(self) -> None: qualified_table_name = sql_client.make_qualified_table_name(table_name) copy_sql = ( - "COPY %s (%s) FROM STDIN WITH (FORMAT CSV, DELIMITER '%s', NULL ''," - " %s ENCODING '%s')" - % ( - qualified_table_name, - headers, - sep, - null_headers, - csv_format.encoding, - ) + f"COPY {qualified_table_name} ({headers}) FROM STDIN WITH (FORMAT CSV, DELIMITER" + f" '{sep}', NULL '', {null_headers} ENCODING '{csv_format.encoding}')" ) with sql_client.begin_transaction(): with sql_client.native_connection.cursor() as cursor: @@ -173,15 +162,16 @@ def create_load_job( return job def _get_column_def_sql(self, c: TColumnSchema, table: PreparedTableSchema = None) -> str: - hints_str = " ".join( + hints_ = " ".join( self.active_hints.get(h, "") for h in self.active_hints.keys() if c.get(h, False) is True ) column_name = self.sql_client.escape_column_name(c["name"]) - return ( - f"{column_name} {self.type_mapper.to_destination_type(c,table)} {hints_str} {self._gen_not_null(c.get('nullable', True))}" - ) + nullability = self._gen_not_null(c.get("nullable", True)) + column_type = self.type_mapper.to_destination_type(c, table) + + return f"{column_name} {column_type} {hints_} {nullability}" def _create_replace_followup_jobs( self, table_chain: Sequence[PreparedTableSchema] diff --git a/dlt/destinations/impl/postgres/postgres_adapter.py b/dlt/destinations/impl/postgres/postgres_adapter.py new file mode 100644 index 0000000000..11e86ec525 --- /dev/null +++ b/dlt/destinations/impl/postgres/postgres_adapter.py @@ -0,0 +1,63 @@ +from typing import Any, Optional + +from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.destinations.utils import get_resource_for_adapter +from dlt.extract import DltResource + +GEOMETRY_HINT = "x-postgres-geometry" +SRID_HINT = "x-postgres-srid" + + +def postgres_adapter( + data: Any, + geometry: TColumnNames = None, + srid: Optional[int] = 4326, +) -> DltResource: + """Prepares data for the postgres destination by specifying which columns should + be cast to PostGIS geometry types. + + Args: + data (Any): The data to be transformed. It can be raw data or an instance + of DltResource. If raw data, the function wraps it into a DltResource + object. + geometry (TColumnNames, optional): Specify columns to cast to geometries. + It can be a single column name as a string, or a list of column names. + srid (int, optional): The Spatial Reference System Identifier (SRID) to be + used for the geometry columns. If not provided, SRID 4326 will be used. + + Returns: + DltResource: A resource with applied postgres-specific hints. + + Raises: + ValueError: If input for `geometry` is invalid, or if no geometry columns are specified. + + Examples: + >>> data = [{"town": "Null Island", "loc": "POINT(0 0)"}] + >>> postgres_adapter(data, geometry="loc", srid=4326) + [DltResource with hints applied] + """ + resource = get_resource_for_adapter(data) + + column_hints: TTableSchemaColumns = {} + + if geometry: + if isinstance(geometry, str): + geometry = [geometry] + if not isinstance(geometry, list): + raise ValueError( + "'geometry' must be a list of column names or a single column name as a string." + ) + + for column_name in geometry: + column_hints[column_name] = { + "name": column_name, + GEOMETRY_HINT: True, # type: ignore[misc] + } + if srid is not None: + column_hints[column_name][SRID_HINT] = srid # type: ignore + + if not column_hints: + raise ValueError("A value for 'geometry' must be specified.") + else: + resource.apply_hints(columns=column_hints) + return resource diff --git a/dlt/destinations/impl/qdrant/qdrant_adapter.py b/dlt/destinations/impl/qdrant/qdrant_adapter.py index bbc2d719a8..5a5a44965c 100644 --- a/dlt/destinations/impl/qdrant/qdrant_adapter.py +++ b/dlt/destinations/impl/qdrant/qdrant_adapter.py @@ -1,6 +1,7 @@ from typing import Any -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.extract import DltResource from dlt.destinations.utils import get_resource_for_adapter diff --git a/dlt/destinations/impl/weaviate/weaviate_adapter.py b/dlt/destinations/impl/weaviate/weaviate_adapter.py index 0ca9047528..329d13c493 100644 --- a/dlt/destinations/impl/weaviate/weaviate_adapter.py +++ b/dlt/destinations/impl/weaviate/weaviate_adapter.py @@ -1,6 +1,7 @@ from typing import Dict, Any, Literal, Set, get_args -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.extract import DltResource, resource as make_resource from dlt.destinations.utils import get_resource_for_adapter diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 63140e8f78..f8703e1452 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -32,7 +32,6 @@ from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION from dlt.common.schema.schema import Schema from dlt.common.schema.typing import ( - TColumnNames, TFileFormat, TWriteDisposition, TWriteDispositionConfig, @@ -43,7 +42,8 @@ ) from dlt.common.storages.exceptions import SchemaNotFoundError from dlt.common.storages.schema_storage import SchemaStorage -from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems +from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems, TColumnNames + from dlt.common.utils import get_callable_name, get_module_name, is_inner_callable from dlt.extract.hints import make_hints @@ -70,6 +70,7 @@ TSourceFunParams, ) from dlt.extract.resource import DltResource, TUnboundDltResource, TDltResourceImpl +from dlt.extract.incremental import TIncrementalConfig @configspec @@ -446,6 +447,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] ) -> TDltResourceImpl: ... @@ -468,6 +470,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] ) -> Callable[[Callable[TResourceFunParams, Any]], TDltResourceImpl]: ... @@ -490,6 +493,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] standalone: Literal[True] = True, ) -> Callable[ @@ -515,6 +519,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] ) -> TDltResourceImpl: ... @@ -536,6 +541,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] standalone: bool = False, data_from: TUnboundDltResource = None, @@ -632,6 +638,7 @@ def make_resource(_name: str, _section: str, _data: Any) -> TDltResourceImpl: table_format=table_format, file_format=file_format, references=references, + incremental=incremental, ) resource = _impl_cls.from_data( @@ -643,6 +650,10 @@ def make_resource(_name: str, _section: str, _data: Any) -> TDltResourceImpl: cast(DltResource, data_from), True, ) + + if incremental: + # Reset the flag to allow overriding by incremental argument + resource.incremental._from_hints = False # If custom nesting level was specified then # we need to add it to table hints so that # later in normalizer dlt/common/normalizers/json/relational.py @@ -681,7 +692,7 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltResourceImpl: return _wrap def decorator( - f: Callable[TResourceFunParams, Any] + f: Callable[TResourceFunParams, Any], ) -> Callable[TResourceFunParams, TDltResourceImpl]: if not callable(f): if data_from: @@ -1023,7 +1034,7 @@ def get_source() -> DltSource: def defer( - f: Callable[TDeferredFunParams, TBoundItems] + f: Callable[TDeferredFunParams, TBoundItems], ) -> Callable[TDeferredFunParams, TDeferred[TBoundItems]]: @wraps(f) def _wrap(*args: Any, **kwargs: Any) -> TDeferred[TBoundItems]: diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index e65f6cf0d0..25c3a0dbae 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -2,7 +2,7 @@ from collections.abc import Sequence as C_Sequence from copy import copy import itertools -from typing import Iterator, List, Dict, Any, Optional +from typing import Iterator, List, Dict, Any, Optional, Mapping import yaml from dlt.common.configuration.container import Container @@ -17,13 +17,12 @@ WithStepInfo, reset_resource_state, ) -from dlt.common.typing import DictStrAny +from dlt.common.typing import DictStrAny, TColumnNames from dlt.common.runtime import signals from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema import Schema, utils from dlt.common.schema.typing import ( TAnySchemaColumns, - TColumnNames, TSchemaContract, TTableFormat, TWriteDispositionConfig, @@ -39,7 +38,7 @@ from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints -from dlt.extract.incremental import IncrementalResourceWrapper +from dlt.extract.incremental import IncrementalResourceWrapper, Incremental from dlt.extract.pipe_iterator import PipeIterator from dlt.extract.source import DltSource from dlt.extract.resource import DltResource diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 5daabd0c6a..000e5c4cdb 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -1,10 +1,9 @@ -from typing import TypedDict, cast, Any, Optional, Dict, Sequence, Mapping +from typing import TypedDict, cast, Any, Optional, Dict, Sequence, Mapping, Union from typing_extensions import Self from dlt.common import logger from dlt.common.schema.typing import ( C_DLT_ID, - TColumnNames, TColumnProp, TFileFormat, TPartialTableSchema, @@ -28,7 +27,7 @@ new_column, new_table, ) -from dlt.common.typing import TDataItem +from dlt.common.typing import TDataItem, TColumnNames from dlt.common.time import ensure_pendulum_datetime from dlt.common.utils import clone_dict_nested from dlt.common.normalizers.json.relational import DataItemNormalizer @@ -37,7 +36,7 @@ DataItemRequiredForDynamicTableHints, InconsistentTableTemplate, ) -from dlt.extract.incremental import Incremental +from dlt.extract.incremental import Incremental, TIncrementalConfig from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta, ValidateItem from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint from dlt.extract.validation import create_item_validator @@ -86,6 +85,7 @@ def make_hints( table_format: TTableHintTemplate[TTableFormat] = None, file_format: TTableHintTemplate[TFileFormat] = None, references: TTableHintTemplate[TTableReferenceParam] = None, + incremental: TIncrementalConfig = None, ) -> TResourceHints: """A convenience function to create resource hints. Accepts both static and dynamic hints based on data. @@ -119,6 +119,8 @@ def make_hints( if validator: new_template["validator"] = validator DltResourceHints.validate_dynamic_hints(new_template) + if incremental is not None: # TODO: Validate + new_template["incremental"] = Incremental.ensure_instance(incremental) return new_template @@ -204,6 +206,10 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab for k, v in table_template.items() if k not in NATURAL_CALLABLES } # type: ignore + if "incremental" in table_template: + incremental = table_template["incremental"] + if isinstance(incremental, Incremental) and incremental is not Incremental.EMPTY: + resolved_template["incremental"] = incremental table_schema = self._create_table_schema(resolved_template, self.name) migrate_complex_types(table_schema, warn=True) validate_dict_ignoring_xkeys( @@ -221,7 +227,7 @@ def apply_hints( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, - incremental: Incremental[Any] = None, + incremental: TIncrementalConfig = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, additional_table_hints: Optional[Dict[str, TTableHintTemplate[Any]]] = None, table_format: TTableHintTemplate[TTableFormat] = None, @@ -360,7 +366,7 @@ def apply_hints( # set properties that can't be passed to make_hints if incremental is not None: - t["incremental"] = incremental + t["incremental"] = Incremental.ensure_instance(incremental) self._set_hints(t, create_table_variant) return self @@ -506,6 +512,22 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: "row_key": False, } + @staticmethod + def _merge_incremental_column_hint(dict_: Dict[str, Any]) -> None: + incremental = dict_.pop("incremental") + if incremental is None: + return + col_name = incremental.get_cursor_column_name() + if not col_name: + # cursor cannot resolve to a single column, no hint added + return + incremental_col = dict_["columns"].get(col_name) + if not incremental_col: + incremental_col = {"name": col_name} + + incremental_col["incremental"] = True + dict_["columns"][col_name] = incremental_col + @staticmethod def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> TTableSchema: """Creates table schema from resource hints and resource name. Resource hints are resolved @@ -518,6 +540,8 @@ def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> "disposition": resource_hints["write_disposition"] } # wrap in dict DltResourceHints._merge_write_disposition_dict(resource_hints) # type: ignore[arg-type] + if "incremental" in resource_hints: + DltResourceHints._merge_incremental_column_hint(resource_hints) # type: ignore[arg-type] dict_ = cast(TTableSchema, resource_hints) dict_["resource"] = resource_name return dict_ diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 69af0d68a6..28d33bb71f 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -1,6 +1,6 @@ import os from datetime import datetime # noqa: I251 -from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union +from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union, Literal, Tuple from typing_extensions import get_args import inspect @@ -9,7 +9,7 @@ from dlt.common import logger from dlt.common.exceptions import MissingDependencyException from dlt.common.pendulum import pendulum -from dlt.common.jsonpath import compile_path +from dlt.common.jsonpath import compile_path, extract_simple_field_name from dlt.common.typing import ( TDataItem, TDataItems, @@ -19,8 +19,8 @@ get_generic_type_argument_from_instance, is_optional_type, is_subclass, + TColumnNames, ) -from dlt.common.schema.typing import TColumnNames from dlt.common.configuration import configspec, ConfigurationValueError from dlt.common.configuration.specs import BaseConfiguration from dlt.common.pipeline import resource_state @@ -29,17 +29,19 @@ coerce_value, py_type_to_sc_type, ) +from dlt.common.utils import without_none from dlt.extract.exceptions import IncrementalUnboundError from dlt.extract.incremental.exceptions import ( IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, ) -from dlt.extract.incremental.typing import ( +from dlt.common.incremental.typing import ( IncrementalColumnState, TCursorValue, LastValueFunc, OnCursorValueMissing, + IncrementalArgs, ) from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform from dlt.extract.incremental.transform import ( @@ -123,7 +125,7 @@ def __init__( self, cursor_path: str = None, initial_value: Optional[TCursorValue] = None, - last_value_func: Optional[LastValueFunc[TCursorValue]] = max, + last_value_func: Optional[Union[LastValueFunc[TCursorValue], Literal["min", "max"]]] = max, primary_key: Optional[TTableHintTemplate[TColumnNames]] = None, end_value: Optional[TCursorValue] = None, row_order: Optional[TSortOrder] = None, @@ -135,6 +137,16 @@ def __init__( if cursor_path: compile_path(cursor_path) self.cursor_path = cursor_path + if isinstance(last_value_func, str): + if last_value_func == "min": + last_value_func = min + elif last_value_func == "max": + last_value_func = max + else: + raise ValueError( + f"Unknown last_value_func '{last_value_func}' passed as string. Provide a" + " callable to use a custom function." + ) self.last_value_func = last_value_func self.initial_value = initial_value """Initial value of last_value""" @@ -247,6 +259,10 @@ def copy(self) -> "Incremental[TCursorValue]": # merge creates a copy return self.merge(self) + def get_cursor_column_name(self) -> Optional[str]: + """Return the name of the cursor column if the cursor path resolves to a single column""" + return extract_simple_field_name(self.cursor_path) + def on_resolved(self) -> None: compile_path(self.cursor_path) if self.end_value is not None and self.initial_value is None: @@ -491,6 +507,12 @@ def can_close(self) -> bool: and self.start_out_of_range ) + @classmethod + def ensure_instance(cls, value: "TIncrementalConfig") -> "Incremental[TCursorValue]": + if isinstance(value, Incremental): + return value + return cls(**value) + def __str__(self) -> str: return ( f"Incremental at 0x{id(self):x} for resource {self.resource_name} with cursor path:" @@ -511,7 +533,6 @@ def _get_transformer(self, items: TDataItems) -> IncrementalTransform: def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]: if rows is None: return rows - transformer = self._get_transformer(rows) if isinstance(rows, list): rows = [ @@ -556,6 +577,8 @@ def _check_duplicate_cursor_threshold( Incremental.EMPTY = Incremental[Any]() Incremental.EMPTY.__is_resolved__ = True +TIncrementalConfig = Union[Incremental[Any], IncrementalArgs] + class IncrementalResourceWrapper(ItemTransform[TDataItem]): placement_affinity: ClassVar[float] = 1 # stick to end @@ -595,6 +618,34 @@ def get_incremental_arg(sig: inspect.Signature) -> Optional[inspect.Parameter]: break return incremental_param + @staticmethod + def inject_implicit_incremental_arg( + incremental: Optional[Union[Incremental[Any], "IncrementalResourceWrapper"]], + sig: inspect.Signature, + func_args: Tuple[Any], + func_kwargs: Dict[str, Any], + fallback: Optional[Incremental[Any]] = None, + ) -> Tuple[Tuple[Any], Dict[str, Any], Optional[Incremental[Any]]]: + """Inject the incremental instance into function arguments + if the function has an incremental argument without default in its signature and it is not already set in the arguments. + + Returns: + Tuple of the new args, kwargs and the incremental instance that was injected (if any) + """ + if isinstance(incremental, IncrementalResourceWrapper): + incremental = incremental.incremental + if not incremental: + if not fallback: + return func_args, func_kwargs, None + incremental = fallback + incremental_param = IncrementalResourceWrapper.get_incremental_arg(sig) + if incremental_param: + bound_args = sig.bind_partial(*func_args, **func_kwargs) + if not bound_args.arguments.get(incremental_param.name): + bound_args.arguments[incremental_param.name] = incremental + return bound_args.args, bound_args.kwargs, incremental + return func_args, func_kwargs, None + def wrap(self, sig: inspect.Signature, func: TFun) -> TFun: """Wrap the callable to inject an `Incremental` object configured for the resource.""" incremental_param = self.get_incremental_arg(sig) @@ -666,12 +717,14 @@ def incremental(self) -> Optional[Incremental[Any]]: return self._incremental def set_incremental( - self, incremental: Optional[Incremental[Any]], from_hints: bool = False + self, incremental: Optional[TIncrementalConfig], from_hints: bool = False ) -> None: """Sets the incremental. If incremental was set from_hints, it can only be changed in the same manner""" if self._from_hints and not from_hints: # do not accept incremental if apply hints were used return + if incremental is not None: + incremental = Incremental.ensure_instance(incremental) self._from_hints = from_hints self._incremental = incremental @@ -710,6 +763,12 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: return self._incremental(item, meta) +def incremental_config_to_instance(cfg: TIncrementalConfig) -> Incremental[Any]: + if isinstance(cfg, Incremental): + return cfg + return Incremental(**cfg) + + __all__ = [ "Incremental", "IncrementalResourceWrapper", @@ -717,6 +776,7 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: "IncrementalCursorPathMissing", "IncrementalPrimaryKeyMissing", "IncrementalUnboundError", + "TIncrementalconfig", "LastValueFunc", "TCursorValue", ] diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 842c8aebe8..22b1194b51 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -5,7 +5,7 @@ from dlt.common.utils import digest128 from dlt.common.json import json from dlt.common.pendulum import pendulum -from dlt.common.typing import TDataItem +from dlt.common.typing import TDataItem, TColumnNames from dlt.common.jsonpath import find_values, compile_path, extract_simple_field_name from dlt.extract.incremental.exceptions import ( IncrementalCursorInvalidCoercion, @@ -13,10 +13,9 @@ IncrementalPrimaryKeyMissing, IncrementalCursorPathHasValueNone, ) -from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing +from dlt.common.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing from dlt.extract.utils import resolve_column_value from dlt.extract.items import TTableHintTemplate -from dlt.common.schema.typing import TColumnNames try: from dlt.common.libs import pyarrow diff --git a/dlt/extract/items.py b/dlt/extract/items.py index d721e8094e..888787e6b7 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -19,7 +19,14 @@ ) from concurrent.futures import Future -from dlt.common.typing import TAny, TDataItem, TDataItems +from dlt.common.typing import ( + TAny, + TDataItem, + TDataItems, + TTableHintTemplate, + TFunHintTemplate, + TDynHintType, +) TDecompositionStrategy = Literal["none", "scc"] @@ -27,9 +34,6 @@ TAwaitableDataItems = Awaitable[TDataItems] TPipedDataItems = Union[TDataItems, TDeferredDataItems, TAwaitableDataItems] -TDynHintType = TypeVar("TDynHintType") -TFunHintTemplate = Callable[[TDataItem], TDynHintType] -TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]] if TYPE_CHECKING: TItemFuture = Future[TPipedDataItems] diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index c6ca1660f4..42e3905162 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -11,6 +11,7 @@ Union, Any, Optional, + Mapping, ) from typing_extensions import TypeVar, Self @@ -28,6 +29,7 @@ pipeline_state, ) from dlt.common.utils import flatten_list_or_items, get_callable_name, uniq_id +from dlt.common.schema.typing import TTableSchema from dlt.extract.utils import wrap_async_iterator, wrap_parallel_iterator from dlt.extract.items import ( @@ -42,7 +44,7 @@ ) from dlt.extract.pipe_iterator import ManagedPipeIterator from dlt.extract.pipe import Pipe, TPipeStep -from dlt.extract.hints import DltResourceHints, HintsMeta, TResourceHints +from dlt.extract.hints import DltResourceHints, HintsMeta, TResourceHints, make_hints from dlt.extract.incremental import Incremental, IncrementalResourceWrapper from dlt.extract.exceptions import ( InvalidTransformerDataTypeGeneratorFunctionRequired, @@ -442,35 +444,60 @@ def add_step( self._pipe.insert_step(item_transform, insert_at) return self + def _remove_incremental_step(self) -> None: + step_no = self._pipe.find(Incremental, IncrementalResourceWrapper) + if step_no >= 0: + self._pipe.remove_step(step_no) + + def set_incremental( + self, + new_incremental: Union[Incremental[Any], IncrementalResourceWrapper], + from_hints: bool = False, + ) -> Optional[Union[Incremental[Any], IncrementalResourceWrapper]]: + """Set/replace the incremental transform for the resource. + + Args: + new_incremental: The Incremental instance/hint to set or replace + from_hints: If the incremental is set from hints. Defaults to False. + """ + if new_incremental is Incremental.EMPTY: + new_incremental = None + incremental = self.incremental + if incremental is not None: + # if isinstance(new_incremental, Mapping): + # new_incremental = Incremental.ensure_instance(new_incremental) + + if isinstance(new_incremental, IncrementalResourceWrapper): + # Completely replace the wrapper + self._remove_incremental_step() + self.add_step(new_incremental) + elif isinstance(incremental, IncrementalResourceWrapper): + incremental.set_incremental(new_incremental, from_hints=from_hints) + else: + self._remove_incremental_step() + # re-add the step + incremental = None + if incremental is None: + # if there's no wrapper add incremental as a transform + if new_incremental: + if not isinstance(new_incremental, IncrementalResourceWrapper): + new_incremental = Incremental.ensure_instance(new_incremental) + self.add_step(new_incremental) + return new_incremental + def _set_hints( self, table_schema_template: TResourceHints, create_table_variant: bool = False ) -> None: super()._set_hints(table_schema_template, create_table_variant) # validators and incremental apply only to resource hints if not create_table_variant: - incremental = self.incremental # try to late assign incremental if table_schema_template.get("incremental") is not None: - new_incremental = table_schema_template["incremental"] - # remove incremental if empty - if new_incremental is Incremental.EMPTY: - new_incremental = None - - if incremental is not None: - if isinstance(incremental, IncrementalResourceWrapper): - # replace in wrapper - incremental.set_incremental(new_incremental, from_hints=True) - else: - step_no = self._pipe.find(Incremental) - self._pipe.remove_step(step_no) - # re-add the step - incremental = None - - if incremental is None: - # if there's no wrapper add incremental as a transform - incremental = new_incremental # type: ignore - if new_incremental: - self.add_step(new_incremental) + incremental = self.set_incremental( + table_schema_template["incremental"], from_hints=True + ) + else: + incremental = self.incremental if incremental: primary_key = table_schema_template.get("primary_key", incremental.primary_key) @@ -480,10 +507,25 @@ def _set_hints( if table_schema_template.get("validator") is not None: self.validator = table_schema_template["validator"] + def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: + incremental: Optional[Union[Incremental[Any], IncrementalResourceWrapper]] = ( + self.incremental + ) + if incremental and "incremental" not in self._hints: + if isinstance(incremental, IncrementalResourceWrapper): + incremental = incremental.incremental + if incremental: + self._hints["incremental"] = incremental + + table_schema = super().compute_table_schema(item, meta) + + return table_schema + def bind(self: TDltResourceImpl, *args: Any, **kwargs: Any) -> TDltResourceImpl: """Binds the parametrized resource to passed arguments. Modifies resource pipe in place. Does not evaluate generators or iterators.""" if self._args_bound: raise TypeError(f"Parametrized resource {self.name} is not callable") + orig_gen = self._pipe.gen gen = self._pipe.bind_gen(*args, **kwargs) if isinstance(gen, DltResource): @@ -599,14 +641,14 @@ def _eject_config(self) -> bool: if not self._pipe.is_empty and not self._args_bound: orig_gen = getattr(self._pipe.gen, "__GEN__", None) if orig_gen: - step_no = self._pipe.find(IncrementalResourceWrapper) - if step_no >= 0: - self._pipe.remove_step(step_no) + self._remove_incremental_step() self._pipe.replace_gen(orig_gen) return True return False - def _inject_config(self) -> "DltResource": + def _inject_config( + self, incremental_from_hints_override: Optional[bool] = None + ) -> "DltResource": """Wraps the pipe generation step in incremental and config injection wrappers and adds pipe step with Incremental transform. """ @@ -618,8 +660,17 @@ def _inject_config(self) -> "DltResource": sig = inspect.signature(gen) if IncrementalResourceWrapper.should_wrap(sig): incremental = IncrementalResourceWrapper(self._hints.get("primary_key")) + if incr_hint := self._hints.get("incremental"): + incremental.set_incremental( + incr_hint, + from_hints=( + incremental_from_hints_override + if incremental_from_hints_override is not None + else True + ), + ) incr_f = incremental.wrap(sig, gen) - self.add_step(incremental) + self.set_incremental(incremental) else: incr_f = gen resource_sections = (known_sections.SOURCES, self.section, self.name) @@ -649,6 +700,12 @@ def _clone( if self._pipe and not self._pipe.is_empty: pipe = pipe._clone(new_name=new_name, with_parent=with_parent) # incremental and parent are already in the pipe (if any) + + incremental = self.incremental + if isinstance(incremental, IncrementalResourceWrapper): + incremental_from_hints: Optional[bool] = incremental._from_hints + else: + incremental_from_hints = None r_ = self.__class__( pipe, self._clone_hints(self._hints), @@ -661,7 +718,7 @@ def _clone( # this makes sure that a take config values from a right section and wrapper has a separated # instance in the pipeline if r_._eject_config(): - r_._inject_config() + r_._inject_config(incremental_from_hints_override=incremental_from_hints) return r_ def _get_config_section_context(self) -> ConfigSectionContext: diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 55a8b0b8c4..68570d0995 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -22,8 +22,15 @@ from dlt.common.data_writers import TDataItemFormat from dlt.common.exceptions import MissingDependencyException from dlt.common.pipeline import reset_resource_state -from dlt.common.schema.typing import TColumnNames, TAnySchemaColumns, TTableSchemaColumns -from dlt.common.typing import AnyFun, DictStrAny, TDataItem, TDataItems, TAnyFunOrGenerator +from dlt.common.schema.typing import TAnySchemaColumns, TTableSchemaColumns +from dlt.common.typing import ( + AnyFun, + DictStrAny, + TDataItem, + TDataItems, + TAnyFunOrGenerator, + TColumnNames, +) from dlt.common.utils import get_callable_name from dlt.extract.exceptions import ( diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index a9f07d417e..70d160ea67 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -38,7 +38,6 @@ from dlt.common.exceptions import MissingDependencyException from dlt.common.runtime import signals, apply_runtime_config from dlt.common.schema.typing import ( - TColumnNames, TSchemaTables, TTableFormat, TWriteDispositionConfig, @@ -47,7 +46,7 @@ ) from dlt.common.schema.utils import normalize_schema_name from dlt.common.storages.exceptions import LoadPackageNotFound -from dlt.common.typing import ConfigValue, TFun, TSecretStrValue, is_optional_type +from dlt.common.typing import ConfigValue, TFun, TSecretStrValue, is_optional_type, TColumnNames from dlt.common.runners import pool_runner as runner from dlt.common.storages import ( LiveSchemaStorage, diff --git a/dlt/sources/rest_api/typing.py b/dlt/sources/rest_api/typing.py index ccef828b1a..c48e54de4a 100644 --- a/dlt/sources/rest_api/typing.py +++ b/dlt/sources/rest_api/typing.py @@ -15,7 +15,7 @@ from dlt.common.schema.typing import ( TAnySchemaColumns, ) -from dlt.extract.incremental.typing import IncrementalArgs +from dlt.common.incremental.typing import IncrementalArgs from dlt.extract.items import TTableHintTemplate from dlt.extract.hints import TResourceHintsBase from dlt.sources.helpers.rest_client.auth import AuthConfigBase, TApiKeyLocation @@ -23,9 +23,8 @@ from dataclasses import dataclass, field from dlt.common import jsonpath -from dlt.common.typing import TSortOrder +from dlt.common.typing import TSortOrder, TColumnNames from dlt.common.schema.typing import ( - TColumnNames, TTableFormat, TAnySchemaColumns, TWriteDispositionConfig, @@ -33,7 +32,7 @@ ) from dlt.extract.items import TTableHintTemplate -from dlt.extract.incremental.typing import LastValueFunc +from dlt.common.incremental.typing import LastValueFunc from dlt.extract.resource import DltResource from requests import Session diff --git a/docs/website/docs/dlt-ecosystem/destinations/postgres.md b/docs/website/docs/dlt-ecosystem/destinations/postgres.md index bb9aba9051..922b187a7e 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/postgres.md +++ b/docs/website/docs/dlt-ecosystem/destinations/postgres.md @@ -117,7 +117,57 @@ In the example above, `arrow_table` will be converted to CSV with **pyarrow** an ## Supported column hints `postgres` will create unique indexes for all columns with `unique` hints. This behavior **may be disabled**. -### Table and column identifiers +### Spatial Types + +To enable GIS capabilities in your Postgres destination, use the `x-postgres-geometry` and `x-postgres-srid` hints for columns containing geometric data. +The `postgres_adapter` facilitates applying these hints conveniently, with a default SRID of `4326`. + +**Supported Geometry Types:** + +- WKT (Well-Known Text) +- Hex Representation + +If you have geometry data in binary format, you will need to convert it to hexadecimal representation before loading. + +**Example:** Using `postgres_adapter` with Different Geometry Types + +```py +from dlt.destinations.impl.postgres.postgres_adapter import postgres_adapter + +# Sample data with various geometry types +data_wkt = [ + {"type": "Point_wkt", "geom": "POINT (1 1)"}, + {"type": "Point_wkt", "geom": "Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)])"}, + ] + +data_wkb_hex = [ + {"type": "Point_wkb_hex", "geom": "0101000000000000000000F03F000000000000F03F"}, + {"type": "Point_wkb_hex", "geom": "01020000000300000000000000000000000000000000000000000000000000F03F000000000000F03F00000000000000400000000000000040"}, +] + + + +# Apply postgres_adapter to the 'geom' column with default SRID 4326 +resource_wkt = postgres_adapter(data_wkt, geometry="geom") +resource_wkb_hex = postgres_adapter(data_wkb_hex, geometry="geom") + +# If you need a different SRID +resource_wkt = postgres_adapter(data_wkt, geometry="geom", srid=3242) +``` + +Ensure that the PostGIS extension is enabled in your Postgres database: + +```sql +CREATE EXTENSION postgis; +``` + +This configuration allows `dlt` to map the `geom` column to the PostGIS `geometry` type for spatial queries and analyses. + +:::warning +`LinearRing` geometry type isn't supported. +::: + +## Table and column identifiers Postgres supports both case-sensitive and case-insensitive identifiers. All unquoted and lowercase identifiers resolve case-insensitively in SQL statements. Case insensitive [naming conventions](../../general-usage/naming-convention.md#case-sensitive-and-insensitive-destinations) like the default **snake_case** will generate case-insensitive identifiers. Case sensitive (like **sql_cs_v1**) will generate case-sensitive identifiers that must be quoted in SQL statements. ## Additional destination options diff --git a/poetry.lock b/poetry.lock index 25e568f9e7..7d5e236be2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -13,13 +13,13 @@ files = [ [[package]] name = "adlfs" -version = "2024.7.0" +version = "2024.4.1" description = "Access Azure Datalake Gen1 with fsspec and dask" optional = true python-versions = ">=3.8" files = [ - {file = "adlfs-2024.7.0-py3-none-any.whl", hash = "sha256:2005c8e124fda3948f2a6abb2dbebb2c936d2d821acaca6afd61932edfa9bc07"}, - {file = "adlfs-2024.7.0.tar.gz", hash = "sha256:106995b91f0eb5e775bcd5957d180d9a14faef3271a063b1f65c66fd5ab05ddf"}, + {file = "adlfs-2024.4.1-py3-none-any.whl", hash = "sha256:acea94612ddacaa34ea8c6babcc95b8da6982f930cdade7a86fbd17382403e16"}, + {file = "adlfs-2024.4.1.tar.gz", hash = "sha256:75530a45447f358ae53c5c39c298b8d966dae684be84db899f63b94cd96fc000"}, ] [package.dependencies] @@ -4504,13 +4504,13 @@ files = [ [[package]] name = "ibis-framework" -version = "10.0.0.dev231" +version = "10.0.0.dev256" description = "The portable Python dataframe library" optional = true python-versions = "<4.0,>=3.10" files = [ - {file = "ibis_framework-10.0.0.dev231-py3-none-any.whl", hash = "sha256:8689cbcd55c3680bdb5fd51ff0d2a10260372c1b15661c123b0460087cfdbda2"}, - {file = "ibis_framework-10.0.0.dev231.tar.gz", hash = "sha256:199142243d1a6a0eba3bbbe0debba910fc8087dffe4eac9e3d61823f6988f421"}, + {file = "ibis_framework-10.0.0.dev256-py3-none-any.whl", hash = "sha256:d6f21278e6fd78920bbe986df2c871921142635cc4f7d5d2048cae26e307a3df"}, + {file = "ibis_framework-10.0.0.dev256.tar.gz", hash = "sha256:e9f97d8177fd88f4a3578be20519c1da79a6a7ffac678b46b790bfde67405930"}, ] [package.dependencies] @@ -4520,26 +4520,27 @@ db-dtypes = {version = ">=0.3,<2", optional = true, markers = "extra == \"bigque duckdb = {version = ">=0.10,<1.2", optional = true, markers = "extra == \"duckdb\""} google-cloud-bigquery = {version = ">=3,<4", optional = true, markers = "extra == \"bigquery\""} google-cloud-bigquery-storage = {version = ">=2,<3", optional = true, markers = "extra == \"bigquery\""} -numpy = {version = ">=1.23.2,<3", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +numpy = {version = ">=1.23.2,<3", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} packaging = {version = ">=21.3,<25", optional = true, markers = "extra == \"duckdb\" or extra == \"oracle\" or extra == \"polars\" or extra == \"pyspark\""} -pandas = {version = ">=1.5.3,<3", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +pandas = {version = ">=1.5.3,<3", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} parsy = ">=2,<3" psycopg2 = {version = ">=2.8.4,<3", optional = true, markers = "extra == \"postgres\" or extra == \"risingwave\""} -pyarrow = {version = ">=10.0.1,<19", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} -pyarrow-hotfix = {version = ">=0.4,<1", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +pyarrow = {version = ">=10.0.1,<19", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +pyarrow-hotfix = {version = ">=0.4,<1", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} pydata-google-auth = {version = ">=1.4.0,<2", optional = true, markers = "extra == \"bigquery\""} pyodbc = {version = ">=4.0.39,<6", optional = true, markers = "extra == \"mssql\""} python-dateutil = ">=2.8.2,<3" pytz = ">=2022.7" -rich = {version = ">=12.4.4,<14", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} +rich = {version = ">=12.4.4,<14", optional = true, markers = "extra == \"bigquery\" or extra == \"clickhouse\" or extra == \"databricks\" or extra == \"datafusion\" or extra == \"druid\" or extra == \"duckdb\" or extra == \"exasol\" or extra == \"flink\" or extra == \"impala\" or extra == \"mssql\" or extra == \"mysql\" or extra == \"oracle\" or extra == \"polars\" or extra == \"postgres\" or extra == \"pyspark\" or extra == \"snowflake\" or extra == \"sqlite\" or extra == \"risingwave\" or extra == \"trino\""} snowflake-connector-python = {version = ">=3.0.2,<3.3.0b1 || >3.3.0b1,<4", optional = true, markers = "extra == \"snowflake\""} -sqlglot = ">=23.4,<25.29" +sqlglot = ">=23.4,<25.30" toolz = ">=0.11,<2" typing-extensions = ">=4.3.0,<5" [package.extras] bigquery = ["db-dtypes (>=0.3,<2)", "google-cloud-bigquery (>=3,<4)", "google-cloud-bigquery-storage (>=2,<3)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "pydata-google-auth (>=1.4.0,<2)", "rich (>=12.4.4,<14)"] clickhouse = ["clickhouse-connect[arrow,numpy,pandas] (>=0.5.23,<1)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] +databricks = ["databricks-sql-connector-core (>=4,<5)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] datafusion = ["datafusion (>=0.6,<43)", "numpy (>=1.23.2,<3)", "pandas (>=1.5.3,<3)", "pyarrow (>=10.0.1,<19)", "pyarrow-hotfix (>=0.4,<1)", "rich (>=12.4.4,<14)"] decompiler = ["black (>=22.1.0,<25)"] deltalake = ["deltalake (>=0.9.0,<1)"] @@ -7458,18 +7459,18 @@ typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" [[package]] name = "pydata-google-auth" -version = "1.8.2" +version = "1.9.0" description = "PyData helpers for authenticating to Google APIs" optional = true -python-versions = "*" +python-versions = ">=3.9" files = [ - {file = "pydata-google-auth-1.8.2.tar.gz", hash = "sha256:547b6c0fbea657dcecd50887c5db8640ebec062a59a2b88e8ff8e53a04818303"}, - {file = "pydata_google_auth-1.8.2-py2.py3-none-any.whl", hash = "sha256:a9dce59af4a170ea60c4b2ebbc83ee1f74d34255a4f97b2469ae9a4a0dc98e99"}, + {file = "pydata-google-auth-1.9.0.tar.gz", hash = "sha256:2f546e88f007dfdb050087556eb46d6008e351386a7b368096797fae5df374f2"}, + {file = "pydata_google_auth-1.9.0-py2.py3-none-any.whl", hash = "sha256:e17a44ce8de5b48883667357c03595b85d80938bf1fb714d65bfac9a9f9c8add"}, ] [package.dependencies] -google-auth = {version = ">=1.25.0,<3.0dev", markers = "python_version >= \"3.6\""} -google-auth-oauthlib = {version = ">=0.4.0", markers = "python_version >= \"3.6\""} +google-auth = ">=1.25.0,<3.0dev" +google-auth-oauthlib = ">=0.4.0" setuptools = "*" [[package]] @@ -8903,6 +8904,64 @@ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] +[[package]] +name = "shapely" +version = "2.0.6" +description = "Manipulation and analysis of geometric objects" +optional = false +python-versions = ">=3.7" +files = [ + {file = "shapely-2.0.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:29a34e068da2d321e926b5073539fd2a1d4429a2c656bd63f0bd4c8f5b236d0b"}, + {file = "shapely-2.0.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e1c84c3f53144febf6af909d6b581bc05e8785d57e27f35ebaa5c1ab9baba13b"}, + {file = "shapely-2.0.6-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2ad2fae12dca8d2b727fa12b007e46fbc522148a584f5d6546c539f3464dccde"}, + {file = "shapely-2.0.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b3304883bd82d44be1b27a9d17f1167fda8c7f5a02a897958d86c59ec69b705e"}, + {file = "shapely-2.0.6-cp310-cp310-win32.whl", hash = "sha256:3ec3a0eab496b5e04633a39fa3d5eb5454628228201fb24903d38174ee34565e"}, + {file = "shapely-2.0.6-cp310-cp310-win_amd64.whl", hash = "sha256:28f87cdf5308a514763a5c38de295544cb27429cfa655d50ed8431a4796090c4"}, + {file = "shapely-2.0.6-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:5aeb0f51a9db176da9a30cb2f4329b6fbd1e26d359012bb0ac3d3c7781667a9e"}, + {file = "shapely-2.0.6-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:9a7a78b0d51257a367ee115f4d41ca4d46edbd0dd280f697a8092dd3989867b2"}, + {file = "shapely-2.0.6-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f32c23d2f43d54029f986479f7c1f6e09c6b3a19353a3833c2ffb226fb63a855"}, + {file = "shapely-2.0.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b3dc9fb0eb56498912025f5eb352b5126f04801ed0e8bdbd867d21bdbfd7cbd0"}, + {file = "shapely-2.0.6-cp311-cp311-win32.whl", hash = "sha256:d93b7e0e71c9f095e09454bf18dad5ea716fb6ced5df3cb044564a00723f339d"}, + {file = "shapely-2.0.6-cp311-cp311-win_amd64.whl", hash = "sha256:c02eb6bf4cfb9fe6568502e85bb2647921ee49171bcd2d4116c7b3109724ef9b"}, + {file = "shapely-2.0.6-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:cec9193519940e9d1b86a3b4f5af9eb6910197d24af02f247afbfb47bcb3fab0"}, + {file = "shapely-2.0.6-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:83b94a44ab04a90e88be69e7ddcc6f332da7c0a0ebb1156e1c4f568bbec983c3"}, + {file = "shapely-2.0.6-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:537c4b2716d22c92036d00b34aac9d3775e3691f80c7aa517c2c290351f42cd8"}, + {file = "shapely-2.0.6-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:98fea108334be345c283ce74bf064fa00cfdd718048a8af7343c59eb40f59726"}, + {file = "shapely-2.0.6-cp312-cp312-win32.whl", hash = "sha256:42fd4cd4834747e4990227e4cbafb02242c0cffe9ce7ef9971f53ac52d80d55f"}, + {file = "shapely-2.0.6-cp312-cp312-win_amd64.whl", hash = "sha256:665990c84aece05efb68a21b3523a6b2057e84a1afbef426ad287f0796ef8a48"}, + {file = "shapely-2.0.6-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:42805ef90783ce689a4dde2b6b2f261e2c52609226a0438d882e3ced40bb3013"}, + {file = "shapely-2.0.6-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:6d2cb146191a47bd0cee8ff5f90b47547b82b6345c0d02dd8b25b88b68af62d7"}, + {file = "shapely-2.0.6-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e3fdef0a1794a8fe70dc1f514440aa34426cc0ae98d9a1027fb299d45741c381"}, + {file = "shapely-2.0.6-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2c665a0301c645615a107ff7f52adafa2153beab51daf34587170d85e8ba6805"}, + {file = "shapely-2.0.6-cp313-cp313-win32.whl", hash = "sha256:0334bd51828f68cd54b87d80b3e7cee93f249d82ae55a0faf3ea21c9be7b323a"}, + {file = "shapely-2.0.6-cp313-cp313-win_amd64.whl", hash = "sha256:d37d070da9e0e0f0a530a621e17c0b8c3c9d04105655132a87cfff8bd77cc4c2"}, + {file = "shapely-2.0.6-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:fa7468e4f5b92049c0f36d63c3e309f85f2775752e076378e36c6387245c5462"}, + {file = "shapely-2.0.6-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ed5867e598a9e8ac3291da6cc9baa62ca25706eea186117034e8ec0ea4355653"}, + {file = "shapely-2.0.6-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:81d9dfe155f371f78c8d895a7b7f323bb241fb148d848a2bf2244f79213123fe"}, + {file = "shapely-2.0.6-cp37-cp37m-win32.whl", hash = "sha256:fbb7bf02a7542dba55129062570211cfb0defa05386409b3e306c39612e7fbcc"}, + {file = "shapely-2.0.6-cp37-cp37m-win_amd64.whl", hash = "sha256:837d395fac58aa01aa544495b97940995211e3e25f9aaf87bc3ba5b3a8cd1ac7"}, + {file = "shapely-2.0.6-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c6d88ade96bf02f6bfd667ddd3626913098e243e419a0325ebef2bbd481d1eb6"}, + {file = "shapely-2.0.6-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:8b3b818c4407eaa0b4cb376fd2305e20ff6df757bf1356651589eadc14aab41b"}, + {file = "shapely-2.0.6-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1bbc783529a21f2bd50c79cef90761f72d41c45622b3e57acf78d984c50a5d13"}, + {file = "shapely-2.0.6-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2423f6c0903ebe5df6d32e0066b3d94029aab18425ad4b07bf98c3972a6e25a1"}, + {file = "shapely-2.0.6-cp38-cp38-win32.whl", hash = "sha256:2de00c3bfa80d6750832bde1d9487e302a6dd21d90cb2f210515cefdb616e5f5"}, + {file = "shapely-2.0.6-cp38-cp38-win_amd64.whl", hash = "sha256:3a82d58a1134d5e975f19268710e53bddd9c473743356c90d97ce04b73e101ee"}, + {file = "shapely-2.0.6-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:392f66f458a0a2c706254f473290418236e52aa4c9b476a072539d63a2460595"}, + {file = "shapely-2.0.6-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:eba5bae271d523c938274c61658ebc34de6c4b33fdf43ef7e938b5776388c1be"}, + {file = "shapely-2.0.6-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7060566bc4888b0c8ed14b5d57df8a0ead5c28f9b69fb6bed4476df31c51b0af"}, + {file = "shapely-2.0.6-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b02154b3e9d076a29a8513dffcb80f047a5ea63c897c0cd3d3679f29363cf7e5"}, + {file = "shapely-2.0.6-cp39-cp39-win32.whl", hash = "sha256:44246d30124a4f1a638a7d5419149959532b99dfa25b54393512e6acc9c211ac"}, + {file = "shapely-2.0.6-cp39-cp39-win_amd64.whl", hash = "sha256:2b542d7f1dbb89192d3512c52b679c822ba916f93479fa5d4fc2fe4fa0b3c9e8"}, + {file = "shapely-2.0.6.tar.gz", hash = "sha256:997f6159b1484059ec239cacaa53467fd8b5564dabe186cd84ac2944663b0bf6"}, +] + +[package.dependencies] +numpy = ">=1.14,<3" + +[package.extras] +docs = ["matplotlib", "numpydoc (==1.1.*)", "sphinx", "sphinx-book-theme", "sphinx-remove-toctrees"] +test = ["pytest", "pytest-cov"] + [[package]] name = "shellingham" version = "1.5.4" @@ -10641,6 +10700,7 @@ lancedb = ["lancedb", "pyarrow", "tantivy"] motherduck = ["duckdb", "pyarrow"] mssql = ["pyodbc"] parquet = ["pyarrow"] +postgis = ["psycopg2-binary", "psycopg2cffi"] postgres = ["psycopg2-binary", "psycopg2cffi"] pyiceberg = ["pyarrow", "pyiceberg", "sqlalchemy"] qdrant = ["qdrant-client"] @@ -10656,4 +10716,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.13" -content-hash = "bf6bbd4a02be9fbec49f26710879c9194bf36707d7a586cfa723aafb9dc04167" +content-hash = "3f57364040f4f97a1422a6bd6ce72a70c655f379c77f65d091ac0417d44c0a31" diff --git a/pyproject.toml b/pyproject.toml index 2208ae83e5..17c7af99d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,6 +126,7 @@ deltalake = ["deltalake", "pyarrow"] sql_database = ["sqlalchemy"] sqlalchemy = ["sqlalchemy", "alembic"] pyiceberg = ["pyiceberg", "pyarrow", "sqlalchemy"] +postgis = ["psycopg2-binary", "psycopg2cffi"] [tool.poetry.scripts] dlt = "dlt.cli._dlt:_main" @@ -175,6 +176,7 @@ types-regex = "^2024.5.15.20240519" flake8-print = "^5.0.0" mimesis = "^7.0.0" ibis-framework = { version = ">=9.0.0", markers = "python_version >= '3.10'", optional = true, extras = ["duckdb", "postgres", "bigquery", "snowflake", "mssql", "clickhouse"]} +shapely = ">=2.0.6" [tool.poetry.group.sources] optional = true diff --git a/tests/common/test_jsonpath.py b/tests/common/test_jsonpath.py new file mode 100644 index 0000000000..c4e9fbc664 --- /dev/null +++ b/tests/common/test_jsonpath.py @@ -0,0 +1,43 @@ +import pytest + +from dlt.common import jsonpath as jp + + +@pytest.mark.parametrize("compiled", [True, False]) +@pytest.mark.parametrize( + "path, expected", + [ + ("col_a", "col_a"), + ("'col.a'", "col.a"), + ("'$col_a'", "$col_a"), + ("'col|a'", "col|a"), + ], +) +def test_extract_simple_field_name_positive(path, expected, compiled): + if compiled: + path = jp.compile_path(path) + + result = jp.extract_simple_field_name(path) + assert result == expected + + +@pytest.mark.parametrize("compiled", [True, False]) +@pytest.mark.parametrize( + "path", + [ + "$.col_a", + "$.col_a.items", + "$.col_a.items[0]", + "$.col_a.items[*]", + "col_a|col_b", + ], +) +def test_extract_simple_field_name_negative(path, compiled): + if compiled: + path = jp.compile_path(path) + + result = jp.extract_simple_field_name(path) + assert result is None + + +# TODO: Test all jsonpath utils diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index 3f8ccfc20f..f3ebb02b46 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -19,13 +19,13 @@ from dlt.common import Decimal, jsonpath from dlt.common.exceptions import DictValidationException from dlt.common.schema.typing import ( - TColumnNames, TStoredSchema, TColumnSchema, TWriteDispositionConfig, ) from dlt.common.schema.utils import simple_regex_validator -from dlt.common.typing import DictStrStr, StrStr, TDataItem, TSortOrder +from dlt.common.typing import DictStrStr, StrStr, TDataItem, TSortOrder, TColumnNames + from dlt.common.validation import validate_dict, validate_dict_ignoring_xkeys diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index dbec417f97..9343449aed 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -213,6 +213,48 @@ def with_table_hints(): extract_step.extract(source, 20, 1) +def test_extract_hints_mark_incremental(extract_step: Extract) -> None: + os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" + + @dlt.resource(columns=[{"name": "id", "data_type": "bigint"}], primary_key="id") + def with_table_hints(): + # yield a regular dataset first, simulate backfil + yield [{"id": id_, "pk": "A"} for id_ in range(1, 10)] + + # get the resource + resource = dlt.current.source().resources[dlt.current.resource_name()] + table = resource.compute_table_schema() + # also there we see the hints + assert table["columns"]["id"]["primary_key"] is True + assert table["columns"]["id"]["data_type"] == "bigint" + + # start emitting incremental + yield dlt.mark.with_hints( + [{"id": id_, "pk": "A", "created_at": id_ + 10} for id_ in range(100, 110)], + make_hints(incremental=dlt.sources.incremental("created_at", initial_value=105)), + ) + + # get the resource + resource = dlt.current.source().resources[dlt.current.resource_name()] + assert resource.incremental.cursor_path == "created_at" # type: ignore[attr-defined] + assert resource.incremental.primary_key == "id" + # we are able to add the incremental to the pipe. but it won't + # join actually executing pipe which is a clone of a (partial) pipe of the resource + assert isinstance(resource._pipe._steps[1], dlt.sources.incremental) + # NOTE: this results in unbounded exception + # assert resource.incremental.last_value == 299 + table = resource.compute_table_schema() + assert table["columns"]["created_at"]["incremental"] is not None + + yield [{"id": id_, "pk": "A", "created_at": id_ + 10} for id_ in range(110, 120)] + + source = DltSource(dlt.Schema("hintable"), "module", [with_table_hints]) + extract_step.extract(source, 20, 1) + # make sure incremental is in the source schema + table = source.schema.get_table("with_table_hints") + assert table["columns"]["created_at"]["incremental"] is not None + + def test_extract_metrics_on_exception_no_flush(extract_step: Extract) -> None: @dlt.resource def letters(): diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 7ce4228b6c..30df12ae17 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -5,7 +5,7 @@ from datetime import datetime, date # noqa: I251 from itertools import chain, count from time import sleep -from typing import Any, Optional +from typing import Any, Optional, Literal, Sequence, Dict from unittest import mock import duckdb @@ -1468,10 +1468,13 @@ def test_apply_hints_incremental(item_type: TestDataItemFormat) -> None: data = [{"created_at": 1}, {"created_at": 2}, {"created_at": 3}] source_items = data_to_item_format(item_type, data) + should_have_arg = True + @dlt.resource def some_data(created_at: Optional[dlt.sources.incremental[int]] = None): # make sure that incremental from apply_hints is here - if created_at is not None: + if should_have_arg: + assert created_at is not None assert created_at.cursor_path == "created_at" assert created_at.last_value_func is max yield source_items @@ -1505,6 +1508,7 @@ def some_data(created_at: Optional[dlt.sources.incremental[int]] = None): assert list(r) == [] # remove incremental + should_have_arg = False r.apply_hints(incremental=dlt.sources.incremental.EMPTY) assert r.incremental is not None assert r.incremental.incremental is None @@ -1515,6 +1519,7 @@ def some_data(created_at: Optional[dlt.sources.incremental[int]] = None): # as above but we provide explicit incremental when creating resource p = p.drop() + should_have_arg = True r = some_data(created_at=dlt.sources.incremental("created_at", last_value_func=min)) # hints have precedence, as expected r.apply_hints(incremental=dlt.sources.incremental("created_at", last_value_func=max)) @@ -3568,3 +3573,223 @@ def some_data( call for call in logger_spy.call_args_list if "Large number of records" in call.args[0] ] assert len(warning_calls) == 1 + + +def _resource_for_table_hint( + hint_type: Literal[ + "default_arg", "explicit_arg", "apply_hints", "default_arg_override", "decorator" + ], + data: Sequence[Dict[str, Any]], + incremental_arg: dlt.sources.incremental[Any], + incremental_arg_default: dlt.sources.incremental[Any] = None, +) -> DltResource: + if incremental_arg is None and incremental_arg_default is None: + raise ValueError("One of the incremental arguments must be provided.") + + decorator_arg = None + if hint_type == "default_arg": + default_arg = incremental_arg_default + override_arg = None + elif hint_type == "default_arg_override": + default_arg = incremental_arg_default + override_arg = incremental_arg + elif hint_type == "decorator": + default_arg = None + override_arg = None + decorator_arg = incremental_arg_default + else: + default_arg = None + override_arg = incremental_arg + + @dlt.resource(incremental=decorator_arg) + def some_data( + updated_at: dlt.sources.incremental[Any] = default_arg, + ) -> Any: + yield data_to_item_format("object", data) + + if override_arg is None: + return some_data() + + if hint_type == "apply_hints": + rs = some_data() + rs.apply_hints(incremental=override_arg) + return rs + + return some_data(updated_at=override_arg) + + +@pytest.mark.parametrize( + "hint_type", ["default_arg", "explicit_arg", "apply_hints", "default_arg_override", "decorator"] +) +@pytest.mark.parametrize( + "incremental_settings", + [ + { + "last_value_func": "min", + "row_order": "desc", + "on_cursor_value_missing": "include", + }, + {"last_value_func": "max", "on_cursor_value_missing": "raise"}, + ], +) +def test_incremental_table_hint_datetime_column( + hint_type: Literal[ + "default_arg", + "explicit_arg", + "default_arg_override", + "apply_hints", + "decorator", + ], + incremental_settings: Dict[str, Any], +) -> None: + initial_value_override = pendulum.now() + initial_value_default = pendulum.now().subtract(seconds=10) + rs = _resource_for_table_hint( + hint_type, + [{"updated_at": pendulum.now().add(seconds=i)} for i in range(1, 12)], + dlt.sources.incremental( + "updated_at", initial_value=initial_value_override, **incremental_settings + ), + dlt.sources.incremental( + "updated_at", initial_value=initial_value_default, **incremental_settings + ), + ) + + pipeline = dlt.pipeline(pipeline_name=uniq_id()) + pipeline.extract(rs) + + table_schema = pipeline.default_schema.tables["some_data"] + + assert table_schema["columns"]["updated_at"]["incremental"] is True + + +def incremental_instance_or_dict(use_dict: bool, **kwargs): + if use_dict: + return kwargs + return dlt.sources.incremental(**kwargs) + + +@pytest.mark.parametrize("use_dict", [False, True]) +def test_incremental_in_resource_decorator(use_dict: bool) -> None: + # Incremental set in decorator, without any arguments + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def no_incremental_arg(): + yield [{"value": i} for i in range(10)] + + result = list(no_incremental_arg()) + # filtering is applied + assert result == [{"value": i} for i in range(0, 6)] + + # Apply hints overrides the decorator settings + rs = no_incremental_arg() + rs.apply_hints( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=3, last_value_func=max + ) + ) + result = list(rs) + assert result == [{"value": i} for i in range(3, 10)] + + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def with_optional_incremental_arg(incremental: Optional[dlt.sources.incremental[int]] = None): + assert incremental is not None + yield [{"value": i} for i in range(10)] + + # Decorator settings are used + result = list(with_optional_incremental_arg()) + assert result == [{"value": i} for i in range(0, 6)] + + +@pytest.mark.parametrize("use_dict", [False, True]) +def test_incremental_in_resource_decorator_default_arg(use_dict: bool) -> None: + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def with_default_incremental_arg( + incremental: dlt.sources.incremental[int] = dlt.sources.incremental( + "value", initial_value=3, last_value_func=min + ) + ): + assert incremental.last_value == initial_value + assert incremental.last_value_func == last_value_func + yield [{"value": i} for i in range(10)] + + last_value_func = max + initial_value = 4 + # Explicit argument overrides the default and decorator argument + result = list( + with_default_incremental_arg( + incremental=dlt.sources.incremental( + "value", initial_value=initial_value, last_value_func=last_value_func + ) + ) + ) + assert result == [{"value": i} for i in range(4, 10)] + + # Decorator param overrides function default arg + last_value_func = min + initial_value = 5 + result = list(with_default_incremental_arg()) + assert result == [{"value": i} for i in range(0, 6)] + + +@pytest.mark.parametrize("use_dict", [False, True]) +def test_incremental_table_hint_merged_columns(use_dict: bool) -> None: + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="col_a", initial_value=3, last_value_func=min + ) + ) + def some_data(): + yield [{"col_a": i, "foo": i + 2, "col_b": i + 1, "bar": i + 3} for i in range(10)] + + pipeline = dlt.pipeline(pipeline_name=uniq_id()) + pipeline.extract(some_data()) + + table_schema = pipeline.default_schema.tables["some_data"] + assert table_schema["columns"]["col_a"]["incremental"] is True + + rs = some_data() + rs.apply_hints( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="col_b", initial_value=5, last_value_func=max + ) + ) + + pipeline.extract(rs) + + table_schema_2 = pipeline.default_schema.tables["some_data"] + + # Only one column should have the hint + assert "incremental" not in table_schema_2["columns"]["col_a"] + assert table_schema_2["columns"]["col_b"]["incremental"] is True + + +@pytest.mark.parametrize("use_dict", [True, False]) +def test_incremental_column_hint_cursor_is_not_column(use_dict: bool): + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="col_a|col_b", initial_value=3, last_value_func=min + ) + ) + def some_data(): + yield [{"col_a": i, "foo": i + 2, "col_b": i + 1, "bar": i + 3} for i in range(10)] + + pipeline = dlt.pipeline(pipeline_name=uniq_id()) + + pipeline.extract(some_data()) + + table_schema = pipeline.default_schema.tables["some_data"] + + for col in table_schema["columns"].values(): + assert "incremental" not in col diff --git a/tests/load/postgres/test_postgres_table_builder.py b/tests/load/postgres/test_postgres_table_builder.py index 4dac400f2a..e2ed0f0b2e 100644 --- a/tests/load/postgres/test_postgres_table_builder.py +++ b/tests/load/postgres/test_postgres_table_builder.py @@ -1,24 +1,35 @@ -import pytest from copy import deepcopy +from typing import Generator, Any, List + +import pytest import sqlfluff +import dlt from dlt.common.exceptions import TerminalValueError -from dlt.common.utils import uniq_id from dlt.common.schema import Schema, utils - +from dlt.common.typing import DictStrStr +from dlt.common.utils import uniq_id from dlt.destinations import postgres -from dlt.destinations.impl.postgres.postgres import PostgresClient from dlt.destinations.impl.postgres.configuration import ( PostgresClientConfiguration, PostgresCredentials, ) - +from dlt.destinations.impl.postgres.postgres import ( + PostgresClient, +) +from dlt.destinations.impl.postgres.postgres_adapter import ( + postgres_adapter, + SRID_HINT, + GEOMETRY_HINT, +) +from dlt.extract import DltResource from tests.cases import ( TABLE_UPDATE, TABLE_UPDATE_ALL_INT_PRECISIONS, - TABLE_UPDATE_ALL_TIMESTAMP_PRECISIONS, ) -from tests.load.utils import empty_schema +from tests.load.postgres.utils import generate_sample_geometry_records +from tests.load.utils import destinations_configs, DestinationTestConfiguration, sequence_generator +from tests.utils import assert_load_info # mark all tests as essential, do not remove pytestmark = pytest.mark.essential @@ -182,3 +193,217 @@ def test_create_dlt_table(client: PostgresClient) -> None: sqlfluff.parse(sql, dialect="postgres") qualified_name = client.sql_client.make_qualified_table_name("_dlt_version") assert f"CREATE TABLE IF NOT EXISTS {qualified_name}" in sql + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["postgres"]), + ids=lambda x: x.name, +) +def test_adapter_geometry_hint_config( + destination_config: DestinationTestConfiguration, +) -> None: + @dlt.resource(columns=[{"name": "content", "data_type": "text"}]) + def some_data() -> Generator[DictStrStr, Any, None]: + yield from next(sequence_generator()) + + assert some_data.columns["content"] == {"name": "content", "data_type": "text"} # type: ignore[index] + + # Default SRID. + postgres_adapter(some_data, geometry=["content"]) + + assert some_data.columns["content"] == { # type: ignore + "name": "content", + "data_type": "text", + GEOMETRY_HINT: True, + SRID_HINT: 4326, + } + + # Nonstandard SRID. + postgres_adapter(some_data, geometry="content", srid=8232) + + assert some_data.columns["content"] == { # type: ignore + "name": "content", + "data_type": "text", + GEOMETRY_HINT: True, + SRID_HINT: 8232, + } + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["postgres"]), + ids=lambda x: x.name, +) +def test_geometry_types( + destination_config: DestinationTestConfiguration, +) -> None: + from shapely import wkt, wkb, LinearRing, Polygon # type: ignore + + @dlt.resource + def geodata_default_wkt(): + yield from generate_sample_geometry_records("wkt") + + @dlt.resource + def geodata_3857_wkt(): + yield from generate_sample_geometry_records("wkt") + + @dlt.resource + def geodata_2163_wkt(): + yield from generate_sample_geometry_records("wkt") + + @dlt.resource + def geodata_default_wkb_hex(): + yield from generate_sample_geometry_records("wkb_hex") + + @dlt.resource + def geodata_3857_wkb_hex(): + yield from generate_sample_geometry_records("wkb_hex") + + @dlt.resource + def geodata_2163_wkb_hex(): + yield from generate_sample_geometry_records("wkb_hex") + + @dlt.resource(file_format="csv") + def geodata_default_csv_wkt(): + yield from generate_sample_geometry_records("wkt") + + @dlt.resource(file_format="csv") + def geodata_3857_csv_wkt(): + yield from generate_sample_geometry_records("wkt") + + @dlt.resource(file_format="csv") + def geodata_2163_csv_wkt(): + yield from generate_sample_geometry_records("wkt") + + @dlt.resource(file_format="csv") + def geodata_default_csv_wkb_hex(): + yield from generate_sample_geometry_records("wkb_hex") + + @dlt.resource(file_format="csv") + def geodata_3857_csv_wkb_hex(): + yield from generate_sample_geometry_records("wkb_hex") + + @dlt.resource(file_format="csv") + def geodata_2163_csv_wkb_hex(): + yield from generate_sample_geometry_records("wkb_hex") + + @dlt.resource + def no_geodata(): + yield from [{"a": 1}, {"a": 2}] + + postgres_adapter(geodata_default_wkt, geometry=["geom"]) + postgres_adapter(geodata_3857_wkt, geometry=["geom"], srid=3857) + postgres_adapter(geodata_2163_wkt, geometry=["geom"], srid=2163) + postgres_adapter(geodata_default_wkb_hex, geometry=["geom"]) + postgres_adapter(geodata_3857_wkb_hex, geometry=["geom"], srid=3857) + postgres_adapter(geodata_2163_wkb_hex, geometry=["geom"], srid=2163) + postgres_adapter(geodata_default_csv_wkt, geometry=["geom"]) + postgres_adapter(geodata_3857_csv_wkt, geometry=["geom"], srid=3857) + postgres_adapter(geodata_2163_csv_wkt, geometry=["geom"], srid=2163) + postgres_adapter(geodata_default_csv_wkb_hex, geometry=["geom"]) + postgres_adapter(geodata_3857_csv_wkb_hex, geometry=["geom"], srid=3857) + postgres_adapter(geodata_2163_csv_wkb_hex, geometry=["geom"], srid=2163) + + @dlt.source + def geodata() -> List[DltResource]: + return [ + geodata_default_wkt, + geodata_3857_wkt, + geodata_2163_wkt, + geodata_default_wkb_hex, + geodata_3857_wkb_hex, + geodata_2163_wkb_hex, + no_geodata, + geodata_default_csv_wkt, + geodata_3857_csv_wkt, + geodata_2163_csv_wkt, + geodata_default_csv_wkb_hex, + geodata_3857_csv_wkb_hex, + geodata_2163_csv_wkb_hex, + ] + + pipeline = destination_config.setup_pipeline("test_geometry_types", dev_mode=True) + info = pipeline.run( + geodata(), + ) + assert_load_info(info) + + # Assert that types were read in as PostGIS geometry types + with pipeline.sql_client() as c: + with c.execute_query(f"""SELECT f_geometry_column +FROM geometry_columns +WHERE f_table_name in + ('geodata_default_wkb', 'geodata_3857_wkb', 'geodata_2163_wkb', 'geodata_default_wkt', 'geodata_3857_wkt', + 'geodata_2163_wkt', 'geodata_default_wkb_hex', 'geodata_3857_wkb_hex', 'geodata_2163_wkb_hex', + 'geodata_default_csv_wkt', 'geodata_3857_csv_wkt', 'geodata_2163_csv_wkt', 'geodata_default_csv_wkb_hex', + 'geodata_3857_csv_wkb_hex', 'geodata_2163_csv_wkb_hex' + ) + AND f_table_schema = '{c.fully_qualified_dataset_name(escape=False)}'""") as cur: + records = cur.fetchall() + assert records + assert {record[0] for record in records} == {"geom"} + + # Verify round-trip integrity + for resource in [ + "geodata_default_wkt", + "geodata_3857_wkt", + "geodata_2163_wkt", + "geodata_default_wkb_hex", + "geodata_3857_wkb_hex", + "geodata_2163_wkb_hex", + "geodata_default_csv_wkt", + "geodata_3857_csv_wkt", + "geodata_2163_csv_wkt", + "geodata_default_csv_wkb_hex", + "geodata_3857_csv_wkb_hex", + "geodata_2163_csv_wkb_hex", + ]: + srid = 4326 if resource.startswith("geodata_default") else int(resource.split("_")[1]) + + query = f""" + SELECT type, ST_AsText(geom) as wkt, ST_SRID(geom) as srid, ST_AsBinary(geom) as wkb + FROM {c.make_qualified_table_name(resource)} + """ + + with c.execute_query(query) as cur: + results = cur.fetchall() + + def get_format(column_name): + if column_name.endswith("wkb_hex"): + return "wkb_hex" + return column_name.split("_")[-1] + + original_geometries = generate_sample_geometry_records(get_format(resource)) + + for result in results: + db_type, db_wkt, db_srid, db_wkb = result + orig_geom = next((g for g in original_geometries if g["type"] == db_type), None) + + assert orig_geom is not None, f"No matching original geometry found for {db_type}" + + assert ( + db_srid == srid + ), f"SRID mismatch for {db_type}: expected {srid}, got {db_srid}" + + if "Empty" in db_type: + assert wkt.loads(db_wkt).is_empty, f"Expected empty geometry for {db_type}" + else: + if "_wkt" in db_type: + orig_geom = wkt.loads(orig_geom["geom"]) + db_geom = wkt.loads(db_wkt) + elif "_wkb_hex" in db_type: + orig_geom = wkb.loads(bytes.fromhex(orig_geom["geom"])) + db_geom = wkb.loads(bytes(db_wkb)) + + tolerance = 1e-8 + if isinstance(orig_geom, LinearRing): + # LinearRing geometries are converted to Polygons for PostGIS compatibility. + db_geom = Polygon(orig_geom) + assert LinearRing(db_geom.exterior.coords).equals_exact( + orig_geom, tolerance + ), f"Geometry mismatch for {db_type}" + else: + assert orig_geom.equals_exact( # type: ignore[attr-defined] + db_geom, tolerance + ), f"Geometry mismatch for {db_type}" diff --git a/tests/load/postgres/utils.py b/tests/load/postgres/utils.py new file mode 100644 index 0000000000..b03a6b5096 --- /dev/null +++ b/tests/load/postgres/utils.py @@ -0,0 +1,68 @@ +from typing import List + +from shapely import ( # type: ignore + Point, + LineString, + Polygon, + MultiPoint, + MultiLineString, + MultiPolygon, + GeometryCollection, + LinearRing, +) +from shapely.wkb import dumps as wkb_dumps # type: ignore + +from dlt.common.typing import DictStrStr + + +def generate_sample_geometry_records(geometry_type: str) -> List[DictStrStr]: + """ + Generate sample geometry records including WKT and WKB representations. + + Returns: + A list of dictionaries, each containing a geometry type, + its Well-Known Text (WKT), and Well-Known Binary (WKB) representation. + """ + geometries = [ + ("Point", Point(1, 1)), + ("LineString", LineString([(0, 0), (1, 1), (2, 2)])), + ("Polygon", Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)])), + ("MultiPoint", MultiPoint([(0, 0), (1, 1), (2, 2)])), + ("MultiLineString", MultiLineString([((0, 0), (1, 1)), ((2, 2), (3, 3))])), + ( + "MultiPolygon", + MultiPolygon( + [ + Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]), + Polygon([(2, 2), (3, 2), (3, 3), (2, 3), (2, 2)]), + ] + ), + ), + ( + "GeometryCollection", + GeometryCollection([Point(1, 1), LineString([(0, 0), (1, 1), (2, 2)])]), + ), + ( + "ComplexPolygon", + Polygon( + [(0, 0), (10, 0), (10, 10), (0, 10), (0, 0)], + [[(4, 4), (6, 4), (6, 6), (4, 6), (4, 4)]], + ), + ), + ("EmptyPoint", Point()), + ("EmptyLineString", LineString()), + ("EmptyPolygon", Polygon()), + ("EmptyMultiPoint", MultiPoint()), + ("EmptyMultiLineString", MultiLineString()), + ("EmptyMultiPolygon", MultiPolygon()), + ("EmptyGeometryCollection", GeometryCollection()), + ] + + # LinearRing only works with wkb types + if geometry_type == "wkb": + geometries += [("LinearRing", LinearRing([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]))] + + return [ + {"type": f"{name}_{geometry_type}", "geom": getattr(geom, geometry_type)} + for name, geom in geometries + ]