Skip to content

Commit

Permalink
Microbatch: event_time ref + source filtering (#10594)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Sep 12, 2024
1 parent ab500a9 commit cc8541c
Show file tree
Hide file tree
Showing 15 changed files with 911 additions and 7 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240911-121029.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add basic functionality for creating microbatch incremental models
time: 2024-09-11T12:10:29.822189-05:00
custom:
Author: MichelleArk QMalcolm
Issue: 9490 10635 10637 10638 10636 10662 10639
7 changes: 7 additions & 0 deletions core/dbt/artifacts/resources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,10 @@ class TimePeriod(StrEnum):

def plural(self) -> str:
return str(self) + "s"


class BatchSize(StrEnum):
hour = "hour"
day = "day"
month = "month"
year = "year"
3 changes: 3 additions & 0 deletions core/dbt/artifacts/resources/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class NodeConfig(NodeAndTestConfig):
# 'mergebehavior' dictionary
materialized: str = "view"
incremental_strategy: Optional[str] = None
batch_size: Any = None
lookback: Any = 0
persist_docs: Dict[str, Any] = field(default_factory=dict)
post_hook: List[Hook] = field(
default_factory=list,
Expand Down Expand Up @@ -122,6 +124,7 @@ class NodeConfig(NodeAndTestConfig):
default_factory=ContractConfig,
metadata=MergeBehavior.Update.meta(),
)
event_time: Any = None

def __post_init__(self):
# we validate that node_color has a suitable value to prevent dbt-docs from crashing
Expand Down
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/source_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
@dataclass
class SourceConfig(BaseConfig):
enabled: bool = True
event_time: Any = None


@dataclass
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ def cli(ctx, **kwargs):
@click.pass_context
@global_flags
@p.empty
@p.event_time_start
@p.event_time_end
@p.exclude
@p.export_saved_queries
@p.full_refresh
Expand Down Expand Up @@ -537,6 +539,8 @@ def parse(ctx, **kwargs):
@p.profiles_dir
@p.project_dir
@p.empty
@p.event_time_start
@p.event_time_end
@p.select
@p.selector
@p.target_path
Expand Down
16 changes: 16 additions & 0 deletions core/dbt/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@
is_flag=True,
)

event_time_end = click.option(
"--event-time-end",
envvar="DBT_EVENT_TIME_END",
help="If specified, the end datetime dbt uses to filter microbatch model inputs (exclusive).",
type=click.DateTime(),
default=None,
)

event_time_start = click.option(
"--event-time-start",
envvar="DBT_EVENT_TIME_START",
help="If specified, the start datetime dbt uses to filter microbatch model inputs (inclusive).",
type=click.DateTime(),
default=None,
)

exclude = click.option(
"--exclude",
envvar=None,
Expand Down
120 changes: 115 additions & 5 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import os
from copy import deepcopy
from datetime import datetime, timedelta
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -16,18 +17,21 @@
Union,
)

import pytz
from typing_extensions import Protocol

