Skip to content

Commit

Permalink
update escape character, text mapping handling
Browse files Browse the repository at this point in the history
resolves 'str' mapping error
  • Loading branch information
eryanRM committed Oct 24, 2023
1 parent 1271547 commit be41558
Showing 1 changed file with 55 additions and 53 deletions.
108 changes: 55 additions & 53 deletions dlt/destinations/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.data_types import TDataType
from dlt.common.schema import TColumnSchema, TColumnHint, Schema
from dlt.common.schema.typing import TTableSchema
from dlt.common.schema.typing import TTableSchema, TColumnType, TTableFormat
from dlt.common.utils import uniq_id

from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlMergeJob
Expand All @@ -17,37 +17,50 @@
from dlt.destinations.synapse.configuration import SynapseClientConfiguration
from dlt.destinations.sql_client import SqlClientBase
from dlt.common.schema.typing import COLUMN_HINTS
from dlt.destinations.type_mapping import TypeMapper

import logging # Import the logging module if it's not already imported

logger = logging.getLogger(__name__)


SCT_TO_PGT: Dict[TDataType, str] = {
"complex": "nvarchar(max)",
"text": "nvarchar(max)",
"ntext": "nvarchar(max)",
"double": "float",
"bool": "bit",
"timestamp": "datetimeoffset",
"date": "date",
"bigint": "bigint",
"binary": "varbinary(max)",
"decimal": "decimal(%i,%i)",
"time": "time"
}

PGT_TO_SCT: Dict[str, TDataType] = {
"nvarchar": "text",
"float": "double",
"bit": "bool",
"datetimeoffset": "timestamp",
"date": "date",
"bigint": "bigint",
"varbinary": "binary",
"decimal": "decimal",
"time": "time"
}
class SynapseTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
"complex": "nvarchar(max)",
"text": "nvarchar(max)",
"double": "float",
"bool": "bit",
"bigint": "bigint",
"binary": "varbinary(max)",
"date": "date",
"timestamp": "datetimeoffset",
"time": "time",
}

sct_to_dbt = {
"complex": "nvarchar(%i)",
"text": "nvarchar(%i)",
"timestamp": "datetimeoffset(%i)",
"binary": "varbinary(%i)",
"decimal": "decimal(%i,%i)",
"time": "time(%i)",
"wei": "decimal(%i,%i)"
}

dbt_to_sct = {
"nvarchar": "text",
"float": "double",
"bit": "bool",
"datetimeoffset": "timestamp",
"date": "date",
"bigint": "bigint",
"varbinary": "binary",
"decimal": "decimal",
"time": "time",
"tinyint": "bigint",
"smallint": "bigint",
"int": "bigint",
}

HINT_TO_SYNAPSE_ATTR: Dict[TColumnHint, str] = {
"unique": "UNIQUE"
Expand Down Expand Up @@ -89,7 +102,6 @@ def _new_temp_table_name(cls, name_prefix: str) -> str:
return '#' + name

class SynapseClient(InsertValuesJobClient):
#TODO: Add a function to insert multiple values for several columns in insert sql to individual insert sql.
#Synapse does not support multi-row inserts using a single INSERT INTO statement

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
Expand All @@ -103,6 +115,7 @@ def __init__(self, schema: Schema, config: SynapseClientConfiguration) -> None:
self.config: SynapseClientConfiguration = config
self.sql_client = sql_client
self.active_hints = HINT_TO_SYNAPSE_ATTR if self.config.create_indexes else {}
self.type_mapper = SynapseTypeMapper(self.capabilities)

def _create_merge_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob:
return SynapseMergeJob.from_table_chain(table_chain, self.sql_client)
Expand Down Expand Up @@ -145,40 +158,29 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc
" Several hint types may not be added to existing tables.")
return sql_result

def _get_column_def_sql(self, c: TColumnSchema) -> str:
def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
return [SynapseMergeJob.from_table_chain(table_chain, self.sql_client)]

def _make_add_column_sql(self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None) -> List[str]:
# Override because mssql requires multiple columns in a single ADD COLUMN clause
return ["ADD \n" + ",\n".join(self._get_column_def_sql(c, table_format) for c in new_columns)]

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
sc_type = c["data_type"]
if sc_type == "text" and c.get("unique"):
# MSSQL does not allow index on large TEXT columns
db_type = "nvarchar(900)"
db_type = "nvarchar(%i)" % (c.get("precision") or 900)
else:
# TODO remove logging
#logger.info(f'sc_type: {sc_type}')
db_type = self._to_db_type(sc_type)
#logger.info(f'db_type: {db_type}')
db_type = self.type_mapper.to_db_type(c)

hints_str = " ".join(self.active_hints.get(h, "") for h in self.active_hints.keys() if c.get(h, False) is True)
column_name = self.capabilities.escape_identifier(c["name"])
return f"{column_name} {db_type} {hints_str} {self._gen_not_null(c['nullable'])}"

def _create_optimized_replace_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob:
return SynapseStagingCopyJob.from_table_chain(table_chain, self.sql_client)

@classmethod
def _to_db_type(cls, sc_t: TDataType) -> str:
if sc_t == "ntext":
return SCT_TO_PGT["ntext"]
if sc_t == "wei":
return SCT_TO_PGT["decimal"] % cls.capabilities.wei_precision
if sc_t == "decimal":
return SCT_TO_PGT["decimal"] % cls.capabilities.decimal_precision
if sc_t == "wei":
return f"numeric({2*EVM_DECIMAL_PRECISION},{EVM_DECIMAL_PRECISION})"
return SCT_TO_PGT[sc_t]
def _create_replace_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
if self.config.replace_strategy == "staging-optimized":
return [SynapseStagingCopyJob.from_table_chain(table_chain, self.sql_client)]
return super()._create_replace_followup_jobs(table_chain)

@classmethod
def _from_db_type(cls, pq_t: str, precision: Optional[int], scale: Optional[int]) -> TDataType:
if pq_t == "numeric":
if (precision, scale) == cls.capabilities.wei_precision:
return "wei"
return PGT_TO_SCT[pq_t]
def _from_db_type(self, pq_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
return self.type_mapper.from_db_type(pq_t, precision, scale)

0 comments on commit be41558

Please sign in to comment.