Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Aug 30, 2023
1 parent 7a4b6fd commit 2471cdb
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 22 deletions.
6 changes: 6 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ def tables(self) -> TSchemaTables:
@property
def settings(self) -> TSchemaSettings:
return self._settings

@property
def has_data_columns(self) -> bool:
for table in self.data_tables():
return True
return False

def to_pretty_json(self, remove_defaults: bool = True) -> str:
d = self.to_dict(remove_defaults=remove_defaults)
Expand Down
2 changes: 1 addition & 1 deletion dlt/normalize/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlt.common.runners.configuration import PoolRunnerConfiguration, TPoolType
from dlt.common.storages import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration

TSchemaUpdateMode = Literal["update-schema", "freeze-and-discard", "freeze-and-raise"]
TSchemaUpdateMode = Literal["update-schema", "freeze-and-filter", "freeze-and-raise", "freeze-and-discard"]


@configspec
Expand Down
20 changes: 13 additions & 7 deletions dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def w_normalize_files(
load_storage = LoadStorage(False, destination_caps.preferred_loader_file_format, LoadStorage.ALL_SUPPORTED_FILE_FORMATS, loader_storage_config)
normalize_storage = NormalizeStorage(False, normalize_storage_config)

schema_has_columns = schema.has_data_columns

try:
root_tables: Set[str] = set()
populated_root_tables: Set[str] = set()
Expand All @@ -95,7 +97,7 @@ def w_normalize_files(
items_count = 0
for line_no, line in enumerate(f):
items: List[TDataItem] = json.loads(line)
partial_update, items_count = Normalize._w_normalize_chunk(normalize_config, load_storage, schema, load_id, root_table_name, items)
partial_update, items_count = Normalize._w_normalize_chunk(normalize_config, load_storage, schema, load_id, root_table_name, items, schema_has_columns)
schema_updates.append(partial_update)
total_items += items_count
logger.debug(f"Processed {line_no} items from file {extracted_items_file}, items {items_count} of total {total_items}")
Expand All @@ -121,7 +123,7 @@ def w_normalize_files(
return schema_updates, total_items, load_storage.closed_files()

@staticmethod
def _w_normalize_chunk(config: NormalizeConfiguration, load_storage: LoadStorage, schema: Schema, load_id: str, root_table_name: str, items: List[TDataItem]) -> Tuple[TSchemaUpdate, int]:
def _w_normalize_chunk(config: NormalizeConfiguration, load_storage: LoadStorage, schema: Schema, load_id: str, root_table_name: str, items: List[TDataItem], schema_has_columns: bool) -> Tuple[TSchemaUpdate, int]:
column_schemas: Dict[str, TTableSchemaColumns] = {} # quick access to column schema for writers below
schema_update: TSchemaUpdate = {}
schema_name = schema.name
Expand All @@ -138,19 +140,23 @@ def _w_normalize_chunk(config: NormalizeConfiguration, load_storage: LoadStorage
row[k] = custom_pua_decode(v) # type: ignore
# coerce row of values into schema table, generating partial table with new columns if any
row, partial_table = schema.coerce_row(table_name, parent_table, row)

# if there is a schema update and we froze schema and discaro additional data, clean up
if partial_table and config.schema_update_mode == "freeze-and-discard":
# if there is a schema update and we froze schema and filter additional data, clean up
if schema_has_columns and partial_table and config.schema_update_mode == "freeze-and-filter":
# do not create new tables
if table_name not in schema.tables:
if table_name not in schema.tables or not len(schema.tables[table_name].get("columns", {})):
continue
# pop unknown values
for item in list(row.keys()):
if item not in schema.tables[table_name]["columns"]:
row.pop(item)

# if there is a schema update and we froze schema and discard additional rows, just continue
elif schema_has_columns and partial_table and config.schema_update_mode == "freeze-and-filter":
continue

# if there is a schema update and we disallow any data not fitting the schema, raise!
elif partial_table and config.schema_update_mode == "freeze-and-raise":
elif schema_has_columns and partial_table and config.schema_update_mode == "freeze-and-raise":
raise SchemaFrozenException(f"Trying to modify table {table_name} but schema is frozen.")

# theres a new table or new columns in existing table
Expand Down
39 changes: 25 additions & 14 deletions tests/load/test_freeze_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
from dlt.normalize.exceptions import SchemaFrozenException
from dlt.common.schema import utils

SCHEMA_UPDATE_MODES = ["update-schema", "freeze-and-filter", "freeze-and-raise", "freeze-and-discard"]

@pytest.mark.parametrize("destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]), ids=lambda x: x.name)
def test_freeze_schema(destination_config: DestinationTestConfiguration) -> None:
@pytest.mark.parametrize("update_mode", SCHEMA_UPDATE_MODES)
def test_freeze_schema(update_mode: str, destination_config: DestinationTestConfiguration) -> None:

pipeline = destination_config.setup_pipeline("test_freeze_schema", dataset_name="freeze" + uniq_id())
# freeze pipeline, drop additional values
# this will allow for the first run to create the schema, but will not accept further updates after that
os.environ['NORMALIZE__SCHEMA_UPDATE_MODE'] = update_mode
pipeline = destination_config.setup_pipeline("test_freeze_schema_2", dataset_name="freeze" + uniq_id())

@dlt.resource(name="items", write_disposition="append")
def load_items():
Expand Down Expand Up @@ -44,24 +50,29 @@ def load_items_with_subitems():
assert table_counts["items"] == 10
schema_hash = utils.generate_version_hash(pipeline.default_schema.to_dict())

# freeze pipeline, drop additional values
os.environ['NORMALIZE__SCHEMA_UPDATE_MODE'] = "freeze-and-discard"
pipeline.run([load_items_with_subitems], loader_file_format=destination_config.file_format)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
# check schema has not changed
assert schema_hash == utils.generate_version_hash(pipeline.default_schema.to_dict())

# on freeze and raise we expect an exception
if update_mode == "freeze-and-raise":
with pytest.raises(PipelineStepFailed) as py_ex:
pipeline.run([load_items_with_subitems], loader_file_format=destination_config.file_format)
assert isinstance(py_ex.value.__context__, SchemaFrozenException)
else:
pipeline.run([load_items_with_subitems], loader_file_format=destination_config.file_format)



# check schema has not changed for frozen modes
if update_mode != "update-schema":
assert schema_hash == utils.generate_version_hash(pipeline.default_schema.to_dict())

return

# check data
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20
assert "items__sub_items" not in table_counts
# schema was not migrated to contain new subtable
assert "items__sub_items" not in pipeline.default_schema.tables
# schema was not migrated to contain new attribute
assert "new_attribute" not in pipeline.default_schema.tables["items"]["columns"]

# now raise on migration
os.environ['NORMALIZE__SCHEMA_UPDATE_MODE'] = "freeze-and-raise"
with pytest.raises(PipelineStepFailed) as py_ex:
pipeline.run([load_items_with_subitems], loader_file_format=destination_config.file_format)
assert isinstance(py_ex.value.__context__, SchemaFrozenException)

0 comments on commit 2471cdb

Please sign in to comment.