From a855e397255e2f424e8c73d8987a19bbcc8835dd Mon Sep 17 00:00:00 2001 From: jnadal Date: Thu, 8 Aug 2024 11:15:22 +0200 Subject: [PATCH] fixed table materialization for delta format --- CHANGELOG.md | 2 +- dbt/adapters/glue/impl.py | 7 +++- dbt/include/glue/macros/adapters.sql | 10 ++++- .../table/delta_table_replace.sql | 42 ------------------- 4 files changed, 15 insertions(+), 46 deletions(-) delete mode 100644 dbt/include/glue/macros/materializations/table/delta_table_replace.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index e7267e5b..4869b4d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## New version - Fix session provisioning timeout and delay handling -- Add delta_table_replace materialization +- Fix table materialization for Delta models ## v1.8.1 - Fix typo in README.md diff --git a/dbt/adapters/glue/impl.py b/dbt/adapters/glue/impl.py index fac57744..2f9ef795 100644 --- a/dbt/adapters/glue/impl.py +++ b/dbt/adapters/glue/impl.py @@ -187,11 +187,14 @@ def get_relation(self, database, schema, identifier): DatabaseName=schema, Name=identifier ) + is_delta = response.get('Table').get("Parameters").get("spark.sql.sources.provider") == "delta" + relations = self.Relation.create( database=schema, schema=schema, identifier=identifier, - type=self.relation_type_map.get(response.get("Table", {}).get("TableType", "Table")) + type=self.relation_type_map.get(response.get("Table", {}).get("TableType", "Table")), + is_delta=is_delta ) logger.debug(f"""schema : {schema} identifier : {identifier} @@ -638,7 +641,7 @@ def delta_create_table(self, target_relation, request, primary_key, partition_ke location = custom_location create_table_query = f""" -CREATE TABLE {table_name} +CREATE OR REPLACE TABLE {table_name} USING delta LOCATION '{location}' """ diff --git a/dbt/include/glue/macros/adapters.sql b/dbt/include/glue/macros/adapters.sql index b1613442..b79f251c 100644 --- a/dbt/include/glue/macros/adapters.sql +++ b/dbt/include/glue/macros/adapters.sql @@ -62,10 +62,18 @@ {%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%} {%- set table_properties = config.get('table_properties', default={}) -%} + {%- set create_statement_string -%} + {% if file_format in ['delta', 'iceberg'] -%} + create or replace table + {%- else -%} + create table + {% endif %} + {%- endset %} + {% if temporary -%} {{ create_temporary_view(relation, sql) }} {%- else -%} - create table {{ relation }} + {{ create_statement_string }} {{ relation }} {% set contract_config = config.get('contract') %} {% if contract_config.enforced %} {{ get_assert_columns_equivalent(sql) }} diff --git a/dbt/include/glue/macros/materializations/table/delta_table_replace.sql b/dbt/include/glue/macros/materializations/table/delta_table_replace.sql deleted file mode 100644 index 1b18e2ef..00000000 --- a/dbt/include/glue/macros/materializations/table/delta_table_replace.sql +++ /dev/null @@ -1,42 +0,0 @@ -{% materialization delta_table_replace, adapter='glue' %} - {%- set partition_by = config.get('partition_by', none) -%} - {%- set table_properties = config.get('table_properties', default={}) -%} - {% set lf_tags_config = config.get('lf_tags_config') -%} - {%- set lf_grants = config.get('lf_grants') -%} - {%- set target_relation = this -%} - {%- set build_sql = delta_create_or_replace(target_relation, table_properties) -%} - - {{ run_hooks(pre_hooks) }} - - {%- call statement('main') -%} - {{ build_sql }} - {%- endcall -%} - - {% do persist_docs(target_relation, model) %} - - {% do persist_constraints(target_relation, model) %} - - {% if lf_tags_config is not none %} - {{ adapter.add_lf_tags(target_relation, lf_tags_config) }} - {% endif %} - - {% if lf_grants is not none %} - {{ adapter.apply_lf_grants(target_relation, lf_grants) }} - {% endif %} - - {{ run_hooks(post_hooks) }} - - {{ return({'relations': [target_relation]}) }} - -{% endmaterialization %} - -{% macro delta_create_or_replace(relation, table_properties) %} - create or replace table {{ relation }} - using delta - {{ set_table_properties(table_properties) }} - {{ partition_cols(label="partitioned by") }} - {{ glue__location_clause(relation) }} - {{ comment_clause() }} - as - {{ sql }} -{% endmacro %}