Skip to content

Commit

Permalink
Arrow fix
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Dec 19, 2024
1 parent 66a54af commit 39ac90f
Showing 1 changed file with 46 additions and 43 deletions.
89 changes: 46 additions & 43 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,56 +411,59 @@ def _write_item(

def _compute_table(
self, resource: DltResource, items: TDataItems, meta: Any
) -> TPartialTableSchema:
arrow_table: TTableSchema = None
) -> List[TPartialTableSchema]:
# arrow_table: TTableSchema = None
arrow_tables: Dict[str, TTableSchema] = {}

# several arrow tables will update the pipeline schema and we want that earlier
# arrow tables override the latter so the resultant schema is the same as if
# they are sent separately
for item in reversed(items):
computed_table = super()._compute_table(resource, item, Any)
# Merge the columns to include primary_key and other hints that may be set on the resource
if arrow_table:
utils.merge_table(self.schema.name, computed_table, arrow_table)
else:
arrow_table = copy(computed_table)
arrow_table["columns"] = pyarrow.py_arrow_to_table_schema_columns(item.schema)

# Add load_id column if needed
dlt_load_id = self.naming.normalize_identifier(C_DLT_LOAD_ID)
if self._normalize_config.add_dlt_load_id and dlt_load_id not in arrow_table["columns"]:
# will be normalized line below
arrow_table["columns"][C_DLT_LOAD_ID] = utils.dlt_load_id_column()

# normalize arrow table before merging
arrow_table = utils.normalize_table_identifiers(arrow_table, self.schema.naming)
# issue warnings when overriding computed with arrow
override_warn: bool = False
for col_name, column in arrow_table["columns"].items():
if src_column := computed_table["columns"].get(col_name):
for hint_name, hint in column.items():
if (src_hint := src_column.get(hint_name)) is not None:
if src_hint != hint:
override_warn = True
logger.info(
f"In resource: {resource.name}, when merging arrow schema on"
f" column {col_name}. The hint {hint_name} value"
f" {src_hint} defined in resource will overwrite arrow hint"
f" with value {hint}."
)
if override_warn:
logger.warning(
f"In resource: {resource.name}, when merging arrow schema with dlt schema,"
" several column hints were different. dlt schema hints were kept and arrow"
" schema and data were unmodified. It is up to destination to coerce the"
" differences when loading. Change log level to INFO for more details."
computed_tables = super()._compute_table(resource, item, Any)
for computed_table in computed_tables:
arrow_table = arrow_tables.get(computed_table['name'])
# Merge the columns to include primary_key and other hints that may be set on the resource
if arrow_table:
utils.merge_table(self.schema.name, computed_table, arrow_table)
else:
arrow_table = arrow_tables[computed_table['name']] = copy(computed_table)
arrow_table["columns"] = pyarrow.py_arrow_to_table_schema_columns(item.schema)

# Add load_id column if needed
dlt_load_id = self.naming.normalize_identifier(C_DLT_LOAD_ID)
if self._normalize_config.add_dlt_load_id and dlt_load_id not in arrow_table["columns"]:
# will be normalized line below
arrow_table["columns"][C_DLT_LOAD_ID] = utils.dlt_load_id_column()

# normalize arrow table before merging
arrow_table = utils.normalize_table_identifiers(arrow_table, self.schema.naming)
# issue warnings when overriding computed with arrow
override_warn: bool = False
for col_name, column in arrow_table["columns"].items():
if src_column := computed_table["columns"].get(col_name):
for hint_name, hint in column.items():
if (src_hint := src_column.get(hint_name)) is not None:
if src_hint != hint:
override_warn = True
logger.info(
f"In resource: {resource.name}, when merging arrow schema on"
f" column {col_name}. The hint {hint_name} value"
f" {src_hint} defined in resource will overwrite arrow hint"
f" with value {hint}."
)
if override_warn:
logger.warning(
f"In resource: {resource.name}, when merging arrow schema with dlt schema,"
" several column hints were different. dlt schema hints were kept and arrow"
" schema and data were unmodified. It is up to destination to coerce the"
" differences when loading. Change log level to INFO for more details."
)

utils.merge_columns(
arrow_table["columns"], computed_table["columns"], merge_columns=True
)

utils.merge_columns(
arrow_table["columns"], computed_table["columns"], merge_columns=True
)

return arrow_table
return list(arrow_tables.values())

def _compute_and_update_table(
self, resource: DltResource, table_name: str, items: TDataItems, meta: Any
Expand Down

0 comments on commit 39ac90f

Please sign in to comment.