Skip to content

Commit

Permalink
Replace environment variable with a project flag to gate microbatch f…
Browse files Browse the repository at this point in the history
…unctionality (#10799)

* first pass: replace os env with project flag

* Fix `TestMicrobatchMultipleRetries` to not use `os.env`

* Turn off microbatch project flag for `TestMicrobatchCustomUserStrategyDefault` as it was prior to a9df50f

* Update `BaseMicrobatchTest` to turn on microbatch via project flags

* Add changie doc

* Fix functional tests after merging in main

* Add function to that determines whether the new microbatch functionality should be used

The new microbatch functionality is, unfortunately, potentially dangerous. That is
it adds a new materalization strategy `microbatch` which an end user could have
defined as a custom strategy previously. Additionally we added config keys to nodes,
and as `config` is just a Dict[str, Any], it could contain anything, thus meaning
people could already be using the configs we're adding for different purposes. Thus
we need some intellegent gating. Specifically something that adheres to the following:

cms = Custom Microbatch Strategy
abms = Adapter Builtin Microbatch Strategy
bf = Behavior flag
umb = Use Microbatch Batching
t/f/e = True/False/Error

| cms | abms | bf | umb |
| t   | t    | t  | t   |
| f   | t    | t  | t   |
| t   | f    | t  | t   |
| f   | f    | t  | e   |
| t   | t    | f  | f   |
| f   | t    | f  | t   |
| t   | f    | f  | f   |
| f   | f    | f  | e   |

(The above table assumes that there is a microbatch model present in the project)

In order to achieve this we need to check that either the microbatch behavior
flag is set to true OR microbatch materializaion being used is the _root_ microbatch
materialization (i.e. not custom). The function we added in this commit,
`use_microbatch_batches`, does just that.

* Gate microbatch functionality by `use_microbatch_batches` manifest function

* Rename microbatch behavior flag to `require_batched_execution_for_custom_microbatch_strategy`

* Extract logic of `find_macro_by_name` to `find_macro_candiate_by_name`

In 0349968 I had done this for the function
`find_materialization_macro_by_name`, but that wasn't the right function to
do it to, and will be reverted shortly. `find_materialization_macro_by_name`
is used for finding the general materialization macro, whereas `find_macro_by_name`
is more general. For the work we're doing, we need to find the microbatch
macro, which is not a materialization macro.

* Use `find_macro_candidate_by_name` to find the microbatch macro

* Fix microbatch macro locality check to search for `core` locality instead of `root`

Previously were were checking for a locality of `root`. However, a locality
of `root` means it was provided by a `package`. We wnt to check for locality
of `core` which basically means `builtin via dbt-core/adapters`. There is
another locality `imported` which I beleive means it comes from another
package.

* Move the evaluation of `use_microbatch_batches` to the last position in boolean checks

The method `use_microbatch_batches` is always invoked to evaluate an `if`
statement. In most instances, it is part of a logic chain (i.e. there are
multiple things being evaluated in the `if` statement). In `if` statements
where there are multiple things being evaulated, `use_microbatch_batches`
should come _last_ (or as late as possible). This is because it is likely
the most costly thing to evaluate in the logic chain, and thus any shortcuts
cuts via other evaluations in the if statement failing (and thus avoiding
invoking `use_microbatch_batches) is desirable.

* Drop behavior flag setting for BaseMicrobatchTest tests

* Rename 'env_var' to 'project_flag' in test_microbatch.py

* Update microbatch tests to assert when we are/aren't running with batches

* Update `test_resolve_event_time_filter` to use `use_microbatch_batches`

* Fire deprecation warning for custom microbatch macros

* Add microbatch deprecation events to test_events.py

---------

Co-authored-by: Quigley Malcolm <[email protected]>
  • Loading branch information
MichelleArk and QMalcolm authored Nov 11, 2024
1 parent 30b8a92 commit 89caa33
Show file tree
Hide file tree
Showing 14 changed files with 754 additions and 637 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241001-161422.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Change gating of microbatch feature to be behind project flag / behavior flag
time: 2024-10-01T16:14:22.267253-05:00
custom:
Author: MichelleArk QMalcolm
Issue: "10798"
4 changes: 2 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,12 @@ def resolve_limit(self) -> Optional[int]:
def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
(isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
and target.config.event_time
and isinstance(self.model, ModelNode)
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
and self.manifest.use_microbatch_batches(project_name=self.config.project_name)
):
start = self.model.config.get("__dbt_internal_microbatch_event_time_start")
end = self.model.config.get("__dbt_internal_microbatch_event_time_end")
Expand Down
34 changes: 30 additions & 4 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,10 +714,10 @@ def __init__(self):
self._macros_by_name = {}
self._macros_by_package = {}

def find_macro_by_name(
def find_macro_candidate_by_name(
self, name: str, root_project_name: str, package: Optional[str]
) -> Optional[Macro]:
"""Find a macro in the graph by its name and package name, or None for
) -> Optional[MacroCandidate]:
"""Find a MacroCandidate in the graph by its name and package name, or None for
any package. The root project name is used to determine priority:
- locally defined macros come first
- then imported macros
Expand All @@ -735,7 +735,15 @@ def filter(candidate: MacroCandidate) -> bool:
filter=filter,
)

return candidates.last()
return candidates.last_candidate()

def find_macro_by_name(
self, name: str, root_project_name: str, package: Optional[str]
) -> Optional[Macro]:
macro_candidate = self.find_macro_candidate_by_name(
name=name, root_project_name=root_project_name, package=package
)
return macro_candidate.macro if macro_candidate else None

def find_generate_macro_by_name(
self, component: str, root_project_name: str, imported_package: Optional[str] = None
Expand Down Expand Up @@ -1747,6 +1755,24 @@ def __reduce_ex__(self, protocol):
)
return self.__class__, args

def _microbatch_macro_is_core(self, project_name: str) -> bool:
microbatch_is_core = False
candidate = self.find_macro_candidate_by_name(
name="get_incremental_microbatch_sql", root_project_name=project_name, package=None
)

# We want to check for "Core", because "Core" basically means "builtin"
if candidate is not None and candidate.locality == Locality.Core:
microbatch_is_core = True

return microbatch_is_core

def use_microbatch_batches(self, project_name: str) -> bool:
return (
get_flags().require_batched_execution_for_custom_microbatch_strategy
or self._microbatch_macro_is_core(project_name=project_name)
)


class MacroManifest(MacroMethods):
def __init__(self, macros) -> None:
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ class ProjectFlags(ExtensibleDbtClassMixin):
write_json: Optional[bool] = None

# legacy behaviors - https://github.com/dbt-labs/dbt-core/blob/main/docs/guides/behavior-change-flags.md
require_batched_execution_for_custom_microbatch_strategy: bool = False
require_explicit_package_overrides_for_builtin_materializations: bool = True
require_resource_names_without_spaces: bool = False
source_freshness_run_project_hooks: bool = False
Expand All @@ -350,6 +351,7 @@ class ProjectFlags(ExtensibleDbtClassMixin):
@property
def project_only_flags(self) -> Dict[str, Any]:
return {
"require_batched_execution_for_custom_microbatch_strategy": self.require_batched_execution_for_custom_microbatch_strategy,
"require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations,
"require_resource_names_without_spaces": self.require_resource_names_without_spaces,
"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks,
Expand Down
6 changes: 6 additions & 0 deletions core/dbt/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ class MFCumulativeTypeParamsDeprecation(DBTDeprecation):
_event = "MFCumulativeTypeParamsDeprecation"


class MicrobatchMacroOutsideOfBatchesDeprecation(DBTDeprecation):
_name = "microbatch-macro-outside-of-batches-deprecation"
_event = "MicrobatchMacroOutsideOfBatchesDeprecation"


def renamed_env_var(old_name: str, new_name: str):
class EnvironmentVariableRenamed(DBTDeprecation):
_name = f"environment-variable-renamed:{old_name}"
Expand Down Expand Up @@ -178,6 +183,7 @@ def show_callback():
SourceFreshnessProjectHooksNotRun(),
MFTimespineWithoutYamlConfigurationDeprecation(),
MFCumulativeTypeParamsDeprecation(),
MicrobatchMacroOutsideOfBatchesDeprecation(),
]

deprecations: Dict[str, DBTDeprecation] = {d.name: d for d in deprecations_list}
Expand Down
8 changes: 8 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,14 @@ message MFCumulativeTypeParamsDeprecationMsg {
MFCumulativeTypeParamsDeprecation data = 2;
}

// D020
message MicrobatchMacroOutsideOfBatchesDeprecation {}

message MicrobatchMacroOutsideOfBatchesDeprecationMsg {
CoreEventInfo info = 1;
MicrobatchMacroOutsideOfBatchesDeprecation data = 2;
}

// I065
message DeprecatedModel {
string model_name = 1;
Expand Down
1,182 changes: 593 additions & 589 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,16 @@ def message(self) -> str:
return line_wrap_message(warning_tag(description))


class MicrobatchMacroOutsideOfBatchesDeprecation(WarnLevel):
def code(self) -> str:
return "D020"

def message(self) -> str:
description = "The use of a custom microbatch macro outside of batched execution is deprecated. To use it with batched execution, set `flags.require_batched_execution_for_custom_microbatch_strategy` to `True` in `dbt_project.yml`. In the future this will be the default behavior."

return line_wrap_message(warning_tag(description))


# =======================================================
# I - Project parsing
# =======================================================
Expand Down
20 changes: 19 additions & 1 deletion core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ def load(self) -> Manifest:

self.check_for_model_deprecations()
self.check_for_spaces_in_resource_names()
self.check_for_microbatch_deprecations()

return self.manifest

Expand Down Expand Up @@ -649,6 +650,23 @@ def check_for_spaces_in_resource_names(self):
else: # ERROR level
raise DbtValidationError("Resource names cannot contain spaces")

def check_for_microbatch_deprecations(self) -> None:
if not get_flags().require_batched_execution_for_custom_microbatch_strategy:
has_microbatch_model = False
for _, node in self.manifest.nodes.items():
if (
isinstance(node, ModelNode)
and node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
has_microbatch_model = True
break

if has_microbatch_model and self.manifest._microbatch_macro_is_core(
self.root_project.project_name
):
dbt.deprecations.warn("microbatch-macro-outside-of-batches-deprecation")

def load_and_parse_macros(self, project_parser_files):
for project in self.all_projects.values():
if project.project_name not in project_parser_files:
Expand Down Expand Up @@ -1390,7 +1408,7 @@ def check_valid_snapshot_config(self):
node.config.final_validate()

def check_valid_microbatch_config(self):
if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"):
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
Expand Down
6 changes: 2 additions & 4 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import functools
import os
import threading
import time
from copy import deepcopy
Expand Down Expand Up @@ -482,11 +481,10 @@ def execute(self, model, manifest):
)

hook_ctx = self.adapter.pre_model_hook(context_config)

if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
and model.config.materialized == "incremental"
model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
and manifest.use_microbatch_batches(project_name=self.config.project_name)
):
return self._execute_microbatch_model(
hook_ctx, context_config, model, manifest, context, materialization_macro
Expand Down
Loading

0 comments on commit 89caa33

Please sign in to comment.