From 3f0712713bdee909c19378052cd75b8e5acf5d7e Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 6 Sep 2023 09:44:50 +0200 Subject: [PATCH] better tests --- dlt/common/schema/utils.py | 5 +- tests/load/test_freeze_schema.py | 172 ++++++++++++++++++++++++------- 2 files changed, 134 insertions(+), 43 deletions(-) diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 21c0142453..81332976cd 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -412,9 +412,8 @@ def merge_tables(table: TTableSchema, partial_table: TPartialTableSchema) -> TPa if table.get('parent') is None and (resource := partial_table.get('resource')): table['resource'] = resource - partial_e_s = partial_table.get("schema_evolution_settings") - if partial_e_s: - table["schema_evolution_settings"] = partial_e_s + # always update evolution settings + table["schema_evolution_settings"] = partial_table.get("schema_evolution_settings") return diff_table diff --git a/tests/load/test_freeze_schema.py b/tests/load/test_freeze_schema.py index e998f399dc..9d917cdaf1 100644 --- a/tests/load/test_freeze_schema.py +++ b/tests/load/test_freeze_schema.py @@ -11,7 +11,13 @@ from dlt.common.schema.exceptions import SchemaFrozenException from dlt.common.schema import utils +from tests.utils import skip_if_not_active + +skip_if_not_active("duckdb") + SCHEMA_EVOLUTION_SETTINGS = ["evolve", "freeze-and-trim", "freeze-and-discard", "freeze-and-raise"] +LOCATIONS = ["source", "resource", "override"] +SCHEMA_ELEMENTS = ["table", "column", "column_variant"] def items(settings: TSchemaEvolutionSettings) -> Any: @@ -89,37 +95,50 @@ def load_items(): NEW_ITEMS_TABLE = "new_items" -def run_resource(pipeline, resource_fun, settings, settings_location: str) -> DltSource: +def run_resource(pipeline, resource_fun, settings) -> DltSource: + + for item in settings.keys(): + assert item in LOCATIONS + ev_settings = settings[item] + if ev_settings in SCHEMA_EVOLUTION_SETTINGS: + continue + for key, val in ev_settings.items(): + assert val in SCHEMA_EVOLUTION_SETTINGS + assert key in SCHEMA_ELEMENTS - @dlt.source(name="freeze_tests", schema_evolution_settings=settings if settings_location == "source" else None) + @dlt.source(name="freeze_tests", schema_evolution_settings=settings.get("source")) def source() -> DltResource: - return resource_fun(settings if settings_location == "resource" else None) + return resource_fun(settings.get("resource")) - pipeline.run(source(), schema_evolution_settings=settings if settings_location == "global_override" else None) + # run pipeline + pipeline.run(source(), schema_evolution_settings=settings.get("override")) + # check updated schema + # assert pipeline.default_schema._settings["schema_evolution_settings"] == (settings if settings_location == "source" else None) + + # check items table settings + # assert pipeline.default_schema.tables["items"]["schema_evolution_settings"] == (settings if settings_location == "resource" else None) @pytest.mark.parametrize("evolution_setting", SCHEMA_EVOLUTION_SETTINGS) -@pytest.mark.parametrize("setting_location", ["resource", "source", "global_override"]) +@pytest.mark.parametrize("setting_location", LOCATIONS) def test_freeze_new_tables(evolution_setting: str, setting_location: str) -> None: full_settings = { + setting_location: { "table": evolution_setting - } + }} pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:'), full_refresh=True) - run_resource(pipeline, items, full_settings, setting_location) + run_resource(pipeline, items, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 10 assert OLD_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"] - # assert pipeline.default_schema.tables["items"]["schema_evolution_settings"] == { - # "table": evolution_setting - # } - run_resource(pipeline, items_with_new_column, full_settings, setting_location) + run_resource(pipeline, items_with_new_column, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 20 assert NEW_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"] - run_resource(pipeline, items_with_variant, full_settings, setting_location) + run_resource(pipeline, items_with_variant, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 30 assert VARIANT_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"] @@ -127,10 +146,10 @@ def test_freeze_new_tables(evolution_setting: str, setting_location: str) -> Non # test adding new subtable if evolution_setting == "freeze-and-raise": with pytest.raises(PipelineStepFailed) as py_ex: - run_resource(pipeline, items_with_subtable, full_settings, setting_location) + run_resource(pipeline, items_with_subtable, full_settings) assert isinstance(py_ex.value.__context__, SchemaFrozenException) else: - run_resource(pipeline, items_with_subtable, full_settings, setting_location) + run_resource(pipeline, items_with_subtable, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 30 if evolution_setting in ["freeze-and-raise"] else 40 @@ -139,38 +158,37 @@ def test_freeze_new_tables(evolution_setting: str, setting_location: str) -> Non # test adding new table if evolution_setting == "freeze-and-raise": with pytest.raises(PipelineStepFailed) as py_ex: - run_resource(pipeline, new_items, full_settings, setting_location) + run_resource(pipeline, new_items, full_settings) assert isinstance(py_ex.value.__context__, SchemaFrozenException) else: - run_resource(pipeline, new_items, full_settings, setting_location) + run_resource(pipeline, new_items, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts.get("new_items", 0) == (10 if evolution_setting in ["evolve"] else 0) @pytest.mark.parametrize("evolution_setting", SCHEMA_EVOLUTION_SETTINGS) -@pytest.mark.parametrize("setting_location", ["resource", "source", "global_override"]) +@pytest.mark.parametrize("setting_location", LOCATIONS) def test_freeze_new_columns(evolution_setting: str, setting_location: str) -> None: full_settings = { + setting_location: { "column": evolution_setting - } + }} + pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:')) - run_resource(pipeline, items, full_settings, setting_location) + run_resource(pipeline, items, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 10 assert OLD_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"] - # assert pipeline.default_schema.tables["items"]["schema_evolution_settings"] == { - # "column": evolution_setting - # } # subtable should work - run_resource(pipeline, items_with_subtable, full_settings, setting_location) + run_resource(pipeline, items_with_subtable, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 20 assert table_counts[SUBITEMS_TABLE] == 10 # new should work - run_resource(pipeline, new_items, full_settings, setting_location) + run_resource(pipeline, new_items, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 20 assert table_counts[NEW_ITEMS_TABLE] == 10 @@ -178,10 +196,10 @@ def test_freeze_new_columns(evolution_setting: str, setting_location: str) -> No # test adding new column if evolution_setting == "freeze-and-raise": with pytest.raises(PipelineStepFailed) as py_ex: - run_resource(pipeline, items_with_new_column, full_settings, setting_location) + run_resource(pipeline, items_with_new_column, full_settings) assert isinstance(py_ex.value.__context__, SchemaFrozenException) else: - run_resource(pipeline, items_with_new_column, full_settings, setting_location) + run_resource(pipeline, items_with_new_column, full_settings) if evolution_setting == "evolve": assert NEW_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"] @@ -190,14 +208,13 @@ def test_freeze_new_columns(evolution_setting: str, setting_location: str) -> No table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == (30 if evolution_setting in ["evolve", "freeze-and-trim"] else 20) - # test adding variant column if evolution_setting == "freeze-and-raise": with pytest.raises(PipelineStepFailed) as py_ex: - run_resource(pipeline, items_with_variant, full_settings, setting_location) + run_resource(pipeline, items_with_variant, full_settings) assert isinstance(py_ex.value.__context__, SchemaFrozenException) else: - run_resource(pipeline, items_with_variant, full_settings, setting_location) + run_resource(pipeline, items_with_variant, full_settings) if evolution_setting == "evolve": assert VARIANT_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"] @@ -208,35 +225,33 @@ def test_freeze_new_columns(evolution_setting: str, setting_location: str) -> No @pytest.mark.parametrize("evolution_setting", SCHEMA_EVOLUTION_SETTINGS) -@pytest.mark.parametrize("setting_location", ["resource", "source", "global_override"]) +@pytest.mark.parametrize("setting_location", LOCATIONS) def test_freeze_variants(evolution_setting: str, setting_location: str) -> None: full_settings = { + setting_location: { "column_variant": evolution_setting - } + }} pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:')) - run_resource(pipeline, items, full_settings, setting_location) + run_resource(pipeline, items, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 10 assert OLD_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"] - # assert pipeline.default_schema.tables["items"]["schema_evolution_settings"] == { - # "column_variant": evolution_setting - # } # subtable should work - run_resource(pipeline, items_with_subtable, full_settings, setting_location) + run_resource(pipeline, items_with_subtable, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 20 assert table_counts[SUBITEMS_TABLE] == 10 # new should work - run_resource(pipeline, new_items, full_settings, setting_location) + run_resource(pipeline, new_items, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 20 assert table_counts[NEW_ITEMS_TABLE] == 10 # test adding new column - run_resource(pipeline, items_with_new_column, full_settings, setting_location) + run_resource(pipeline, items_with_new_column, full_settings) table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == 30 assert NEW_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"] @@ -244,10 +259,10 @@ def test_freeze_variants(evolution_setting: str, setting_location: str) -> None: # test adding variant column if evolution_setting == "freeze-and-raise": with pytest.raises(PipelineStepFailed) as py_ex: - run_resource(pipeline, items_with_variant, full_settings, setting_location) + run_resource(pipeline, items_with_variant, full_settings) assert isinstance(py_ex.value.__context__, SchemaFrozenException) else: - run_resource(pipeline, items_with_variant, full_settings, setting_location) + run_resource(pipeline, items_with_variant, full_settings) if evolution_setting == "evolve": assert VARIANT_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"] @@ -256,3 +271,80 @@ def test_freeze_variants(evolution_setting: str, setting_location: str) -> None: table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) assert table_counts["items"] == (40 if evolution_setting in ["evolve", "freeze-and-trim"] else 30) + +def test_settings_precedence() -> None: + pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:')) + + # load some data + run_resource(pipeline, items, {}) + + # trying to add new column when forbidden on resource will fail + run_resource(pipeline, items_with_new_column, {"resource": { + "column": "freeze-and-discard" + }}) + + # when allowed on override it will work + run_resource(pipeline, items_with_new_column, { + "resource": {"column": "freeze-and-raise"}, + "override": {"column": "evolve"} + }) + + +def test_settings_precedence_2() -> None: + pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:')) + + # load some data + run_resource(pipeline, items, {"source": { + "column_variant": "freeze-and-discard" + }}) + table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) + assert table_counts["items"] == 10 + + # trying to add variant when forbidden on source will fail + run_resource(pipeline, items_with_variant, {"source": { + "column_variant": "freeze-and-discard" + }}) + table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) + assert table_counts["items"] == 10 + + # if allowed on resource it will pass + run_resource(pipeline, items_with_variant, { + "resource": {"column_variant": "evolve"}, + "source": {"column_variant": "freeze-and-discard"} + }) + table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) + assert table_counts["items"] == 20 + + # if allowed on override it will also pass + run_resource(pipeline, items_with_variant, { + "resource": {"column_variant": "freeze-and-discard"}, + "source": {"column_variant": "freeze-and-discard"}, + "override": {"column_variant": "evolve"}, + }) + table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) + assert table_counts["items"] == 30 + +@pytest.mark.parametrize("setting_location", LOCATIONS) +def test_change_mode(setting_location: str) -> None: + pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:')) + + # load some data + run_resource(pipeline, items, {}) + table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) + assert table_counts["items"] == 10 + + # trying to add variant when forbidden on source will fail + run_resource(pipeline, items_with_variant, {setting_location: { + "column_variant": "freeze-and-discard" + }}) + table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) + assert table_counts["items"] == 10 + + # now allow + run_resource(pipeline, items_with_variant, {setting_location: { + "column_variant": "evolve" + }}) + table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) + assert table_counts["items"] == 20 + +