Skip to content

Commit

Permalink
add new schema update mode
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Aug 30, 2023
1 parent 2471cdb commit f9f96b1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
2 changes: 1 addition & 1 deletion dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def _w_normalize_chunk(config: NormalizeConfiguration, load_storage: LoadStorage
row.pop(item)

# if there is a schema update and we froze schema and discard additional rows, just continue
elif schema_has_columns and partial_table and config.schema_update_mode == "freeze-and-filter":
elif schema_has_columns and partial_table and config.schema_update_mode == "freeze-and-discard":
continue

# if there is a schema update and we disallow any data not fitting the schema, raise!
Expand Down
26 changes: 12 additions & 14 deletions tests/load/test_freeze_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def load_items_with_subitems():
assert table_counts["items"] == 10
schema_hash = utils.generate_version_hash(pipeline.default_schema.to_dict())


# on freeze and raise we expect an exception
if update_mode == "freeze-and-raise":
with pytest.raises(PipelineStepFailed) as py_ex:
Expand All @@ -59,20 +58,19 @@ def load_items_with_subitems():
else:
pipeline.run([load_items_with_subitems], loader_file_format=destination_config.file_format)

# check data
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20 if update_mode not in ["freeze-and-raise", "freeze-and-discard"] else 10


# check schema has not changed for frozen modes
# frozen schemas should not have changed
if update_mode != "update-schema":
assert schema_hash == utils.generate_version_hash(pipeline.default_schema.to_dict())

return

# check data
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20
assert "items__sub_items" not in table_counts
# schema was not migrated to contain new subtable
assert "items__sub_items" not in pipeline.default_schema.tables
# schema was not migrated to contain new attribute
assert "new_attribute" not in pipeline.default_schema.tables["items"]["columns"]
assert "items__sub_items" not in table_counts
# schema was not migrated to contain new attribute
assert "new_attribute" not in pipeline.default_schema.tables["items"]["columns"]
# regular mode evolves the schema
else:
assert table_counts["items__sub_items"] == 20
# schema was not migrated to contain new attribute
assert "new_attribute" in pipeline.default_schema.tables["items"]["columns"]

0 comments on commit f9f96b1

Please sign in to comment.