From d02e39ec1023675b4b747e7af276b82532668342 Mon Sep 17 00:00:00 2001 From: Caleb Hattingh Date: Fri, 31 Jan 2025 16:52:05 +0100 Subject: [PATCH 1/2] Treat min_workers=0 as "scale to zero", None to deactivate --- deadpool.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/deadpool.py b/deadpool.py index 22b1633..7d446e2 100644 --- a/deadpool.py +++ b/deadpool.py @@ -265,7 +265,11 @@ def __init__( self.finitializer = finalizer self.finitargs = finalargs self.pool_size = max_workers or len(os.sched_getaffinity(0)) - self.min_workers = min_workers or self.pool_size + if min_workers is None: + self.min_workers = self.pool_size + else: + self.min_workers = min_workers + self.max_tasks_per_child = max_tasks_per_child self.max_worker_memory_bytes = max_worker_memory_bytes self.submitted_jobs: PriorityQueue[PrioritizedItem] = PriorityQueue( From db3cfc32b95fea64c4201076d047e03808fbe653 Mon Sep 17 00:00:00 2001 From: Caleb Hattingh Date: Sat, 1 Feb 2025 14:17:25 +0100 Subject: [PATCH 2/2] Tweak coverage --- deadpool.py | 10 +++++----- tests/test_deadpool.py | 9 +++++++-- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/deadpool.py b/deadpool.py index 7d446e2..b3ead44 100644 --- a/deadpool.py +++ b/deadpool.py @@ -147,10 +147,10 @@ def shutdown(self, wait=True): self.connection_receive_msgs_from_process.close() - if self.connection_send_msgs_to_process.writable: + if self.connection_send_msgs_to_process.writable: # pragma: no branch try: self.connection_send_msgs_to_process.send(None) - except BrokenPipeError: + except BrokenPipeError: # pragma: no cover pass else: self.connection_send_msgs_to_process.close() @@ -608,7 +608,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: try: worker = self.workers.get_nowait() worker.shutdown() - except Empty: + except Empty: # pragma: no cover break # There may be a few processes left in the @@ -677,7 +677,7 @@ def kill_proc_tree( for p in children: try: p.send_signal(sig) - except psutil.NoSuchProcess: + except psutil.NoSuchProcess: # pragma: no cover pass gone, alive = psutil.wait_procs(children, timeout=timeout, callback=on_terminate) @@ -730,7 +730,7 @@ def conn_send_safe(obj): conn.send(obj) except BrokenPipeError: # pragma: no cover logger.debug("Pipe not usable") - except BaseException: + except BaseException: # pragma: no cover logger.exception("Unexpected pipe error") def timed_out(): diff --git a/tests/test_deadpool.py b/tests/test_deadpool.py index 7622a6f..2a32fa9 100644 --- a/tests/test_deadpool.py +++ b/tests/test_deadpool.py @@ -53,9 +53,14 @@ def test_cancel_all_futures(): @pytest.mark.parametrize("malloc_threshold", [None, 0, 1_000_000]) -def test_simple(malloc_threshold): +@pytest.mark.parametrize("daemon", [True, False]) +@pytest.mark.parametrize("min_workers", [None, 0]) +def test_simple(malloc_threshold, daemon, min_workers): with deadpool.Deadpool( - malloc_trim_rss_memory_threshold_bytes=malloc_threshold + malloc_trim_rss_memory_threshold_bytes=malloc_threshold, + daemon=daemon, + min_workers=min_workers, + max_workers=10, ) as exe: fut = exe.submit(t, 0.05) result = fut.result()