diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index c3954e0ae5..2b3927e7c9 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -49,6 +49,7 @@ from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob from dlt.destinations.job_impl import ReferenceFollowupJobRequest from dlt.destinations.sql_jobs import SqlMergeFollowupJob +from dlt.destinations.sql_client import SqlClientBase class BigQueryLoadJob(RunnableLoadJob, HasFollowupJobs): @@ -142,6 +143,18 @@ def get_job_id_from_file_path(file_path: str) -> str: class BigQueryMergeJob(SqlMergeFollowupJob): + @classmethod + def _gen_table_setup_clauses( + cls, table_chain: Sequence[PreparedTableSchema], sql_client: SqlClientBase[Any] + ) -> List[str]: + """generate final tables from staging table schema for autodetect tables""" + sql: List[str] = [] + for table in table_chain: + if should_autodetect_schema(table): + table_name, staging_table_name = sql_client.get_qualified_table_names(table["name"]) + sql.append(f"CREATE TABLE IF NOT EXISTS {table_name} LIKE {staging_table_name};") + return sql + @classmethod def gen_key_table_clauses( cls, diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index faa6b50531..2226dfeece 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -167,12 +167,24 @@ def generate_sql( merge_strategy = resolve_merge_strategy( {root_table["name"]: root_table}, root_table, sql_client.capabilities ) + + merge_sql = None if merge_strategy == "delete-insert": - return cls.gen_merge_sql(table_chain, sql_client) + merge_sql = cls.gen_merge_sql(table_chain, sql_client) elif merge_strategy == "upsert": - return cls.gen_upsert_sql(table_chain, sql_client) + merge_sql = cls.gen_upsert_sql(table_chain, sql_client) elif merge_strategy == "scd2": - return cls.gen_scd2_sql(table_chain, sql_client) + merge_sql = cls.gen_scd2_sql(table_chain, sql_client) + + # prepend setup code + return cls._gen_table_setup_clauses(table_chain, sql_client) + merge_sql + + @classmethod + def _gen_table_setup_clauses( + cls, table_chain: Sequence[PreparedTableSchema], sql_client: SqlClientBase[Any] + ) -> List[str]: + """Subclasses may override this method to generate additional sql statements to run before the merge""" + return [] @classmethod def _gen_key_table_clauses( @@ -501,12 +513,6 @@ def gen_merge_sql( root_table["name"] ) - # NOTE: this is bigquery specific code! Move to bigquery merge job - # NOTE: we also need to create all child tables - # NOTE: this will not work if the schema of the staging table changes in the next run.. - # in some cases we need to create final tables here - sql.append(f"CREATE TABLE IF NOT EXISTS {root_table_name} LIKE {staging_root_table_name};") - # get merge and primary keys from top level primary_keys = cls._escape_list( get_columns_names_with_prop(root_table, "primary_key"),