-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Spatial Types for PostGIS #1927
Changes from 57 commits
ac08c07
81f0195
873a3ba
f0632e6
67f301e
336f8bd
8871610
1f1c382
6873a32
83628b3
87dac56
18f1d1e
848d006
1652d79
702d004
017cb70
74d2b90
7128949
426be63
408cde2
bc07a20
73fb389
9ff9f00
94b3e08
2c5f483
73de8dd
0a86702
9f84109
51d72ce
31d1ecb
fcaa572
49bde3f
1b852fd
355dd8f
b0a6e9a
e3e8562
6c80b17
3e5af60
f1925e2
437e2c0
bc20d29
73fe78e
6be12ef
3b5a9e4
2b0abcc
d8eb975
185213e
44e5bf2
22c680d
a210907
acece96
17bdd8b
8e33b2f
664e35e
86a4d31
b966f9d
88c0992
9faf18f
40de779
298d831
46e7383
470e59e
8db4ce1
9e7fdcf
34bdfbf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,30 +2,28 @@ | |
|
||
from dlt.common import logger | ||
from dlt.common.data_writers.configuration import CsvFormatConfiguration | ||
from dlt.common.destination import DestinationCapabilitiesContext | ||
from dlt.common.destination.exceptions import ( | ||
DestinationInvalidFileFormat, | ||
DestinationTerminalException, | ||
) | ||
from dlt.common.destination.reference import ( | ||
HasFollowupJobs, | ||
PreparedTableSchema, | ||
RunnableLoadJob, | ||
FollowupJobRequest, | ||
LoadJob, | ||
TLoadJobState, | ||
) | ||
from dlt.common.destination import DestinationCapabilitiesContext | ||
from dlt.common.exceptions import TerminalValueError | ||
from dlt.common.schema import TColumnSchema, TColumnHint, Schema | ||
from dlt.common.schema.typing import TColumnType, TTableFormat | ||
from dlt.common.schema.typing import TColumnType | ||
from dlt.common.schema.utils import is_nullable_column | ||
from dlt.common.storages.file_storage import FileStorage | ||
|
||
from dlt.destinations.sql_jobs import SqlStagingCopyFollowupJob, SqlJobParams | ||
from dlt.destinations.insert_job_client import InsertValuesJobClient | ||
from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient | ||
from dlt.destinations.impl.postgres.configuration import PostgresClientConfiguration | ||
from dlt.destinations.impl.postgres.postgres_adapter import GEOMETRY_HINT | ||
from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient | ||
from dlt.destinations.insert_job_client import InsertValuesJobClient | ||
from dlt.destinations.sql_client import SqlClientBase | ||
from dlt.destinations.sql_jobs import SqlStagingCopyFollowupJob, SqlJobParams | ||
|
||
HINT_TO_POSTGRES_ATTR: Dict[TColumnHint, str] = {"unique": "UNIQUE"} | ||
|
||
|
@@ -43,15 +41,16 @@ def generate_sql( | |
with sql_client.with_staging_dataset(): | ||
staging_table_name = sql_client.make_qualified_table_name(table["name"]) | ||
table_name = sql_client.make_qualified_table_name(table["name"]) | ||
# drop destination table | ||
sql.append(f"DROP TABLE IF EXISTS {table_name};") | ||
# moving staging table to destination schema | ||
sql.append( | ||
f"ALTER TABLE {staging_table_name} SET SCHEMA" | ||
f" {sql_client.fully_qualified_dataset_name()};" | ||
sql.extend( | ||
( | ||
f"DROP TABLE IF EXISTS {table_name};", | ||
( | ||
f"ALTER TABLE {staging_table_name} SET SCHEMA" | ||
f" {sql_client.fully_qualified_dataset_name()};" | ||
), | ||
f"CREATE TABLE {staging_table_name} (like {table_name} including all);", | ||
) | ||
) | ||
# recreate staging table | ||
sql.append(f"CREATE TABLE {staging_table_name} (like {table_name} including all);") | ||
return sql | ||
|
||
|
||
|
@@ -111,8 +110,7 @@ def run(self) -> None: | |
split_columns.append(norm_col) | ||
if norm_col in split_headers and is_nullable_column(col): | ||
split_null_headers.append(norm_col) | ||
split_unknown_headers = set(split_headers).difference(split_columns) | ||
if split_unknown_headers: | ||
if split_unknown_headers := set(split_headers).difference(split_columns): | ||
raise DestinationInvalidFileFormat( | ||
"postgres", | ||
"csv", | ||
|
@@ -130,15 +128,8 @@ def run(self) -> None: | |
|
||
qualified_table_name = sql_client.make_qualified_table_name(table_name) | ||
copy_sql = ( | ||
"COPY %s (%s) FROM STDIN WITH (FORMAT CSV, DELIMITER '%s', NULL ''," | ||
" %s ENCODING '%s')" | ||
% ( | ||
qualified_table_name, | ||
headers, | ||
sep, | ||
null_headers, | ||
csv_format.encoding, | ||
) | ||
f"COPY {qualified_table_name} ({headers}) FROM STDIN WITH (FORMAT CSV, DELIMITER" | ||
Pipboyguy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
f" '{sep}', NULL '', {null_headers} ENCODING '{csv_format.encoding}')" | ||
) | ||
with sql_client.begin_transaction(): | ||
with sql_client.native_connection.cursor() as cursor: | ||
|
@@ -167,21 +158,30 @@ def __init__( | |
def create_load_job( | ||
self, table: PreparedTableSchema, file_path: str, load_id: str, restore: bool = False | ||
) -> LoadJob: | ||
if any( | ||
column.get(GEOMETRY_HINT) for column in table["columns"].values() | ||
) and not file_path.endswith("insert_values"): | ||
# Only insert_values load jobs supported for geom types. | ||
# TODO: This isn't actually true, can make it work with geoarrow! | ||
raise TerminalValueError( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it really true? if column is text and contains WKT data it should get inserted. did you try it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added csv loader cases to the geo test, and it all passes. Your intuition was entirely correct! |
||
"CSV bulk loading is not supported for tables with geometry columns." | ||
) | ||
job = super().create_load_job(table, file_path, load_id, restore) | ||
if not job and file_path.endswith("csv"): | ||
job = PostgresCsvCopyJob(file_path) | ||
return job | ||
|
||
def _get_column_def_sql(self, c: TColumnSchema, table: PreparedTableSchema = None) -> str: | ||
hints_str = " ".join( | ||
hints_ = " ".join( | ||
self.active_hints.get(h, "") | ||
for h in self.active_hints.keys() | ||
if c.get(h, False) is True | ||
) | ||
column_name = self.sql_client.escape_column_name(c["name"]) | ||
return ( | ||
f"{column_name} {self.type_mapper.to_destination_type(c,table)} {hints_str} {self._gen_not_null(c.get('nullable', True))}" | ||
) | ||
nullability = self._gen_not_null(c.get("nullable", True)) | ||
column_type = self.type_mapper.to_destination_type(c, table) | ||
|
||
return f"{column_name} {column_type} {hints_} {nullability}" | ||
|
||
def _create_replace_followup_jobs( | ||
self, table_chain: Sequence[PreparedTableSchema] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
from typing import Any, Optional | ||
|
||
from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns | ||
from dlt.destinations.utils import get_resource_for_adapter | ||
from dlt.extract import DltResource | ||
|
||
GEOMETRY_HINT = "x-postgres-geometry" | ||
SRID_HINT = "x-postgres-srid" | ||
|
||
|
||
def postgres_adapter( | ||
data: Any, | ||
geometry: TColumnNames = None, | ||
srid: Optional[int] = 4326, | ||
) -> DltResource: | ||
"""Prepares data for the postgres destination by specifying which columns should | ||
be cast to PostGIS geometry types. | ||
|
||
Args: | ||
data (Any): The data to be transformed. It can be raw data or an instance | ||
of DltResource. If raw data, the function wraps it into a DltResource | ||
object. | ||
geometry (TColumnNames, optional): Specify columns to cast to geometries. | ||
It can be a single column name as a string, or a list of column names. | ||
srid (int, optional): The Spatial Reference System Identifier (SRID) to be | ||
used for the geometry columns. If not provided, SRID 4326 will be used. | ||
|
||
Returns: | ||
DltResource: A resource with applied postgres-specific hints. | ||
|
||
Raises: | ||
ValueError: If input for `geometry` is invalid, or if no geometry columns are specified. | ||
|
||
Examples: | ||
>>> data = [{"town": "Null Island", "loc": "POINT(0 0)"}] | ||
>>> postgres_adapter(data, geometry="loc", srid=4326) | ||
[DltResource with hints applied] | ||
""" | ||
resource = get_resource_for_adapter(data) | ||
|
||
column_hints: TTableSchemaColumns = {} | ||
|
||
if geometry: | ||
if isinstance(geometry, str): | ||
geometry = [geometry] | ||
if not isinstance(geometry, list): | ||
raise ValueError( | ||
"'geometry' must be a list of column names or a single column name as a string." | ||
) | ||
|
||
for column_name in geometry: | ||
column_hints[column_name] = { | ||
"name": column_name, | ||
GEOMETRY_HINT: True, # type: ignore[misc] | ||
} | ||
if srid is not None: | ||
column_hints[column_name][SRID_HINT] = srid # type: ignore | ||
|
||
if not column_hints: | ||
raise ValueError("A value for 'geometry' must be specified.") | ||
else: | ||
resource.apply_hints(columns=column_hints) | ||
return resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very impractical :/ we have a dependence on shapely and there may be false positives (there are no magic numbers in wkb). passing expected type to escape...literal will slow this down
we could use a special binary type but user would need to wrap binary anyway. so my take is to drop
wkb
and just keepwkb_hex
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, removed wkb support, will update docs too