Skip to content

Commit

Permalink
Log key collision count in update_graph log event (#8692)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Jun 13, 2024
1 parent d8dc8ad commit f7921a1
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
26 changes: 19 additions & 7 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4590,11 +4590,6 @@ def _create_taskstate_from_graph(

lost_keys = self._match_graph_with_tasks(dsk, dependencies, keys)

if len(dsk) > 1:
self.log_event(
["all", client], {"action": "update_graph", "count": len(dsk)}
)

if lost_keys:
self.report({"op": "cancelled-keys", "keys": lost_keys}, client=client)
self.client_releases_keys(
Expand All @@ -4616,13 +4611,28 @@ def _create_taskstate_from_graph(
computation.annotations.update(global_annotations)
del global_annotations

runnable, touched_tasks, new_tasks = self._generate_taskstates(
(
runnable,
touched_tasks,
new_tasks,
colliding_task_count,
) = self._generate_taskstates(
keys=keys,
dsk=dsk,
dependencies=dependencies,
computation=computation,
)

if len(dsk) > 1 or colliding_task_count:
self.log_event(
["all", client],
{
"action": "update_graph",
"count": len(dsk),
"key-collisions": colliding_task_count,
},
)

keys_with_annotations = self._apply_annotations(
tasks=new_tasks,
annotations_by_type=annotations_by_type,
Expand Down Expand Up @@ -4815,6 +4825,7 @@ def _generate_taskstates(
touched_keys = set()
touched_tasks = []
tgs_with_bad_run_spec = set()
colliding_task_count = 0
while stack:
k = stack.pop()
if k in touched_keys:
Expand Down Expand Up @@ -4860,6 +4871,7 @@ def _generate_taskstates(
# dask/dask#9888.
dependencies[k] = deps_lhs

colliding_task_count += 1
if ts.group not in tgs_with_bad_run_spec:
tgs_with_bad_run_spec.add(ts.group)
logger.warning(
Expand Down Expand Up @@ -4912,7 +4924,7 @@ def _generate_taskstates(
len(touched_tasks),
len(keys),
)
return runnable, touched_tasks, new_tasks
return runnable, touched_tasks, new_tasks, colliding_task_count

def _apply_annotations(
self,
Expand Down
19 changes: 19 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4789,6 +4789,23 @@ async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, de
For a real world example where this can trigger, see
https://github.com/dask/dask/issues/9888
"""
seen = False

def _match(event):
_, msg = event
return (
isinstance(msg, dict)
and msg.get("action", None) == "update_graph"
and msg["key-collisions"] > 0
)

def handler(ev):
if _match(ev):
nonlocal seen
seen = True

c.subscribe_topic("all", handler)

x1 = c.submit(inc, 1, key="x1")
y_old = c.submit(inc, x1, key="y")

Expand All @@ -4803,6 +4820,8 @@ async def test_resubmit_different_task_same_key_before_previous_is_done(c, s, de

assert "Detected different `run_spec` for key 'y'" in log.getvalue()

await async_poll_for(lambda: seen, timeout=5)

async with Worker(s.address):
# Used old run_spec
assert await y_old == 3
Expand Down

0 comments on commit f7921a1

Please sign in to comment.