Skip to content

Commit

Permalink
creates a table with just root name to cache item normalizers properly
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Jun 5, 2024
1 parent e33af63 commit cdeefd2
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ def w_normalize_files(
) -> TWorkerRV:
destination_caps = config.destination_capabilities
schema_updates: List[TSchemaUpdate] = []
# normalizers are cached per table name
item_normalizers: Dict[str, ItemsNormalizer] = {}
# Use default storage if parquet is not supported to make normalizer fallback to read rows from the file

preferred_file_format = (
destination_caps.preferred_loader_file_format
or destination_caps.preferred_staging_file_format
Expand All @@ -129,9 +130,7 @@ def w_normalize_files(
def _get_items_normalizer(
item_format: TDataItemFormat, table_schema: Optional[TTableSchema]
) -> ItemsNormalizer:
table_schema = {} if table_schema is None else table_schema
table_name = table_schema.get("name")

table_name = table_schema["name"]
if table_name in item_normalizers:
return item_normalizers[table_name]

Expand Down Expand Up @@ -256,7 +255,7 @@ def _gather_metrics_and_close(
root_tables.add(root_table_name)
normalizer = _get_items_normalizer(
DataWriter.item_format_from_file_extension(parsed_file_name.file_format),
stored_schema["tables"].get(root_table_name),
stored_schema["tables"].get(root_table_name, {"name": root_table_name}),
)
logger.debug(
f"Processing extracted items in {extracted_items_file} in load_id"
Expand Down

0 comments on commit cdeefd2

Please sign in to comment.