diff --git a/.changes/unreleased/Features-20240903-155618.yaml b/.changes/unreleased/Features-20240903-155618.yaml new file mode 100644 index 00000000..fdafceb3 --- /dev/null +++ b/.changes/unreleased/Features-20240903-155618.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Allow configuring of snapshot column names +time: 2024-09-03T15:56:18.211492-04:00 +custom: + Author: gshank + Issue: "289" diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 07b65c40..f3788fe3 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -83,7 +83,6 @@ QuoteConfigTypeError, RelationReturnedMultipleResultsError, RenameToNoneAttemptedError, - SnapshotTargetIncompleteError, SnapshotTargetNotSnapshotTableError, UnexpectedNonTimestampError, ) @@ -764,7 +763,9 @@ def get_missing_columns( return [col for (col_name, col) in from_columns.items() if col_name in missing_columns] @available.parse_none - def valid_snapshot_target(self, relation: BaseRelation) -> None: + def valid_snapshot_target( + self, relation: BaseRelation, column_names: Optional[Dict[str, str]] = None + ) -> None: """Ensure that the target relation is valid, by making sure it has the expected columns. @@ -782,21 +783,16 @@ def valid_snapshot_target(self, relation: BaseRelation) -> None: columns = self.get_columns_in_relation(relation) names = set(c.name.lower() for c in columns) - expanded_keys = ("scd_id", "valid_from", "valid_to") - extra = [] missing = [] - for legacy in expanded_keys: - desired = "dbt_" + legacy + # Note: we're not checking dbt_updated_at here because it's not + # always present. + for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"): + desired = column_names[column] if column_names else column if desired not in names: missing.append(desired) - if legacy in names: - extra.append(legacy) if missing: - if extra: - raise SnapshotTargetIncompleteError(extra, missing) - else: - raise SnapshotTargetNotSnapshotTableError(missing) + raise SnapshotTargetNotSnapshotTableError(missing) @available.parse_none def expand_target_column_types( diff --git a/dbt/adapters/exceptions/compilation.py b/dbt/adapters/exceptions/compilation.py index 46ca5219..d82924e3 100644 --- a/dbt/adapters/exceptions/compilation.py +++ b/dbt/adapters/exceptions/compilation.py @@ -150,8 +150,10 @@ def __init__(self, missing: List): super().__init__(msg=self.get_message()) def get_message(self) -> str: - msg = 'Snapshot target is not a snapshot table (missing "{}")'.format( - '", "'.join(self.missing) + missing = '", "'.join(self.missing) + msg = ( + f'Snapshot target is missing configured columns (missing "{missing}"). ' + "See https://docs.getdbt.com/docs/build/snapshots#snapshot-meta-fields for more information." ) return msg diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 9e8575e9..8d982855 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -34,7 +34,12 @@ {{ adapter.dispatch('snapshot_staging_table', 'dbt')(strategy, source_sql, target_relation) }} {% endmacro %} +{% macro get_snapshot_table_column_names() %} + {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }} +{% endmacro %} + {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} with snapshot_query as ( @@ -48,7 +53,7 @@ {{ strategy.unique_key }} as dbt_unique_key from {{ target_relation }} - where dbt_valid_to is null + where {{ columns.dbt_valid_to }} is null ), @@ -57,10 +62,10 @@ select *, {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, - {{ strategy.scd_id }} as dbt_scd_id + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }}, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }} from snapshot_query ), @@ -70,9 +75,9 @@ select *, {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - {{ strategy.updated_at }} as dbt_valid_to + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} from snapshot_query ), @@ -111,7 +116,7 @@ select 'update' as dbt_change_type, source_data.*, - snapshotted_data.dbt_scd_id + snapshotted_data.{{ columns.dbt_scd_id }} from updates_source_data as source_data join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key @@ -128,10 +133,10 @@ select 'delete' as dbt_change_type, source_data.*, - {{ snapshot_get_time() }} as dbt_valid_from, - {{ snapshot_get_time() }} as dbt_updated_at, - {{ snapshot_get_time() }} as dbt_valid_to, - snapshotted_data.dbt_scd_id + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }} from snapshotted_data left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key @@ -155,12 +160,13 @@ {% endmacro %} {% macro default__build_snapshot_table(strategy, sql) %} + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} select *, - {{ strategy.scd_id }} as dbt_scd_id, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }} from ( {{ sql }} ) sbq diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index 7be94701..5daead4c 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -1,5 +1,4 @@ {% materialization snapshot, default %} - {%- set config = model['config'] -%} {%- set target_table = model.get('alias', model.get('name')) -%} @@ -24,7 +23,9 @@ {{ run_hooks(pre_hooks, inside_transaction=True) }} {% set strategy_macro = strategy_dispatch(strategy_name) %} - {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} + {# The model['config'] parameter below is no longer used, but passing anyway for compatibility #} + {# It was a dictionary of config, instead of the config object from the context #} + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %} {% if not target_relation_exists %} @@ -34,7 +35,9 @@ {% else %} - {{ adapter.valid_snapshot_target(target_relation) }} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} + + {{ adapter.valid_snapshot_target(target_relation, columns) }} {% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %} {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql index 56798811..74494ed2 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql @@ -7,15 +7,17 @@ {% macro default__snapshot_merge_sql(target, source, insert_cols) -%} {%- set insert_cols_csv = insert_cols | join(', ') -%} + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} + merge into {{ target.render() }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id + on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }} when matched - and DBT_INTERNAL_DEST.dbt_valid_to is null + and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') then update - set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }} when not matched and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index d22cc336..8c086182 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -49,10 +49,13 @@ {# Core strategy definitions #} -{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set primary_key = config['unique_key'] %} - {% set updated_at = config['updated_at'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} + +{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %} + {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} + {% set primary_key = config.get('unique_key') %} + {% set updated_at = config.get('updated_at') %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} {#/* The snapshot relation might not have an {{ updated_at }} value if the @@ -64,7 +67,7 @@ See https://github.com/dbt-labs/dbt-core/issues/2350 */ #} {% set row_changed_expr -%} - ({{ snapshotted_rel }}.dbt_valid_from < {{ current_rel }}.{{ updated_at }}) + ({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }}) {%- endset %} {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} @@ -133,11 +136,12 @@ {%- endmacro %} -{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set check_cols_config = config['check_cols'] %} - {% set primary_key = config['unique_key'] %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} - {% set updated_at = config.get('updated_at', snapshot_get_time()) %} +{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %} + {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} + {% set check_cols_config = config.get('check_cols') %} + {% set primary_key = config.get('unique_key') %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set updated_at = config.get('updated_at') or snapshot_get_time() %} {% set column_added = false %}