From 242400f701f8527977a770123dfaad8ffa2a0654 Mon Sep 17 00:00:00 2001 From: Jeremynadal33 <36603814+Jeremynadal33@users.noreply.github.com> Date: Mon, 26 Aug 2024 08:38:56 +0200 Subject: [PATCH] added on_schema_change possibility (#426) Co-authored-by: Noritaka Sekiyama --- CHANGELOG.md | 1 + .../macros/materializations/incremental/incremental.sql | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4869b4d3..0050c875 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## New version - Fix session provisioning timeout and delay handling +- Add on_schema_change possibility - Fix table materialization for Delta models ## v1.8.1 diff --git a/dbt/include/glue/macros/materializations/incremental/incremental.sql b/dbt/include/glue/macros/materializations/incremental/incremental.sql index 7c0da215..194e2e1d 100644 --- a/dbt/include/glue/macros/materializations/incremental/incremental.sql +++ b/dbt/include/glue/macros/materializations/incremental/incremental.sql @@ -11,21 +11,26 @@ {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} {%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} {% endif %} + {%- set unique_key = config.get('unique_key', none) -%} {% if unique_key is none and file_format == 'hudi' %} {{ exceptions.raise_compiler_error("unique_key model configuration is required for HUDI incremental materializations.") }} {% endif %} + {%- set partition_by = config.get('partition_by', none) -%} {%- set custom_location = config.get('custom_location', default='empty') -%} {%- set expire_snapshots = config.get('iceberg_expire_snapshots', 'True') -%} {%- set table_properties = config.get('table_properties', default='empty') -%} {% set target_relation = this %} + {%- set existing_relation = load_relation(this) -%} {% set existing_relation_type = adapter.get_table_type(target_relation) %} {% set tmp_relation = make_temp_relation(target_relation, '_tmp') %} {% set is_incremental = 'False' %} {% set lf_tags_config = config.get('lf_tags_config') %} {% set lf_grants = config.get('lf_grants') %} + {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} + {% call statement() %} set spark.sql.autoBroadcastJoinThreshold=-1 {% endcall %} @@ -74,6 +79,8 @@ {{ glue__create_tmp_table_as(tmp_relation, sql) }} {% set is_incremental = 'True' %} {% set build_sql = dbt_glue_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} + + {%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%} {% endif %} {% endif %}