Skip to content

Commit

Permalink
Init from_db_type
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Sep 21, 2023
1 parent 4dd6184 commit d513d2c
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 232 deletions.
11 changes: 7 additions & 4 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@
WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))


class TColumnSchemaBase(TypedDict, total=False):
class TColumnType(TypedDict, total=False):
data_type: Optional[TDataType]
precision: Optional[int]
scale: Optional[int]


class TColumnSchemaBase(TColumnType, total=False):
"""TypedDict that defines basic properties of a column: name, data type and nullable"""
name: Optional[str]
data_type: Optional[TDataType]
nullable: Optional[bool]


Expand All @@ -53,8 +58,6 @@ class TColumnSchema(TColumnSchemaBase, total=False):
root_key: Optional[bool]
merge_key: Optional[bool]
variant: Optional[bool]
precision: Optional[int]
scale: Optional[int]


TTableSchemaColumns = Dict[str, TColumnSchema]
Expand Down
42 changes: 21 additions & 21 deletions dlt/destinations/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dlt.common import logger
from dlt.common.data_types import TDataType
from dlt.common.schema import TColumnSchema, Schema
from dlt.common.schema.typing import TTableSchema
from dlt.common.schema.typing import TTableSchema, TColumnType
from dlt.common.schema.utils import table_schema_has_type
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import LoadJob
Expand All @@ -35,20 +35,6 @@
from dlt.destinations import path_utils



HIVET_TO_SCT: Dict[str, TDataType] = {
"varchar": "text",
"double": "double",
"boolean": "bool",
"date": "date",
"timestamp": "timestamp",
"bigint": "bigint",
"binary": "binary",
"varbinary": "binary",
"decimal": "decimal",
}


class AthenaTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
"complex": "string",
Expand All @@ -67,6 +53,24 @@ class AthenaTypeMapper(TypeMapper):
"wei": "decimal(%i,%i)"
}

dbt_to_sct = {
"varchar": "text",
"double": "double",
"boolean": "bool",
"date": "date",
"timestamp": "timestamp",
"bigint": "bigint",
"binary": "binary",
"varbinary": "binary",
"decimal": "decimal",
}

def from_db_type(self, db_type: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
for key, val in self.dbt_to_sct.items():
if db_type.startswith(key):
return dict(data_type=val, precision=precision, scale=scale)
return dict(data_type=None)


# add a formatter for pendulum to be used by pyathen dbapi
def _format_pendulum_datetime(formatter: Formatter, escaper: Callable[[str], str], val: Any) -> Any:
Expand Down Expand Up @@ -280,12 +284,8 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
# never truncate tables in athena
super().initialize_storage([])

@classmethod
def _from_db_type(cls, hive_t: str, precision: Optional[int], scale: Optional[int]) -> TDataType:
for key, val in HIVET_TO_SCT.items():
if hive_t.startswith(key):
return val
return None
def _from_db_type(self, hive_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
return self.type_mapper.from_db_type(hive_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema) -> str:
return f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c)}"
Expand Down
69 changes: 27 additions & 42 deletions dlt/destinations/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dlt.common.data_types import TDataType
from dlt.common.storages.file_storage import FileStorage
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns
from dlt.common.schema.typing import TTableSchema
from dlt.common.schema.typing import TTableSchema, TColumnType

from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate, DestinationTransientException, LoadJobNotExistsException, LoadJobTerminalException, LoadJobUnknownTableException
Expand All @@ -26,32 +26,8 @@

from dlt.common.schema.utils import table_schema_has_type

SCT_TO_BQT: Dict[TDataType, str] = {
"complex": "JSON",
"text": "STRING",
"double": "FLOAT64",
"bool": "BOOLEAN",
"date": "DATE",
"timestamp": "TIMESTAMP",
"bigint": "INTEGER",
"binary": "BYTES",
"decimal": "NUMERIC(%i,%i)",
"wei": "BIGNUMERIC", # non parametrized should hold wei values
"time": "TIME",
}

BQT_TO_SCT: Dict[str, TDataType] = {
"STRING": "text",
"FLOAT": "double",
"BOOLEAN": "bool",
"DATE": "date",
"TIMESTAMP": "timestamp",
"INTEGER": "bigint",
"BYTES": "binary",
"NUMERIC": "decimal",
"BIGNUMERIC": "decimal",
"JSON": "complex",
"TIME": "time",

}


Expand All @@ -75,6 +51,27 @@ class BigQueryTypeMapper(TypeMapper):
"decimal": "NUMERIC(%i,%i)",
}

dbt_to_sct = {
"STRING": "text",
"FLOAT": "double",
"BOOLEAN": "bool",
"DATE": "date",
"TIMESTAMP": "timestamp",
"INTEGER": "bigint",
"BYTES": "binary",
"NUMERIC": "decimal",
"BIGNUMERIC": "decimal",
"JSON": "complex",
"TIME": "time",
}

