Skip to content

Commit

Permalink
replace natural_key with merge_key
Browse files Browse the repository at this point in the history
  • Loading branch information
jorritsandbrink committed Sep 19, 2024
1 parent 7d1aad9 commit a0aa99c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 31 deletions.
2 changes: 1 addition & 1 deletion dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def gen_key_table_clauses(

@classmethod
def gen_update_table_prefix(cls, table_name: str) -> str:
return f"ALTER TABLE {table_name} UPDATE"
return f"ALTER TABLE {table_name} UPDATE AS d"

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
Expand Down
11 changes: 8 additions & 3 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:

@classmethod
def gen_update_table_prefix(cls, table_name: str) -> str:
return f"UPDATE {table_name} SET"
return f"UPDATE {table_name} AS d SET"

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
Expand Down Expand Up @@ -772,8 +772,13 @@ def gen_scd2_sql(
retire_absent_rows = root_table.get("x-retire-absent-rows", True)
if not retire_absent_rows:
retire_sql = retire_sql.rstrip()[:-1] # remove semicolon
nk = escape_column_id(get_first_column_name_with_prop(root_table, "x-natural-key"))
nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})"
# merge keys act as natural key
merge_keys = cls._escape_list(
get_columns_names_with_prop(root_table, "merge_key"),
escape_column_id,
)
keys_equal = cls._gen_key_table_clauses([], merge_keys)[0].format(d="d", s="s")
nk_present = f"EXISTS (SELECT 1 FROM {staging_root_table_name} AS s WHERE {keys_equal})"
retire_sql += f" AND {nk_present};"
sql.append(retire_sql)

Expand Down
9 changes: 5 additions & 4 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def _set_hints(
self, hints_template: TResourceHints, create_table_variant: bool = False
) -> None:
DltResourceHints.validate_dynamic_hints(hints_template)
DltResourceHints.validate_write_disposition_hint(hints_template.get("write_disposition"))
DltResourceHints.validate_write_disposition_hint(hints_template)
if create_table_variant:
table_name: str = hints_template["name"] # type: ignore[assignment]
# incremental cannot be specified in variant
Expand Down Expand Up @@ -524,7 +524,8 @@ def validate_dynamic_hints(template: TResourceHints) -> None:
)

@staticmethod
def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConfig]) -> None:
def validate_write_disposition_hint(template: TResourceHints) -> None:
wd = template.get("write_disposition")
if isinstance(wd, dict) and wd["disposition"] == "merge":
wd = cast(TMergeDispositionDict, wd)
if "strategy" in wd and wd["strategy"] not in MERGE_STRATEGIES:
Expand Down Expand Up @@ -552,6 +553,6 @@ def validate_write_disposition_hint(wd: TTableHintTemplate[TWriteDispositionConf
if (
"retire_absent_rows" in wd
and not wd["retire_absent_rows"]
and "natural_key" not in wd
and template.get("merge_key") is None
):
raise ValueError("`natural_key` is required when `retire_absent_rows=False`")
raise ValueError("`merge_key` is required when `retire_absent_rows=False`")
75 changes: 52 additions & 23 deletions tests/load/pipeline/test_scd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,10 @@ def r(data):
)


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
destinations_configs(default_sql_configs=True, supports_merge=True),
ids=lambda x: x.name,
)
def test_retire_absent_rows(
Expand All @@ -726,23 +727,22 @@ def test_retire_absent_rows(
p = destination_config.setup_pipeline("abstract", dev_mode=True)

@dlt.resource(
table_name="dim_test",
merge_key="nk",
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_absent_rows": False,
"natural_key": "nk",
},
)
def r(data):
def dim_test(data):
yield data

# load 1 — initial load
dim_snap = [
{"nk": 1, "foo": "foo"},
{"nk": 2, "foo": "foo"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
info = p.run(dim_test(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 2
_, to = DEFAULT_VALIDITY_COLUMN_NAMES
Expand All @@ -753,7 +753,7 @@ def r(data):
dim_snap = [
{"nk": 1, "foo": "foo"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
info = p.run(dim_test(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 2
# both records should still be active
Expand All @@ -763,7 +763,7 @@ def r(data):
dim_snap = [
{"nk": 1, "foo": "bar"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
info = p.run(dim_test(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 3
ts3 = get_load_package_created_at(p, info)
Expand All @@ -777,7 +777,7 @@ def r(data):
dim_snap = [
{"nk": 1, "foo": "foo"},
]
info = p.run(r(dim_snap), **destination_config.run_kwargs)
info = p.run(dim_test(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test")["dim_test"] == 4
ts4 = get_load_package_created_at(p, info)
Expand All @@ -789,40 +789,69 @@ def r(data):
# now test various configs

with pytest.raises(ValueError):
# should raise because `natural_key` is required when `retire_absent_rows=False`
r.apply_hints(
# should raise because `merge_key` is required when `retire_absent_rows=False`
dim_test.apply_hints(
merge_key="",
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_absent_rows": False,
}
},
)

# `retire_absent_rows=True` does not require `natural_key`
r.apply_hints(
# `retire_absent_rows=True` does not require `merge_key`
dim_test.apply_hints(
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_absent_rows": True,
}
)
assert r.compute_table_schema()["x-retire-absent-rows"] # type: ignore[typeddict-item]
assert dim_test.compute_table_schema()["x-retire-absent-rows"] # type: ignore[typeddict-item]

# user-provided hints for `natural_key` column should be respected
r.apply_hints(
columns={"nk": {"x-foo": "foo"}}, # type: ignore[typeddict-unknown-key]
# test compound `merge_key`

@dlt.resource(
merge_key=["first_name", "last_name"],
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"retire_absent_rows": False,
"natural_key": "nk",
},
)
assert r.compute_table_schema()["columns"]["nk"] == {
"x-foo": "foo",
"name": "nk",
"x-natural-key": True,
}
def dim_test_compound(data):
yield data

# load 1 — initial load
dim_snap = [
{"first_name": "John", "last_name": "Doe", "age": 20},
{"first_name": "John", "last_name": "Dodo", "age": 20},
]
info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 2
# both records should be active (i.e. not retired)
assert [row[to] for row in get_table(p, "dim_test_compound")] == [None, None]

# load 2 — natural key "John" + "Dodo" is absent, natural key "John" + "Doe" has changed
dim_snap = [
{"first_name": "John", "last_name": "Doe", "age": 30},
]
info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 3
ts3 = get_load_package_created_at(p, info)
# natural key "John" + "Doe" should now have two records (one retired, one active)
actual = [
{k: v for k, v in row.items() if k in ("first_name", "last_name", to)}
for row in get_table(p, "dim_test_compound")
]
expected = [
{"first_name": "John", "last_name": "Doe", to: ts3},
{"first_name": "John", "last_name": "Doe", to: None},
{"first_name": "John", "last_name": "Dodo", to: None},
]
assert_records_as_set(actual, expected) # type: ignore[arg-type]


@pytest.mark.parametrize(
Expand Down

0 comments on commit a0aa99c

Please sign in to comment.