Skip to content

Commit

Permalink
fix sql merge syntax for iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 6, 2023
1 parent 8682350 commit 1560768
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: st
A list of clauses may be returned for engines that do not support OR in subqueries. Like BigQuery
"""
return [f"FROM {root_table_name} as d WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d='d',s='s') for c in key_clauses])})"]
return [f"FROM {root_table_name} WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d=root_table_name,s='s') for c in key_clauses])})"]

@classmethod
def gen_delete_temp_table_sql(cls, unique_column: str, key_table_clauses: Sequence[str]) -> Tuple[List[str], str]:
def gen_delete_temp_table_sql(cls, unique_column: str, key_table_clauses: Sequence[str], root_table_name: str) -> Tuple[List[str], str]:
"""Generate sql that creates delete temp table and inserts `unique_column` from root table for all records to delete. May return several statements.
Returns temp table name for cases where special names are required like SQLServer.
"""
sql: List[str] = []
temp_table_name = cls._new_temp_table_name("delete")
select_statement = f"SELECT d.{unique_column} {key_table_clauses[0]}"
select_statement = f"SELECT {root_table_name}.{unique_column} {key_table_clauses[0]}"
sql.append(cls._to_temp_table(select_statement, temp_table_name))
for clause in key_table_clauses[1:]:
sql.append(f"INSERT INTO {temp_table_name} SELECT {unique_column} {clause};")
Expand Down Expand Up @@ -184,7 +184,7 @@ def gen_merge_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClien
# get first unique column
unique_column = sql_client.capabilities.escape_identifier(unique_columns[0])
# create temp table with unique identifier
create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql(unique_column, key_table_clauses)
create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql(unique_column, key_table_clauses, root_table_name)
sql.extend(create_delete_temp_table_sql)
# delete top table
sql.append(f"DELETE FROM {root_table_name} WHERE {unique_column} IN (SELECT * FROM {delete_temp_table_name});")
Expand Down

0 comments on commit 1560768

Please sign in to comment.