Skip to content

Commit

Permalink
support on configuration change (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Dec 26, 2023
1 parent ab1874b commit 61aada5
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 10 deletions.
53 changes: 51 additions & 2 deletions dbt/include/risingwave/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
({{ comma_separated_columns }});
{%- endmacro %}

{%- macro risingwave__get_drop_index_sql(relation, index_name) -%}
drop index if exists "{{ relation.schema }}"."{{ index_name }}"
{%- endmacro -%}

{% macro risingwave__drop_relation(relation) -%}
{% call statement('drop_relation') -%}
{% if relation.type == 'view' %}
Expand All @@ -79,8 +83,6 @@
drop source if exists {{ relation }} cascade
{% elif relation.type == 'sink' %}
drop sink if exists {{ relation }} cascade
{% elif relation.type == 'index' %}
drop index if exists {{ relation }} cascade
{% endif %}
{%- endcall %}
{% endmacro %}
Expand Down Expand Up @@ -125,3 +127,50 @@
{%- endif %}
{{ sql }};
{%- endmacro %}

{%- macro risingwave__update_indexes_on_materialized_view(relation, index_changes) -%}
{{- log("Applying UPDATE INDEXES to: " ~ relation) -}}

{%- for _index_change in index_changes -%}
{%- set _index = _index_change.context -%}

{%- if _index_change.action == "drop" -%}

{{ risingwave__get_drop_index_sql(relation, _index.name) }};

{%- elif _index_change.action == "create" -%}

{{ risingwave__get_create_index_sql(relation, _index.as_node_config) }}

{%- endif -%}

{%- endfor -%}

{%- endmacro -%}

{% macro risingwave__get_show_indexes_sql(relation) %}
with index_info as (
select
i.relname as name,
'btree' as method,
ix.indisunique as "unique",
a.attname as attname,
array_position(ix.indkey, a.attnum) as ord
from pg_index ix
join pg_class i
on i.oid = ix.indexrelid
join pg_class t
on t.oid = ix.indrelid
join pg_namespace n
on n.oid = t.relnamespace
join pg_attribute a
on a.attrelid = t.oid
and a.attnum = ANY(ix.indkey)
where t.relname = '{{ relation.identifier }}'
and n.nspname = '{{ relation.schema }}'
and t.relkind in ('r', 'm')
)
select name, method, "unique", array_to_string(array_agg(attname order by ord), ',') as column_names from index_info
group by 1, 2, 3
order by 1, 2, 3;
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,37 @@
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% call statement('main') -%}
{{ risingwave__create_materialized_view_as(target_relation, sql) }}
{%- endcall %}
{% if old_relation is none or (full_refresh_mode and old_relation) %}
{% call statement('main') -%}
{{ risingwave__create_materialized_view_as(target_relation, sql) }}
{%- endcall %}

{{ create_indexes(target_relation) }}
{% else %}
-- get config options
{% set on_configuration_change = config.get('on_configuration_change') %}
{% set configuration_changes = get_materialized_view_configuration_changes(old_relation, config) %}

{% if configuration_changes is none %}
-- do nothing
{{ materialized_view_execute_no_op(target_relation) }}
{% elif on_configuration_change == 'apply' %}
{% call statement('main') -%}
{{ risingwave__update_indexes_on_materialized_view(target_relation, configuration_changes.indexes) }}
{%- endcall %}
{% elif on_configuration_change == 'continue' %}
-- do nothing but a warn
{{ exceptions.warn("Configuration changes were identified and `on_configuration_change` was set to `continue` for `" ~ target_relation ~ "`") }}
{{ materialized_view_execute_no_op(target_relation) }}
{% elif on_configuration_change == 'fail' %}
{{ exceptions.raise_fail_fast_error("Configuration changes were identified and `on_configuration_change` was set to `fail` for `" ~ target_relation ~ "`") }}
{% else %}
-- this only happens if the user provides a value other than `apply`, 'continue', 'fail'
{{ exceptions.raise_compiler_error("Unexpected configuration scenario") }}

{% endif %}
{% endif %}

{{ create_indexes(target_relation) }}
{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=False) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,37 @@
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% call statement('main') -%}
{{ risingwave__create_materialized_view_as(target_relation, sql) }}
{%- endcall %}
{% if old_relation is none or (full_refresh_mode and old_relation) %}
{% call statement('main') -%}
{{ risingwave__create_materialized_view_as(target_relation, sql) }}
{%- endcall %}

{{ create_indexes(target_relation) }}
{% else %}
-- get config options
{% set on_configuration_change = config.get('on_configuration_change') %}
{% set configuration_changes = get_materialized_view_configuration_changes(old_relation, config) %}

{% if configuration_changes is none %}
-- do nothing
{{ materialized_view_execute_no_op(target_relation) }}
{% elif on_configuration_change == 'apply' %}
{% call statement('main') -%}
{{ risingwave__update_indexes_on_materialized_view(target_relation, configuration_changes.indexes) }}
{%- endcall %}
{% elif on_configuration_change == 'continue' %}
-- do nothing but a warn
{{ exceptions.warn("Configuration changes were identified and `on_configuration_change` was set to `continue` for `" ~ target_relation ~ "`") }}
{{ materialized_view_execute_no_op(target_relation) }}
{% elif on_configuration_change == 'fail' %}
{{ exceptions.raise_fail_fast_error("Configuration changes were identified and `on_configuration_change` was set to `fail` for `" ~ target_relation ~ "`") }}
{% else %}
-- this only happens if the user provides a value other than `apply`, 'continue', 'fail'
{{ exceptions.raise_compiler_error("Unexpected configuration scenario") }}

{% endif %}
{% endif %}

{{ create_indexes(target_relation) }}
{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=False) }}
Expand Down

0 comments on commit 61aada5

Please sign in to comment.