From be41558b33820b25f6596cac2707b733470ab8c7 Mon Sep 17 00:00:00 2001 From: eryanRM Date: Mon, 23 Oct 2023 23:06:29 -0400 Subject: [PATCH] update escape character, text mapping handling resolves 'str' mapping error --- dlt/destinations/synapse/synapse.py | 108 ++++++++++++++-------------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/dlt/destinations/synapse/synapse.py b/dlt/destinations/synapse/synapse.py index 6577e45546..5aacb7a45f 100644 --- a/dlt/destinations/synapse/synapse.py +++ b/dlt/destinations/synapse/synapse.py @@ -5,7 +5,7 @@ from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.data_types import TDataType from dlt.common.schema import TColumnSchema, TColumnHint, Schema -from dlt.common.schema.typing import TTableSchema +from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat from dlt.common.utils import uniq_id from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlMergeJob @@ -17,37 +17,50 @@ from dlt.destinations.synapse.configuration import SynapseClientConfiguration from dlt.destinations.sql_client import SqlClientBase from dlt.common.schema.typing import COLUMN_HINTS +from dlt.destinations.type_mapping import TypeMapper import logging # Import the logging module if it's not already imported logger = logging.getLogger(__name__) -SCT_TO_PGT: Dict[TDataType, str] = { - "complex": "nvarchar(max)", - "text": "nvarchar(max)", - "ntext": "nvarchar(max)", - "double": "float", - "bool": "bit", - "timestamp": "datetimeoffset", - "date": "date", - "bigint": "bigint", - "binary": "varbinary(max)", - "decimal": "decimal(%i,%i)", - "time": "time" -} - -PGT_TO_SCT: Dict[str, TDataType] = { - "nvarchar": "text", - "float": "double", - "bit": "bool", - "datetimeoffset": "timestamp", - "date": "date", - "bigint": "bigint", - "varbinary": "binary", - "decimal": "decimal", - "time": "time" -} +class SynapseTypeMapper(TypeMapper): + sct_to_unbound_dbt = { + "complex": "nvarchar(max)", + "text": "nvarchar(max)", + "double": "float", + "bool": "bit", + "bigint": "bigint", + "binary": "varbinary(max)", + "date": "date", + "timestamp": "datetimeoffset", + "time": "time", + } + + sct_to_dbt = { + "complex": "nvarchar(%i)", + "text": "nvarchar(%i)", + "timestamp": "datetimeoffset(%i)", + "binary": "varbinary(%i)", + "decimal": "decimal(%i,%i)", + "time": "time(%i)", + "wei": "decimal(%i,%i)" + } + + dbt_to_sct = { + "nvarchar": "text", + "float": "double", + "bit": "bool", + "datetimeoffset": "timestamp", + "date": "date", + "bigint": "bigint", + "varbinary": "binary", + "decimal": "decimal", + "time": "time", + "tinyint": "bigint", + "smallint": "bigint", + "int": "bigint", + } HINT_TO_SYNAPSE_ATTR: Dict[TColumnHint, str] = { "unique": "UNIQUE" @@ -89,7 +102,6 @@ def _new_temp_table_name(cls, name_prefix: str) -> str: return '#' + name class SynapseClient(InsertValuesJobClient): - #TODO: Add a function to insert multiple values for several columns in insert sql to individual insert sql. #Synapse does not support multi-row inserts using a single INSERT INTO statement capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() @@ -103,6 +115,7 @@ def __init__(self, schema: Schema, config: SynapseClientConfiguration) -> None: self.config: SynapseClientConfiguration = config self.sql_client = sql_client self.active_hints = HINT_TO_SYNAPSE_ATTR if self.config.create_indexes else {} + self.type_mapper = SynapseTypeMapper(self.capabilities) def _create_merge_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob: return SynapseMergeJob.from_table_chain(table_chain, self.sql_client) @@ -145,40 +158,29 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc " Several hint types may not be added to existing tables.") return sql_result - def _get_column_def_sql(self, c: TColumnSchema) -> str: + def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: + return [SynapseMergeJob.from_table_chain(table_chain, self.sql_client)] + + def _make_add_column_sql(self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None) -> List[str]: + # Override because mssql requires multiple columns in a single ADD COLUMN clause + return ["ADD \n" + ",\n".join(self._get_column_def_sql(c, table_format) for c in new_columns)] + def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: sc_type = c["data_type"] if sc_type == "text" and c.get("unique"): # MSSQL does not allow index on large TEXT columns - db_type = "nvarchar(900)" + db_type = "nvarchar(%i)" % (c.get("precision") or 900) else: - # TODO remove logging - #logger.info(f'sc_type: {sc_type}') - db_type = self._to_db_type(sc_type) - #logger.info(f'db_type: {db_type}') + db_type = self.type_mapper.to_db_type(c) hints_str = " ".join(self.active_hints.get(h, "") for h in self.active_hints.keys() if c.get(h, False) is True) column_name = self.capabilities.escape_identifier(c["name"]) return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c['nullable'])}" - def _create_optimized_replace_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob: - return SynapseStagingCopyJob.from_table_chain(table_chain, self.sql_client) - - @classmethod - def _to_db_type(cls, sc_t: TDataType) -> str: - if sc_t == "ntext": - return SCT_TO_PGT["ntext"] - if sc_t == "wei": - return SCT_TO_PGT["decimal"] % cls.capabilities.wei_precision - if sc_t == "decimal": - return SCT_TO_PGT["decimal"] % cls.capabilities.decimal_precision - if sc_t == "wei": - return f"numeric({2*EVM_DECIMAL_PRECISION},{EVM_DECIMAL_PRECISION})" - return SCT_TO_PGT[sc_t] + def _create_replace_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: + if self.config.replace_strategy == "staging-optimized": + return [SynapseStagingCopyJob.from_table_chain(table_chain, self.sql_client)] + return super()._create_replace_followup_jobs(table_chain) - @classmethod - def _from_db_type(cls, pq_t: str, precision: Optional[int], scale: Optional[int]) -> TDataType: - if pq_t == "numeric": - if (precision, scale) == cls.capabilities.wei_precision: - return "wei" - return PGT_TO_SCT[pq_t] + def _from_db_type(self, pq_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType: + return self.type_mapper.from_db_type(pq_t, precision, scale)