Skip to content

Commit

Permalink
Snapshot column names (#290)
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Sep 18, 2024
1 parent 2ba660f commit db11adc
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 47 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240903-155618.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allow configuring of snapshot column names
time: 2024-09-03T15:56:18.211492-04:00
custom:
Author: gshank
Issue: "289"
20 changes: 8 additions & 12 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
QuoteConfigTypeError,
RelationReturnedMultipleResultsError,
RenameToNoneAttemptedError,
SnapshotTargetIncompleteError,
SnapshotTargetNotSnapshotTableError,
UnexpectedNonTimestampError,
)
Expand Down Expand Up @@ -764,7 +763,9 @@ def get_missing_columns(
return [col for (col_name, col) in from_columns.items() if col_name in missing_columns]

@available.parse_none
def valid_snapshot_target(self, relation: BaseRelation) -> None:
def valid_snapshot_target(
self, relation: BaseRelation, column_names: Optional[Dict[str, str]] = None
) -> None:
"""Ensure that the target relation is valid, by making sure it has the
expected columns.
Expand All @@ -782,21 +783,16 @@ def valid_snapshot_target(self, relation: BaseRelation) -> None:

columns = self.get_columns_in_relation(relation)
names = set(c.name.lower() for c in columns)
expanded_keys = ("scd_id", "valid_from", "valid_to")
extra = []
missing = []
for legacy in expanded_keys:
desired = "dbt_" + legacy
# Note: we're not checking dbt_updated_at here because it's not
# always present.
for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"):
desired = column_names[column] if column_names else column
if desired not in names:
missing.append(desired)
if legacy in names:
extra.append(legacy)

if missing:
if extra:
raise SnapshotTargetIncompleteError(extra, missing)
else:
raise SnapshotTargetNotSnapshotTableError(missing)
raise SnapshotTargetNotSnapshotTableError(missing)

@available.parse_none
def expand_target_column_types(
Expand Down
6 changes: 4 additions & 2 deletions dbt/adapters/exceptions/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ def __init__(self, missing: List):
super().__init__(msg=self.get_message())

def get_message(self) -> str:
msg = 'Snapshot target is not a snapshot table (missing "{}")'.format(
'", "'.join(self.missing)
missing = '", "'.join(self.missing)
msg = (
f'Snapshot target is missing configured columns (missing "{missing}"). '
"See https://docs.getdbt.com/docs/build/snapshots#snapshot-meta-fields for more information."
)
return msg

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@
{{ adapter.dispatch('snapshot_staging_table', 'dbt')(strategy, source_sql, target_relation) }}
{% endmacro %}

{% macro get_snapshot_table_column_names() %}
{{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }}
{% endmacro %}

{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

with snapshot_query as (

Expand All @@ -48,7 +53,7 @@
{{ strategy.unique_key }} as dbt_unique_key

from {{ target_relation }}
where dbt_valid_to is null
where {{ columns.dbt_valid_to }} is null

),

Expand All @@ -57,10 +62,10 @@
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }},
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}

from snapshot_query
),
Expand All @@ -70,9 +75,9 @@
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
{{ strategy.updated_at }} as dbt_valid_to
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}

from snapshot_query
),
Expand Down Expand Up @@ -111,7 +116,7 @@
select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id
snapshotted_data.{{ columns.dbt_scd_id }}

from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
Expand All @@ -128,10 +133,10 @@
select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
Expand All @@ -155,12 +160,13 @@
{% endmacro %}

{% macro default__build_snapshot_table(strategy, sql) %}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

select *,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }}
from (
{{ sql }}
) sbq
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{% materialization snapshot, default %}
{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

Expand All @@ -24,7 +23,9 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}
{# The model['config'] parameter below is no longer used, but passing anyway for compatibility #}
{# It was a dictionary of config, instead of the config object from the context #}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %}

{% if not target_relation_exists %}

Expand All @@ -34,7 +35,9 @@

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{{ adapter.valid_snapshot_target(target_relation, columns) }}

{% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %}
{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
{% macro default__snapshot_merge_sql(target, source, insert_cols) -%}
{%- set insert_cols_csv = insert_cols | join(', ') -%}

{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}

merge into {{ target.render() }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id
on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }}

when matched
and DBT_INTERNAL_DEST.dbt_valid_to is null
and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null
and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}

when not matched
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@
{#
Core strategy definitions
#}
{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = config['updated_at'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}

{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %}
{# The model_config parameter is no longer used, but is passed in anyway for compatibility. #}
{% set primary_key = config.get('unique_key') %}
{% set updated_at = config.get('updated_at') %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{#/*
The snapshot relation might not have an {{ updated_at }} value if the
Expand All @@ -64,7 +67,7 @@
See https://github.com/dbt-labs/dbt-core/issues/2350
*/ #}
{% set row_changed_expr -%}
({{ snapshotted_rel }}.dbt_valid_from < {{ current_rel }}.{{ updated_at }})
({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }})
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}
Expand Down Expand Up @@ -133,11 +136,12 @@
{%- endmacro %}


{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}
{% set updated_at = config.get('updated_at', snapshot_get_time()) %}
{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, model_config, target_exists) %}
{# The model_config parameter is no longer used, but is passed in anyway for compatibility. #}
{% set check_cols_config = config.get('check_cols') %}
{% set primary_key = config.get('unique_key') %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %}
{% set updated_at = config.get('updated_at') or snapshot_get_time() %}

{% set column_added = false %}

Expand Down

0 comments on commit db11adc

Please sign in to comment.