def from_db_type(self, db_type: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
if db_type == "BIGNUMERIC":
if precision is None: # biggest numeric possible
return dict(data_type="wei")
return super().from_db_type(db_type, precision, scale)


class BigQueryLoadJob(LoadJob, FollowupJob):
def __init__(
self,
Expand Down Expand Up @@ -267,13 +264,13 @@ def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]
schema_c: TColumnSchema = {
"name": c.name,
"nullable": c.is_nullable,
"data_type": self._from_db_type(c.field_type, c.precision, c.scale),
"unique": False,
"sort": False,
"primary_key": False,
"foreign_key": False,
"cluster": c.name in (table.clustering_fields or []),
"partition": c.name == partition_field
"partition": c.name == partition_field,
**self._from_db_type(c.field_type, c.precision, c.scale) # type: ignore[misc]
}
schema_table[c.name] = schema_c
return True, schema_table
Expand Down Expand Up @@ -334,17 +331,5 @@ def _retrieve_load_job(self, file_path: str) -> bigquery.LoadJob:
job_id = BigQueryLoadJob.get_job_id_from_file_path(file_path)
return cast(bigquery.LoadJob, self.sql_client.native_connection.get_job(job_id))

# @classmethod
# def _to_db_type(cls, sc_t: TDataType) -> str:
# if sc_t == "decimal":
# return SCT_TO_BQT["decimal"] % cls.capabilities.decimal_precision
# return SCT_TO_BQT[sc_t]

@classmethod
def _from_db_type(cls, bq_t: str, precision: Optional[int], scale: Optional[int]) -> TDataType:
if bq_t == "BIGNUMERIC":
if precision is None: # biggest numeric possible
return "wei"
return BQT_TO_SCT.get(bq_t, "text")


def _from_db_type(self, bq_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
return self.type_mapper.from_db_type(bq_t, precision, scale)
55 changes: 25 additions & 30 deletions dlt/destinations/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlt.common.data_types import TDataType
from dlt.common.schema import TColumnSchema, TColumnHint, Schema
from dlt.common.destination.reference import LoadJob, FollowupJob, TLoadJobState
from dlt.common.schema.typing import TTableSchema
from dlt.common.schema.typing import TTableSchema, TColumnType
from dlt.common.storages.file_storage import FileStorage
from dlt.common.utils import maybe_context

Expand All @@ -17,19 +17,6 @@
from dlt.destinations.type_mapping import TypeMapper


# SCT_TO_PGT: Dict[TDataType, str] = {
# "complex": "JSON",
# "text": "VARCHAR",
# "double": "DOUBLE",
# "bool": "BOOLEAN",
# "date": "DATE",
# "timestamp": "TIMESTAMP WITH TIME ZONE",
# "bigint": "BIGINT",
# "binary": "BLOB",
# "decimal": "DECIMAL(%i,%i)",
# "time": "TIME"
# }

PGT_TO_SCT: Dict[str, TDataType] = {
"VARCHAR": "text",
"JSON": "complex",
Expand Down Expand Up @@ -73,6 +60,28 @@ class DuckDbTypeMapper(TypeMapper):
"wei": "DECIMAL(%i,%i)",
}

dbt_to_sct = {
"VARCHAR": "text",
"JSON": "complex",
"DOUBLE": "double",
"BOOLEAN": "bool",
"DATE": "date",
"TIMESTAMP WITH TIME ZONE": "timestamp",
"BIGINT": "bigint",
"BLOB": "binary",
"DECIMAL": "decimal",
"TIME": "time"
}

def from_db_type(self, db_type: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
# duckdb provides the types with scale and precision
db_type = db_type.split("(")[0].upper()
if db_type == "DECIMAL":
if precision == 38 and scale == 0:
return dict(data_type="wei", precision=precision, scale=scale)
return super().from_db_type(db_type, precision, scale)


class DuckDbCopyJob(LoadJob, FollowupJob):
def __init__(self, table_name: str, file_path: str, sql_client: DuckDbSqlClient) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))
Expand Down Expand Up @@ -130,19 +139,5 @@ def _get_column_def_sql(self, c: TColumnSchema) -> str:
column_name = self.capabilities.escape_identifier(c["name"])
return f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}"

# @classmethod
# def _to_db_type(cls, sc_t: TDataType) -> str:
# if sc_t == "wei":
# return SCT_TO_PGT["decimal"] % cls.capabilities.wei_precision
# if sc_t == "decimal":
# return SCT_TO_PGT["decimal"] % cls.capabilities.decimal_precision
# return SCT_TO_PGT[sc_t]

