Skip to content

Commit

Permalink
set constants once
Browse files Browse the repository at this point in the history
  • Loading branch information
jorritsandbrink committed Sep 19, 2024
1 parent 16e52cd commit c826afc
Showing 1 changed file with 49 additions and 57 deletions.
106 changes: 49 additions & 57 deletions tests/load/pipeline/test_scd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from tests.utils import TPythonTableFormat

get_row_hash = DataItemNormalizer.get_row_hash
FROM, TO = DEFAULT_VALIDITY_COLUMN_NAMES


def get_load_package_created_at(pipeline: dlt.Pipeline, load_info: LoadInfo) -> datetime:
Expand Down Expand Up @@ -233,9 +234,6 @@ def test_child_table(destination_config: DestinationTestConfiguration, simple: b
def r(data):
yield data

# get validity column names
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES

# load 1 — initial load
dim_snap: List[Dict[str, Any]] = [
l1_1 := {"nk": 1, "c1": "foo", "c2": [1] if simple else [{"cc1": 1}]},
Expand All @@ -245,8 +243,8 @@ def r(data):
ts_1 = get_load_package_created_at(p, info)
assert_load_info(info)
assert get_table(p, "dim_test", "c1") == [
{from_: ts_1, to: None, "nk": 2, "c1": "bar"},
{from_: ts_1, to: None, "nk": 1, "c1": "foo"},
{FROM: ts_1, TO: None, "nk": 2, "c1": "bar"},
{FROM: ts_1, TO: None, "nk": 1, "c1": "foo"},
]
cname = "value" if simple else "cc1"
assert get_table(p, "dim_test__c2", cname) == [
Expand All @@ -264,9 +262,9 @@ def r(data):
ts_2 = get_load_package_created_at(p, info)
assert_load_info(info)
assert get_table(p, "dim_test", "c1") == [
{from_: ts_1, to: None, "nk": 2, "c1": "bar"},
{from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, # updated
{from_: ts_2, to: None, "nk": 1, "c1": "foo_updated"}, # new
{FROM: ts_1, TO: None, "nk": 2, "c1": "bar"},
{FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"}, # updated
{FROM: ts_2, TO: None, "nk": 1, "c1": "foo_updated"}, # new
]
assert_records_as_set(
get_table(p, "dim_test__c2"),
Expand All @@ -293,10 +291,10 @@ def r(data):
assert_records_as_set(
get_table(p, "dim_test"),
[
{from_: ts_1, to: None, "nk": 2, "c1": "bar"},
{from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"},
{from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, # updated
{from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, # new
{FROM: ts_1, TO: None, "nk": 2, "c1": "bar"},
{FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"},
{FROM: ts_2, TO: ts_3, "nk": 1, "c1": "foo_updated"}, # updated
{FROM: ts_3, TO: None, "nk": 1, "c1": "foo_updated"}, # new
],
)
exp_3 = [
Expand All @@ -319,10 +317,10 @@ def r(data):
assert_records_as_set(
get_table(p, "dim_test"),
[
{from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"}, # updated
{from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"},
{from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"},
{from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"},
{FROM: ts_1, TO: ts_4, "nk": 2, "c1": "bar"}, # updated
{FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"},
{FROM: ts_2, TO: ts_3, "nk": 1, "c1": "foo_updated"},
{FROM: ts_3, TO: None, "nk": 1, "c1": "foo_updated"},
],
)
assert_records_as_set(
Expand All @@ -340,11 +338,11 @@ def r(data):
assert_records_as_set(
get_table(p, "dim_test"),
[
{from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"},
{from_: ts_5, to: None, "nk": 3, "c1": "baz"}, # new
{from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"},
{from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"},
{from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"},
{FROM: ts_1, TO: ts_4, "nk": 2, "c1": "bar"},
{FROM: ts_5, TO: None, "nk": 3, "c1": "baz"}, # new
{FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo"},
{FROM: ts_2, TO: ts_3, "nk": 1, "c1": "foo_updated"},
{FROM: ts_3, TO: None, "nk": 1, "c1": "foo_updated"},
],
)
assert_records_as_set(
Expand Down Expand Up @@ -497,13 +495,12 @@ def r(data):
ts_3 = get_load_package_created_at(p, info)

# assert parent records
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
r1_no_child = {k: v for k, v in r1.items() if k != "child"}
r2_no_child = {k: v for k, v in r2.items() if k != "child"}
expected = [
{**{from_: ts_1, to: ts_2}, **r1_no_child},
{**{from_: ts_3, to: None}, **r1_no_child},
{**{from_: ts_1, to: None}, **r2_no_child},
{**{FROM: ts_1, TO: ts_2}, **r1_no_child},
{**{FROM: ts_3, TO: None}, **r1_no_child},
{**{FROM: ts_1, TO: None}, **r2_no_child},
]
assert_records_as_set(get_table(p, "dim_test"), expected)

Expand Down Expand Up @@ -631,10 +628,9 @@ def r(data):
info = p.run(r(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 2
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
expected = [
{**{from_: strip_timezone(ts1), to: None}, **l1_1},
{**{from_: strip_timezone(ts1), to: None}, **l1_2},
{**{FROM: strip_timezone(ts1), TO: None}, **l1_1},
{**{FROM: strip_timezone(ts1), TO: None}, **l1_2},
]
assert get_table(p, "dim_test", "nk") == expected

Expand All @@ -655,10 +651,10 @@ def r(data):
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 4
expected = [
{**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_1}, # retired
{**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_2}, # retired
{**{from_: strip_timezone(ts2), to: None}, **l2_1}, # new
{**{from_: strip_timezone(ts2), to: None}, **l2_3}, # new
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # retired
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # retired
{**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # new
{**{FROM: strip_timezone(ts2), TO: None}, **l2_3}, # new
]
assert_records_as_set(get_table(p, "dim_test"), expected)

Expand All @@ -677,10 +673,10 @@ def r(data):
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 4
expected = [
{**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_1}, # unchanged
{**{from_: strip_timezone(ts1), to: strip_timezone(ts2)}, **l1_2}, # unchanged
{**{from_: strip_timezone(ts2), to: None}, **l2_1}, # unchanged
{**{from_: strip_timezone(ts2), to: strip_timezone(ts3)}, **l2_3}, # retired
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # unchanged
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # unchanged
{**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # unchanged
{**{FROM: strip_timezone(ts2), TO: strip_timezone(ts3)}, **l2_3}, # retired
]
assert_records_as_set(get_table(p, "dim_test"), expected)

Expand Down Expand Up @@ -725,9 +721,8 @@ def dim_test(data):
info = p.run(dim_test(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 2
_, to = DEFAULT_VALIDITY_COLUMN_NAMES
# both records should be active (i.e. not retired)
assert [row[to] for row in get_table(p, "dim_test")] == [None, None]
assert [row[TO] for row in get_table(p, "dim_test")] == [None, None]

# load 2 — natural key 2 is absent, natural key 1 is unchanged
dim_snap = [
Expand All @@ -737,7 +732,7 @@ def dim_test(data):
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 2
# both records should still be active
assert [row[to] for row in get_table(p, "dim_test")] == [None, None]
assert [row[TO] for row in get_table(p, "dim_test")] == [None, None]

# load 3 — natural key 2 is absent, natural key 1 has changed
dim_snap = [
Expand All @@ -748,8 +743,8 @@ def dim_test(data):
assert load_table_counts(p, "dim_test")["dim_test"] == 3
ts3 = get_load_package_created_at(p, info)
# natural key 1 should now have two records (one retired, one active)
actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")]
expected = [{"nk": 1, to: ts3}, {"nk": 1, to: None}, {"nk": 2, to: None}]
actual = [{k: v for k, v in row.items() if k in ("nk", TO)} for row in get_table(p, "dim_test")]
expected = [{"nk": 1, TO: ts3}, {"nk": 1, TO: None}, {"nk": 2, TO: None}]
assert_records_as_set(actual, expected) # type: ignore[arg-type]

# load 4 — natural key 2 is absent, natural key 1 has changed back to
Expand All @@ -762,8 +757,8 @@ def dim_test(data):
assert load_table_counts(p, "dim_test")["dim_test"] == 4
ts4 = get_load_package_created_at(p, info)
# natural key 1 should now have three records (two retired, one active)
actual = [{k: v for k, v in row.items() if k in ("nk", to)} for row in get_table(p, "dim_test")]
expected = [{"nk": 1, to: ts3}, {"nk": 1, to: ts4}, {"nk": 1, to: None}, {"nk": 2, to: None}]
actual = [{k: v for k, v in row.items() if k in ("nk", TO)} for row in get_table(p, "dim_test")]
expected = [{"nk": 1, TO: ts3}, {"nk": 1, TO: ts4}, {"nk": 1, TO: None}, {"nk": 2, TO: None}]
assert_records_as_set(actual, expected) # type: ignore[arg-type]

# now test various configs
Expand Down Expand Up @@ -828,8 +823,7 @@ def dim_test_compound(data):
assert_load_info(info)
assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 2
# both records should be active (i.e. not retired)
_, to = DEFAULT_VALIDITY_COLUMN_NAMES
assert [row[to] for row in get_table(p, "dim_test_compound")] == [None, None]
assert [row[TO] for row in get_table(p, "dim_test_compound")] == [None, None]

# load 2 — "Dodo" is absent, "Doe" has changed
dim_snap = [
Expand All @@ -841,13 +835,13 @@ def dim_test_compound(data):
ts3 = get_load_package_created_at(p, info)
# "Doe" should now have two records (one retired, one active)
actual = [
{k: v for k, v in row.items() if k in ("first_name", "last_name", to)}
{k: v for k, v in row.items() if k in ("first_name", "last_name", TO)}
for row in get_table(p, "dim_test_compound")
]
expected = [
{"first_name": first_name, "last_name": "Doe", to: ts3},
{"first_name": first_name, "last_name": "Doe", to: None},
{"first_name": first_name, "last_name": "Dodo", to: None},
{"first_name": first_name, "last_name": "Doe", TO: ts3},
{"first_name": first_name, "last_name": "Doe", TO: None},
{"first_name": first_name, "last_name": "Dodo", TO: None},
]
assert_records_as_set(actual, expected) # type: ignore[arg-type]

Expand Down Expand Up @@ -885,9 +879,8 @@ def _make_scd2_r(table_: Any) -> DltResource:
# make sure we have scd2 columns in schema
table_schema = p.default_schema.get_table("tabular")
assert table_schema["x-merge-strategy"] == "scd2" # type: ignore[typeddict-item]
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
assert table_schema["columns"][from_]["x-valid-from"] # type: ignore[typeddict-item]
assert table_schema["columns"][to]["x-valid-to"] # type: ignore[typeddict-item]
assert table_schema["columns"][FROM]["x-valid-from"] # type: ignore[typeddict-item]
assert table_schema["columns"][TO]["x-valid-to"] # type: ignore[typeddict-item]
assert table_schema["columns"]["row_hash"]["x-row-version"] # type: ignore[typeddict-item]
# 100 items in destination
assert load_table_counts(p, "tabular")["tabular"] == 100
Expand Down Expand Up @@ -951,13 +944,12 @@ def r(data):
ts_2 = get_load_package_created_at(p, info)

# assert load results
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
assert get_table(p, "dim_test", "c1") == [
{from_: ts_1, to: ts_2, "nk": 2, "c1": "bar", "row_hash": "mocked_hash_2"},
{from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "row_hash": "mocked_hash_1"},
{FROM: ts_1, TO: ts_2, "nk": 2, "c1": "bar", "row_hash": "mocked_hash_2"},
{FROM: ts_1, TO: ts_2, "nk": 1, "c1": "foo", "row_hash": "mocked_hash_1"},
{
from_: ts_2,
to: None,
FROM: ts_2,
TO: None,
"nk": 1,
"c1": "foo_upd",
"row_hash": "mocked_hash_1_upd",
Expand Down

0 comments on commit c826afc

Please sign in to comment.