Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/migrate sql gen to macro #461

Merged
merged 7 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## New version
- Allow to load big seed files
- Migrates the PySpark code for the Iceberg file format at a macro level, making the impl.py file more readable.
- Fixes the get_columns_in_relation function to work for both Iceberg and non-Iceberg tables without hard-coding the catalog name.
- Fixes the get_location table function to work for both Iceberg and non-Iceberg tables on macOS and Windows.
- Adds a helper function to retrieve the Iceberg catalog namespace from the profile.yaml file.
- Adds merge_exclude_columns and incremental_predicates features.


## v1.8.6
- Fix session provisioning timeout and delay handling
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/glue/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class GlueCredentials(Credentials):
datalake_formats: Optional[str] = None
enable_session_per_model: Optional[bool] = False
use_arrow: Optional[bool] = False

custom_iceberg_catalog_namespace: Optional[str] = "glue_catalog"

@property
def type(self):
Expand Down
259 changes: 51 additions & 208 deletions dbt/adapters/glue/impl.py

Large diffs are not rendered by default.

105 changes: 48 additions & 57 deletions dbt/include/glue/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
{% macro glue__location_clause(relation) %}
{% macro glue__location_clause() %}
{%- set custom_location = config.get('custom_location', validator=validation.any[basestring]) -%}
{%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%}
{%- set materialized = config.get('materialized') -%}

{%- if custom_location is not none %}
location '{{ custom_location }}'
{%- else -%}
{% if file_format == 'iceberg' %}
{{ adapter.get_iceberg_location(relation) }}
{%- else -%}
{{ adapter.get_location(relation) }}
{%- endif %}
{{ adapter.get_location(this) }}
{%- endif %}
{%- endmacro -%}

Expand All @@ -24,17 +20,27 @@
{{return('')}}
{%- endmacro %}

