Skip to content

Commit

Permalink
rewrite natural key presence check
Browse files Browse the repository at this point in the history
  • Loading branch information
jorritsandbrink committed Sep 19, 2024
1 parent a0aa99c commit 65838e4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
6 changes: 6 additions & 0 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ def gen_delete_temp_table_sql(
sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""")
return sql, temp_table_name

@classmethod
def gen_concat_sql(cls, columns: Sequence[str]) -> str:
# Athena requires explicit casting
columns = [f"CAST({c} AS VARCHAR)" for c in columns]
return f"CONCAT({', '.join(columns)})"

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
return True
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def gen_key_table_clauses(

@classmethod
def gen_update_table_prefix(cls, table_name: str) -> str:
return f"ALTER TABLE {table_name} UPDATE AS d"
return f"ALTER TABLE {table_name} UPDATE"

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
Expand Down
13 changes: 10 additions & 3 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ def gen_delete_from_sql(
);
"""

@classmethod
def gen_concat_sql(cls, columns: Sequence[str]) -> str:
return f"CONCAT({', '.join(columns)})"

@classmethod
def _shorten_table_name(cls, ident: str, sql_client: SqlClientBase[Any]) -> str:
"""Trims identifier to max length supported by sql_client. Used for dynamically constructed table names"""
Expand Down Expand Up @@ -367,7 +371,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:

@classmethod
def gen_update_table_prefix(cls, table_name: str) -> str:
return f"UPDATE {table_name} AS d SET"
return f"UPDATE {table_name} SET"

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
Expand Down Expand Up @@ -777,8 +781,11 @@ def gen_scd2_sql(
get_columns_names_with_prop(root_table, "merge_key"),
escape_column_id,
)
keys_equal = cls._gen_key_table_clauses([], merge_keys)[0].format(d="d", s="s")
nk_present = f"EXISTS (SELECT 1 FROM {staging_root_table_name} AS s WHERE {keys_equal})"
if len(merge_keys) == 1:
nk = merge_keys[0]
else:
nk = cls.gen_concat_sql(merge_keys) # compound key
nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})"
retire_sql += f" AND {nk_present};"
sql.append(retire_sql)

Expand Down
37 changes: 28 additions & 9 deletions tests/load/pipeline/test_scd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dlt.common.typing import TAnyDateTime
from dlt.common.pendulum import pendulum
from dlt.common.pipeline import LoadInfo
from dlt.common.data_types.typing import TDataType
from dlt.common.schema.typing import DEFAULT_VALIDITY_COLUMN_NAMES
from dlt.common.normalizers.json.relational import DataItemNormalizer
from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention
Expand Down Expand Up @@ -809,7 +810,19 @@ def dim_test(data):
)
assert dim_test.compute_table_schema()["x-retire-absent-rows"] # type: ignore[typeddict-item]

# test compound `merge_key`

@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, supports_merge=True),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("key_type", ("text", "bigint"))
def test_retire_absent_rows_compound_key(
destination_config: DestinationTestConfiguration,
key_type: TDataType,
) -> None:
p = destination_config.setup_pipeline("abstract", dev_mode=True)

@dlt.resource(
merge_key=["first_name", "last_name"],
Expand All @@ -822,34 +835,40 @@ def dim_test(data):
def dim_test_compound(data):
yield data

# vary `first_name` type to test mixed compound `merge_key`
if key_type == "text":
first_name = "John"
elif key_type == "bigint":
first_name = 1 # type: ignore[assignment]
# load 1 — initial load
dim_snap = [
{"first_name": "John", "last_name": "Doe", "age": 20},
{"first_name": "John", "last_name": "Dodo", "age": 20},
{"first_name": first_name, "last_name": "Doe", "age": 20},
{"first_name": first_name, "last_name": "Dodo", "age": 20},
]
info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 2
# both records should be active (i.e. not retired)
_, to = DEFAULT_VALIDITY_COLUMN_NAMES
assert [row[to] for row in get_table(p, "dim_test_compound")] == [None, None]

# load 2 — natural key "John" + "Dodo" is absent, natural key "John" + "Doe" has changed
# load 2 — "Dodo" is absent, "Doe" has changed
dim_snap = [
{"first_name": "John", "last_name": "Doe", "age": 30},
{"first_name": first_name, "last_name": "Doe", "age": 30},
]
info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs)
assert_load_info(info)
assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 3
ts3 = get_load_package_created_at(p, info)
# natural key "John" + "Doe" should now have two records (one retired, one active)
# "Doe" should now have two records (one retired, one active)
actual = [
{k: v for k, v in row.items() if k in ("first_name", "last_name", to)}
for row in get_table(p, "dim_test_compound")
]
expected = [
{"first_name": "John", "last_name": "Doe", to: ts3},
{"first_name": "John", "last_name": "Doe", to: None},
{"first_name": "John", "last_name": "Dodo", to: None},
{"first_name": first_name, "last_name": "Doe", to: ts3},
{"first_name": first_name, "last_name": "Doe", to: None},
{"first_name": first_name, "last_name": "Dodo", to: None},
]
assert_records_as_set(actual, expected) # type: ignore[arg-type]

Expand Down

0 comments on commit 65838e4

Please sign in to comment.