Skip to content

Commit

Permalink
Non-transitive sync status
Browse files Browse the repository at this point in the history
[INTERNAL_BRANCH=sean/non-transitive-sync]
  • Loading branch information
smackesey committed Jul 2, 2024
1 parent 6ab26d1 commit a739b4d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 189 deletions.
13 changes: 1 addition & 12 deletions python_modules/dagster/dagster/_core/definitions/data_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,18 +518,7 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato
partition_deps = self._get_partition_dependencies(key=key)
for dep_key in sorted(partition_deps):
dep_asset = self.asset_graph.get(dep_key.asset_key)
if (
self._instance.use_transitive_stale_causes
and self._get_status(key=dep_key) == StaleStatus.STALE
):
yield StaleCause(
key,
StaleCauseCategory.DATA,
"stale dependency",
dep_key,
self._get_stale_causes(key=dep_key),
)
elif provenance:
if provenance:
if not provenance.has_input_asset(dep_key.asset_key):
yield StaleCause(
key,
Expand Down
4 changes: 0 additions & 4 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,10 +961,6 @@ def auto_materialize_use_sensors(self) -> int:
def global_op_concurrency_default_limit(self) -> Optional[int]:
return self.get_settings("concurrency").get("default_op_concurrency_limit")

@property
def use_transitive_stale_causes(self) -> bool:
return False

# python logs

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,28 +294,7 @@ def asset2(asset1): ...
],
),
]
assert status_resolver.get_status(asset2.key) == StaleStatus.STALE
assert status_resolver.get_stale_causes(asset2.key) == [
StaleCause(
asset2.key,
StaleCauseCategory.DATA,
"stale dependency",
asset1.key,
[
StaleCause(
asset1.key,
StaleCauseCategory.DATA,
"has a new dependency data version",
source1.key,
[
StaleCause(
source1.key, StaleCauseCategory.DATA, "has a new data version"
),
],
),
],
)
]
assert status_resolver.get_status(asset2.key) == StaleStatus.FRESH
materialize_assets(all_assets, instance)

# Simulate updating an asset with a new code version
Expand All @@ -329,18 +308,7 @@ def asset1_v2(source1): ...
assert status_resolver.get_stale_causes(asset1.key) == [
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version"),
]
assert status_resolver.get_status(asset2.key) == StaleStatus.STALE
assert status_resolver.get_stale_causes(asset2.key) == [
StaleCause(
asset2.key,
StaleCauseCategory.DATA,
"stale dependency",
asset1.key,
[
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version"),
],
),
]
assert status_resolver.get_status(asset2.key) == StaleStatus.FRESH

@asset
def asset3(): ...
Expand Down Expand Up @@ -505,7 +473,7 @@ def asset1(): ...
all_assets = [asset1, asset2]
status_resolver = get_stale_status_resolver(instance, all_assets)
assert status_resolver.get_status(asset1.key, "foo") == StaleStatus.STALE
assert status_resolver.get_status(asset2.key, "foo") == StaleStatus.STALE
assert status_resolver.get_status(asset2.key, "foo") == StaleStatus.FRESH


def test_stale_status_partitions_enabled() -> None:
Expand Down Expand Up @@ -672,8 +640,8 @@ def asset3(asset2):
assert status_resolver.get_status(asset1.key, "alpha") == StaleStatus.FRESH
assert status_resolver.get_status(asset1.key, "beta") == StaleStatus.FRESH
assert status_resolver.get_status(asset2.key) == StaleStatus.STALE
assert status_resolver.get_status(asset3.key, "alpha") == StaleStatus.STALE
assert status_resolver.get_status(asset3.key, "beta") == StaleStatus.STALE
assert status_resolver.get_status(asset3.key, "alpha") == StaleStatus.FRESH
assert status_resolver.get_status(asset3.key, "beta") == StaleStatus.FRESH
assert status_resolver.get_stale_causes(asset2.key) == [
StaleCause(
asset2.key,
Expand Down Expand Up @@ -707,10 +675,8 @@ def asset3(asset2):
)
for dep_key in [a1_foo_key, a1_bar_key]
]
assert status_resolver.get_stale_root_causes(asset3.key, "alpha") == [
StaleCause(dep_key, StaleCauseCategory.DATA, "has a new data version")
for dep_key in [a1_foo_key, a1_bar_key]
]
assert status_resolver.get_status(asset3.key, "alpha") == StaleStatus.FRESH
assert status_resolver.get_status(asset3.key, "beta") == StaleStatus.FRESH

materialize_asset(all_assets, asset2, instance)
status_resolver = get_stale_status_resolver(instance, all_assets)
Expand Down Expand Up @@ -834,7 +800,7 @@ def asset2(context, asset1):
assert status_resolver.get_status(asset2.key) == StaleStatus.FRESH


def test_stale_status_root_causes_general() -> None:
def test_stale_status_non_transitive_root_causes() -> None:
x = 0

@observable_source_asset
Expand Down Expand Up @@ -870,14 +836,10 @@ def asset1_v2(source1): ...
assert status_resolver.get_stale_root_causes(asset1.key) == [
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version")
]
assert status_resolver.get_status(asset2.key) == StaleStatus.STALE
assert status_resolver.get_stale_root_causes(asset2.key) == [
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version")
]
assert status_resolver.get_status(asset3.key) == StaleStatus.STALE
assert status_resolver.get_stale_root_causes(asset3.key) == [
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version")
]
assert status_resolver.get_status(asset2.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset2.key) == []
assert status_resolver.get_status(asset3.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset3.key) == []

