Skip to content

Commit

Permalink
Detect if the GridEngine worker thread has crashed to prevent hanging…
Browse files Browse the repository at this point in the history
… the workflow (#4873)

* Debug envvar

* add error to message

* Add logic for unexpected background thread failure

* Set block back to true

* Don't duplicate thread exception message and print at end

* Revert "Debug envvar"

This reverts commit 1339285.

* Apply suggestions from code review

Co-authored-by: Adam Novak <[email protected]>

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Adam Novak <[email protected]>
  • Loading branch information
3 people authored Apr 26, 2024
1 parent c566f31 commit c743848
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 28 deletions.
55 changes: 35 additions & 20 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,21 @@ class AbstractGridEngineBatchSystem(BatchSystemCleanupSupport):
A partial implementation of BatchSystemSupport for batch systems run on a
standard HPC cluster. By default auto-deployment is not implemented.
"""
class GridEngineThreadException(Exception):
pass

class Worker(Thread, metaclass=ABCMeta):

class GridEngineThread(Thread, metaclass=ABCMeta):
def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queue, killedJobsQueue: Queue, boss: 'AbstractGridEngineBatchSystem') -> None:
"""
Abstract worker interface class. All instances are created with five
Abstract thread interface class. All instances are created with five
initial arguments (below). Note the Queue instances passed are empty.
:param newJobsQueue: a Queue of new (unsubmitted) jobs
:param updatedJobsQueue: a Queue of jobs that have been updated
:param killQueue: a Queue of active jobs that need to be killed
:param killedJobsQueue: Queue of killed jobs for this worker
:param killedJobsQueue: Queue of killed jobs for this thread
:param boss: the AbstractGridEngineBatchSystem instance that
controls this AbstractGridEngineWorker
controls this GridEngineThread
"""
Thread.__init__(self)
Expand All @@ -77,6 +78,7 @@ def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queu
self.batchJobIDs: Dict[int, str] = dict()
self._checkOnJobsCache = None
self._checkOnJobsTimestamp = None
self.exception = None

def getBatchSystemID(self, jobID: int) -> str:
"""
Expand Down Expand Up @@ -110,7 +112,7 @@ def createJobs(self, newJob: JobTuple) -> bool:
"""
Create a new job with the given attributes.
Implementation-specific; called by AbstractGridEngineWorker.run()
Implementation-specific; called by GridEngineThread.run()
"""
activity = False
# Load new job id if present:
Expand Down Expand Up @@ -146,7 +148,7 @@ def createJobs(self, newJob: JobTuple) -> bool:

def killJobs(self):
"""
Kill any running jobs within worker
Kill any running jobs within thread
"""
killList = list()
while True:
Expand Down Expand Up @@ -277,14 +279,17 @@ def run(self):
while self._runStep():
pass
except Exception as ex:
logger.error("GridEngine like batch system failure", exc_info=ex)
raise
self.exception = ex
logger.error("GridEngine like batch system failure: %s", ex)
# don't raise exception as is_alive will still be set to false,
# signalling exception in the thread as we expect the thread to
# always be running for the duration of the workflow

def coalesce_job_exit_codes(self, batch_job_id_list: list) -> List[Union[int, Tuple[int, Optional[BatchJobExitReason]], None]]:
"""
Returns exit codes and possibly exit reasons for a list of jobs, or None if they are running.
Called by AbstractGridEngineWorker.checkOnJobs().
Called by GridEngineThread.checkOnJobs().
This is an optional part of the interface. It should raise
NotImplementedError if not actually implemented for a particular
Expand Down Expand Up @@ -345,7 +350,7 @@ def getRunningJobIDs(self):
def killJob(self, jobID):
"""
Kill specific job with the Toil job ID. Implementation-specific; called
by AbstractGridEngineWorker.killJobs()
by GridEngineThread.killJobs()
:param string jobID: Toil job ID
"""
Expand All @@ -360,7 +365,7 @@ def getJobExitCode(self, batchJobID) -> Union[int, Tuple[int, Optional[BatchJobE
If the job is not running but the exit code is not available, it
will be EXIT_STATUS_UNAVAILABLE_VALUE. Implementation-specific;
called by AbstractGridEngineWorker.checkOnJobs().
called by GridEngineThread.checkOnJobs().
The exit code will only be 0 if the job affirmatively succeeded.
Expand All @@ -379,10 +384,10 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
self.updatedJobsQueue = Queue()
self.killQueue = Queue()
self.killedJobsQueue = Queue()
# get the associated worker class here
self.worker = self.Worker(self.newJobsQueue, self.updatedJobsQueue,
self.killQueue, self.killedJobsQueue, self)
self.worker.start()
# get the associated thread class here
self.background_thread = self.GridEngineThread(self.newJobsQueue, self.updatedJobsQueue,
self.killQueue, self.killedJobsQueue, self)
self.background_thread.start()
self._getRunningBatchJobIDsTimestamp = None
self._getRunningBatchJobIDsCache = {}

Expand Down Expand Up @@ -428,7 +433,12 @@ def killBatchJobs(self, jobIDs):
for jobID in jobIDs:
self.killQueue.put(jobID)
while jobIDs:
killedJobId = self.killedJobsQueue.get()
try:
killedJobId = self.killedJobsQueue.get(timeout=10)
except Empty:
if not self.background_thread.is_alive():
raise self.GridEngineThreadException("Grid engine thread failed unexpectedly") from self.background_thread.exception
continue
if killedJobId is None:
break
jobIDs.remove(killedJobId)
Expand Down Expand Up @@ -460,14 +470,19 @@ def getRunningBatchJobIDs(self):
self.config.statePollingWait):
batchIds = self._getRunningBatchJobIDsCache
else:
batchIds = self.with_retries(self.worker.getRunningJobIDs)
batchIds = self.with_retries(self.background_thread.getRunningJobIDs)
self._getRunningBatchJobIDsCache = batchIds
self._getRunningBatchJobIDsTimestamp = datetime.now()
batchIds.update(self.getRunningLocalJobIDs())
return batchIds

def getUpdatedBatchJob(self, maxWait):
local_tuple = self.getUpdatedLocalJob(0)

if not self.background_thread.is_alive():
# kill remaining jobs on the thread
self.background_thread.killJobs()
raise self.GridEngineThreadException("Unexpected GridEngineThread failure") from self.background_thread.exception
if local_tuple:
return local_tuple
else:
Expand All @@ -481,14 +496,14 @@ def getUpdatedBatchJob(self, maxWait):

def shutdown(self) -> None:
"""
Signals worker to shutdown (via sentinel) then cleanly joins the thread
Signals thread to shutdown (via sentinel) then cleanly joins the thread
"""
self.shutdownLocal()
newJobsQueue = self.newJobsQueue
self.newJobsQueue = None

newJobsQueue.put(None)
self.worker.join()
self.background_thread.join()

def setEnv(self, name, value=None):
if value and ',' in value:
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

class GridEngineBatchSystem(AbstractGridEngineBatchSystem):

class Worker(AbstractGridEngineBatchSystem.Worker):
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):
"""
Grid Engine-specific AbstractGridEngineWorker methods
"""
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
class HTCondorBatchSystem(AbstractGridEngineBatchSystem):
# When using HTCondor, the Schedd handles scheduling

class Worker(AbstractGridEngineBatchSystem.Worker):
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):

# Override the createJobs method so that we can use htcondor.Submit objects
# and so that we can get disk allocation requests and ceil the CPU request.
Expand Down
4 changes: 2 additions & 2 deletions src/toil/batchSystems/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@

class LSFBatchSystem(AbstractGridEngineBatchSystem):

class Worker(AbstractGridEngineBatchSystem.Worker):
"""LSF specific AbstractGridEngineWorker methods."""
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):
"""LSF specific GridEngineThread methods."""

def getRunningJobIDs(self):
times = {}
Expand Down
4 changes: 2 additions & 2 deletions src/toil/batchSystems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@

class SlurmBatchSystem(AbstractGridEngineBatchSystem):

class Worker(AbstractGridEngineBatchSystem.Worker):
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):

def getRunningJobIDs(self):
# Should return a dictionary of Job IDs and number of seconds
Expand Down Expand Up @@ -135,7 +135,7 @@ def submitJob(self, subLine):
logger.debug("sbatch submitted job %d", result)
return result
except OSError as e:
logger.error("sbatch command failed")
logger.error(f"sbatch command failed with error: {e}")
raise e

def coalesce_job_exit_codes(self, batch_job_id_list: list) -> List[Union[int, Tuple[int, Optional[BatchJobExitReason]], None]]:
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
class TorqueBatchSystem(AbstractGridEngineBatchSystem):

# class-specific Worker
class Worker(AbstractGridEngineBatchSystem.Worker):
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):
def __init__(
self, newJobsQueue, updatedJobsQueue, killQueue, killedJobsQueue, boss
):
Expand Down
2 changes: 2 additions & 0 deletions src/toil/lib/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ def old_retry(
>>> i
1
"""
if timeout is None:
timeout = DEFAULT_TIMEOUT
if timeout > 0:
go = [ None ]

Expand Down
2 changes: 1 addition & 1 deletion src/toil/test/batchSystems/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class SlurmTest(ToilTest):

def setUp(self):
self.monkeypatch = pytest.MonkeyPatch()
self.worker = toil.batchSystems.slurm.SlurmBatchSystem.Worker(
self.worker = toil.batchSystems.slurm.SlurmBatchSystem.GridEngineThread(
newJobsQueue=Queue(),
updatedJobsQueue=Queue(),
killQueue=Queue(),
Expand Down

0 comments on commit c743848

Please sign in to comment.