Skip to content

Commit

Permalink
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1996…
Browse files Browse the repository at this point in the history
…-iceberg-filesystem
  • Loading branch information
jorritsandbrink committed Dec 1, 2024
2 parents a0fc017 + 61c2ed9 commit ba75445
Show file tree
Hide file tree
Showing 37 changed files with 1,115 additions and 159 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:

- name: Install dependencies
# install dlt with postgres support
run: poetry install --no-interaction -E postgres --with sentry-sdk,dbt
run: poetry install --no-interaction -E postgres -E postgis --with sentry-sdk,dbt

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake -E pyiceberg
run: poetry install --no-interaction -E redshift -E postgis -E postgres -E gs -E s3 -E az -E parquet -E duckdb -E cli -E filesystem --with sentry-sdk --with pipeline -E deltalake -E pyiceberg

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
image: postgis/postgis
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
Expand Down Expand Up @@ -95,7 +95,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake -E pyiceberg
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E weaviate -E qdrant -E sftp --with sentry-sdk --with pipeline -E deltalake -E pyiceberg

- name: Upgrade sqlalchemy
run: poetry run pip install sqlalchemy==2.0.18 # minimum version required by `pyiceberg`
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
image: postgis/postgis
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
Expand Down Expand Up @@ -83,7 +83,7 @@ jobs:

# TODO: which deps should we enable?
- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources
run: poetry install --no-interaction -E postgres -E postgis -E duckdb -E parquet -E filesystem -E cli -E sql_database --with sentry-sdk,pipeline,sources

# run sources tests in load against configured destinations
- run: poetry run pytest tests/load/sources
Expand Down
6 changes: 5 additions & 1 deletion dlt/common/destination/typing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Optional

from dlt.common.schema.typing import _TTableSchemaBase, TWriteDisposition, TTableReferenceParam
from dlt.common.schema.typing import (
_TTableSchemaBase,
TWriteDisposition,
TTableReferenceParam,
)


class PreparedTableSchema(_TTableSchemaBase, total=False):
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar, Union

from dlt.common.schema.typing import TColumnNames
from dlt.common.typing import TSortOrder
from dlt.extract.items import TTableHintTemplate
from dlt.common.typing import TSortOrder, TTableHintTemplate, TColumnNames

TCursorValue = TypeVar("TCursorValue", bound=Any)
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]
Expand All @@ -19,10 +17,12 @@ class IncrementalColumnState(TypedDict):

class IncrementalArgs(TypedDict, total=False):
cursor_path: str
initial_value: Optional[str]
last_value_func: Optional[LastValueFunc[str]]
initial_value: Optional[Any]
last_value_func: Optional[Union[LastValueFunc[str], Literal["min", "max"]]]
"""Last value callable or name of built in function"""
primary_key: Optional[TTableHintTemplate[TColumnNames]]
end_value: Optional[str]
end_value: Optional[Any]
row_order: Optional[TSortOrder]
allow_external_schedulers: Optional[bool]
lag: Optional[Union[float, int]]
on_cursor_value_missing: Optional[OnCursorValueMissing]
3 changes: 1 addition & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@
)
from dlt.common.schema import Schema
from dlt.common.schema.typing import (
TColumnNames,
TColumnSchema,
TWriteDispositionConfig,
TSchemaContract,
)
from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.common.storages.load_storage import LoadPackageInfo
from dlt.common.time import ensure_pendulum_datetime, precise_time
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize, TColumnNames
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
from dlt.common.data_writers.writers import TLoaderFileFormat
from dlt.common.utils import RowCounts, merge_row_counts
Expand Down
5 changes: 2 additions & 3 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from dlt.common.data_types import TDataType
from dlt.common.normalizers.typing import TNormalizersConfig
from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat
from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat, TColumnNames

try:
from pydantic import BaseModel as _PydanticBaseModel
Expand Down Expand Up @@ -132,8 +132,6 @@ class TColumnPropInfo(NamedTuple):
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
]
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""


class TColumnType(TypedDict, total=False):
Expand Down Expand Up @@ -166,6 +164,7 @@ class TColumnSchema(TColumnSchemaBase, total=False):
variant: Optional[bool]
hard_delete: Optional[bool]
dedup_sort: Optional[TSortOrder]
incremental: Optional[bool]


