Skip to content

Commit

Permalink
use patch_microbatch_end_time (#307)
Browse files Browse the repository at this point in the history
Co-authored-by: Quigley Malcolm <[email protected]>
Co-authored-by: Mike Alfare <[email protected]>
Co-authored-by: Quigley Malcolm <[email protected]>
Co-authored-by: Colin Rogers <[email protected]>
  • Loading branch information
5 people authored Sep 16, 2024
1 parent c77fde5 commit 2ba660f
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
from unittest import mock

import pytest
from freezegun import freeze_time

from dbt.tests.util import relation_from_name, run_dbt

try:
# patch_microbatch_end_time introduced in dbt 1.9.0
from dbt.tests.util import patch_microbatch_end_time
except ImportError:
from freezegun import freeze_time as patch_microbatch_end_time

_input_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
Expand Down Expand Up @@ -61,12 +66,12 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project, insert_two_rows_sql):
# initial run -- backfills all data
with freeze_time("2020-01-03 13:57:00"):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

# our partition grain is "day" so running the same day without new data should produce the same results
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run"])
self.assert_row_count(project, "microbatch_model", 3)

Expand All @@ -76,16 +81,16 @@ def test_run_with_event_time(self, project, insert_two_rows_sql):
self.assert_row_count(project, "input_model", 5)

# re-run without changing current time => no insert
with freeze_time("2020-01-03 14:57:00"):
with patch_microbatch_end_time("2020-01-03 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 3)

# re-run by advancing time by one day changing current time => insert 1 row
with freeze_time("2020-01-04 14:57:00"):
with patch_microbatch_end_time("2020-01-04 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 4)

# re-run by advancing time by one more day changing current time => insert 1 more row
with freeze_time("2020-01-05 14:57:00"):
with patch_microbatch_end_time("2020-01-05 14:57:00"):
run_dbt(["run", "--select", "microbatch_model"])
self.assert_row_count(project, "microbatch_model", 5)

0 comments on commit 2ba660f

Please sign in to comment.