From e49c05f52bcbb0752d689016c4053ecb4d5e448e Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 4 Mar 2024 16:38:13 +0100 Subject: [PATCH] skip deleting of messages in merge job --- dlt/destinations/sql_jobs.py | 96 ++++++++++--------- dlt/extract/incremental/__init__.py | 1 - tests/load/pipeline/test_merge_disposition.py | 38 ++++++++ 3 files changed, 88 insertions(+), 47 deletions(-) diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 215bcf9fe5..e34c6e0392 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -362,66 +362,70 @@ def gen_merge_sql( ) key_clauses = cls._gen_key_table_clauses(primary_keys, merge_keys) + # if we do not have merge or primary keys, we fall back to append, thus do not execute any delete statements + fallback_to_append = len(primary_keys) == 0 and len(merge_keys) == 0 + unique_column: str = None root_key_column: str = None - if len(table_chain) == 1: - key_table_clauses = cls.gen_key_table_clauses( - root_table_name, staging_root_table_name, key_clauses, for_delete=True - ) - # if no child tables, just delete data from top table - for clause in key_table_clauses: - sql.append(f"DELETE {clause};") - else: - key_table_clauses = cls.gen_key_table_clauses( - root_table_name, staging_root_table_name, key_clauses, for_delete=False - ) - # use unique hint to create temp table with all identifiers to delete - unique_columns = get_columns_names_with_prop(root_table, "unique") - if not unique_columns: - raise MergeDispositionException( - sql_client.fully_qualified_dataset_name(), - staging_root_table_name, - [t["name"] for t in table_chain], - f"There is no unique column (ie _dlt_id) in top table {root_table['name']} so" - " it is not possible to link child tables to it.", + if not fallback_to_append: + if len(table_chain) == 1: + key_table_clauses = cls.gen_key_table_clauses( + root_table_name, staging_root_table_name, key_clauses, for_delete=True ) - # get first unique column - unique_column = escape_id(unique_columns[0]) - # create temp table with unique identifier - create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql( - unique_column, key_table_clauses - ) - sql.extend(create_delete_temp_table_sql) - - # delete from child tables first. This is important for databricks which does not support temporary tables, - # but uses temporary views instead - for table in table_chain[1:]: - table_name = sql_client.make_qualified_table_name(table["name"]) - root_key_columns = get_columns_names_with_prop(table, "root_key") - if not root_key_columns: + # if no child tables, just delete data from top table + for clause in key_table_clauses: + sql.append(f"DELETE {clause};") + else: + key_table_clauses = cls.gen_key_table_clauses( + root_table_name, staging_root_table_name, key_clauses, for_delete=False + ) + # use unique hint to create temp table with all identifiers to delete + unique_columns = get_columns_names_with_prop(root_table, "unique") + if not unique_columns: raise MergeDispositionException( sql_client.fully_qualified_dataset_name(), staging_root_table_name, [t["name"] for t in table_chain], - "There is no root foreign key (ie _dlt_root_id) in child table" - f" {table['name']} so it is not possible to refer to top level table" - f" {root_table['name']} unique column {unique_column}", + "There is no unique column (ie _dlt_id) in top table" + f" {root_table['name']} so it is not possible to link child tables to it.", ) - root_key_column = escape_id(root_key_columns[0]) + # get first unique column + unique_column = escape_id(unique_columns[0]) + # create temp table with unique identifier + create_delete_temp_table_sql, delete_temp_table_name = ( + cls.gen_delete_temp_table_sql(unique_column, key_table_clauses) + ) + sql.extend(create_delete_temp_table_sql) + + # delete from child tables first. This is important for databricks which does not support temporary tables, + # but uses temporary views instead + for table in table_chain[1:]: + table_name = sql_client.make_qualified_table_name(table["name"]) + root_key_columns = get_columns_names_with_prop(table, "root_key") + if not root_key_columns: + raise MergeDispositionException( + sql_client.fully_qualified_dataset_name(), + staging_root_table_name, + [t["name"] for t in table_chain], + "There is no root foreign key (ie _dlt_root_id) in child table" + f" {table['name']} so it is not possible to refer to top level table" + f" {root_table['name']} unique column {unique_column}", + ) + root_key_column = escape_id(root_key_columns[0]) + sql.append( + cls.gen_delete_from_sql( + table_name, root_key_column, delete_temp_table_name, unique_column + ) + ) + + # delete from top table now that child tables have been prcessed sql.append( cls.gen_delete_from_sql( - table_name, root_key_column, delete_temp_table_name, unique_column + root_table_name, unique_column, delete_temp_table_name, unique_column ) ) - # delete from top table now that child tables have been prcessed - sql.append( - cls.gen_delete_from_sql( - root_table_name, unique_column, delete_temp_table_name, unique_column - ) - ) - # get name of column with hard_delete hint, if specified not_deleted_cond: str = None hard_delete_col = get_first_column_name_with_prop(root_table, "hard_delete") diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index d1a5a05c34..24495ccb19 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -7,7 +7,6 @@ from functools import wraps - import dlt from dlt.common.exceptions import MissingDependencyException from dlt.common import pendulum, logger diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index 19ee9a34c8..feaffef21c 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -943,3 +943,41 @@ def r(): ) with pytest.raises(PipelineStepFailed): info = p.run(r(), loader_file_format=destination_config.file_format) + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, supports_merge=True), + ids=lambda x: x.name, +) +def test_fallback_to_append(destination_config: DestinationTestConfiguration) -> None: + """ + Test fallback to append for merge disposition if no merge or primary keys defined + We need to test nested and not nested tables + """ + p = destination_config.setup_pipeline("fallback", full_refresh=True) + + @dlt.resource( + name="not_nested", + write_disposition="merge", + ) + def not_nested(): + yield {"id": 1, "val": "foo"} + + @dlt.resource( + name="nested", + write_disposition="merge", + ) + def nested(): + yield {"id": 1, "val": "foo", "nested": [{"id": 1}, {"id": 2}]} + + # run both resources twice + p.run([not_nested(), nested()], loader_file_format=destination_config.file_format) + p.run([not_nested(), nested()], loader_file_format=destination_config.file_format) + + # we should have 2 records in not_nested and 2 in nested and 4 in the nested child table + assert load_table_counts(p, "not_nested", "nested", "nested__nested") == { + "not_nested": 2, + "nested": 2, + "nested__nested": 4, + }