observe([source1], instance=instance)
status_resolver = get_stale_status_resolver(instance, all_assets)
Expand All @@ -886,133 +848,20 @@ def asset1_v2(source1): ...
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version"),
StaleCause(source1.key, StaleCauseCategory.DATA, "has a new data version"),
]
assert status_resolver.get_status(asset2.key) == StaleStatus.STALE
assert status_resolver.get_stale_root_causes(asset2.key) == [
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version"),
StaleCause(source1.key, StaleCauseCategory.DATA, "has a new data version"),
]
assert status_resolver.get_status(asset3.key) == StaleStatus.STALE
assert status_resolver.get_stale_root_causes(asset3.key) == [
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version"),
StaleCause(source1.key, StaleCauseCategory.DATA, "has a new data version"),
]

# Simulate updating an asset with a new code version
@asset(name="asset3", code_version="2")
def asset3_v2(asset2): ...

all_assets = [source1, asset1_v2, asset2, asset3_v2]
status_resolver = get_stale_status_resolver(instance, all_assets)
assert status_resolver.get_stale_root_causes(asset3.key) == [
StaleCause(asset3.key, StaleCauseCategory.CODE, "has a new code version"),
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version"),
StaleCause(source1.key, StaleCauseCategory.DATA, "has a new data version"),
]


def test_stale_status_non_transitive_root_causes() -> None:
with mock.patch.object(DagsterInstance, "use_transitive_stale_causes", False):
x = 0

@observable_source_asset
def source1(_context):
nonlocal x
x = x + 1
return DataVersion(str(x))

@asset(code_version="1")
def asset1(source1): ...

@asset(code_version="1")
def asset2(asset1): ...

@asset(code_version="1")
def asset3(asset2): ...

with instance_for_test() as instance:
all_assets = [source1, asset1, asset2, asset3]
status_resolver = get_stale_status_resolver(instance, all_assets)
assert status_resolver.get_stale_root_causes(asset1.key) == []
assert status_resolver.get_stale_root_causes(asset2.key) == []

materialize_assets(all_assets, instance)

# Simulate updating an asset with a new code version
@asset(name="asset1", code_version="2")
def asset1_v2(source1): ...

all_assets = [source1, asset1_v2, asset2, asset3]
status_resolver = get_stale_status_resolver(instance, all_assets)
assert status_resolver.get_status(asset1.key) == StaleStatus.STALE
assert status_resolver.get_stale_root_causes(asset1.key) == [
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version")
]
assert status_resolver.get_status(asset2.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset2.key) == []
assert status_resolver.get_status(asset3.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset3.key) == []

observe([source1], instance=instance)
status_resolver = get_stale_status_resolver(instance, all_assets)
assert status_resolver.get_status(asset1.key) == StaleStatus.STALE
assert status_resolver.get_stale_root_causes(asset1.key) == [
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version"),
StaleCause(source1.key, StaleCauseCategory.DATA, "has a new data version"),
]
assert status_resolver.get_status(asset2.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset2.key) == []
assert status_resolver.get_status(asset3.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset3.key) == []

materialize_assets(all_assets, instance=instance, selection=[asset1])
status_resolver = get_stale_status_resolver(instance, all_assets)
assert status_resolver.get_status(asset1.key) == StaleStatus.FRESH
assert status_resolver.get_status(asset2.key) == StaleStatus.STALE
assert status_resolver.get_stale_root_causes(asset2.key) == [
StaleCause(asset1.key, StaleCauseCategory.DATA, "has a new data version"),
]
assert status_resolver.get_status(asset3.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset3.key) == []


def test_stale_status_root_causes_dedup() -> None:
x = 0

@asset(code_version="1")
def asset1():
nonlocal x
x += 1
return Output(x, data_version=DataVersion(str(x)))

@asset
def asset2(asset1): ...

@asset
def asset3(asset1): ...

@asset
def asset4(asset2, asset3): ...

with instance_for_test() as instance:
all_assets = [asset1, asset2, asset3, asset4]
materialize_assets(all_assets, instance)
assert status_resolver.get_status(asset2.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset2.key) == []
assert status_resolver.get_status(asset3.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset3.key) == []

# Test dedup from updated data version
materialize_assets([asset1], instance=instance)
materialize_assets(all_assets, instance=instance, selection=[asset1])
status_resolver = get_stale_status_resolver(instance, all_assets)
assert status_resolver.get_stale_root_causes(asset4.key) == [
assert status_resolver.get_status(asset1.key) == StaleStatus.FRESH
assert status_resolver.get_status(asset2.key) == StaleStatus.STALE
assert status_resolver.get_stale_root_causes(asset2.key) == [
StaleCause(asset1.key, StaleCauseCategory.DATA, "has a new data version"),
]

# Test dedup from updated code version
@asset(name="asset1", code_version="2")
def asset1_v2(): ...

all_assets = [asset1_v2, asset2, asset3, asset4]
status_resolver = get_stale_status_resolver(instance, all_assets)
assert status_resolver.get_stale_root_causes(asset4.key) == [
StaleCause(asset1.key, StaleCauseCategory.CODE, "has a new code version"),
]
assert status_resolver.get_status(asset3.key) == StaleStatus.FRESH
assert status_resolver.get_stale_root_causes(asset3.key) == []


def test_no_provenance_stale_status():
Expand Down

0 comments on commit a739b4d

Please sign in to comment.