Skip to content

Commit

Permalink
Add an option to use INFORMATION_SCHEMA for partition info retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Aug 7, 2023
1 parent b06d230 commit eed1562
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 8 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230807-235539.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add an option to use INFORMATION_SCHEMA for partition info retrieval
time: 2023-08-07T23:55:39.31409+02:00
custom:
Author: Kayrnt
Issue: "867"
10 changes: 6 additions & 4 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,15 +598,17 @@ def dry_run(self, sql: str) -> BigQueryAdapterResponse:
def _bq_job_link(location, project_id, job_id) -> str:
return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"

def get_partitions_metadata(self, table):
def get_partitions_metadata(self, table, use_legacy_sql=False):
def standard_to_legacy(table):
return table.project + ":" + table.dataset + "." + table.identifier

legacy_sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]"
if use_legacy_sql:
sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]"
else:
sql = f"SELECT * FROM `{table.project}.{table.dataset}.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '{table.identifier}'"

sql = self._add_query_comment(legacy_sql)
# auto_begin is ignored on bigquery, and only included for consistency
_, iterator = self.raw_execute(sql, use_legacy_sql=True)
_, iterator = self.raw_execute(self._add_query_comment(sql), use_legacy_sql=use_legacy_sql)
return self.get_table_from_response(iterator)

def copy_bq_table(self, source, destination, write_disposition):
Expand Down
12 changes: 10 additions & 2 deletions dbt/include/bigquery/macros/etc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
{% do adapter.grant_access_to(entity, entity_type, role, grant_target_dict) %}
{% endmacro %}

{%- macro get_partitions_metadata(table) -%}
{#
This macro returns the partition matadata for provided table.
The expected input is a table object (ie through a `source` or `ref`).
The output contains the result from partitions informations for your input table.
The details of the retrieved columns can be found on https://cloud.google.com/bigquery/docs/managing-partitioned-tables
if use_legacy_sql is set to True, the query will be executed using legacy sql and access the data from __PARTITIONS_SUMMARY__ meta-table
else it will leverage the INFORMATION_SCHEMA.PARTITIONS table.
#}
{%- macro get_partitions_metadata(table, use_legacy_sql = True) -%}
{%- if execute -%}
{%- set res = adapter.get_partitions_metadata(table) -%}
{%- set res = adapter.get_partitions_metadata(table, use_legacy_sql) -%}
{{- return(res) -}}
{%- endif -%}
{{- return(None) -}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,49 @@

{#-- TODO: revisit partitioning with python models --#}
{%- if '_dbt_max_partition' in compiled_code and language == 'sql' -%}
{%- if partition_by.partition_information = "information_schema" -%}
{{ dbt_max_partition_from_information_schema_data_sql(relation, partition_by) }}
{%- else -%}
{{ dbt_max_partition_from_table_data_sql(relation, partition_by) }}
{%- endif -%}

declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
{%- endif -%}

{% endmacro %}

{% macro dbt_max_partition_from_table_data_sql(relation, partition_by) %}

declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
select max({{ partition_by.field }}) from {{ this }}
where {{ partition_by.field }} is not null
);
);

{% endmacro %}

{% macro dbt_max_partition_from_information_schema_data_sql(relation, partition_by) %}

{%- if data_type is none -%}
{%- set data_type = partition_by.data_type -%}
{%- set granularity = partition_by.granularity -%}
{%- endif -%}

{# Format partition_id to match the declared variable type #}
{%- if data_type | lower in ('date', 'timestamp', 'datetime') -%}
{%- if granularity = "day" -%}
{%- set format = "%Y%m%d" -%}
{%- else -%}
{%- set format = "%Y%m%d%H" -%}
{%- endif -%}
{%- set field = "parse_" ~data_type ~ "('" ~ format ~ "', partition_id)" -%}
{%- else -%}
{%- set field = "CAST(partition_id AS INT64)" -%}
{%- endif -%}

declare _dbt_max_partition {{ partition_by.data_type_for_partition() }} default (
SELECT MAX({{ field }}) AS max_partition
FROM `{relation.project}.{relation.dataset}.INFORMATION_SCHEMA.PARTITIONS`
WHERE TABLE_NAME = '{relation.identifier}'
AND NOT(STARTS_WITH(partition_id, "__"))
);

{% endmacro %}

0 comments on commit eed1562

Please sign in to comment.