From 65838e498100f47c260d866b0b3f1f97bd3cb9bb Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Thu, 19 Sep 2024 17:17:04 +0400 Subject: [PATCH] rewrite natural key presence check --- dlt/destinations/impl/athena/athena.py | 6 +++ .../impl/clickhouse/clickhouse.py | 2 +- dlt/destinations/sql_jobs.py | 13 +++++-- tests/load/pipeline/test_scd2.py | 37 ++++++++++++++----- 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 04078dd510..72611a9568 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -149,6 +149,12 @@ def gen_delete_temp_table_sql( sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""") return sql, temp_table_name + @classmethod + def gen_concat_sql(cls, columns: Sequence[str]) -> str: + # Athena requires explicit casting + columns = [f"CAST({c} AS VARCHAR)" for c in columns] + return f"CONCAT({', '.join(columns)})" + @classmethod def requires_temp_table_for_delete(cls) -> bool: return True diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 04273919c6..b6f23ee221 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -195,7 +195,7 @@ def gen_key_table_clauses( @classmethod def gen_update_table_prefix(cls, table_name: str) -> str: - return f"ALTER TABLE {table_name} UPDATE AS d" + return f"ALTER TABLE {table_name} UPDATE" @classmethod def requires_temp_table_for_delete(cls) -> bool: diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 7f4622e406..d40726b18e 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -339,6 +339,10 @@ def gen_delete_from_sql( ); """ + @classmethod + def gen_concat_sql(cls, columns: Sequence[str]) -> str: + return f"CONCAT({', '.join(columns)})" + @classmethod def _shorten_table_name(cls, ident: str, sql_client: SqlClientBase[Any]) -> str: """Trims identifier to max length supported by sql_client. Used for dynamically constructed table names""" @@ -367,7 +371,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: @classmethod def gen_update_table_prefix(cls, table_name: str) -> str: - return f"UPDATE {table_name} AS d SET" + return f"UPDATE {table_name} SET" @classmethod def requires_temp_table_for_delete(cls) -> bool: @@ -777,8 +781,11 @@ def gen_scd2_sql( get_columns_names_with_prop(root_table, "merge_key"), escape_column_id, ) - keys_equal = cls._gen_key_table_clauses([], merge_keys)[0].format(d="d", s="s") - nk_present = f"EXISTS (SELECT 1 FROM {staging_root_table_name} AS s WHERE {keys_equal})" + if len(merge_keys) == 1: + nk = merge_keys[0] + else: + nk = cls.gen_concat_sql(merge_keys) # compound key + nk_present = f"{nk} IN (SELECT {nk} FROM {staging_root_table_name})" retire_sql += f" AND {nk_present};" sql.append(retire_sql) diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 8948c21653..37f3267f9a 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -9,6 +9,7 @@ from dlt.common.typing import TAnyDateTime from dlt.common.pendulum import pendulum from dlt.common.pipeline import LoadInfo +from dlt.common.data_types.typing import TDataType from dlt.common.schema.typing import DEFAULT_VALIDITY_COLUMN_NAMES from dlt.common.normalizers.json.relational import DataItemNormalizer from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention @@ -809,7 +810,19 @@ def dim_test(data): ) assert dim_test.compute_table_schema()["x-retire-absent-rows"] # type: ignore[typeddict-item] - # test compound `merge_key` + +@pytest.mark.essential +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, supports_merge=True), + ids=lambda x: x.name, +) +@pytest.mark.parametrize("key_type", ("text", "bigint")) +def test_retire_absent_rows_compound_key( + destination_config: DestinationTestConfiguration, + key_type: TDataType, +) -> None: + p = destination_config.setup_pipeline("abstract", dev_mode=True) @dlt.resource( merge_key=["first_name", "last_name"], @@ -822,34 +835,40 @@ def dim_test(data): def dim_test_compound(data): yield data + # vary `first_name` type to test mixed compound `merge_key` + if key_type == "text": + first_name = "John" + elif key_type == "bigint": + first_name = 1 # type: ignore[assignment] # load 1 — initial load dim_snap = [ - {"first_name": "John", "last_name": "Doe", "age": 20}, - {"first_name": "John", "last_name": "Dodo", "age": 20}, + {"first_name": first_name, "last_name": "Doe", "age": 20}, + {"first_name": first_name, "last_name": "Dodo", "age": 20}, ] info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 2 # both records should be active (i.e. not retired) + _, to = DEFAULT_VALIDITY_COLUMN_NAMES assert [row[to] for row in get_table(p, "dim_test_compound")] == [None, None] - # load 2 — natural key "John" + "Dodo" is absent, natural key "John" + "Doe" has changed + # load 2 — "Dodo" is absent, "Doe" has changed dim_snap = [ - {"first_name": "John", "last_name": "Doe", "age": 30}, + {"first_name": first_name, "last_name": "Doe", "age": 30}, ] info = p.run(dim_test_compound(dim_snap), **destination_config.run_kwargs) assert_load_info(info) assert load_table_counts(p, "dim_test_compound")["dim_test_compound"] == 3 ts3 = get_load_package_created_at(p, info) - # natural key "John" + "Doe" should now have two records (one retired, one active) + # "Doe" should now have two records (one retired, one active) actual = [ {k: v for k, v in row.items() if k in ("first_name", "last_name", to)} for row in get_table(p, "dim_test_compound") ] expected = [ - {"first_name": "John", "last_name": "Doe", to: ts3}, - {"first_name": "John", "last_name": "Doe", to: None}, - {"first_name": "John", "last_name": "Dodo", to: None}, + {"first_name": first_name, "last_name": "Doe", to: ts3}, + {"first_name": first_name, "last_name": "Doe", to: None}, + {"first_name": first_name, "last_name": "Dodo", to: None}, ] assert_records_as_set(actual, expected) # type: ignore[arg-type]