Skip to content

Commit

Permalink
Use BIGNUMERIC for large decimals in bigquery (#984)
Browse files Browse the repository at this point in the history
* Use BIGNUMERIC for large decimals in bigquery

* Handle dest without decimal type
  • Loading branch information
steinitzu authored Feb 21, 2024
1 parent b4e5272 commit eaf3c05
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 2 deletions.
8 changes: 7 additions & 1 deletion dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class BigQueryTypeMapper(TypeMapper):
sct_to_dbt = {
"text": "STRING(%i)",
"binary": "BYTES(%i)",
"decimal": "NUMERIC(%i,%i)",
}

dbt_to_sct = {
Expand All @@ -79,6 +78,13 @@ class BigQueryTypeMapper(TypeMapper):
"TIME": "time",
}

def to_db_decimal_type(self, precision: Optional[int], scale: Optional[int]) -> str:
# Use BigQuery's BIGNUMERIC for large precision decimals
precision, scale = self.decimal_precision(precision, scale)
if precision > 38 or scale > 9:
return "BIGNUMERIC(%i,%i)" % (precision, scale)
return "NUMERIC(%i,%i)" % (precision, scale)

# noinspection PyTypeChecker,PydanticTypeChecker
def from_db_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
Expand Down
8 changes: 8 additions & 0 deletions dlt/destinations/type_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ def to_db_time_type(self, precision: Optional[int], table_format: TTableFormat =
# Override in subclass if db supports other time types (e.g. with different time resolutions)
return None

def to_db_decimal_type(self, precision: Optional[int], scale: Optional[int]) -> str:
precision_tup = self.decimal_precision(precision, scale)
if not precision_tup or "decimal" not in self.sct_to_dbt:
return self.sct_to_unbound_dbt["decimal"]
return self.sct_to_dbt["decimal"] % (precision_tup[0], precision_tup[1])

def to_db_type(self, column: TColumnSchema, table_format: TTableFormat = None) -> str:
precision, scale = column.get("precision"), column.get("scale")
sc_t = column["data_type"]
Expand All @@ -45,6 +51,8 @@ def to_db_type(self, column: TColumnSchema, table_format: TTableFormat = None) -
db_t = self.to_db_datetime_type(precision, table_format)
elif sc_t == "time":
db_t = self.to_db_time_type(precision, table_format)
elif sc_t == "decimal":
db_t = self.to_db_decimal_type(precision, scale)
else:
db_t = None
if db_t:
Expand Down
21 changes: 20 additions & 1 deletion tests/load/bigquery/test_bigquery_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,24 @@ def gcp_client(schema: Schema) -> BigQueryClient:

def test_create_table(gcp_client: BigQueryClient) -> None:
# non existing table
sql = gcp_client._get_table_update_sql("event_test_table", TABLE_UPDATE, False)[0]
# Add BIGNUMERIC column
table_update = TABLE_UPDATE + [
{
"name": "col_high_p_decimal",
"data_type": "decimal",
"precision": 76,
"scale": 0,
"nullable": False,
},
{
"name": "col_high_s_decimal",
"data_type": "decimal",
"precision": 38,
"scale": 24,
"nullable": False,
},
]
sql = gcp_client._get_table_update_sql("event_test_table", table_update, False)[0]
sqlfluff.parse(sql, dialect="bigquery")
assert sql.startswith("CREATE TABLE")
assert "event_test_table" in sql
Expand All @@ -92,6 +109,8 @@ def test_create_table(gcp_client: BigQueryClient) -> None:
assert "`col6_precision` NUMERIC(6,2) NOT NULL" in sql
assert "`col7_precision` BYTES(19)" in sql
assert "`col11_precision` TIME NOT NULL" in sql
assert "`col_high_p_decimal` BIGNUMERIC(76,0) NOT NULL" in sql
assert "`col_high_s_decimal` BIGNUMERIC(38,24) NOT NULL" in sql
assert "CLUSTER BY" not in sql
assert "PARTITION BY" not in sql

Expand Down
38 changes: 38 additions & 0 deletions tests/load/pipeline/test_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest

from dlt.common import Decimal

from tests.pipeline.utils import assert_load_info
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration
from tests.load.utils import delete_dataset


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
ids=lambda x: x.name,
)
def test_bigquery_numeric_types(destination_config: DestinationTestConfiguration) -> None:
pipeline = destination_config.setup_pipeline("test_bigquery_numeric_types")

columns = [
{"name": "col_big_numeric", "data_type": "decimal", "precision": 47, "scale": 9},
{"name": "col_numeric", "data_type": "decimal", "precision": 38, "scale": 9},
]

data = [
{
# Valid BIGNUMERIC and NUMERIC values
"col_big_numeric": Decimal("12345678901234567890123456789012345678.123456789"),
"col_numeric": Decimal("12345678901234567890123456789.123456789"),
},
]

info = pipeline.run(iter(data), table_name="big_numeric", columns=columns) # type: ignore[arg-type]
assert_load_info(info)

with pipeline.sql_client() as client:
with client.execute_query("SELECT col_big_numeric, col_numeric FROM big_numeric;") as q:
row = q.fetchone()
assert row[0] == data[0]["col_big_numeric"]
assert row[1] == data[0]["col_numeric"]

0 comments on commit eaf3c05

Please sign in to comment.