From eed156297d851fbfa13ef14edb8c614582eef2f2 Mon Sep 17 00:00:00 2001 From: Christophe Oudar Date: Mon, 7 Aug 2023 17:58:16 +0200 Subject: [PATCH] Add an option to use INFORMATION_SCHEMA for partition info retrieval --- .../unreleased/Features-20230807-235539.yaml | 6 +++ dbt/adapters/bigquery/connections.py | 10 +++-- dbt/include/bigquery/macros/etc.sql | 12 +++++- .../incremental_strategy/common.sql | 41 ++++++++++++++++++- 4 files changed, 61 insertions(+), 8 deletions(-) create mode 100644 .changes/unreleased/Features-20230807-235539.yaml diff --git a/.changes/unreleased/Features-20230807-235539.yaml b/.changes/unreleased/Features-20230807-235539.yaml new file mode 100644 index 0000000000..0fbde028f3 --- /dev/null +++ b/.changes/unreleased/Features-20230807-235539.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add an option to use INFORMATION_SCHEMA for partition info retrieval +time: 2023-08-07T23:55:39.31409+02:00 +custom: + Author: Kayrnt + Issue: "867" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index b466fee3b9..11548cda48 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -598,15 +598,17 @@ def dry_run(self, sql: str) -> BigQueryAdapterResponse: def _bq_job_link(location, project_id, job_id) -> str: return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults" - def get_partitions_metadata(self, table): + def get_partitions_metadata(self, table, use_legacy_sql=False): def standard_to_legacy(table): return table.project + ":" + table.dataset + "." + table.identifier - legacy_sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]" + if use_legacy_sql: + sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]" + else: + sql = f"SELECT * FROM `{table.project}.{table.dataset}.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '{table.identifier}'" - sql = self._add_query_comment(legacy_sql) # auto_begin is ignored on bigquery, and only included for consistency - _, iterator = self.raw_execute(sql, use_legacy_sql=True) + _, iterator = self.raw_execute(self._add_query_comment(sql), use_legacy_sql=use_legacy_sql) return self.get_table_from_response(iterator) def copy_bq_table(self, source, destination, write_disposition): diff --git a/dbt/include/bigquery/macros/etc.sql b/dbt/include/bigquery/macros/etc.sql index 59b61473ed..92cd091695 100644 --- a/dbt/include/bigquery/macros/etc.sql +++ b/dbt/include/bigquery/macros/etc.sql @@ -6,9 +6,17 @@ {% do adapter.grant_access_to(entity, entity_type, role, grant_target_dict) %} {% endmacro %} -{%- macro get_partitions_metadata(table) -%} +{# + This macro returns the partition matadata for provided table. + The expected input is a table object (ie through a `source` or `ref`). + The output contains the result from partitions informations for your input table. + The details of the retrieved columns can be found on https://cloud.google.com/bigquery/docs/managing-partitioned-tables + if use_legacy_sql is set to True, the query will be executed using legacy sql and access the data from __PARTITIONS_SUMMARY__ meta-table + else it will leverage the INFORMATION_SCHEMA.PARTITIONS table. +#} +{%- macro get_partitions_metadata(table, use_legacy_sql = True) -%} {%- if execute -%} - {%- set res = adapter.get_partitions_metadata(table) -%} + {%- set res = adapter.get_partitions_metadata(table, use_legacy_sql) -%} {{- return(res) -}} {%- endif -%} {{- return(None) -}} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql index 9d71ba7c0a..f5846b2912 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql @@ -2,12 +2,49 @@ {#-- TODO: revisit partitioning with python models --#} {%- if '_dbt_max_partition' in compiled_code and language == 'sql' -%} + {%- if partition_by.partition_information = "information_schema" -%} + {{ dbt_max_partition_from_information_schema_data_sql(relation, partition_by) }} + {%- else -%} + {{ dbt_max_partition_from_table_data_sql(relation, partition_by) }} + {%- endif -%} - declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default ( + {%- endif -%} + +{% endmacro %} + +{% macro dbt_max_partition_from_table_data_sql(relation, partition_by) %} + + declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default ( select max({{ partition_by.field }}) from {{ this }} where {{ partition_by.field }} is not null - ); + ); +{% endmacro %} + +{% macro dbt_max_partition_from_information_schema_data_sql(relation, partition_by) %} + + {%- if data_type is none -%} + {%- set data_type = partition_by.data_type -%} + {%- set granularity = partition_by.granularity -%} {%- endif -%} + {# Format partition_id to match the declared variable type #} + {%- if data_type | lower in ('date', 'timestamp', 'datetime') -%} + {%- if granularity = "day" -%} + {%- set format = "%Y%m%d" -%} + {%- else -%} + {%- set format = "%Y%m%d%H" -%} + {%- endif -%} + {%- set field = "parse_" ~data_type ~ "('" ~ format ~ "', partition_id)" -%} + {%- else -%} + {%- set field = "CAST(partition_id AS INT64)" -%} + {%- endif -%} + + declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default ( + SELECT MAX({{ field }}) AS max_partition + FROM `{relation.project}.{relation.dataset}.INFORMATION_SCHEMA.PARTITIONS` + WHERE TABLE_NAME = '{relation.identifier}' + AND NOT(STARTS_WITH(partition_id, "__")) + ); + {% endmacro %}