TTableSchemaColumns = Dict[str, TColumnSchema]
Expand Down
11 changes: 11 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,17 @@ def merge_diff(table: TTableSchema, table_diff: TPartialTableSchema) -> TPartial
* table hints are added or replaced from diff
* nothing gets deleted
"""

incremental_a_col = get_first_column_name_with_prop(
table, "incremental", include_incomplete=True
)
if incremental_a_col:
incremental_b_col = get_first_column_name_with_prop(
table_diff, "incremental", include_incomplete=True
)
if incremental_b_col:
table["columns"][incremental_a_col].pop("incremental")

# add new columns when all checks passed
updated_columns = merge_columns(table["columns"], table_diff["columns"])
table.update(table_diff)
Expand Down
7 changes: 7 additions & 0 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Iterator,
Generator,
NamedTuple,
Sequence,
)

from typing_extensions import (
Expand Down Expand Up @@ -112,6 +113,8 @@ class SecretSentinel:

TSecretStrValue = Annotated[str, SecretSentinel]

TColumnNames = Union[str, Sequence[str]]
"""A string representing a column name or a list of"""
TDataItem: TypeAlias = Any
"""A single data item as extracted from data source"""
TDataItems: TypeAlias = Union[TDataItem, List[TDataItem]]
Expand All @@ -126,6 +129,10 @@ class SecretSentinel:
TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv", "reference"]
"""known loader file formats"""

TDynHintType = TypeVar("TDynHintType")
TFunHintTemplate = Callable[[TDataItem], TDynHintType]
TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]]


class ConfigValueSentinel(NamedTuple):
"""Class to create singleton sentinel for config and secret injected value"""
Expand Down
6 changes: 2 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

from dlt.common.destination import PreparedTableSchema
from dlt.common.pendulum import timezone
from dlt.common.schema.typing import (
TColumnNames,
TTableSchemaColumns,
)
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.destinations.utils import get_resource_for_adapter
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/lancedb/lancedb_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import TColumnNames
from dlt.destinations.utils import get_resource_for_adapter
from dlt.extract import DltResource
from dlt.extract.items import TTableHintTemplate
Expand Down
22 changes: 15 additions & 7 deletions dlt/destinations/impl/postgres/factory.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import typing as t

from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.data_writers.configuration import CsvFormatConfiguration
from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.data_writers.escape import escape_postgres_identifier, escape_postgres_literal
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.exceptions import TerminalValueError
from dlt.common.schema.typing import TColumnSchema, TColumnType
from dlt.common.wei import EVM_DECIMAL_PRECISION

from dlt.destinations.type_mapping import TypeMapperImpl
from dlt.destinations.impl.postgres.configuration import (
PostgresCredentials,
PostgresClientConfiguration,
)
from dlt.destinations.impl.postgres.postgres_adapter import GEOMETRY_HINT, SRID_HINT
from dlt.destinations.type_mapping import TypeMapperImpl

if t.TYPE_CHECKING:
from dlt.destinations.impl.postgres.postgres import PostgresClient
Expand Down Expand Up @@ -55,6 +55,7 @@ class PostgresTypeMapper(TypeMapperImpl):
"character varying": "text",
"smallint": "bigint",
"integer": "bigint",
"geometry": "text",
}

def to_db_integer_type(self, column: TColumnSchema, table: PreparedTableSchema = None) -> str:
Expand Down Expand Up @@ -108,11 +109,18 @@ def to_db_datetime_type(
def from_destination_type(
self, db_type: str, precision: t.Optional[int] = None, scale: t.Optional[int] = None
) -> TColumnType:
if db_type == "numeric":
if (precision, scale) == self.capabilities.wei_precision:
return dict(data_type="wei")
if db_type == "numeric" and (precision, scale) == self.capabilities.wei_precision:
return dict(data_type="wei")
if db_type.startswith("geometry"):
return dict(data_type="text")
return super().from_destination_type(db_type, precision, scale)

def to_destination_type(self, column: TColumnSchema, table: PreparedTableSchema) -> str:
if column.get(GEOMETRY_HINT):
srid = column.get(SRID_HINT, 4326)
return f"geometry(Geometry, {srid})"
return super().to_destination_type(column, table)


class postgres(Destination[PostgresClientConfiguration, "PostgresClient"]):
spec = PostgresClientConfiguration
Expand Down
54 changes: 22 additions & 32 deletions dlt/destinations/impl/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,26 @@

from dlt.common import logger
from dlt.common.data_writers.configuration import CsvFormatConfiguration
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import (
DestinationInvalidFileFormat,
DestinationTerminalException,
)
from dlt.common.destination.reference import (
HasFollowupJobs,
PreparedTableSchema,
RunnableLoadJob,
FollowupJobRequest,
LoadJob,
TLoadJobState,
)
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.exceptions import TerminalValueError
from dlt.common.schema import TColumnSchema, TColumnHint, Schema
from dlt.common.schema.typing import TColumnType, TTableFormat
from dlt.common.schema.typing import TColumnType
from dlt.common.schema.utils import is_nullable_column
from dlt.common.storages.file_storage import FileStorage

from dlt.destinations.sql_jobs import SqlStagingCopyFollowupJob, SqlJobParams
from dlt.destinations.insert_job_client import InsertValuesJobClient
from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient
from dlt.destinations.impl.postgres.configuration import PostgresClientConfiguration
from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient
from dlt.destinations.insert_job_client import InsertValuesJobClient
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.sql_jobs import SqlStagingCopyFollowupJob, SqlJobParams

HINT_TO_POSTGRES_ATTR: Dict[TColumnHint, str] = {"unique": "UNIQUE"}

Expand All @@ -43,15 +39,16 @@ def generate_sql(
with sql_client.with_staging_dataset():
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
# drop destination table
sql.append(f"DROP TABLE IF EXISTS {table_name};")
# moving staging table to destination schema
sql.append(
f"ALTER TABLE {staging_table_name} SET SCHEMA"
f" {sql_client.fully_qualified_dataset_name()};"
sql.extend(
(
f"DROP TABLE IF EXISTS {table_name};",
(
f"ALTER TABLE {staging_table_name} SET SCHEMA"
f" {sql_client.fully_qualified_dataset_name()};"
),
f"CREATE TABLE {staging_table_name} (like {table_name} including all);",
)
)
# recreate staging table
sql.append(f"CREATE TABLE {staging_table_name} (like {table_name} including all);")
return sql


Expand Down Expand Up @@ -111,8 +108,7 @@ def run(self) -> None:
split_columns.append(norm_col)
if norm_col in split_headers and is_nullable_column(col):
split_null_headers.append(norm_col)
split_unknown_headers = set(split_headers).difference(split_columns)
if split_unknown_headers:
if split_unknown_headers := set(split_headers).difference(split_columns):
raise DestinationInvalidFileFormat(
"postgres",
"csv",
Expand All @@ -130,15 +126,8 @@ def run(self) -> None:

qualified_table_name = sql_client.make_qualified_table_name(table_name)
copy_sql = (
"COPY %s (%s) FROM STDIN WITH (FORMAT CSV, DELIMITER '%s', NULL '',"
" %s ENCODING '%s')"
% (
qualified_table_name,
headers,
sep,
null_headers,
csv_format.encoding,
)
f"COPY {qualified_table_name} ({headers}) FROM STDIN WITH (FORMAT CSV, DELIMITER"
f" '{sep}', NULL '', {null_headers} ENCODING '{csv_format.encoding}')"
)
with sql_client.begin_transaction():
with sql_client.native_connection.cursor() as cursor:
Expand Down Expand Up @@ -173,15 +162,16 @@ def create_load_job(
return job

def _get_column_def_sql(self, c: TColumnSchema, table: PreparedTableSchema = None) -> str:
hints_str = " ".join(
hints_ = " ".join(
self.active_hints.get(h, "")
for h in self.active_hints.keys()
if c.get(h, False) is True
)
column_name = self.sql_client.escape_column_name(c["name"])
return (
f"{column_name} {self.type_mapper.to_destination_type(c,table)} {hints_str} {self._gen_not_null(c.get('nullable', True))}"
)
nullability = self._gen_not_null(c.get("nullable", True))
column_type = self.type_mapper.to_destination_type(c, table)

return f"{column_name} {column_type} {hints_} {nullability}"

def _create_replace_followup_jobs(
self, table_chain: Sequence[PreparedTableSchema]
Expand Down
Loading

0 comments on commit ba75445

Please sign in to comment.