Skip to content

Commit

Permalink
create final table from staging tables only for autodetect tables on …
Browse files Browse the repository at this point in the history
…bigquery
  • Loading branch information
sh-rp committed Nov 12, 2024
1 parent 92deee7 commit 99f75da
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
13 changes: 13 additions & 0 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob
from dlt.destinations.job_impl import ReferenceFollowupJobRequest
from dlt.destinations.sql_jobs import SqlMergeFollowupJob
from dlt.destinations.sql_client import SqlClientBase


class BigQueryLoadJob(RunnableLoadJob, HasFollowupJobs):
Expand Down Expand Up @@ -142,6 +143,18 @@ def get_job_id_from_file_path(file_path: str) -> str:


class BigQueryMergeJob(SqlMergeFollowupJob):
@classmethod
def _gen_table_setup_clauses(
cls, table_chain: Sequence[PreparedTableSchema], sql_client: SqlClientBase[Any]
) -> List[str]:
"""generate final tables from staging table schema for autodetect tables"""
sql: List[str] = []
for table in table_chain:
if should_autodetect_schema(table):
table_name, staging_table_name = sql_client.get_qualified_table_names(table["name"])
sql.append(f"CREATE TABLE IF NOT EXISTS {table_name} LIKE {staging_table_name};")
return sql

@classmethod
def gen_key_table_clauses(
cls,
Expand Down
24 changes: 15 additions & 9 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,24 @@ def generate_sql(
merge_strategy = resolve_merge_strategy(
{root_table["name"]: root_table}, root_table, sql_client.capabilities
)

merge_sql = None
if merge_strategy == "delete-insert":
return cls.gen_merge_sql(table_chain, sql_client)
merge_sql = cls.gen_merge_sql(table_chain, sql_client)
elif merge_strategy == "upsert":
return cls.gen_upsert_sql(table_chain, sql_client)
merge_sql = cls.gen_upsert_sql(table_chain, sql_client)
elif merge_strategy == "scd2":
return cls.gen_scd2_sql(table_chain, sql_client)
merge_sql = cls.gen_scd2_sql(table_chain, sql_client)

# prepend setup code
return cls._gen_table_setup_clauses(table_chain, sql_client) + merge_sql

@classmethod
def _gen_table_setup_clauses(
cls, table_chain: Sequence[PreparedTableSchema], sql_client: SqlClientBase[Any]
) -> List[str]:
"""Subclasses may override this method to generate additional sql statements to run before the merge"""
return []

@classmethod
def _gen_key_table_clauses(
Expand Down Expand Up @@ -501,12 +513,6 @@ def gen_merge_sql(
root_table["name"]
)

# NOTE: this is bigquery specific code! Move to bigquery merge job
# NOTE: we also need to create all child tables
# NOTE: this will not work if the schema of the staging table changes in the next run..
# in some cases we need to create final tables here
sql.append(f"CREATE TABLE IF NOT EXISTS {root_table_name} LIKE {staging_root_table_name};")

# get merge and primary keys from top level
primary_keys = cls._escape_list(
get_columns_names_with_prop(root_table, "primary_key"),
Expand Down

0 comments on commit 99f75da

Please sign in to comment.