Skip to content

Commit

Permalink
skip deleting of messages in merge job
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Mar 4, 2024
1 parent fc34dd0 commit e49c05f
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 47 deletions.
96 changes: 50 additions & 46 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,66 +362,70 @@ def gen_merge_sql(
)
key_clauses = cls._gen_key_table_clauses(primary_keys, merge_keys)

# if we do not have merge or primary keys, we fall back to append, thus do not execute any delete statements
fallback_to_append = len(primary_keys) == 0 and len(merge_keys) == 0

unique_column: str = None
root_key_column: str = None

if len(table_chain) == 1:
key_table_clauses = cls.gen_key_table_clauses(
root_table_name, staging_root_table_name, key_clauses, for_delete=True
)
# if no child tables, just delete data from top table
for clause in key_table_clauses:
sql.append(f"DELETE {clause};")
else:
key_table_clauses = cls.gen_key_table_clauses(
root_table_name, staging_root_table_name, key_clauses, for_delete=False
)
# use unique hint to create temp table with all identifiers to delete
unique_columns = get_columns_names_with_prop(root_table, "unique")
if not unique_columns:
raise MergeDispositionException(
sql_client.fully_qualified_dataset_name(),
staging_root_table_name,
[t["name"] for t in table_chain],
f"There is no unique column (ie _dlt_id) in top table {root_table['name']} so"
" it is not possible to link child tables to it.",
if not fallback_to_append:
if len(table_chain) == 1:
key_table_clauses = cls.gen_key_table_clauses(
root_table_name, staging_root_table_name, key_clauses, for_delete=True
)
# get first unique column
unique_column = escape_id(unique_columns[0])
# create temp table with unique identifier
create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql(
unique_column, key_table_clauses
)
sql.extend(create_delete_temp_table_sql)

# delete from child tables first. This is important for databricks which does not support temporary tables,
# but uses temporary views instead
for table in table_chain[1:]:
table_name = sql_client.make_qualified_table_name(table["name"])
root_key_columns = get_columns_names_with_prop(table, "root_key")
if not root_key_columns:
# if no child tables, just delete data from top table
for clause in key_table_clauses:
sql.append(f"DELETE {clause};")
else:
key_table_clauses = cls.gen_key_table_clauses(
root_table_name, staging_root_table_name, key_clauses, for_delete=False
)
# use unique hint to create temp table with all identifiers to delete
unique_columns = get_columns_names_with_prop(root_table, "unique")
if not unique_columns:
raise MergeDispositionException(
sql_client.fully_qualified_dataset_name(),
staging_root_table_name,
[t["name"] for t in table_chain],
"There is no root foreign key (ie _dlt_root_id) in child table"
f" {table['name']} so it is not possible to refer to top level table"
f" {root_table['name']} unique column {unique_column}",
"There is no unique column (ie _dlt_id) in top table"
f" {root_table['name']} so it is not possible to link child tables to it.",
)
root_key_column = escape_id(root_key_columns[0])
# get first unique column
unique_column = escape_id(unique_columns[0])
# create temp table with unique identifier
create_delete_temp_table_sql, delete_temp_table_name = (
cls.gen_delete_temp_table_sql(unique_column, key_table_clauses)
)
sql.extend(create_delete_temp_table_sql)

# delete from child tables first. This is important for databricks which does not support temporary tables,
# but uses temporary views instead
for table in table_chain[1:]:
table_name = sql_client.make_qualified_table_name(table["name"])
root_key_columns = get_columns_names_with_prop(table, "root_key")
if not root_key_columns:
raise MergeDispositionException(
sql_client.fully_qualified_dataset_name(),
staging_root_table_name,
[t["name"] for t in table_chain],
"There is no root foreign key (ie _dlt_root_id) in child table"
f" {table['name']} so it is not possible to refer to top level table"
f" {root_table['name']} unique column {unique_column}",
)
root_key_column = escape_id(root_key_columns[0])
sql.append(
cls.gen_delete_from_sql(
table_name, root_key_column, delete_temp_table_name, unique_column
)
)

# delete from top table now that child tables have been prcessed
sql.append(
cls.gen_delete_from_sql(
table_name, root_key_column, delete_temp_table_name, unique_column
root_table_name, unique_column, delete_temp_table_name, unique_column
)
)

# delete from top table now that child tables have been prcessed
sql.append(
cls.gen_delete_from_sql(
root_table_name, unique_column, delete_temp_table_name, unique_column
)
)

# get name of column with hard_delete hint, if specified
not_deleted_cond: str = None
hard_delete_col = get_first_column_name_with_prop(root_table, "hard_delete")
Expand Down
1 change: 0 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from functools import wraps



import dlt
from dlt.common.exceptions import MissingDependencyException
from dlt.common import pendulum, logger
Expand Down
38 changes: 38 additions & 0 deletions tests/load/pipeline/test_merge_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,3 +943,41 @@ def r():
)
with pytest.raises(PipelineStepFailed):
info = p.run(r(), loader_file_format=destination_config.file_format)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, supports_merge=True),
ids=lambda x: x.name,
)
def test_fallback_to_append(destination_config: DestinationTestConfiguration) -> None:
"""
Test fallback to append for merge disposition if no merge or primary keys defined
We need to test nested and not nested tables
"""
p = destination_config.setup_pipeline("fallback", full_refresh=True)

@dlt.resource(
name="not_nested",
write_disposition="merge",
)
def not_nested():
yield {"id": 1, "val": "foo"}

@dlt.resource(
name="nested",
write_disposition="merge",
)
def nested():
yield {"id": 1, "val": "foo", "nested": [{"id": 1}, {"id": 2}]}

# run both resources twice
p.run([not_nested(), nested()], loader_file_format=destination_config.file_format)
p.run([not_nested(), nested()], loader_file_format=destination_config.file_format)

# we should have 2 records in not_nested and 2 in nested and 4 in the nested child table
assert load_table_counts(p, "not_nested", "nested", "nested__nested") == {
"not_nested": 2,
"nested": 2,
"nested__nested": 4,
}

0 comments on commit e49c05f

Please sign in to comment.