Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hard_delete and dedup_sort columns hint for merge #960

Merged
merged 32 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
82c3634
black formatting
Feb 12, 2024
97c5512
remove unused exception
Feb 12, 2024
400d84b
add initial support for replicate write disposition
Feb 12, 2024
24f362e
add hard_delete hint and sorted deduplication for merge
Feb 14, 2024
f3a4878
undo config change
Feb 14, 2024
deb816f
undo unintentional changes
Feb 14, 2024
4a38d56
refactor hard_delete handling and introduce dedup_sort hint
Feb 15, 2024
0d1c977
update docstring
Feb 15, 2024
474d8bc
replace dialect-specific SQL
Feb 16, 2024
568ef26
add parentheses to ensure proper clause evaluation order
Feb 16, 2024
81ea426
add escape defaults and temp tables for non-primary key case
Feb 16, 2024
a04a238
exclude destinations that don't support merge from test
Feb 17, 2024
8ac0f9c
correct typo
Feb 20, 2024
ec115e9
extend docstring
Feb 20, 2024
a1afeb8
remove redundant copies for immutable strings
Feb 20, 2024
f07205d
simplify boolean logic
Feb 20, 2024
a64580d
add more test cases for hard_delete and dedup_sort hints
Feb 20, 2024
3308549
refactor table chain resolution
Feb 21, 2024
189c2fb
marks tables that seen data in normalizer, skips empty jobs if never …
rudolfix Feb 22, 2024
a649b0e
ignores tables that didn't seen data when loading, tests edge cases
rudolfix Feb 22, 2024
9778f0e
Merge branch 'devel' into 947-core-extensions-to-support-database-rep…
rudolfix Feb 22, 2024
4b3c59b
add sort order configuration option
Feb 22, 2024
c984c4e
bumps schema engine to v9, adds migrations
rudolfix Feb 22, 2024
935748a
filters tables without data properly in load
rudolfix Feb 22, 2024
d125556
converts seen-data to boolean, fixes tests
rudolfix Feb 22, 2024
ecaf6ef
Merge branch '947-core-extensions-to-support-database-replication' of…
rudolfix Feb 22, 2024
af0b344
disables filesystem tests config due to merge present
rudolfix Feb 22, 2024
262018b
add docs for hard_delete and dedup_sort column hints
Feb 22, 2024
0814bb0
Merge branch '947-core-extensions-to-support-database-replication' of…
Feb 22, 2024
44a9ff2
fixes extending table chains in load
rudolfix Feb 23, 2024
9384148
Merge branch '947-core-extensions-to-support-database-replication' of…
rudolfix Feb 23, 2024
9921b89
refactors load and adds unit tests with dummy
rudolfix Feb 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class TColumnSchema(TColumnSchemaBase, total=False):
root_key: Optional[bool]
merge_key: Optional[bool]
variant: Optional[bool]
hard_delete: Optional[bool]


TTableSchemaColumns = Dict[str, TColumnSchema]
Expand Down
7 changes: 7 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,13 @@ def get_columns_names_with_prop(
]


def has_column_with_prop(
table: TTableSchema, column_prop: Union[TColumnProp, str], include_incomplete: bool = False
) -> bool:
"""Checks if `table` schema contains column with property `column_prop`."""
return len(get_columns_names_with_prop(table, column_prop, include_incomplete)) > 0


def merge_schema_updates(schema_updates: Sequence[TSchemaUpdate]) -> TSchemaTables:
aggregated_update: TSchemaTables = {}
for schema_update in schema_updates:
Expand Down
16 changes: 7 additions & 9 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ def _from_db_type(
return self.type_mapper.from_db_type(hive_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
return f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
return (
f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
)

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
Expand All @@ -376,19 +378,15 @@ def _get_table_update_sql(
# use qualified table names
qualified_table_name = self.sql_client.make_qualified_ddl_table_name(table_name)
if is_iceberg and not generate_alter:
sql.append(
f"""CREATE TABLE {qualified_table_name}
sql.append(f"""CREATE TABLE {qualified_table_name}
({columns})
LOCATION '{location}'
TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');"""
)
TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""")
elif not generate_alter:
sql.append(
f"""CREATE EXTERNAL TABLE {qualified_table_name}
sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
({columns})
STORED AS PARQUET
LOCATION '{location}';"""
)
LOCATION '{location}';""")
# alter table to add new columns at the end
else:
sql.append(f"""ALTER TABLE {qualified_table_name} ADD COLUMNS ({columns});""")
Expand Down
10 changes: 6 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ def _get_table_update_sql(
elif (c := partition_list[0])["data_type"] == "date":
sql[0] = f"{sql[0]}\nPARTITION BY {self.capabilities.escape_identifier(c['name'])}"
elif (c := partition_list[0])["data_type"] == "timestamp":
sql[
0
] = f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})"
sql[0] = (
f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})"
)
# Automatic partitioning of an INT64 type requires us to be prescriptive - we treat the column as a UNIX timestamp.
# This is due to the bounds requirement of GENERATE_ARRAY function for partitioning.
# The 10,000 partitions limit makes it infeasible to cover the entire `bigint` range.
Expand All @@ -272,7 +272,9 @@ def _get_table_update_sql(

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
name = self.capabilities.escape_identifier(c["name"])
return f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}"
return (
f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}"
)

