Skip to content

Commit

Permalink
Disable load-balancing by default and improve logging on exception on…
Browse files Browse the repository at this point in the history
… worker or processor failure. (Citi#55)

Signed-off-by: rafa-be <[email protected]>
  • Loading branch information
rafa-be authored Feb 6, 2025
1 parent 5153efc commit bb4623a
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 3 deletions.
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.9.0"
__version__ = "1.9.1"
3 changes: 2 additions & 1 deletion scaler/io/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
DEFAULT_CLIENT_TIMEOUT_SECONDS = 60

# number of seconds for load balance, if value is 0 means disable load balance
DEFAULT_LOAD_BALANCE_SECONDS = 1
# FIXME: load balancing is currently disabled by default as it's causing some issues under heavy load.
DEFAULT_LOAD_BALANCE_SECONDS = 0

# when load balance advice happened repeatedly and always be the same, we issue load balance request when exact repeated
# times happened
Expand Down
3 changes: 3 additions & 0 deletions scaler/worker/agent/processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ def __run_forever(self):
except (KeyboardInterrupt, InterruptedError):
pass

except Exception as e:
logging.exception(f"Processor[{self.pid}]: failed with unhandled exception:\n{(e)}")

finally:
self._object_cache.destroy()
self._connector.close()
Expand Down
2 changes: 2 additions & 0 deletions scaler/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ async def __get_loops(self):
pass
except (ClientShutdownException, TimeoutError) as e:
logging.info(f"Worker[{self.pid}]: {str(e)}")
except Exception as e:
logging.exception(f"Worker[{self.pid}]: failed with unhandled exception:\n{(e)}")

await self._connector_external.send(DisconnectRequest.new_msg(self._connector_external.identity))

Expand Down
7 changes: 6 additions & 1 deletion tests/test_balance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ def test_balance(self):
N_WORKERS = N_TASKS

address = f"tcp://127.0.0.1:{get_available_tcp_port()}"
combo = SchedulerClusterCombo(address=address, n_workers=1, per_worker_queue_size=N_TASKS)
combo = SchedulerClusterCombo(
address=address,
n_workers=1,
per_worker_queue_size=N_TASKS,
load_balance_seconds=1, # FIXME: re-enable balancing as it's currently disabled by default
)

client = Client(address=address)

Expand Down

0 comments on commit bb4623a

Please sign in to comment.