Skip to content

Commit

Permalink
Merge pull request #104 from alanmcruickshank/ac/fix_cartesian_join_2
Browse files Browse the repository at this point in the history
Fix cartesian join bug
  • Loading branch information
NiallRees authored Mar 21, 2022
2 parents 42f481f + a664b11 commit ea64592
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 27 deletions.
2 changes: 1 addition & 1 deletion macros/dedupe_artifacts_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
{% for artifact_table, table_key in [
(src_results, 'command_invocation_id'),
(src_results_nodes, 'command_invocation_id, node_id'),
(src_manifest_nodes, 'command_invocation_id, node_id')
(src_manifest_nodes, 'artifact_run_id, node_id')
] %}

{% set dedupe_results_query %}
Expand Down
4 changes: 3 additions & 1 deletion macros/upload_artifacts_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@
{{ flatten_manifest("raw_data") }}

) as new_data
on old_data.command_invocation_id = new_data.command_invocation_id and old_data.node_id = new_data.node_id
-- NB: We dedupe on artifact_run_id rather than command_invocation_id for manifest nodes
-- to avoid holding duplicate data.
on old_data.artifact_run_id = new_data.artifact_run_id and old_data.node_id = new_data.node_id
-- NB: No clause for "when matched" - as matching rows should be skipped.
when not matched then insert (
command_invocation_id,
Expand Down
5 changes: 1 addition & 4 deletions models/incremental/fct_dbt__model_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ model_executions_with_materialization as (
models.name
from model_executions_incremental
left join models on
(
model_executions_incremental.command_invocation_id = models.command_invocation_id
or model_executions_incremental.dbt_cloud_run_id = models.dbt_cloud_run_id
)
model_executions_incremental.artifact_run_id = models.artifact_run_id
and model_executions_incremental.node_id = models.node_id

),
Expand Down
6 changes: 1 addition & 5 deletions models/incremental/fct_dbt__seed_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ seed_executions_incremental as (
select *
from node_executions
where resource_type = 'seed'

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
and coalesce(artifact_generated_at > (select max(artifact_generated_at) from {{ this }}), true)
Expand All @@ -35,10 +34,7 @@ seed_executions_with_materialization as (
seeds.name
from seed_executions_incremental
left join seeds on
(
seed_executions_incremental.command_invocation_id = seeds.command_invocation_id
or seed_executions_incremental.dbt_cloud_run_id = seeds.dbt_cloud_run_id
)
seed_executions_incremental.artifact_run_id = seeds.artifact_run_id
and seed_executions_incremental.node_id = seeds.node_id

),
Expand Down
6 changes: 1 addition & 5 deletions models/incremental/fct_dbt__snapshot_executions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ snapshot_executions_incremental as (
select *
from node_executions
where resource_type = 'snapshot'

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
and coalesce(artifact_generated_at > (select max(artifact_generated_at) from {{ this }}), true)
Expand All @@ -35,10 +34,7 @@ snapshot_executions_with_materialization as (
snapshots.name
from snapshot_executions_incremental
left join snapshots on
(
snapshot_executions_incremental.command_invocation_id = snapshots.command_invocation_id
or snapshot_executions_incremental.dbt_cloud_run_id = snapshots.dbt_cloud_run_id
)
snapshot_executions_incremental.artifact_run_id = snapshots.artifact_run_id
and snapshot_executions_incremental.node_id = snapshots.node_id

),
Expand Down
25 changes: 18 additions & 7 deletions models/staging/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ sources:
- name: dbt_run_results
identifier: "{{ var('dbt_artifacts_results_table', 'dbt_run_results') }}"
description: |
The source table containing the loaded metadata from run_results.json loaded artifacts. This belongs
to the V2 upload. See the README for more info.
The source table containing the loaded metadata from run_results.json loaded artifacts. Granularity is
`command_invocation_id`, because each step of the a multi-step dbt cloud job will generate a seperate
run results file.
This belongs to the V2 upload. See the README for more info.
columns:
- name: command_invocation_id
description: The id of the command which resulted in the source artifact's generation.
Expand All @@ -44,8 +47,11 @@ sources:
- name: dbt_run_results_nodes
identifier: "{{ var('dbt_artifacts_result_nodes_table', 'dbt_run_results_nodes') }}"
description: |
The source table containing the loaded and flattened results from run_results.json loaded artifacts. This belongs
to the V2 upload. See the README for more info.
The source table containing the loaded and flattened results from run_results.json loaded artifacts.
Granularity is `command_invocation_id` & `node_id`, because each step of the a multi-step dbt cloud
job will generate a seperate run results file, which may have one or more nodes being run at that step.
This belongs to the V2 upload. See the README for more info.
columns:
- name: command_invocation_id
description: The id of the command which resulted in the source artifact's generation.
Expand All @@ -63,12 +69,17 @@ sources:
The source table containing the loaded and flattened nodes (including tests, seeds, sources, models and
exposures) from manifest.json loaded artifacts. Note that within the raw manifest file, exposures and sources
are stored seperately from seeds, models and tests, but for convenience they are all flattened to a single table
for this package.
for this package. Granularity is `artifact_run_id` & `node_id`, because each step of the a multi-step dbt cloud
job will generate a seperate manifest file but each will be equivalent. For this V2 upload we deduplicate that
on _load_, so any subsequent joins must be done on `artifact_run_id` and not `command_invocation_id`.
This belongs to the V2 upload. See the README for more info.
columns:
- name: command_invocation_id
description: The id of the command which resulted in the source artifact's generation.
description: The id of the command which resulted in the source artifact's generation. Note that because
manifests are generated for each step in a dbt cloud run, but the id here will be the one belonging
to the _first_ manifest generated. For most circumstances, users should use the `artifact_run_id` to
join on this table.
- name: artifact_run_id
description: A constructed ID to serve as a reliable identifier for a single run.
- name: artifact_generated_at
Expand Down
25 changes: 21 additions & 4 deletions models/staging/stg_dbt__nodes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,35 @@ base_v2 as (

),

manifests as (
manifests_v1 as (

select *
from base
where artifact_type = 'manifest.json'

),

flattened as (
flattened_v1 as (

{{ flatten_manifest("manifests_v1") }}

),

deduped_v1 as (

select *
from flattened_v1
-- Deduplicate the V1 issue of potential multiple manifest files.
-- This is a very likely occurance if using dbt-cloud as each artifact upload
-- will generate a new manifest.
qualify row_number() over (partition by artifact_run_id, node_id order by artifact_generated_at asc) = 1

),

unioned as (

-- V1 uploads
{{ flatten_manifest("manifests") }}
select * from deduped_v1

union all

Expand All @@ -49,7 +66,7 @@ surrogate_key as (
node_json:description::string as node_description,
name,
node_json
from flattened
from unioned

)

Expand Down

0 comments on commit ea64592

Please sign in to comment.