def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]:
schema_table: TTableSchemaColumns = {}
Expand Down
28 changes: 18 additions & 10 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ def __init__(
else:
raise LoadJobTerminalException(
file_path,
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and azure buckets are supported",
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and"
" azure buckets are supported",
)
else:
raise LoadJobTerminalException(
file_path,
"Cannot load from local file. Databricks does not support loading from local files. Configure staging with an s3 or azure storage bucket.",
"Cannot load from local file. Databricks does not support loading from local files."
" Configure staging with an s3 or azure storage bucket.",
)

# decide on source format, stage_file_path will either be a local file or a bucket path
Expand All @@ -181,27 +183,33 @@ def __init__(
if not config.get("data_writer.disable_compression"):
raise LoadJobTerminalException(
file_path,
"Databricks loader does not support gzip compressed JSON files. Please disable compression in the data writer configuration: https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression",
"Databricks loader does not support gzip compressed JSON files. Please disable"
" compression in the data writer configuration:"
" https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression",
)
if table_schema_has_type(table, "decimal"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DECIMAL type columns from json files. Switch to parquet format to load decimals.",
"Databricks loader cannot load DECIMAL type columns from json files. Switch to"
" parquet format to load decimals.",
)
if table_schema_has_type(table, "binary"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load BINARY type columns from json files. Switch to parquet format to load byte values.",
"Databricks loader cannot load BINARY type columns from json files. Switch to"
" parquet format to load byte values.",
)
if table_schema_has_type(table, "complex"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load complex columns (lists and dicts) from json files. Switch to parquet format to load complex types.",
"Databricks loader cannot load complex columns (lists and dicts) from json"
" files. Switch to parquet format to load complex types.",
)
if table_schema_has_type(table, "date"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DATE type columns from json files. Switch to parquet format to load dates.",
"Databricks loader cannot load DATE type columns from json files. Switch to"
" parquet format to load dates.",
)

source_format = "JSON"
Expand Down Expand Up @@ -311,7 +319,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non

def _get_storage_table_query_columns(self) -> List[str]:
fields = super()._get_storage_table_query_columns()
fields[
1
] = "full_data_type" # Override because this is the only way to get data type with precision
fields[1] = ( # Override because this is the only way to get data type with precision
"full_data_type"
)
return fields
6 changes: 2 additions & 4 deletions dlt/destinations/impl/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,13 @@ def __init__(
f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE,'
" AUTO_COMPRESS = FALSE"
)
client.execute_sql(
f"""COPY INTO {qualified_table_name}
client.execute_sql(f"""COPY INTO {qualified_table_name}
{from_clause}
{files_clause}
{credentials_clause}
FILE_FORMAT = {source_format}
MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE'
"""
)
""")
if stage_file_path and not keep_staged_files:
client.execute_sql(f"REMOVE {stage_file_path}")

Expand Down
13 changes: 13 additions & 0 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from dlt.common.storages import FileStorage
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns, TSchemaTables
from dlt.common.schema.utils import get_columns_names_with_prop, has_column_with_prop
from dlt.common.destination.reference import (
StateInfo,
StorageSchemaInfo,
Expand Down Expand Up @@ -588,3 +589,15 @@ def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
):
return True
return False

def _create_table_update(
self, table_name: str, storage_columns: TTableSchemaColumns
) -> Sequence[TColumnSchema]:
updates = super()._create_table_update(table_name, storage_columns)
table = self.schema.get_table(table_name)
if has_column_with_prop(table, "hard_delete"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really good point. but my take would be to have identical schemas in staging and destination datasets. also: what about append and replace? this data won't be dropped from parquet/json files so just dropping from schema wont't help.

I'd say let's remove it. also all the code in merge job that skips deleted columns

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Staging and destination datasets now how identical schemas.

# hard_delete column should only be present in staging table, not in final table
if not self.in_staging_mode:
hard_delete_column = get_columns_names_with_prop(table, "hard_delete")[0]
updates = [d for d in updates if d["name"] != hard_delete_column]
return updates
63 changes: 54 additions & 9 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dlt.common.runtime.logger import pretty_format_exception

from dlt.common.schema.typing import TTableSchema
from dlt.common.schema.utils import get_columns_names_with_prop
from dlt.common.schema.utils import get_columns_names_with_prop, has_column_with_prop
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import uniq_id
from dlt.destinations.exceptions import MergeDispositionException
Expand Down Expand Up @@ -147,6 +147,8 @@ def generate_sql(

First we store the root_keys of root table elements to be deleted in the temp table. Then we use the temp table to delete records from root and all child tables in the destination dataset.
At the end we copy the data from the staging dataset into destination dataset.

If sort and/or hard_delete column hints are provided, records are deleted from the staging dataset before its data is copied to the destination dataset.
"""
return cls.gen_merge_sql(table_chain, sql_client)

Expand Down Expand Up @@ -252,6 +254,8 @@ def gen_merge_sql(
) -> List[str]:
sql: List[str] = []
root_table = table_chain[0]
escape_identifier = sql_client.capabilities.escape_identifier
escape_literal = sql_client.capabilities.escape_literal

# get top level table full identifiers
root_table_name = sql_client.make_qualified_table_name(root_table["name"])
Expand All @@ -260,13 +264,13 @@ def gen_merge_sql(
# get merge and primary keys from top level
primary_keys = list(
map(
sql_client.capabilities.escape_identifier,
escape_identifier,
get_columns_names_with_prop(root_table, "primary_key"),
)
)
merge_keys = list(
map(
sql_client.capabilities.escape_identifier,
escape_identifier,
get_columns_names_with_prop(root_table, "merge_key"),
)
)
Expand Down Expand Up @@ -298,7 +302,7 @@ def gen_merge_sql(
" it is not possible to link child tables to it.",
)
# get first unique column
unique_column = sql_client.capabilities.escape_identifier(unique_columns[0])
unique_column = escape_identifier(unique_columns[0])
# create temp table with unique identifier
create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql(
unique_column, key_table_clauses
Expand All @@ -319,7 +323,7 @@ def gen_merge_sql(
f" {table['name']} so it is not possible to refer to top level table"
f" {root_table['name']} unique column {unique_column}",
)
root_key_column = sql_client.capabilities.escape_identifier(root_key_columns[0])
root_key_column = escape_identifier(root_key_columns[0])
sql.append(
cls.gen_delete_from_sql(
table_name, root_key_column, delete_temp_table_name, unique_column
Expand All @@ -333,6 +337,44 @@ def gen_merge_sql(
)
)

# remove "non-latest" records from staging table (deduplicate) if a sort column is provided
if len(primary_keys) > 0:
if has_column_with_prop(root_table, "sort"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you point out this does deduplication on top of the dedup done when generating temp tables (or inserting at the end when there are no child tables). my take: use sorted column in those clauses below if sorted column present. otherwise ORDER BY (SELECT NULL)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sort_column = escape_identifier(get_columns_names_with_prop(root_table, "sort")[0])
sql.append(f"""
DELETE FROM {staging_root_table_name}
WHERE {sort_column} IN (
SELECT {sort_column} FROM (
SELECT {sort_column}, ROW_NUMBER() OVER (partition BY {", ".join(primary_keys)} ORDER BY {sort_column} DESC) AS _rn
FROM {staging_root_table_name}
) AS a
WHERE a._rn > 1
);
""")

# remove deleted records from staging tables if a hard_delete column is provided
if has_column_with_prop(root_table, "hard_delete"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think (I hope) there's a simpler way to handle hard_deletes. The code below does not need any modifications. It will delete all rows from destination dataset (using primary and merge keys) that are present in the staging dataset. it does not matter if hard delete flag is set or not. we must delete those rows anyway.

we only must change how we insert, from here:

# insert from staging to dataset, truncate staging table
        for table in table_chain:

the only think you need to do is to filter out rows that have deleted flag set so this is another clause in where

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall it should be way less code, we do not interfere with any edge cases by deleting and deduplicating the staging dataset + it looks like less row reads

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the approach to extending the where clause in the insert stage, rather than deleting from the staging dataset. It didn't turn out to be less code but it makes more sense nonetheless.

hard_delete_column = escape_identifier(
get_columns_names_with_prop(root_table, "hard_delete")[0]
)
# first delete from root staging table
sql.append(f"""
DELETE FROM {staging_root_table_name}
WHERE {hard_delete_column} IS NOT DISTINCT FROM {escape_literal(True)};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so you assume that hard deleted column is boolean. probably makes the most sense. but then you must check the type somewhere. my take:
delete if value is IS NOT NULL OR (only in case of boolean) when true as above. maybe someone wants to have deleted flag as timestamp?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I implemented your suggestion.

""")
# then delete from child staging tables
for table in table_chain[1:]:
with sql_client.with_staging_dataset(staging=True):
staging_table_name = sql_client.make_qualified_table_name(table["name"])
sql.append(f"""
DELETE FROM {staging_table_name}
WHERE NOT EXISTS (
SELECT 1 FROM {staging_root_table_name} AS p
WHERE {staging_table_name}.{root_key_column} = p.{unique_column}
);
""")

if len(table_chain) > 1:
# create temp table used to deduplicate, only when we have primary keys
if primary_keys:
(
Expand All @@ -343,15 +385,19 @@ def gen_merge_sql(
)
sql.extend(create_insert_temp_table_sql)

# insert from staging to dataset, truncate staging table
# insert from staging to dataset
for table in table_chain:
table_name = sql_client.make_qualified_table_name(table["name"])
with sql_client.with_staging_dataset(staging=True):
staging_table_name = sql_client.make_qualified_table_name(table["name"])
columns = ", ".join(
map(
sql_client.capabilities.escape_identifier,
get_columns_names_with_prop(table, "name"),
escape_identifier,
[
c
for c in get_columns_names_with_prop(table, "name")
if c not in get_columns_names_with_prop(table, "hard_delete")
],
)
)
insert_sql = (
Expand All @@ -374,6 +420,5 @@ def gen_merge_sql(
if insert_sql.strip()[-1] != ";":
insert_sql += ";"
sql.append(insert_sql)
# -- DELETE FROM {staging_table_name} WHERE 1=1;

return sql
7 changes: 0 additions & 7 deletions dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,6 @@ def __init__(self, resource_name: str, msg: str) -> None:
super().__init__(resource_name, f"This resource is not a transformer: {msg}")


class TableNameMissing(DltSourceException):
def __init__(self) -> None:
super().__init__(
"""Table name is missing in table template. Please provide a string or a function that takes a data item as an argument"""
)


class InconsistentTableTemplate(DltSourceException):
def __init__(self, reason: str) -> None:
msg = f"A set of table hints provided to the resource is inconsistent: {reason}"
Expand Down
11 changes: 9 additions & 2 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import TTableSchema, TWriteDisposition
from dlt.common.schema.utils import has_column_with_prop
from dlt.common.storages import LoadStorage
from dlt.common.destination.reference import (
DestinationClientDwhConfiguration,
Expand Down Expand Up @@ -246,8 +247,14 @@ def get_completed_table_chain(
for job in table_jobs
):
return None
# if there are no jobs for the table, skip it, unless the write disposition is replace, as we need to create and clear the child tables
if not table_jobs and top_merged_table["write_disposition"] != "replace":
# if there are no jobs for the table, skip it, unless child tables need to be replaced
needs_replacement = False
if top_merged_table["write_disposition"] == "replace" or (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it is changed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to propagate deletes to child tables. If we provide only a primary key and the hard_delete column for a nested table, such as happens on lines 584 and 599 of test_merge_disposition.py, the child tables wouldn't get included in the table chain, and those deletes would only be executed on the root table.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still do not get it. We have jobs for this table because in both those lines we declare some data. The exception for replace is only for the case that there is no data at all. does not happen here. IMO you should try to remove it and find the problem elsewhere or ping me on slack to discuss it

top_merged_table["write_disposition"] == "merge"
and has_column_with_prop(top_merged_table, "hard_delete")
):
needs_replacement = True
if not table_jobs and not needs_replacement:
continue
table_chain.append(table)
# there must be at least table
Expand Down
6 changes: 3 additions & 3 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,9 +1163,9 @@ def _set_context(self, is_active: bool) -> None:
# set destination context on activation
if self.destination:
# inject capabilities context
self._container[
DestinationCapabilitiesContext
] = self._get_destination_capabilities()
self._container[DestinationCapabilitiesContext] = (
self._get_destination_capabilities()
)
else:
# remove destination context on deactivation
if DestinationCapabilitiesContext in self._container:
Expand Down
Loading
Loading