From 36d78ecf772c567c119b36c1fd524838450b4ce9 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 14 Nov 2024 11:39:01 +0100 Subject: [PATCH] Do not log full worker info in retire_workers --- distributed/scheduler.py | 30 +++++++++---------- .../tests/test_active_memory_manager.py | 8 +++-- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index db1d1c89cd..3bce832c7b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7443,7 +7443,7 @@ async def retire_workers( close_workers: bool = False, remove: bool = True, stimulus_id: str | None = None, - ) -> dict[str, Any]: ... + ) -> list[str]: ... @overload async def retire_workers( @@ -7453,7 +7453,7 @@ async def retire_workers( close_workers: bool = False, remove: bool = True, stimulus_id: str | None = None, - ) -> dict[str, Any]: ... + ) -> list[str]: ... @overload async def retire_workers( @@ -7469,7 +7469,7 @@ async def retire_workers( minimum: int | None = None, target: int | None = None, attribute: str = "address", - ) -> dict[str, Any]: ... + ) -> list[str]: ... @log_errors async def retire_workers( @@ -7481,7 +7481,7 @@ async def retire_workers( remove: bool = True, stimulus_id: str | None = None, **kwargs: Any, - ) -> dict[str, Any]: + ) -> list[str]: """Gracefully retire workers from cluster. Any key that is in memory exclusively on the retired workers is replicated somewhere else. @@ -7559,7 +7559,7 @@ async def retire_workers( self.workers[address] for address in self.workers_to_close(**kwargs) } if not wss: - return {} + return [] stop_amm = False amm: ActiveMemoryManagerExtension | None = self.extensions.get("amm") @@ -7609,13 +7609,13 @@ async def retire_workers( # time (depending on interval settings) amm.run_once() - workers_info_ok = {} - workers_info_abort = {} - for addr, result, info in await asyncio.gather(*coros): + workers_info_ok = [] + workers_info_abort = [] + for addr, result in await asyncio.gather(*coros): if result == "OK": - workers_info_ok[addr] = info + workers_info_ok.append(addr) else: - workers_info_abort[addr] = info + workers_info_abort.append(addr) finally: if stop_amm: @@ -7625,8 +7625,8 @@ async def retire_workers( "all", { "action": "retire-workers", - "retired": workers_info_ok, - "could-not-retire": workers_info_abort, + "retired": list(workers_info_ok), + "could-not-retire": list(workers_info_abort), "stimulus_id": stimulus_id, }, ) @@ -7649,7 +7649,7 @@ async def _track_retire_worker( close: bool, remove: bool, stimulus_id: str, - ) -> tuple[str, Literal["OK", "no-recipients"], dict]: + ) -> tuple[str, Literal["OK", "no-recipients"]]: while not policy.done(): # Sleep 0.01s when there are 4 tasks or less # Sleep 0.5s when there are 200 or more @@ -7671,7 +7671,7 @@ async def _track_retire_worker( f"Could not retire worker {ws.address!r}: unique data could not be " f"moved to any other worker ({stimulus_id=!r})" ) - return ws.address, "no-recipients", ws.identity() + return ws.address, "no-recipients" logger.debug( f"All unique keys on worker {ws.address!r} have been replicated elsewhere" @@ -7685,7 +7685,7 @@ async def _track_retire_worker( self.close_worker(ws.address) logger.info(f"Retired worker {ws.address!r} ({stimulus_id=!r})") - return ws.address, "OK", ws.identity() + return ws.address, "OK" def add_keys( self, worker: str, keys: Collection[Key] = (), stimulus_id: str | None = None diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 4f55158622..bcc0d48d71 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -977,7 +977,7 @@ async def test_RetireWorker_all_recipients_are_paused(c, s, a, b): x = await c.scatter("x", workers=[a.address]) out = await c.retire_workers([a.address]) - assert out == {} + assert not out assert not s.extensions["amm"].policies assert set(s.workers) == {a.address, b.address} @@ -1230,7 +1230,7 @@ async def test_RetireWorker_with_actor(c, s, a, b, has_proxy): with captured_logger("distributed.active_memory_manager", logging.WARNING) as log: out = await c.retire_workers([a.address]) - assert out == {} + assert not out assert "it holds actor(s)" in log.getvalue() assert "x" in a.state.actors @@ -1250,7 +1250,7 @@ async def test_RetireWorker_with_actor_proxy(c, s, a, b): assert "y" in b.data out = await c.retire_workers([b.address]) - assert out.keys() == {b.address} + assert out == (b.address,) assert "x" in a.state.actors assert "y" in a.data @@ -1301,6 +1301,7 @@ async def tensordot_stress(c, s): assert sum(t.start == "memory" for t in s.transition_log) == expected_tasks +@pytest.mark.slow @gen_cluster( client=True, nthreads=[("", 1)] * 4, @@ -1356,6 +1357,7 @@ async def test_ReduceReplicas_stress(c, s, *workers): await tensordot_stress(c, s) +@pytest.mark.slow @pytest.mark.parametrize("use_ReduceReplicas", [False, True]) @gen_cluster( client=True,