@classmethod
def _from_db_type(cls, pq_t: str, precision: Optional[int], scale: Optional[int]) -> TDataType:
# duckdb provides the types with scale and precision
pq_t = pq_t.split("(")[0].upper()
if pq_t == "DECIMAL":
if precision == 38 and scale == 0:
return "wei"
return PGT_TO_SCT[pq_t]
def _from_db_type(self, pq_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
return self.type_mapper.from_db_type(pq_t, precision, scale)
7 changes: 3 additions & 4 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from dlt.common import json, pendulum, logger
from dlt.common.data_types import TDataType
from dlt.common.schema.typing import COLUMN_HINTS, TColumnSchemaBase, TTableSchema, TWriteDisposition
from dlt.common.schema.typing import COLUMN_HINTS, TColumnType, TColumnSchemaBase, TTableSchema, TWriteDisposition
from dlt.common.storages import FileStorage
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns, TSchemaTables
from dlt.common.destination.reference import StateInfo, StorageSchemaInfo,WithStateSync, DestinationClientConfiguration, DestinationClientDwhConfiguration, DestinationClientDwhWithStagingConfiguration, NewLoadJob, WithStagingDataset, TLoadJobState, LoadJob, JobClientBase, FollowupJob, CredentialsConfiguration
Expand Down Expand Up @@ -242,14 +242,13 @@ def _null_to_bool(v: str) -> bool:
schema_c: TColumnSchemaBase = {
"name": c[0],
"nullable": _null_to_bool(c[2]),
"data_type": self._from_db_type(c[1], numeric_precision, numeric_scale),
**self._from_db_type(c[1], numeric_precision, numeric_scale), # type: ignore[misc]
}
schema_table[c[0]] = schema_c # type: ignore
return True, schema_table

@classmethod
@abstractmethod
def _from_db_type(cls, db_type: str, precision: Optional[int], scale: Optional[int]) -> TDataType:
def _from_db_type(self, db_type: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
pass

def get_stored_schema(self) -> StorageSchemaInfo:
Expand Down
41 changes: 10 additions & 31 deletions dlt/destinations/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.data_types import TDataType
from dlt.common.schema import TColumnSchema, TColumnHint, Schema
from dlt.common.schema.typing import TTableSchema
from dlt.common.schema.typing import TTableSchema, TColumnType
from dlt.common.utils import uniq_id

from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlMergeJob
Expand All @@ -19,19 +19,6 @@
from dlt.destinations.type_mapping import TypeMapper


# SCT_TO_PGT: Dict[TDataType, str] = {
# "complex": "nvarchar(max)",
# "text": "nvarchar(max)",
# "double": "float",
# "bool": "bit",
# "timestamp": "datetimeoffset",
# "date": "date",
# "bigint": "bigint",
# "binary": "varbinary(max)",
# "decimal": "decimal(%i,%i)",
# "time": "time"
# }

PGT_TO_SCT: Dict[str, TDataType] = {
"nvarchar": "text",
"float": "double",
Expand Down Expand Up @@ -70,6 +57,12 @@ class MsSqlTypeMapper(TypeMapper):
"wei": "decimal(%i,%i)"
}

def from_db_type(self, db_type: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
if db_type == "numeric":
if (precision, scale) == self.capabilities.wei_precision:
return dict(data_type="wei", precision=precision, scale=scale)
return super().from_db_type(db_type, precision, scale)


class MsSqlStagingCopyJob(SqlStagingCopyJob):

Expand Down Expand Up @@ -108,6 +101,7 @@ def _new_temp_table_name(cls, name_prefix: str) -> str:
name = SqlMergeJob._new_temp_table_name(name_prefix)
return '#' + name


class MsSqlClient(InsertValuesJobClient):

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
Expand Down Expand Up @@ -145,20 +139,5 @@ def _get_column_def_sql(self, c: TColumnSchema) -> str:
def _create_optimized_replace_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob:
return MsSqlStagingCopyJob.from_table_chain(table_chain, self.sql_client)

# @classmethod
# def _to_db_type(cls, sc_t: TDataType) -> str:
# if sc_t == "wei":
# return SCT_TO_PGT["decimal"] % cls.capabilities.wei_precision
# if sc_t == "decimal":
# return SCT_TO_PGT["decimal"] % cls.capabilities.decimal_precision

# if sc_t == "wei":
# return f"numeric({2*EVM_DECIMAL_PRECISION},{EVM_DECIMAL_PRECISION})"
# return SCT_TO_PGT[sc_t]

@classmethod
def _from_db_type(cls, pq_t: str, precision: Optional[int], scale: Optional[int]) -> TDataType:
if pq_t == "numeric":
if (precision, scale) == cls.capabilities.wei_precision:
return "wei"
return PGT_TO_SCT[pq_t]
def _from_db_type(self, pq_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
return self.type_mapper.from_db_type(pq_t, precision, scale)
Loading

0 comments on commit d513d2c

Please sign in to comment.