{% macro glue__make_target_relation(relation, file_format) %}
{%- set iceberg_catalog = adapter.get_custom_iceberg_catalog_namespace() -%}
{%- set first_iceberg_load = (file_format == 'iceberg') -%}
{%- set non_null_catalog = (iceberg_catalog is not none) -%}
{%- if non_null_catalog and first_iceberg_load %}
{# /* We add the iceberg catalog is the following cases */ #}
{%- do return(relation.incorporate(path={"schema": iceberg_catalog ~ '.' ~ relation.schema, "identifier": relation.identifier})) -%}
{%- else -%}
{# /* Otherwise we keep the relation as it is */ #}
{%- do return(relation) -%}
{%- endif %}
{% endmacro %}

{% macro glue__drop_relation(relation) -%}

{% call statement('drop_relation', auto_begin=False) -%}
{% set rel_type = adapter.get_table_type(relation) %}
{%- if rel_type is not none and rel_type != 'iceberg_table' %}
drop {{ rel_type }} if exists {{ relation }}
{%- elif rel_type is not none and rel_type == 'iceberg_table' %}
{%- set default_catalog = 'glue_catalog' -%}
drop table if exists {{ default_catalog }}.{{ relation }}
{%- else -%}
drop table if exists {{ relation }}
{%- endif %}
{%- if relation.type == 'view' %}
drop view if exists {{ this }}
{%- else -%}
drop table if exists {{ relation }}
{%- endif %}
{%- endcall %}
{% endmacro %}

Expand All @@ -59,50 +65,37 @@
{%- endmacro -%}

{% macro glue__create_table_as(temporary, relation, sql) -%}
{%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%}
{%- set table_properties = config.get('table_properties', default={}) -%}

{%- set create_statement_string -%}
{% if file_format in ['delta', 'iceberg'] -%}
create or replace table
{%- else -%}
create table
{% endif %}
{%- endset %}

{% if temporary -%}
{{ create_temporary_view(relation, sql) }}
{%- else -%}
{{ create_statement_string }} {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{#-- This does not enforce contstraints and needs to be a TODO #}
{#-- We'll need to change up the query because with CREATE TABLE AS SELECT, #}
{#-- you do not specify the columns #}
{%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%}
{%- set table_properties = config.get('table_properties', default={}) -%}

{%- set create_statement_string -%}
{% if file_format in ['delta', 'iceberg'] -%}
create or replace table
{%- else -%}
create table
{% endif %}
{{ glue__file_format_clause() }}
{{ partition_cols(label="partitioned by") }}
{{ clustered_cols(label="clustered by") }}
{{ set_table_properties(table_properties) }}
{{ glue__location_clause(relation) }}
{{ comment_clause() }}
as
{{ sql }}
{%- endif %}
{%- endmacro -%}

{% macro glue__create_tmp_table_as(relation, sql) -%}
{% call statement("create_tmp_table_as", fetch_result=false, auto_begin=false) %}
set spark.sql.legacy.allowNonEmptyLocationInCTAS=true
dbt_next_query
DROP TABLE IF EXISTS {{ relation }}
dbt_next_query
create table {{ relation }}
{{ adapter.get_location(relation) }}
{%- endset %}

{{ create_statement_string }} {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{#-- This does not enforce contstraints and needs to be a TODO #}
{#-- We'll need to change up the query because with CREATE TABLE AS SELECT, #}
{#-- you do not specify the columns #}
{% endif %}
{{ glue__file_format_clause() }}
{{ partition_cols(label="partitioned by") }}
{{ clustered_cols(label="clustered by") }}
{{ set_table_properties(table_properties) }}
{{ glue__location_clause() }}
{{ comment_clause() }}
as
{{ sql }}
{% endcall %}
{{ sql }}
{%- endif %}
{%- endmacro -%}

{% macro glue__snapshot_get_time() -%}
Expand Down Expand Up @@ -143,9 +136,7 @@
{%- if contract_config.enforced -%}
{{ get_assert_columns_equivalent(sql) }}
{%- endif -%}
DROP VIEW IF EXISTS {{ relation }}
dbt_next_query
create view {{ relation }}
create or replace view {{ relation }}
as
{{ sql }}
{% endmacro %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,100 +1,94 @@
{% materialization incremental, adapter='glue' -%}

{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{# /*-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --*/ #}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_strategy = config.get('incremental_strategy', default='insert_overwrite') -%}
{%- set file_format = dbt_glue_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_glue_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{% if raw_file_format == 'iceberg' %}
{%- set file_format = 'iceberg' -%}
{%- set strategy = raw_strategy -%}
{% else %}
{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
{% endif %}

{# /*-- Set vars --*/ #}
{%- set language = model['language'] -%}
{%- set existing_relation_type = adapter.get_table_type(this) -%}
{%- set existing_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = existing_relation or glue__make_target_relation(this, config.get('file_format')) -%}
{%- set tmp_relation = make_temp_relation(this, '_tmp').include(schema=false) -%}
{%- set unique_key = config.get('unique_key', none) -%}
{% if unique_key is none and file_format == 'hudi' %}
{{ exceptions.raise_compiler_error("unique_key model configuration is required for HUDI incremental materializations.") }}
{% endif %}

{%- set partition_by = config.get('partition_by', none) -%}
{%- set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) -%}
{%- set custom_location = config.get('custom_location', default='empty') -%}
{%- set expire_snapshots = config.get('iceberg_expire_snapshots', 'True') -%}
{%- set table_properties = config.get('table_properties', default='empty') -%}
{%- set lf_tags_config = config.get('lf_tags_config') -%}
{%- set lf_grants = config.get('lf_grants') -%}
{%- set delta_create_table_write_options = config.get('write_options', default={}) -%}

{% set target_relation = this %}
{%- set existing_relation = load_relation(this) -%}
{% set existing_relation_type = adapter.get_table_type(target_relation) %}
{% set tmp_relation = make_temp_relation(target_relation, '_tmp') %}
{% set is_incremental = 'False' %}
{% set lf_tags_config = config.get('lf_tags_config') %}
{% set lf_grants = config.get('lf_grants') %}
{%- set substitute_variables = config.get('substitute_variables', default=[]) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}
{%- set is_incremental = 'False' -%}

{% call statement() %}
set spark.sql.autoBroadcastJoinThreshold=-1
{% endcall %}
{% if existing_relation_type is not none %}
{%- set target_relation = target_relation.incorporate(type=existing_relation_type if existing_relation_type != "iceberg_table" else "table") -%}
{% endif %}

{# /*-- Validate specific requirements for hudi --*/ #}
{% if unique_key is none and file_format == 'hudi' %}
{{ exceptions.raise_compiler_error("unique_key model configuration is required for HUDI incremental materializations.") }}
{% endif %}

{# /*-- Run pre-hooks --*/ #}
{{ run_hooks(pre_hooks) }}
{%- set substitute_variables = config.get('substitute_variables', default=[]) -%}

{# /*-- Incremental Process Logic --*/ #}
{% if file_format == 'hudi' %}
{%- set hudi_options = config.get('hudi_options', default={}) -%}
{{ adapter.hudi_merge_table(target_relation, sql, unique_key, partition_by, custom_location, hudi_options, substitute_variables) }}
{% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 "%}
{% else %}
{% if strategy == 'insert_overwrite' and partition_by %}
{% call statement() %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
{% endif %}
{% if existing_relation_type is none %}
{# /*-- If the relation doesn't exist it needs to be created --*/ #}
{% if file_format == 'delta' %}
{{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location, delta_create_table_write_options) }}
{% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %}
{% elif file_format == 'iceberg' %}
{{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }}
{% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%}
{% else %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% endif %}
{% elif existing_relation_type == 'view' or should_full_refresh() %}
{# /*-- Relation must be dropped and created --*/ #}
{{ drop_relation(target_relation) }}
{% if file_format == 'delta' %}
{{ adapter.delta_create_table(target_relation, sql, unique_key, partition_by, custom_location, delta_create_table_write_options) }}
{% set build_sql = "select * from " + target_relation.schema + "." + target_relation.identifier + " limit 1 " %}
{% elif file_format == 'iceberg' %}
{{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }}
{% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%}
{% else %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% endif %}
{% elif file_format == 'iceberg' %}
{{ adapter.iceberg_write(target_relation, sql, unique_key, partition_by, custom_location, strategy, table_properties) }}
{% set build_sql = "select * from glue_catalog." + target_relation.schema + "." + target_relation.identifier + " limit 1 "%}
{%- if expire_snapshots == 'True' -%}
{%- set result = adapter.iceberg_expire_snapshots(target_relation) -%}
{%- endif -%}
{% else %}
{{ glue__create_tmp_table_as(tmp_relation, sql) }}
{# /*-- Relation must be merged --*/ #}
{%- call statement('create_tmp_view') -%}
{{ create_table_as(True, tmp_relation, sql) }}
{%- endcall -%}
{% set is_incremental = 'True' %}
{% set build_sql = dbt_glue_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}

{%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
{% set build_sql = dbt_glue_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, incremental_predicates) %}
{%- do process_schema_changes(on_schema_change, tmp_relation, target_relation) -%}
{% endif %}
{% endif %}

{# /*-- Excute the main statement --*/ #}
{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{# /*-- To not break existing workloads, but I think this doesn't need to be here since it can be addeed as post_hook query --*/ #}
{%- if file_format == 'iceberg' and expire_snapshots == 'True' -%}
{%- set result = adapter.iceberg_expire_snapshots(target_relation) -%}
{%- endif -%}

{# /*-- Run post-hooks --*/ #}
{{ run_hooks(post_hooks) }}

{# /*-- setup lake formation tags --*/ #}
{% if lf_tags_config is not none %}
{{ adapter.add_lf_tags(target_relation, lf_tags_config) }}
{% endif %}

{# /*-- setup lake formation grants --*/ #}
{% if lf_grants is not none %}
{{ adapter.apply_lf_grants(target_relation, lf_grants) }}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro get_insert_overwrite_sql(source_relation, target_relation) %}
{%- set dest_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='name') | join(', ') -%}
set hive.exec.dynamic.partition.mode=nonstrict
dbt_next_query
Expand All @@ -16,16 +16,60 @@
select {{dest_cols_csv}} from {{ source_relation }}
{% endmacro %}

{% macro dbt_glue_get_incremental_sql(strategy, source, target, unique_key) %}

{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{# /* need dest_columns for merge_exclude_columns, default to use "*" */ #}
{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{%- set dest_columns = adapter.get_columns_in_relation(target) -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}

when matched then update set
{% if update_columns -%}{%- for column_name in update_columns %}
{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
{%- else %} * {% endif %}

when not matched then insert *
{% endmacro %}


{% macro dbt_glue_get_incremental_sql(strategy, source, target, unique_key, incremental_predicates) %}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
No known SQL for the incremental strategy provided: {{ strategy }}
Expand Down
Loading
Loading