From 61aada54c17e4ad2f6074a2df62e1ef29681d744 Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 26 Dec 2023 20:22:47 +0800 Subject: [PATCH] support on configuration change (#29) --- dbt/include/risingwave/macros/adapters.sql | 53 ++++++++++++++++++- .../materializations/materialized_view.sql | 34 ++++++++++-- .../materializations/materializedview.sql | 34 ++++++++++-- 3 files changed, 111 insertions(+), 10 deletions(-) diff --git a/dbt/include/risingwave/macros/adapters.sql b/dbt/include/risingwave/macros/adapters.sql index a7b17f1..0a42a05 100644 --- a/dbt/include/risingwave/macros/adapters.sql +++ b/dbt/include/risingwave/macros/adapters.sql @@ -65,6 +65,10 @@ ({{ comma_separated_columns }}); {%- endmacro %} +{%- macro risingwave__get_drop_index_sql(relation, index_name) -%} + drop index if exists "{{ relation.schema }}"."{{ index_name }}" +{%- endmacro -%} + {% macro risingwave__drop_relation(relation) -%} {% call statement('drop_relation') -%} {% if relation.type == 'view' %} @@ -79,8 +83,6 @@ drop source if exists {{ relation }} cascade {% elif relation.type == 'sink' %} drop sink if exists {{ relation }} cascade - {% elif relation.type == 'index' %} - drop index if exists {{ relation }} cascade {% endif %} {%- endcall %} {% endmacro %} @@ -125,3 +127,50 @@ {%- endif %} {{ sql }}; {%- endmacro %} + +{%- macro risingwave__update_indexes_on_materialized_view(relation, index_changes) -%} + {{- log("Applying UPDATE INDEXES to: " ~ relation) -}} + + {%- for _index_change in index_changes -%} + {%- set _index = _index_change.context -%} + + {%- if _index_change.action == "drop" -%} + + {{ risingwave__get_drop_index_sql(relation, _index.name) }}; + + {%- elif _index_change.action == "create" -%} + + {{ risingwave__get_create_index_sql(relation, _index.as_node_config) }} + + {%- endif -%} + + {%- endfor -%} + +{%- endmacro -%} + +{% macro risingwave__get_show_indexes_sql(relation) %} + with index_info as ( + select + i.relname as name, + 'btree' as method, + ix.indisunique as "unique", + a.attname as attname, + array_position(ix.indkey, a.attnum) as ord + from pg_index ix + join pg_class i + on i.oid = ix.indexrelid + join pg_class t + on t.oid = ix.indrelid + join pg_namespace n + on n.oid = t.relnamespace + join pg_attribute a + on a.attrelid = t.oid + and a.attnum = ANY(ix.indkey) + where t.relname = '{{ relation.identifier }}' + and n.nspname = '{{ relation.schema }}' + and t.relkind in ('r', 'm') + ) + select name, method, "unique", array_to_string(array_agg(attname order by ord), ',') as column_names from index_info + group by 1, 2, 3 + order by 1, 2, 3; +{% endmacro %} diff --git a/dbt/include/risingwave/macros/materializations/materialized_view.sql b/dbt/include/risingwave/macros/materializations/materialized_view.sql index 757590c..33230fe 100644 --- a/dbt/include/risingwave/macros/materializations/materialized_view.sql +++ b/dbt/include/risingwave/macros/materializations/materialized_view.sql @@ -16,11 +16,37 @@ {{ run_hooks(pre_hooks, inside_transaction=False) }} {{ run_hooks(pre_hooks, inside_transaction=True) }} - {% call statement('main') -%} - {{ risingwave__create_materialized_view_as(target_relation, sql) }} - {%- endcall %} + {% if old_relation is none or (full_refresh_mode and old_relation) %} + {% call statement('main') -%} + {{ risingwave__create_materialized_view_as(target_relation, sql) }} + {%- endcall %} + + {{ create_indexes(target_relation) }} + {% else %} + -- get config options + {% set on_configuration_change = config.get('on_configuration_change') %} + {% set configuration_changes = get_materialized_view_configuration_changes(old_relation, config) %} + + {% if configuration_changes is none %} + -- do nothing + {{ materialized_view_execute_no_op(target_relation) }} + {% elif on_configuration_change == 'apply' %} + {% call statement('main') -%} + {{ risingwave__update_indexes_on_materialized_view(target_relation, configuration_changes.indexes) }} + {%- endcall %} + {% elif on_configuration_change == 'continue' %} + -- do nothing but a warn + {{ exceptions.warn("Configuration changes were identified and `on_configuration_change` was set to `continue` for `" ~ target_relation ~ "`") }} + {{ materialized_view_execute_no_op(target_relation) }} + {% elif on_configuration_change == 'fail' %} + {{ exceptions.raise_fail_fast_error("Configuration changes were identified and `on_configuration_change` was set to `fail` for `" ~ target_relation ~ "`") }} + {% else %} + -- this only happens if the user provides a value other than `apply`, 'continue', 'fail' + {{ exceptions.raise_compiler_error("Unexpected configuration scenario") }} + + {% endif %} + {% endif %} - {{ create_indexes(target_relation) }} {% do persist_docs(target_relation, model) %} {{ run_hooks(post_hooks, inside_transaction=False) }} diff --git a/dbt/include/risingwave/macros/materializations/materializedview.sql b/dbt/include/risingwave/macros/materializations/materializedview.sql index fb66e09..be222ea 100644 --- a/dbt/include/risingwave/macros/materializations/materializedview.sql +++ b/dbt/include/risingwave/macros/materializations/materializedview.sql @@ -16,11 +16,37 @@ {{ run_hooks(pre_hooks, inside_transaction=False) }} {{ run_hooks(pre_hooks, inside_transaction=True) }} - {% call statement('main') -%} - {{ risingwave__create_materialized_view_as(target_relation, sql) }} - {%- endcall %} + {% if old_relation is none or (full_refresh_mode and old_relation) %} + {% call statement('main') -%} + {{ risingwave__create_materialized_view_as(target_relation, sql) }} + {%- endcall %} + + {{ create_indexes(target_relation) }} + {% else %} + -- get config options + {% set on_configuration_change = config.get('on_configuration_change') %} + {% set configuration_changes = get_materialized_view_configuration_changes(old_relation, config) %} + + {% if configuration_changes is none %} + -- do nothing + {{ materialized_view_execute_no_op(target_relation) }} + {% elif on_configuration_change == 'apply' %} + {% call statement('main') -%} + {{ risingwave__update_indexes_on_materialized_view(target_relation, configuration_changes.indexes) }} + {%- endcall %} + {% elif on_configuration_change == 'continue' %} + -- do nothing but a warn + {{ exceptions.warn("Configuration changes were identified and `on_configuration_change` was set to `continue` for `" ~ target_relation ~ "`") }} + {{ materialized_view_execute_no_op(target_relation) }} + {% elif on_configuration_change == 'fail' %} + {{ exceptions.raise_fail_fast_error("Configuration changes were identified and `on_configuration_change` was set to `fail` for `" ~ target_relation ~ "`") }} + {% else %} + -- this only happens if the user provides a value other than `apply`, 'continue', 'fail' + {{ exceptions.raise_compiler_error("Unexpected configuration scenario") }} + + {% endif %} + {% endif %} - {{ create_indexes(target_relation) }} {% do persist_docs(target_relation, model) %} {{ run_hooks(post_hooks, inside_transaction=False) }}