Skip to content

Commit

Permalink
Don't consider scheduler idle while executing ``Scheduler.update_grap…
Browse files Browse the repository at this point in the history
…h`` (#8877)

Co-authored-by: James Bourbeau <[email protected]>
  • Loading branch information
hendrikmakait and jrbourbeau authored Sep 24, 2024
1 parent 42168f7 commit f870325
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
14 changes: 12 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3686,6 +3686,7 @@ class Scheduler(SchedulerState, ServerNode):
_client_connections_removed_total: int
_workers_added_total: int
_workers_removed_total: int
_active_graph_updates: int

def __init__(
self,
Expand Down Expand Up @@ -4049,6 +4050,7 @@ async def post(self):
self._client_connections_removed_total = 0
self._workers_added_total = 0
self._workers_removed_total = 0
self._active_graph_updates = 0

##################
# Administration #
Expand Down Expand Up @@ -4841,6 +4843,7 @@ async def update_graph(
stimulus_id: str | None = None,
) -> None:
start = time()
self._active_graph_updates += 1
try:
try:
graph = deserialize(graph_header, graph_frames).data
Expand Down Expand Up @@ -4913,8 +4916,11 @@ async def update_graph(
# (which may not have been added to who_wants yet)
client=client,
)
end = time()
self.digest_metric("update-graph-duration", end - start)
finally:
self._active_graph_updates -= 1
assert self._active_graph_updates >= 0
end = time()
self.digest_metric("update-graph-duration", end - start)

def _generate_taskstates(
self,
Expand Down Expand Up @@ -8607,6 +8613,10 @@ def check_idle(self) -> float | None:
self.idle_since = None
return None

if self._active_graph_updates > 0:
self.idle_since = None
return None

if (
self.queued
or self.unrunnable
Expand Down
28 changes: 28 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2389,6 +2389,34 @@ async def test_idle_timeout(c, s, a, b):
pc.stop()


@gen_cluster(client=True)
async def test_idle_during_update_graph(c, s, a, b):
class UpdateGraphTrackerPlugin(SchedulerPlugin):
def start(self, scheduler):
self.scheduler = scheduler
self.idle_during_update_graph = None

def update_graph(self, *args, **kwargs):
self.idle_during_update_graph = self.scheduler.check_idle() is not None

await c.register_plugin(UpdateGraphTrackerPlugin(), name="tracker")
plugin = s.plugins["tracker"]
# The cluster is idle because no work ever existed
assert s.check_idle() is not None
beginning = time()
assert s.idle_since < beginning
await c.submit(lambda x: x, 1)
# The cluster may be considered not idle because of the unit of work
s.check_idle()
# Now the cluster must be idle
assert s.check_idle() is not None
end = time()
assert beginning <= s.idle_since
assert s.idle_since <= end
# Ensure the cluster isn't idle while `Scheduler.update_graph` was being run
assert plugin.idle_during_update_graph is False


@gen_cluster(client=True, nthreads=[])
async def test_idle_timeout_no_workers(c, s):
"""Test that idle-timeout is not triggered if there are no workers available
Expand Down

0 comments on commit f870325

Please sign in to comment.