Skip to content

Commit

Permalink
better tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Sep 6, 2023
1 parent e05855c commit 3f07127
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 43 deletions.
5 changes: 2 additions & 3 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,8 @@ def merge_tables(table: TTableSchema, partial_table: TPartialTableSchema) -> TPa
if table.get('parent') is None and (resource := partial_table.get('resource')):
table['resource'] = resource

partial_e_s = partial_table.get("schema_evolution_settings")
if partial_e_s:
table["schema_evolution_settings"] = partial_e_s
# always update evolution settings
table["schema_evolution_settings"] = partial_table.get("schema_evolution_settings")

return diff_table

Expand Down
172 changes: 132 additions & 40 deletions tests/load/test_freeze_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
from dlt.common.schema.exceptions import SchemaFrozenException
from dlt.common.schema import utils

from tests.utils import skip_if_not_active

skip_if_not_active("duckdb")

SCHEMA_EVOLUTION_SETTINGS = ["evolve", "freeze-and-trim", "freeze-and-discard", "freeze-and-raise"]
LOCATIONS = ["source", "resource", "override"]
SCHEMA_ELEMENTS = ["table", "column", "column_variant"]

def items(settings: TSchemaEvolutionSettings) -> Any:

Expand Down Expand Up @@ -89,48 +95,61 @@ def load_items():
NEW_ITEMS_TABLE = "new_items"


def run_resource(pipeline, resource_fun, settings, settings_location: str) -> DltSource:
def run_resource(pipeline, resource_fun, settings) -> DltSource:

for item in settings.keys():
assert item in LOCATIONS
ev_settings = settings[item]
if ev_settings in SCHEMA_EVOLUTION_SETTINGS:
continue
for key, val in ev_settings.items():
assert val in SCHEMA_EVOLUTION_SETTINGS
assert key in SCHEMA_ELEMENTS

@dlt.source(name="freeze_tests", schema_evolution_settings=settings if settings_location == "source" else None)
@dlt.source(name="freeze_tests", schema_evolution_settings=settings.get("source"))
def source() -> DltResource:
return resource_fun(settings if settings_location == "resource" else None)
return resource_fun(settings.get("resource"))

pipeline.run(source(), schema_evolution_settings=settings if settings_location == "global_override" else None)
# run pipeline
pipeline.run(source(), schema_evolution_settings=settings.get("override"))

# check updated schema
# assert pipeline.default_schema._settings["schema_evolution_settings"] == (settings if settings_location == "source" else None)

# check items table settings
# assert pipeline.default_schema.tables["items"]["schema_evolution_settings"] == (settings if settings_location == "resource" else None)

@pytest.mark.parametrize("evolution_setting", SCHEMA_EVOLUTION_SETTINGS)
@pytest.mark.parametrize("setting_location", ["resource", "source", "global_override"])
@pytest.mark.parametrize("setting_location", LOCATIONS)
def test_freeze_new_tables(evolution_setting: str, setting_location: str) -> None:

full_settings = {
setting_location: {
"table": evolution_setting
}
}}
pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:'), full_refresh=True)
run_resource(pipeline, items, full_settings, setting_location)
run_resource(pipeline, items, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 10
assert OLD_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"]
# assert pipeline.default_schema.tables["items"]["schema_evolution_settings"] == {
# "table": evolution_setting
# }

run_resource(pipeline, items_with_new_column, full_settings, setting_location)
run_resource(pipeline, items_with_new_column, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20
assert NEW_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"]

run_resource(pipeline, items_with_variant, full_settings, setting_location)
run_resource(pipeline, items_with_variant, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 30
assert VARIANT_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"]

# test adding new subtable
if evolution_setting == "freeze-and-raise":
with pytest.raises(PipelineStepFailed) as py_ex:
run_resource(pipeline, items_with_subtable, full_settings, setting_location)
run_resource(pipeline, items_with_subtable, full_settings)
assert isinstance(py_ex.value.__context__, SchemaFrozenException)
else:
run_resource(pipeline, items_with_subtable, full_settings, setting_location)
run_resource(pipeline, items_with_subtable, full_settings)

table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 30 if evolution_setting in ["freeze-and-raise"] else 40
Expand All @@ -139,49 +158,48 @@ def test_freeze_new_tables(evolution_setting: str, setting_location: str) -> Non
# test adding new table
if evolution_setting == "freeze-and-raise":
with pytest.raises(PipelineStepFailed) as py_ex:
run_resource(pipeline, new_items, full_settings, setting_location)
run_resource(pipeline, new_items, full_settings)
assert isinstance(py_ex.value.__context__, SchemaFrozenException)
else:
run_resource(pipeline, new_items, full_settings, setting_location)
run_resource(pipeline, new_items, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts.get("new_items", 0) == (10 if evolution_setting in ["evolve"] else 0)


@pytest.mark.parametrize("evolution_setting", SCHEMA_EVOLUTION_SETTINGS)
@pytest.mark.parametrize("setting_location", ["resource", "source", "global_override"])
@pytest.mark.parametrize("setting_location", LOCATIONS)
def test_freeze_new_columns(evolution_setting: str, setting_location: str) -> None:

full_settings = {
setting_location: {
"column": evolution_setting
}
}}

pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:'))
run_resource(pipeline, items, full_settings, setting_location)
run_resource(pipeline, items, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 10
assert OLD_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"]
# assert pipeline.default_schema.tables["items"]["schema_evolution_settings"] == {
# "column": evolution_setting
# }

# subtable should work
run_resource(pipeline, items_with_subtable, full_settings, setting_location)
run_resource(pipeline, items_with_subtable, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20
assert table_counts[SUBITEMS_TABLE] == 10

# new should work
run_resource(pipeline, new_items, full_settings, setting_location)
run_resource(pipeline, new_items, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20
assert table_counts[NEW_ITEMS_TABLE] == 10

# test adding new column
if evolution_setting == "freeze-and-raise":
with pytest.raises(PipelineStepFailed) as py_ex:
run_resource(pipeline, items_with_new_column, full_settings, setting_location)
run_resource(pipeline, items_with_new_column, full_settings)
assert isinstance(py_ex.value.__context__, SchemaFrozenException)
else:
run_resource(pipeline, items_with_new_column, full_settings, setting_location)
run_resource(pipeline, items_with_new_column, full_settings)

if evolution_setting == "evolve":
assert NEW_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"]
Expand All @@ -190,14 +208,13 @@ def test_freeze_new_columns(evolution_setting: str, setting_location: str) -> No
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == (30 if evolution_setting in ["evolve", "freeze-and-trim"] else 20)


# test adding variant column
if evolution_setting == "freeze-and-raise":
with pytest.raises(PipelineStepFailed) as py_ex:
run_resource(pipeline, items_with_variant, full_settings, setting_location)
run_resource(pipeline, items_with_variant, full_settings)
assert isinstance(py_ex.value.__context__, SchemaFrozenException)
else:
run_resource(pipeline, items_with_variant, full_settings, setting_location)
run_resource(pipeline, items_with_variant, full_settings)

if evolution_setting == "evolve":
assert VARIANT_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"]
Expand All @@ -208,46 +225,44 @@ def test_freeze_new_columns(evolution_setting: str, setting_location: str) -> No


@pytest.mark.parametrize("evolution_setting", SCHEMA_EVOLUTION_SETTINGS)
@pytest.mark.parametrize("setting_location", ["resource", "source", "global_override"])
@pytest.mark.parametrize("setting_location", LOCATIONS)
def test_freeze_variants(evolution_setting: str, setting_location: str) -> None:

full_settings = {
setting_location: {
"column_variant": evolution_setting
}
}}
pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:'))
run_resource(pipeline, items, full_settings, setting_location)
run_resource(pipeline, items, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 10
assert OLD_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"]
# assert pipeline.default_schema.tables["items"]["schema_evolution_settings"] == {
# "column_variant": evolution_setting
# }

# subtable should work
run_resource(pipeline, items_with_subtable, full_settings, setting_location)
run_resource(pipeline, items_with_subtable, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20
assert table_counts[SUBITEMS_TABLE] == 10

# new should work
run_resource(pipeline, new_items, full_settings, setting_location)
run_resource(pipeline, new_items, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20
assert table_counts[NEW_ITEMS_TABLE] == 10

# test adding new column
run_resource(pipeline, items_with_new_column, full_settings, setting_location)
run_resource(pipeline, items_with_new_column, full_settings)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 30
assert NEW_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"]

# test adding variant column
if evolution_setting == "freeze-and-raise":
with pytest.raises(PipelineStepFailed) as py_ex:
run_resource(pipeline, items_with_variant, full_settings, setting_location)
run_resource(pipeline, items_with_variant, full_settings)
assert isinstance(py_ex.value.__context__, SchemaFrozenException)
else:
run_resource(pipeline, items_with_variant, full_settings, setting_location)
run_resource(pipeline, items_with_variant, full_settings)

if evolution_setting == "evolve":
assert VARIANT_COLUMN_NAME in pipeline.default_schema.tables["items"]["columns"]
Expand All @@ -256,3 +271,80 @@ def test_freeze_variants(evolution_setting: str, setting_location: str) -> None:
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == (40 if evolution_setting in ["evolve", "freeze-and-trim"] else 30)


def test_settings_precedence() -> None:
pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:'))

# load some data
run_resource(pipeline, items, {})

# trying to add new column when forbidden on resource will fail
run_resource(pipeline, items_with_new_column, {"resource": {
"column": "freeze-and-discard"
}})

# when allowed on override it will work
run_resource(pipeline, items_with_new_column, {
"resource": {"column": "freeze-and-raise"},
"override": {"column": "evolve"}
})


def test_settings_precedence_2() -> None:
pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:'))

# load some data
run_resource(pipeline, items, {"source": {
"column_variant": "freeze-and-discard"
}})
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 10

# trying to add variant when forbidden on source will fail
run_resource(pipeline, items_with_variant, {"source": {
"column_variant": "freeze-and-discard"
}})
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 10

# if allowed on resource it will pass
run_resource(pipeline, items_with_variant, {
"resource": {"column_variant": "evolve"},
"source": {"column_variant": "freeze-and-discard"}
})
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20

# if allowed on override it will also pass
run_resource(pipeline, items_with_variant, {
"resource": {"column_variant": "freeze-and-discard"},
"source": {"column_variant": "freeze-and-discard"},
"override": {"column_variant": "evolve"},
})
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 30

@pytest.mark.parametrize("setting_location", LOCATIONS)
def test_change_mode(setting_location: str) -> None:
pipeline = dlt.pipeline(pipeline_name=uniq_id(), destination='duckdb', credentials=duckdb.connect(':memory:'))

# load some data
run_resource(pipeline, items, {})
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 10

# trying to add variant when forbidden on source will fail
run_resource(pipeline, items_with_variant, {setting_location: {
"column_variant": "freeze-and-discard"
}})
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 10

# now allow
run_resource(pipeline, items_with_variant, {setting_location: {
"column_variant": "evolve"
}})
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
assert table_counts["items"] == 20


0 comments on commit 3f07127

Please sign in to comment.