from dbt import selected_resources
from dbt.adapters.base.column import Column
from dbt.adapters.base.relation import EventTimeFilter
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.exceptions import MissingConfigError
from dbt.adapters.factory import (
get_adapter,
get_adapter_package_names,
get_adapter_type_names,
)
from dbt.artifacts.resources import NodeVersion, RefArgs
from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs, SourceConfig
from dbt.artifacts.resources.types import BatchSize
from dbt.clients.jinja import (
MacroGenerator,
MacroStack,
Expand Down Expand Up @@ -230,6 +234,95 @@ def Relation(self):
def resolve_limit(self) -> Optional[int]:
return 0 if getattr(self.config.args, "EMPTY", False) else None

def _build_end_time(self) -> Optional[datetime]:
return datetime.now(tz=pytz.utc)

def _build_start_time(
self, checkpoint: Optional[datetime], is_incremental: bool
) -> Optional[datetime]:
if not is_incremental or checkpoint is None:
return None

assert isinstance(self.model.config, NodeConfig)
batch_size = self.model.config.batch_size
if batch_size is None:
raise DbtRuntimeError(f"The model `{self.model.name}` requires a `batch_size`")

lookback = self.model.config.lookback
if batch_size == BatchSize.hour:
start = datetime(
checkpoint.year,
checkpoint.month,
checkpoint.day,
checkpoint.hour,
0,
0,
0,
pytz.utc,
) - timedelta(hours=lookback)
elif batch_size == BatchSize.day:
start = datetime(
checkpoint.year, checkpoint.month, checkpoint.day, 0, 0, 0, 0, pytz.utc
) - timedelta(days=lookback)
elif batch_size == BatchSize.month:
start = datetime(checkpoint.year, checkpoint.month, 1, 0, 0, 0, 0, pytz.utc)
for _ in range(lookback):
start = start - timedelta(days=1)
start = datetime(start.year, start.month, 1, 0, 0, 0, 0, pytz.utc)
elif batch_size == BatchSize.year:
start = datetime(checkpoint.year - lookback, 1, 1, 0, 0, 0, 0, pytz.utc)
else:
raise DbtInternalError(
f"Batch size `{batch_size}` is not handled during batch calculation"
)

return start

def _is_incremental(self) -> bool:
# TODO: Remove. This is a temporary method. We're working with adapters on
# a strategy to ensure we can access the `is_incremental` logic without drift
relation_info = self.Relation.create_from(self.config, self.model)
relation = self.db_wrapper.get_relation(
relation_info.database, relation_info.schema, relation_info.name
)
return (
relation is not None
and relation.type == "table"
and self.model.config.materialized == "incremental"
and not (
getattr(self.config.args, "FULL_REFRESH", False) or self.model.config.full_refresh
)
)

def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
and target.config.event_time
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
):
is_incremental = self._is_incremental()
end: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_END", None)
end = end.replace(tzinfo=pytz.UTC) if end else self._build_end_time()

start: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_START", None)
start = (
start.replace(tzinfo=pytz.UTC)
if start
else self._build_start_time(checkpoint=end, is_incremental=is_incremental)
)

if start is not None or end is not None:
event_time_filter = EventTimeFilter(
field_name=target.config.event_time,
start=start,
end=end,
)

return event_time_filter

@abc.abstractmethod
def __call__(self, *args: str) -> Union[str, RelationProxy, MetricReference]:
pass
Expand Down Expand Up @@ -545,7 +638,11 @@ def resolve(
def create_relation(self, target_model: ManifestNode) -> RelationProxy:
if target_model.is_ephemeral_model:
self.model.set_cte(target_model.unique_id, None)
return self.Relation.create_ephemeral_from(target_model, limit=self.resolve_limit)
return self.Relation.create_ephemeral_from(
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_model),
)
elif (
hasattr(target_model, "defer_relation")
and target_model.defer_relation
Expand All @@ -563,10 +660,18 @@ def create_relation(self, target_model: ManifestNode) -> RelationProxy:
)
):
return self.Relation.create_from(
self.config, target_model.defer_relation, limit=self.resolve_limit
self.config,
target_model.defer_relation,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_model),
)
else:
return self.Relation.create_from(self.config, target_model, limit=self.resolve_limit)
return self.Relation.create_from(
self.config,
target_model,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_model),
)

def validate(
self,
Expand Down Expand Up @@ -633,7 +738,12 @@ def resolve(self, source_name: str, table_name: str):
target_kind="source",
disabled=(isinstance(target_source, Disabled)),
)
return self.Relation.create_from(self.config, target_source, limit=self.resolve_limit)
return self.Relation.create_from(
self.config,
target_source,
limit=self.resolve_limit,
event_time_filter=self.resolve_event_time_filter(target_source),
)


class RuntimeUnitTestSourceResolver(BaseSourceResolver):
Expand Down
2 changes: 1 addition & 1 deletion core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"dbt-semantic-interfaces>=0.7.0,<0.8",
# Minor versions for these are expected to be backwards-compatible
"dbt-common>=1.6.0,<2.0",
"dbt-adapters>=1.3.0,<2.0",
"dbt-adapters>=1.6.0,<2.0",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
"packaging>20.9",
Expand Down
Loading

0 comments on commit cc8541c

Please sign in to comment.