Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microbatch: event_time ref + source filtering #10594

Merged
merged 41 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c4930e9
initial rough-in with CLI flags
MichelleArk Aug 22, 2024
4c8528b
dbt-adapters testing against event-time-ref-filtering
MichelleArk Aug 22, 2024
3bb6807
Merge branch 'main' into event-time-ref-filtering
MichelleArk Sep 3, 2024
f5d5bb6
fix TestList
MichelleArk Sep 3, 2024
19ad7c6
Checkpoint
QMalcolm Sep 3, 2024
a57481f
fix tests
MichelleArk Sep 3, 2024
25c10f7
add event_time_start params to build
MichelleArk Sep 3, 2024
699179f
rename configs
MichelleArk Sep 3, 2024
2d19d1c
Gate resolve_event_time_filter via micro batch strategy and fix strpt…
QMalcolm Sep 5, 2024
57b1353
Add unit test for resolve_event_time_filter
QMalcolm Sep 5, 2024
e0bae27
Additional unit tests for `resolve_event_time_filter` to ensure lookb…
QMalcolm Sep 5, 2024
1313aff
Remove extraneous comments and print statements from resolve_event_ti…
QMalcolm Sep 5, 2024
7307c02
Fixup microbatch functional tests to use microbatch strategy
QMalcolm Sep 5, 2024
838a0aa
Gate microbatch functionality behind env_var while in beta
QMalcolm Sep 5, 2024
43715de
Add comment about how _is_incremental should be removed
QMalcolm Sep 6, 2024
e38ff47
Improve `event_time_start/end` cli parameters to auto convert to date…
QMalcolm Sep 6, 2024
457698c
for testing: dbt-postgres 'microbatch' strategy
MichelleArk Sep 6, 2024
3a4fa7f
Make event_time model configs `Any` so as to not break people already…
QMalcolm Sep 9, 2024
8cc1c7d
Rename `PartitionGrain` to `BatchSize`
QMalcolm Sep 9, 2024
1b0a904
fix error message about batch size
QMalcolm Sep 9, 2024
7522b9b
Create unit test to check values which gate off event_time usage
QMalcolm Sep 9, 2024
5f72534
Add unit test case in regard to event_tiem filtering when `is_increme…
QMalcolm Sep 9, 2024
a8b9d64
Cleanup unit test for test_resolve_event_time_filter_batch_calculation
QMalcolm Sep 9, 2024
174db0a
Merge branch 'main' into event-time-ref-filtering
QMalcolm Sep 10, 2024
b85d50e
Update lower bound of dbt-adapters req to 1.5.0
QMalcolm Sep 10, 2024
ec01129
Update dev-reqs now that the changes we need have been merged to main…
QMalcolm Sep 10, 2024
8f8e7e3
Remove `is_incremental` check from `_build_end_time`
QMalcolm Sep 10, 2024
2b80f75
Improve help messages for `event_time_start/end` CLI flags
QMalcolm Sep 10, 2024
0eb8e2f
Improve error handling messages for event time filtering generation
QMalcolm Sep 10, 2024
3a6c739
Merge branch 'main' into event-time-ref-filtering
QMalcolm Sep 11, 2024
1690d81
Delete pairing.md file
QMalcolm Sep 11, 2024
de87286
Add changie doc for initial microbatch implementation
QMalcolm Sep 11, 2024
12bfec9
Update v12.json manifest schema for NodeConfig changes
QMalcolm Sep 11, 2024
9b47966
Add `event_time` to `SourceConfig` and allow event time filtering for…
QMalcolm Sep 12, 2024
6767948
Add test that asserts direct upstrean inputs without event_time aren'…
QMalcolm Sep 12, 2024
61123cb
Add test asserting that calling `.render()` on a ref skips event time…
QMalcolm Sep 12, 2024
43866db
Update v12 manifest schema with new SourceConfig key for event_time
QMalcolm Sep 12, 2024
5426e66
fix test_list for source config event_time
MichelleArk Sep 12, 2024
ee29fed
restore dbt-postgres dev requirement
MichelleArk Sep 12, 2024
3020aa4
Update dbt-adapters minimum version to 1.6.0
QMalcolm Sep 12, 2024
f53204f
fix test_artifacts; add event_time to expected_manifest
MichelleArk Sep 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240911-121029.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add basic functionality for creating microbatch incremental models
time: 2024-09-11T12:10:29.822189-05:00
custom:
Author: MichelleArk QMalcolm
Issue: 9490 10635 10637 10638 10636 10662 10639
7 changes: 7 additions & 0 deletions core/dbt/artifacts/resources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,10 @@ class TimePeriod(StrEnum):

def plural(self) -> str:
return str(self) + "s"


class BatchSize(StrEnum):
hour = "hour"
day = "day"
month = "month"
year = "year"
3 changes: 3 additions & 0 deletions core/dbt/artifacts/resources/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class NodeConfig(NodeAndTestConfig):
# 'mergebehavior' dictionary
materialized: str = "view"
incremental_strategy: Optional[str] = None
batch_size: Any = None
lookback: Any = 0
persist_docs: Dict[str, Any] = field(default_factory=dict)
post_hook: List[Hook] = field(
default_factory=list,
Expand Down Expand Up @@ -122,6 +124,7 @@ class NodeConfig(NodeAndTestConfig):
default_factory=ContractConfig,
metadata=MergeBehavior.Update.meta(),
)
event_time: Any = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the event_time, batch_size, and lookback fields as Any to avoid any regressions (e.g. parsing failures for projects that make use of these configs).

We will be adding parse-time validation for microbatch models in a downstream PR.


def __post_init__(self):
# we validate that node_color has a suitable value to prevent dbt-docs from crashing
Expand Down
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/source_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
@dataclass
class SourceConfig(BaseConfig):
enabled: bool = True
event_time: Any = None


@dataclass
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ def cli(ctx, **kwargs):
@click.pass_context
@global_flags
@p.empty
@p.event_time_start
@p.event_time_end
@p.exclude
@p.export_saved_queries
@p.full_refresh
Expand Down Expand Up @@ -537,6 +539,8 @@ def parse(ctx, **kwargs):
@p.profiles_dir
@p.project_dir
@p.empty
@p.event_time_start
@p.event_time_end
@p.select
@p.selector
@p.target_path
Expand Down
16 changes: 16 additions & 0 deletions core/dbt/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@
is_flag=True,
)

event_time_end = click.option(
"--event-time-end",
envvar="DBT_EVENT_TIME_END",
help="If specified, the end datetime dbt uses to filter microbatch model inputs (exclusive).",
type=click.DateTime(),
default=None,
)

event_time_start = click.option(
"--event-time-start",
envvar="DBT_EVENT_TIME_START",
help="If specified, the start datetime dbt uses to filter microbatch model inputs (inclusive).",
type=click.DateTime(),
default=None,
)

exclude = click.option(
"--exclude",
envvar=None,
Expand Down
120 changes: 115 additions & 5 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import os
from copy import deepcopy
from datetime import datetime, timedelta
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -16,18 +17,21 @@
Union,
)

import pytz
from typing_extensions import Protocol

from dbt import selected_resources
from dbt.adapters.base.column import Column
from dbt.adapters.base.relation import EventTimeFilter
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.exceptions import MissingConfigError
from dbt.adapters.factory import (
get_adapter,
get_adapter_package_names,
get_adapter_type_names,
)
from dbt.artifacts.resources import NodeVersion, RefArgs
from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs, SourceConfig
from dbt.artifacts.resources.types import BatchSize
from dbt.clients.jinja import (
MacroGenerator,
MacroStack,
Expand Down Expand Up @@ -230,6 +234,95 @@
def resolve_limit(self) -> Optional[int]:
return 0 if getattr(self.config.args, "EMPTY", False) else None

def _build_end_time(self) -> Optional[datetime]:
return datetime.now(tz=pytz.utc)

def _build_start_time(
self, checkpoint: Optional[datetime], is_incremental: bool
) -> Optional[datetime]:
if not is_incremental or checkpoint is None:
return None

assert isinstance(self.model.config, NodeConfig)
batch_size = self.model.config.batch_size
if batch_size is None:
raise DbtRuntimeError(f"The model `{self.model.name}` requires a `batch_size`")

Check warning on line 249 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L249

Added line #L249 was not covered by tests

lookback = self.model.config.lookback
if batch_size == BatchSize.hour:
start = datetime(
checkpoint.year,
checkpoint.month,
checkpoint.day,
checkpoint.hour,
0,
0,
0,
pytz.utc,
) - timedelta(hours=lookback)
elif batch_size == BatchSize.day:
start = datetime(
checkpoint.year, checkpoint.month, checkpoint.day, 0, 0, 0, 0, pytz.utc
) - timedelta(days=lookback)
elif batch_size == BatchSize.month:
start = datetime(checkpoint.year, checkpoint.month, 1, 0, 0, 0, 0, pytz.utc)
for _ in range(lookback):
start = start - timedelta(days=1)
start = datetime(start.year, start.month, 1, 0, 0, 0, 0, pytz.utc)
elif batch_size == BatchSize.year:
start = datetime(checkpoint.year - lookback, 1, 1, 0, 0, 0, 0, pytz.utc)
else:
raise DbtInternalError(

Check warning on line 275 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L275

Added line #L275 was not covered by tests
f"Batch size `{batch_size}` is not handled during batch calculation"
)

return start

def _is_incremental(self) -> bool:
# TODO: Remove. This is a temporary method. We're working with adapters on
# a strategy to ensure we can access the `is_incremental` logic without drift
relation_info = self.Relation.create_from(self.config, self.model)
relation = self.db_wrapper.get_relation(
relation_info.database, relation_info.schema, relation_info.name
)
return (
relation is not None
and relation.type == "table"
and self.model.config.materialized == "incremental"
and not (
getattr(self.config.args, "FULL_REFRESH", False) or self.model.config.full_refresh
)
)

def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
and target.config.event_time
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
):
is_incremental = self._is_incremental()
end: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_END", None)
end = end.replace(tzinfo=pytz.UTC) if end else self._build_end_time()

start: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_START", None)
start = (
start.replace(tzinfo=pytz.UTC)
if start
else self._build_start_time(checkpoint=end, is_incremental=is_incremental)
)

if start is not None or end is not None:
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=start,
end=end,
)

return event_time_filter

@abc.abstractmethod
def __call__(self, *args: str) -> Union[str, RelationProxy, MetricReference]:
pass
Expand Down Expand Up @@ -545,7 +638,11 @@
def create_relation(self, target_model: ManifestNode) -> RelationProxy:
if target_model.is_ephemeral_model:
self.model.set_cte(target_model.unique_id, None)
return self.Relation.create_ephemeral_from(target_model, limit=self.resolve_limit)
return self.Relation.create_ephemeral_from(
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_model),
)
elif (
hasattr(target_model, "defer_relation")
and target_model.defer_relation
Expand All @@ -563,10 +660,18 @@
)
):
return self.Relation.create_from(
self.config, target_model.defer_relation, limit=self.resolve_limit
self.config,
target_model.defer_relation,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_model),
)
else:
return self.Relation.create_from(self.config, target_model, limit=self.resolve_limit)
return self.Relation.create_from(
self.config,
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_model),
)

def validate(
self,
Expand Down Expand Up @@ -633,7 +738,12 @@
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)
return self.Relation.create_from(self.config, target_source, limit=self.resolve_limit)
return self.Relation.create_from(
self.config,
target_source,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_source),
)


class RuntimeUnitTestSourceResolver(BaseSourceResolver):
Expand Down
2 changes: 1 addition & 1 deletion core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"dbt-semantic-interfaces>=0.7.0,<0.8",
# Minor versions for these are expected to be backwards-compatible
"dbt-common>=1.6.0,<2.0",
"dbt-adapters>=1.3.0,<2.0",
"dbt-adapters>=1.6.0,<2.0",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
"packaging>20.9",
Expand Down
Loading