From bb4623a2f1f48257b84ca38577df653894a3bfab Mon Sep 17 00:00:00 2001 From: rafa-be Date: Thu, 6 Feb 2025 21:25:31 +0100 Subject: [PATCH] Disable load-balancing by default and improve logging on exception on worker or processor failure. (#55) Signed-off-by: rafa-be --- scaler/about.py | 2 +- scaler/io/config.py | 3 ++- scaler/worker/agent/processor/processor.py | 3 +++ scaler/worker/worker.py | 2 ++ tests/test_balance.py | 7 ++++++- 5 files changed, 14 insertions(+), 3 deletions(-) diff --git a/scaler/about.py b/scaler/about.py index 0a0a43a..38cf6db 100644 --- a/scaler/about.py +++ b/scaler/about.py @@ -1 +1 @@ -__version__ = "1.9.0" +__version__ = "1.9.1" diff --git a/scaler/io/config.py b/scaler/io/config.py index 09ccb0d..a20f5fe 100644 --- a/scaler/io/config.py +++ b/scaler/io/config.py @@ -37,7 +37,8 @@ DEFAULT_CLIENT_TIMEOUT_SECONDS = 60 # number of seconds for load balance, if value is 0 means disable load balance -DEFAULT_LOAD_BALANCE_SECONDS = 1 +# FIXME: load balancing is currently disabled by default as it's causing some issues under heavy load. +DEFAULT_LOAD_BALANCE_SECONDS = 0 # when load balance advice happened repeatedly and always be the same, we issue load balance request when exact repeated # times happened diff --git a/scaler/worker/agent/processor/processor.py b/scaler/worker/agent/processor/processor.py index 97ae428..45d57f0 100644 --- a/scaler/worker/agent/processor/processor.py +++ b/scaler/worker/agent/processor/processor.py @@ -135,6 +135,9 @@ def __run_forever(self): except (KeyboardInterrupt, InterruptedError): pass + except Exception as e: + logging.exception(f"Processor[{self.pid}]: failed with unhandled exception:\n{(e)}") + finally: self._object_cache.destroy() self._connector.close() diff --git a/scaler/worker/worker.py b/scaler/worker/worker.py index 590c17a..dd24997 100644 --- a/scaler/worker/worker.py +++ b/scaler/worker/worker.py @@ -173,6 +173,8 @@ async def __get_loops(self): pass except (ClientShutdownException, TimeoutError) as e: logging.info(f"Worker[{self.pid}]: {str(e)}") + except Exception as e: + logging.exception(f"Worker[{self.pid}]: failed with unhandled exception:\n{(e)}") await self._connector_external.send(DisconnectRequest.new_msg(self._connector_external.identity)) diff --git a/tests/test_balance.py b/tests/test_balance.py index e355326..b980cd9 100644 --- a/tests/test_balance.py +++ b/tests/test_balance.py @@ -28,7 +28,12 @@ def test_balance(self): N_WORKERS = N_TASKS address = f"tcp://127.0.0.1:{get_available_tcp_port()}" - combo = SchedulerClusterCombo(address=address, n_workers=1, per_worker_queue_size=N_TASKS) + combo = SchedulerClusterCombo( + address=address, + n_workers=1, + per_worker_queue_size=N_TASKS, + load_balance_seconds=1, # FIXME: re-enable balancing as it's currently disabled by default + ) client = Client(address=address)