Skip to content

Commit

Permalink
Microbatch Strategy (#1334)
Browse files Browse the repository at this point in the history
* poc: microbatch using merge

* update base tests

* use dynamic insert_overwrite under the hood for bigquery

* changelog entry

* clean up validation + add testing
  • Loading branch information
MichelleArk authored Sep 26, 2024
1 parent 563633b commit 4932b96
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240925-232238.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add Microbatch Strategy to dbt-spark

This comment has been minimized.

Copy link
@christopherekfeldt

christopherekfeldt Sep 27, 2024

dbt-spark -> dbt-bigquery ? :)

time: 2024-09-25T23:22:38.216277+01:00
custom:
Author: michelleark
Issue: "1354"
15 changes: 12 additions & 3 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'merge', 'insert_overwrite'
Expected one of: 'merge', 'insert_overwrite', 'microbatch'
{%- endset %}
{% if strategy not in ['merge', 'insert_overwrite'] %}
{% if strategy not in ['merge', 'insert_overwrite', 'microbatch'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

{% if strategy == 'microbatch' %}
{% do bq_validate_microbatch_config(config) %}
{% endif %}

{% do return(strategy) %}
{% endmacro %}

Expand Down Expand Up @@ -48,8 +52,13 @@
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}

{% else %} {# strategy == 'merge' #}
{% elif strategy == 'microbatch' %}

{% set build_sql = bq_generate_microbatch_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}

{% else %} {# strategy == 'merge' #}
{% set build_sql = bq_generate_incremental_merge_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_predicates
) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{% macro bq_validate_microbatch_config(config) %}
{% if config.get("partition_by") is none %}
{% set missing_partition_msg -%}
The 'microbatch' strategy requires a `partition_by` config.
{%- endset %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% if config.get("partition_by").granularity != config.get('batch_size') %}
{% set invalid_partition_by_granularity_msg -%}
The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`.
Got:
`batch_size`: {{ config.get('batch_size') }}
`partition_by.granularity`: {{ config.get("partition_by").granularity }}
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_partition_by_granularity_msg) %}
{% endif %}
{% endmacro %}

{% macro bq_generate_microbatch_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}
{% set build_sql = bq_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}

{{ return(build_sql) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,59 @@
select * from data
""".lstrip()

microbatch_model_no_unique_id_sql = """
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
partition_by={
'field': 'event_time',
'data_type': 'timestamp',
'granularity': 'day'
},
event_time='event_time',
batch_size='day',
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)
)
}}
select * from {{ ref('input_model') }}
"""

microbatch_input_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
union all
select 2 as id, TIMESTAMP '2020-01-02 00:00:00-0' as event_time
union all
select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time
"""

microbatch_model_no_partition_by_sql = """
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_time',
batch_size='day',
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)
)
}}
select * from {{ ref('input_model') }}
"""


microbatch_model_invalid_partition_by_sql = """
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_time',
batch_size='day',
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0),
partition_by={
'field': 'event_time',
'data_type': 'timestamp',
'granularity': 'hour'
}
)
}}
select * from {{ ref('input_model') }}
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os
import pytest
from unittest import mock

from dbt.tests.util import run_dbt_and_capture
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
patch_microbatch_end_time,
)

from tests.functional.adapter.incremental.incremental_strategy_fixtures import (
microbatch_model_no_unique_id_sql,
microbatch_input_sql,
microbatch_model_no_partition_by_sql,
microbatch_model_invalid_partition_by_sql,
)


class TestBigQueryMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return microbatch_model_no_unique_id_sql


class TestBigQueryMicrobatchMissingPartitionBy:
@pytest.fixture(scope="class")
def models(self) -> str:
return {
"microbatch.sql": microbatch_model_no_partition_by_sql,
"input_model.sql": microbatch_input_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_execution_failure_no_partition_by(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, stdout = run_dbt_and_capture(["run"], expect_pass=False)
assert "The 'microbatch' strategy requires a `partition_by` config" in stdout


class TestBigQueryMicrobatchInvalidPartitionByGranularity:
@pytest.fixture(scope="class")
def models(self) -> str:
return {
"microbatch.sql": microbatch_model_invalid_partition_by_sql,
"input_model.sql": microbatch_input_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_execution_failure_no_partition_by(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, stdout = run_dbt_and_capture(["run"], expect_pass=False)
assert (
"The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`"
in stdout
)

0 comments on commit 4932b96

Please sign in to comment.