-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce hard_delete
and dedup_sort
columns hint for merge
#960
Introduce hard_delete
and dedup_sort
columns hint for merge
#960
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
@rudolfix can you have a look at this and see if it aligns with your ideas? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are going into a good direction. my biggest issue is with the way SqlMergeJob is hacked. IMO we can radically simplify this if:
- we really want to modify existing SqlMergeJob
- we assume that updates are not partial but full
in that case the only thing we need is to define a hard delete column hint and give it a special treatment.
the distinction of insert/update is superfluous if updates are full. you delete records anyway so everything is "i" or "d"
and frankly this is what I'd do. we may even drop a separate write disposition and start interpreting hard delete flag in "merge".
partial deletes:
if we have partial deletes we'll need new write disposition and a completely separate merge job which will be based on sql MERGE statement.
if we do not need this now - let's do it later.
dlt/common/schema/utils.py
Outdated
@@ -317,6 +318,19 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None: | |||
if parent_table_name not in stored_schema["tables"]: | |||
raise ParentTableNotFoundException(table_name, parent_table_name) | |||
|
|||
# check for "replicate" tables that miss a primary key or "cdc_config" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes sense but we should move it to
- end of normalize stage OR
- beginning of load stage
at this moment schema may be still partial. not all columns may be present (100% after extract stage)
also we should check merge disposition.
also take a look at _verify_schema
in JobClientBase
looks like our place :)
dlt/common/schema/typing.py
Outdated
@@ -166,6 +198,7 @@ class TTableSchema(TypedDict, total=False): | |||
columns: TTableSchemaColumns | |||
resource: Optional[str] | |||
table_format: Optional[TTableFormat] | |||
cdc_config: Optional[TCdcConfig] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is one way to go. but IMO a better way would be to define a column level hint.
cdc_op
which could be integer or single char (u/d/i)
do we really need a sequence? if so we could reuse sort
or add a new hint ie. cdc_seq
. There are helper methods to find column(s) with hints
it looks simpler to me.
dlt/destinations/sql_jobs.py
Outdated
|
||
insert_condition = "1 = 1" | ||
write_disposition = root_table["write_disposition"] | ||
if write_disposition == "replicate": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO SqlMergeJob should not be aware of write disposition... Is this possible to create a base class and two subclasses for merge and replicate.
this is very hacky
see my comment above. we skip it for now. it is way more work.
OK so here we'd need a "i" and "u" distinction. IMO part of advanced replication above
yeah! look at this maybe: #923 and #828 definitely next thing we do |
@rudolfix I undid a lot of the changes based on your feedback. What's new:
|
replicate
write dispositionhard_delete
column hint and sorted deduplication in merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on top of the review:
you must test a data where there are child tables. looks at other merge tests. we need a test with one and two nesting levels (can be same dataset)
dlt/destinations/sql_jobs.py
Outdated
@@ -333,6 +337,44 @@ def gen_merge_sql( | |||
) | |||
) | |||
|
|||
# remove "non-latest" records from staging table (deduplicate) if a sort column is provided | |||
if len(primary_keys) > 0: | |||
if has_column_with_prop(root_table, "sort"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as you point out this does deduplication on top of the dedup done when generating temp tables (or inserting at the end when there are no child tables). my take: use sorted
column in those clauses below if sorted column present. otherwise ORDER BY (SELECT NULL)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
dlt/destinations/sql_jobs.py
Outdated
""") | ||
|
||
# remove deleted records from staging tables if a hard_delete column is provided | ||
if has_column_with_prop(root_table, "hard_delete"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think (I hope) there's a simpler way to handle hard_deletes
. The code below does not need any modifications. It will delete all rows from destination dataset (using primary and merge keys) that are present in the staging dataset. it does not matter if hard delete flag is set or not. we must delete those rows anyway.
we only must change how we insert, from here:
# insert from staging to dataset, truncate staging table
for table in table_chain:
the only think you need to do is to filter out rows that have deleted flag set so this is another clause in where
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall it should be way less code, we do not interfere with any edge cases by deleting and deduplicating the staging dataset + it looks like less row reads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed the approach to extending the where clause in the insert stage, rather than deleting from the staging dataset. It didn't turn out to be less code but it makes more sense nonetheless.
dlt/destinations/sql_jobs.py
Outdated
# first delete from root staging table | ||
sql.append(f""" | ||
DELETE FROM {staging_root_table_name} | ||
WHERE {hard_delete_column} IS NOT DISTINCT FROM {escape_literal(True)}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok so you assume that hard deleted column is boolean. probably makes the most sense. but then you must check the type somewhere. my take:
delete if value is IS NOT NULL OR (only in case of boolean) when true as above. maybe someone wants to have deleted flag as timestamp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I implemented your suggestion.
dlt/destinations/job_client_impl.py
Outdated
) -> Sequence[TColumnSchema]: | ||
updates = super()._create_table_update(table_name, storage_columns) | ||
table = self.schema.get_table(table_name) | ||
if has_column_with_prop(table, "hard_delete"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really good point. but my take would be to have identical schemas in staging and destination datasets. also: what about append and replace? this data won't be dropped from parquet/json files so just dropping from schema wont't help.
I'd say let's remove it. also all the code in merge job that skips deleted columns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Staging and destination datasets now how identical schemas.
dlt/load/load.py
Outdated
if not table_jobs and top_merged_table["write_disposition"] != "replace": | ||
# if there are no jobs for the table, skip it, unless child tables need to be replaced | ||
needs_replacement = False | ||
if top_merged_table["write_disposition"] == "replace" or ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it is changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed to propagate deletes to child tables. If we provide only a primary key and the hard_delete
column for a nested table, such as happens on lines 584 and 599 of test_merge_disposition.py
, the child tables wouldn't get included in the table chain, and those deletes would only be executed on the root table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still do not get it. We have jobs for this table because in both those lines we declare some data. The exception for replace is only for the case that there is no data at all. does not happen here. IMO you should try to remove it and find the problem elsewhere or ping me on slack to discuss it
hard_delete
column hint and sorted deduplication in merge
hard_delete
and dedup_sort
columns hint for merge
@rudolfix I addressed your feedback and the PR is ready for another review! See my replies on your comments for details regarding the changes. On top of those replies:
Edit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK! we are almost there. our sql jobs is almost unreadable now (if it ever was)
we are missing a test where you have a merge key on non-unique column and you have a hard delete flag (you should be able to delete whole day of data with just one flag)
also question: does hard deleted flag make sense if there's no primary key? if the answer is no and primary key is required we can simplify code even more
also maybe a test case when we have a dedup sort and two rows one with deleted flag and one without (could run on duckdb only to make it faster)
@@ -253,28 +302,34 @@ def gen_merge_sql( | |||
sql: List[str] = [] | |||
root_table = table_chain[0] | |||
|
|||
escape_id = sql_client.capabilities.escape_identifier | |||
escape_lit = sql_client.capabilities.escape_literal | |||
if escape_id is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really possible? how this code could work before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql_client.capabilities.escape_literal
is None for snowflake
. sql_client.capabilities.escape_identifier
always has a value (at least with the current set of destinations), but I included the if escape_id is None:
for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, it is not defined on snowflake because we never process literals. we need sqlglot ,to generate those statements. but it does not have all dialects and does not support DDL very well (maybe that changed)
dlt/destinations/sql_jobs.py
Outdated
if sort_column is None: | ||
order_by = "(SELECT NULL)" | ||
else: | ||
order_by = f"{sort_column} DESC" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is DESC
what users expect? what is more typical?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Higher values typically indicate more recent (think timestamps, or the LSN in a Postgres WAL). So if we sort in descending order, we get the most recent value, which makes sense for most typical use cases.
I could also change the dedup_sort
column hint from boolean to string and have it accept "asc"
or "desc"
values to make it configurable for the user. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea! (if not too much work)
dlt/destinations/sql_jobs.py
Outdated
insert_sql += ";" | ||
sql.append(insert_sql) | ||
# -- DELETE FROM {staging_table_name} WHERE 1=1; | ||
insert_cond = copy(not_deleted_cond) if hard_delete_col is not None else "1 = 1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you do not need to copy. strings are immutable. you won't change not_deleted_cond
dlt/destinations/sql_jobs.py
Outdated
|
||
insert_temp_table_name: str = None | ||
if len(table_chain) > 1: | ||
if len(primary_keys) > 0 or (len(primary_keys) == 0 and hard_delete_col is not None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(primary_keys) > 0 or hard_delete_col is not None:
should be sufficient.
dlt/destinations/sql_jobs.py
Outdated
if condition is None: | ||
condition = "1 = 1" | ||
col_str = ", ".join(columns) | ||
inner_col_str = copy(col_str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not need to copy! strings are immutable
dlt/destinations/sql_jobs.py
Outdated
insert_temp_table_name: str = None | ||
if len(table_chain) > 1: | ||
if len(primary_keys) > 0 or (len(primary_keys) == 0 and hard_delete_col is not None): | ||
condition_colummns = [hard_delete_col] if not_deleted_cond is not None else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condition_colummns
typo
dlt/load/load.py
Outdated
if not table_jobs and top_merged_table["write_disposition"] != "replace": | ||
# if there are no jobs for the table, skip it, unless child tables need to be replaced | ||
needs_replacement = False | ||
if top_merged_table["write_disposition"] == "replace" or ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still do not get it. We have jobs for this table because in both those lines we declare some data. The exception for replace is only for the case that there is no data at all. does not happen here. IMO you should try to remove it and find the problem elsewhere or ping me on slack to discuss it
@rudolfix I addressed all your points. I added the test cases you mentioned. Only remaining point is the one about child table skipping we are discussing on Slack.
I think it makes as much sense as doing inserts/updates without a primary key. I would say deleting multiple records sharing the same key is a valid use case we should support. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like asc/desc on update is left.
I've found and fixed a lot of heresy in the code. you can see my two commits. there were edge cases that tables were not created even if they should or vice versa.
now normalizer is marking tables which seen data so loader knows more which tables to create.
dlt/destinations/sql_jobs.py
Outdated
if sort_column is None: | ||
order_by = "(SELECT NULL)" | ||
else: | ||
order_by = f"{sort_column} DESC" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea! (if not too much work)
@@ -253,28 +302,34 @@ def gen_merge_sql( | |||
sql: List[str] = [] | |||
root_table = table_chain[0] | |||
|
|||
escape_id = sql_client.capabilities.escape_identifier | |||
escape_lit = sql_client.capabilities.escape_literal | |||
if escape_id is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, it is not defined on snowflake because we never process literals. we need sqlglot ,to generate those statements. but it does not have all dialects and does not support DDL very well (maybe that changed)
dlt/load/load.py
Outdated
continue | ||
result.add(table["name"]) | ||
return result | ||
with self.get_destination_client(schema) as job_client: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't go to the destination to check if table exists., this costs a lot. we avoid any unnecessary database reflection at all cost.
we should return tables without this check. if we do it right, only tables that were created will be returned here.
dlt/load/load.py
Outdated
): | ||
with job_client.with_staging_dataset(): | ||
self._init_dataset_and_update_schema( | ||
job_client, | ||
expected_update, | ||
staging_tables | {schema.version_table_name}, | ||
order_deduped(staging_tables + [schema.version_table_name]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dlt tables are never included in staging tables. no need to dedup
… github.com:dlt-hub/dlt into 947-core-extensions-to-support-database-replication
…ttps://github.com/dlt-hub/dlt into 947-core-extensions-to-support-database-replication
… github.com:dlt-hub/dlt into 947-core-extensions-to-support-database-replication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bugs fixed & some tests added. LGTM!
Description
This PR introduces a new write dispositionreplicate
that can be used to propagate captured change data (INSERTs, UPDATEs, and DELETEs) from the source to the target destination.requires aprimary_key
hint—raisesSchemaException
if not providedrequires acdc_config
hint—raisesSchemaException
if not providedcdc_config
holds information on how the change data is organized, such as which column holds the operation type and which values in that column corresponds with which DML operationextendedSqlMergeJob
to implementreplicate
—SqlMergeJob
now handles bothmerge
andreplicate
write dispositionsmechanism: first load change data to staging table, then propagate changes in final table using simple "delete-and-insert" logic (similar to howmerge
works, but here we have to filter outdelete
records before we insert from the staging table)all primary key values present in the staging table (corresponding withinsert
,update
, anddelete
operations) are deleted from the final tablerecords in the staging table corresponding withinsert
andupdate
operations are inserted in the final tablealso works with child tablesThe above no longer applies—see #960 (comment)
Potentially useful functionality not yet implemented:
Related Issues
Additional Context