From 508184f8b295f6096de2655c40e125c64c8ec893 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Tue, 5 Mar 2024 16:48:37 +0100 Subject: [PATCH] Refactor error handling in Broker class, requeue if killedworker is raised --- cads_broker/dispatcher.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index 8c6d581b..c287e57e 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -203,15 +203,21 @@ def on_future_done(self, future: distributed.Future) -> None: ) elif future.status == "error": exception = future.exception() - request = db.set_request_status( - future.key, - job_status, - error_message="".join(traceback.format_exception(exception)), - error_reason=traceback.format_exception_only(exception)[0], - log=log, - user_visible_log=user_visible_log, - session=session, - ) + error_message = "".join(traceback.format_exception(exception)) + error_reason = exception.__class__.__name__ + if error_reason == "distributed.scheduler.KilledWorker": + logger.info("worker killed: re-queueing", job_id=future.key) + db.requeue_request(request_uid=future.key, session=session) + else: + request = db.set_request_status( + future.key, + job_status, + error_message=error_message, + error_reason=error_reason, + log=log, + user_visible_log=user_visible_log, + session=session, + ) else: # if the dask status is unknown, re-queue it request = db.set_request_status(