Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema_changelog incremental materialization #35

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,26 @@ seeds:
destination:
+column_types:
created_at: timestamp
id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}"
destination_membership:
+column_types:
activated_at: timestamp
joined_at: timestamp
log:
+column_types:
time_stamp: timestamp
transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
transformation_id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}"
message_event: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}"
transformation:
+column_types:
created_at: timestamp
destination_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
destination_id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}"
id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}"
trigger_table:
+quote_columns: "{{ true if target.type in ('redshift', 'postgres') else false }}"
+enabled: "{{ true if target.type != 'snowflake' else false }}"
+column_types:
transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
transformation_id: "{{ 'string' if target.type == 'bigquery' else 'varchar' }}"
trigger_table_snowflake:
+enabled: "{{ true if target.type == 'snowflake' else false }}"
user:
Expand Down
6 changes: 5 additions & 1 deletion integration_tests/seeds/log.csv
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ intrinsic_departed,2021-12-09 14:26:29.860,2021-12-09 20:30:53.904,intrinsic_dep
intrinsic_departed,2021-12-09 14:26:29.814,2021-12-09 20:30:53.903,intrinsic_departed,INFO,"{""table"":""user_insights""}",write_to_table_start,
intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_start,
intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start,
intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start,
intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start,
intrinsic_departed,2021-12-10 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""name"":""activity_add_to_fivetran_user""}",create_table,
intrinsic_departed,2021-12-19 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""type"":""ADD_COLUMN"",""table"":""lead""}",alter_table,
intrinsic_departed,2021-12-11 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""connectorId"":""pg"",""properties"":{""REMOVAL"":[{""schema"":""public"",""tables"":[""active_rows_estimate""]}]}}",change_schema_config,
intrinsic_departed,2021-12-10 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,"{""schema"":""fivetran_log_2""}",create_schema,
41 changes: 37 additions & 4 deletions models/fivetran_log__schema_changelog.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,39 @@
{{ config(
materialized='incremental',
unique_key='unique_schema_change_key',
partition_by={
'field': 'created_at',
'data_type': 'timestamp',
'granularity': 'day'
} if target.type == 'bigquery' else none,
incremental_strategy = 'merge',
file_format = 'delta'
) }}

with schema_changes as (

select *
from {{ ref('stg_fivetran_log__log') }}

where event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config')

{% if is_incremental() %}

-- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs
{%- call statement('max_schema_change', fetch_result=True) -%}
select date(max(created_at)) from {{ this }}
{%- endcall -%}

-- load the result from the above query into a new variable
{%- set query_result = load_result('max_schema_change') -%}

-- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value.
{%- set max_schema_change = query_result['data'][0][0] -%}

-- compare the new batch of data to the latest sync already stored in this model
and date(created_at) >= '{{ max_schema_change }}'

{% endif %}
),

connector as (
Expand All @@ -20,7 +50,8 @@ add_connector_info as (
connector.destination_id,
connector.destination_name

from schema_changes join connector using(connector_id)
from schema_changes join
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just changed this as using can be volatile sometimes (mostly just left joins on snowflake but let's be consistent)

connector on schema_changes.connector_id = connector.connector_id
),

final as (
Expand All @@ -41,10 +72,12 @@ final as (

case
when event_subtype = 'create_schema' or event_subtype = 'create_table' then {{ fivetran_utils.json_extract(string='message_data', string_path='schema') }}
else null end as schema_name
else null end as schema_name,

{{ dbt_utils.surrogate_key(['connector_id', 'destination_id', 'created_at']) }} as unique_schema_change_key


from add_connector_info
)

select * from final
order by created_at desc, connector_id
select * from final