Skip to content

Commit

Permalink
Automatically reduce ts precision to max destination precision
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Oct 25, 2023
1 parent 90fbf24 commit 087f6fa
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
26 changes: 23 additions & 3 deletions dlt/normalize/items_normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,29 @@ def _write_with_dlt_columns(
)
return [schema_update], items_count

def _fix_schema_precisions(self, root_table_name: str) -> List[TSchemaUpdate]:
"""Reduce precision of timestamp columns if needed, according to destination caps"""
schema = self.schema
table = schema.tables[root_table_name]
max_precision = self.config.destination_capabilities.timestamp_precision

new_cols: TTableSchemaColumns = {}
for key, column in table["columns"].items():
if column["data_type"] in ("timestamp", "time"):
if (prec := column.get("precision")) and prec > max_precision:
new_cols[key] = dict(column, precision=max_precision) # type: ignore[assignment]

if not new_cols:
return []
return [{root_table_name: [schema.update_table({
"name": root_table_name,
"columns": new_cols
})]}]

def __call__(
self, extracted_items_file: str, root_table_name: str
self, extracted_items_file: str, root_table_name: str
) -> Tuple[List[TSchemaUpdate], int, TRowCount]:
base_schema_update = self._fix_schema_precisions(root_table_name)
import pyarrow as pa

add_dlt_id = self.config.parquet_normalizer_config.add_dlt_id
Expand All @@ -176,7 +196,7 @@ def __call__(
add_dlt_load_id,
add_dlt_id
)
return schema_update, items_count, {root_table_name: items_count}
return base_schema_update + schema_update, items_count, {root_table_name: items_count}

from dlt.common.libs.pyarrow import get_row_count
with self.normalize_storage.storage.open_file(extracted_items_file, "rb") as f:
Expand All @@ -188,4 +208,4 @@ def __call__(
self.normalize_storage.storage.make_full_path(extracted_items_file),
os.path.join(target_folder, new_file_name)
)
return [], items_count, {root_table_name: items_count}
return base_schema_update, items_count, {root_table_name: items_count}
3 changes: 2 additions & 1 deletion tests/load/pipeline/test_arrow_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def some_data():
if include_time:
assert some_table_columns["time"]["data_type"] == "time"

rows = [list(row) for row in select_data(pipeline, "SELECT * FROM some_data ORDER BY 1")]
qual_name = pipeline.sql_client().make_qualified_table_name("some_data")
rows = [list(row) for row in select_data(pipeline, f"SELECT * FROM {qual_name} ORDER BY 1")]

for row in rows:
for i in range(len(row)):
Expand Down

0 comments on commit 087f6fa

Please sign in to comment.