From 23a0854a9e74b988e524e1365933caebf5e2375c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillermo=20Gonz=C3=A1lez=20de=20Ag=C3=BCero?= Date: Wed, 1 May 2024 13:11:07 +0200 Subject: [PATCH] WIP - Partitioning support --- dbt/include/postgres/macros/adapters.sql | 137 +++++++++++++++++++---- 1 file changed, 118 insertions(+), 19 deletions(-) diff --git a/dbt/include/postgres/macros/adapters.sql b/dbt/include/postgres/macros/adapters.sql index 294443be..4787ff24 100644 --- a/dbt/include/postgres/macros/adapters.sql +++ b/dbt/include/postgres/macros/adapters.sql @@ -4,29 +4,128 @@ {{ sql_header if sql_header is not none }} - create {% if temporary -%} - temporary - {%- elif unlogged -%} - unlogged - {%- endif %} table {{ relation }} - {% set contract_config = config.get('contract') %} - {% if contract_config.enforced %} - {{ get_assert_columns_equivalent(sql) }} - {% endif -%} - {% if contract_config.enforced and (not temporary) -%} - {{ get_table_columns_and_constraints() }} ; - insert into {{ relation }} ( - {{ adapter.dispatch('get_column_names', 'dbt')() }} - ) - {%- set sql = get_select_subquery(sql) %} + {% if config.get('partition_by') != None %} + {# Use partitioning #} + + {%- set partition_by_field = config.get('partition_by')['field'] -%} + {%- set partition_by_granularity = config.get('partition_by')['granularity'] -%} + + -- We cannot create a partitioned table with "create as select". + -- Create a dummy temporary table just to be used as a template for the real one + create temporary table "{{ this.identifier }}__tt" as + select * + from ({{ sql }}) model_subq + where 1 = 2; + + -- Create partitioned table as a copy of the template + create {% if temporary -%} + temporary + {%- elif unlogged -%} + unlogged + {%- endif %} table {{ relation }} (like "{{ this.identifier }}__tt") + partition by range ({{ partition_by_field }}); + + {% set required_partitions_query %} + -- Partitions need to be manually created before inserting. + -- We execute the model SQL to get the first and last partitions and use the granularity to define what others need to be created. + -- + -- Note that we this executes the SQL model a first time just to get the partitions and it will be done a second time to actually get the data. + -- That might be very inefficient depending on the selected volume of data. + -- There's a alternative: + -- - Create as select a temporary non-partitioned table. + -- - Get the min/max partitions from that table. + -- - Move the data to the final table once it's partitioned. + -- + -- That option would mean the data is stored twice during model execution. + with partitions as ( + -- Generate a sequence with one row per required partition, based on the min and max dates + select + generate_series( + date_trunc('{{ partition_by_granularity }}', min({{ partition_by_field }})), + date_trunc('{{ partition_by_granularity }}', max({{ partition_by_field }})), + '1 {{ partition_by_granularity }}'::interval + ) as begin_date, + (select floor(random() * 100 + 1)::int) as rand -- Random number to avoid name colissions + from ( + {{ sql }} + ) model_subq + ) + select + -- Generate the begin-end date and name suffix for each partition + begin_date, + begin_date + '1 {{ partition_by_granularity }}'::interval as end_date, + to_char(begin_date, '_yyyymmdd') as partition_suffix + from partitions; + {% endset %} + {% set required_partitions_results = run_query(required_partitions_query) %} + + -- Create the required partitions + {% for required_partition in required_partitions_results.rows %} + create + {% if temporary -%} + temporary + {%- elif unlogged -%} + unlogged + {%- endif %} + table + {{ make_intermediate_relation(relation, required_partition['partition_suffix']) }} + partition of {{ relation }} for values from ('{{ required_partition["begin_date"] }}'::timestamp) to ('{{ required_partition["end_date"] }}'::timestamp); + {% endfor %} + + -- Insert into the parent table + insert into {{ relation }} + {{ sql }}; {% else %} - as + create {% if temporary -%} + temporary + {%- elif unlogged -%} + unlogged + {%- endif %} table {{ relation }} + {% set contract_config = config.get('contract') %} + {% if contract_config.enforced %} + {{ get_assert_columns_equivalent(sql) }} + {% endif -%} + {% if contract_config.enforced and (not temporary) -%} + {{ get_table_columns_and_constraints() }} ; + insert into {{ relation }} ( + {{ adapter.dispatch('get_column_names', 'dbt')() }} + ) + {%- set sql = get_select_subquery(sql) %} + {% else %} + as + {% endif %} + ( + {{ sql }} + ); {% endif %} - ( - {{ sql }} - ); {%- endmacro %} +{% macro postgres__rename_relation(from_relation, to_relation) %} + {% set target_name = adapter.quote_as_configured(to_relation.identifier, 'identifier') %} + {% call statement('rename_relation') -%} + alter table {{ from_relation }} rename to {{ target_name }}; + + {# If the relation is partitioned, rename the subtables #} + {% set existing_partitions_query %} + select + inhrelid::regclass::text as from_table_name, + replace( + inhrelid::regclass::text, + '{{ from_relation.schema }}.{{ from_relation.identifier }}', -- Current partition name + '{{ to_relation.identifier }}' -- New partition name + ) as to_table_name + from pg_catalog.pg_inherits + where inhparent = '{{ from_relation.schema }}.{{ from_relation.identifier }}'::regclass; + {% endset %} + {% set existing_partitions_results = run_query(existing_partitions_query) %} + + -- Rename the existing partitions + {% for existing_partition in existing_partitions_results.rows %} + alter table {{ existing_partition["from_table_name"] }} rename to {{ existing_partition["to_table_name"] }}; + {% endfor %} + {%- endcall %} +{% endmacro %} + {% macro postgres__get_create_index_sql(relation, index_dict) -%} {%- set index_config = adapter.parse_index(index_dict) -%} {%- set comma_separated_columns = ", ".join(index_config.columns) -%}