Skip to content

Commit

Permalink
Resolve NotImplementedError in gridengine (#5061)
Browse files Browse the repository at this point in the history
* Resolve NotImplementedError by defining base method

* Add mocked unit tests for gridengine. Also define custom exception when
avoiding nested retry issue

* unused import

* Add to CI

* Add missing CI setup line

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
stxue1 and github-actions[bot] committed Aug 21, 2024
1 parent 9e0ab6c commit 966c322
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 22 deletions.
16 changes: 16 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,22 @@ mesos:
- make test threads="${TEST_THREADS}" tests=src/toil/test/src/promisedRequirementTest.py::MesosPromisedRequirementsTest
- make test threads="${TEST_THREADS}" tests="src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSAutoscaleTest src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSStaticAutoscaleTest src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSAutoscaleTestMultipleNodeTypes src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSRestartTest::testAutoScaledCluster"

batchsystem:
rules:
- if: $CI_PIPELINE_SOURCE == "schedule"
- if: $CI_COMMIT_TAG
- if: $CI_COMMIT_BRANCH =~ /.*-fix-ci/
- if: $CI_COMMIT_BRANCH
changes:
compare_to: 'refs/heads/master'
paths:
- 'src/toil/test/batchSystems/test_gridengine.py'
- 'src/toil/batchSystems/gridengine.py'
stage: integration
script:
- ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all]
- make test threads="${TEST_THREADS}" tests=src/toil/test/batchSystems/test_gridengine.py::GridEngineTest

# Cactus-on-Kubernetes integration (as a script and not a pytest test)
cactus_integration:
rules:
Expand Down
45 changes: 23 additions & 22 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from toil.bus import ExternalBatchIdMessage, get_job_kind
from toil.job import AcceleratorRequirement
from toil.lib.misc import CalledProcessErrorStderr
from toil.lib.retry import old_retry, DEFAULT_DELAYS
from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry

logger = logging.getLogger(__name__)

Expand All @@ -41,6 +41,10 @@
# Accelerator requirements for the job
JobTuple = Tuple[int, float, int, str, str, Dict[str, str], List[AcceleratorRequirement]]

class ExceededRetryAttempts(Exception):
def __init__(self):
super().__init__("Exceeded retry attempts talking to scheduler.")

class AbstractGridEngineBatchSystem(BatchSystemCleanupSupport):
"""
A partial implementation of BatchSystemSupport for batch systems run on a
Expand Down Expand Up @@ -208,24 +212,15 @@ def checkOnJobs(self):
running_job_list = list(self.runningJobs)
batch_job_id_list = [self.getBatchSystemID(j) for j in running_job_list]
if batch_job_id_list:
try:
# Get the statuses as a batch
statuses = self.boss.with_retries(
self.coalesce_job_exit_codes, batch_job_id_list
# Get the statuses as a batch
statuses = self.boss.with_retries(
self.coalesce_job_exit_codes, batch_job_id_list
)
# We got the statuses as a batch
for running_job_id, status in zip(running_job_list, statuses):
activity = self._handle_job_status(
running_job_id, status, activity
)
except NotImplementedError:
# We have to get the statuses individually
for running_job_id, batch_job_id in zip(running_job_list, batch_job_id_list):
status = self.boss.with_retries(self.getJobExitCode, batch_job_id)
activity = self._handle_job_status(
running_job_id, status, activity
)
else:
# We got the statuses as a batch
for running_job_id, status in zip(running_job_list, statuses):
activity = self._handle_job_status(
running_job_id, status, activity
)

self._checkOnJobsCache = activity
self._checkOnJobsTimestamp = datetime.now()
Expand Down Expand Up @@ -292,13 +287,19 @@ def coalesce_job_exit_codes(self, batch_job_id_list: list) -> List[Union[int, Tu
Called by GridEngineThread.checkOnJobs().
This is an optional part of the interface. It should raise
NotImplementedError if not actually implemented for a particular
scheduler.
The default implementation falls back on self.getJobExitCode and polls each job individually
:param string batch_job_id_list: List of batch system job ID
"""
raise NotImplementedError()
statuses = []
try:
for batch_job_id in batch_job_id_list:
statuses.append(self.boss.with_retries(self.getJobExitCode, batch_job_id))
except CalledProcessErrorStderr as err:
# This avoids the nested retry issue where we could issue n^2 retries when the backing scheduler somehow disappears
# We catch the internal retry exception and raise something else so the outer retry doesn't retry the entire function again
raise ExceededRetryAttempts() from err
return statuses

@abstractmethod
def prepareSubmission(self,
Expand Down
159 changes: 159 additions & 0 deletions src/toil/test/batchSystems/test_gridengine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import textwrap
from queue import Queue

import pytest

import toil.batchSystems.gridengine
from toil.batchSystems.abstractGridEngineBatchSystem import ExceededRetryAttempts
from toil.common import Config
from toil.lib.misc import CalledProcessErrorStderr
from toil.test import ToilTest


class FakeBatchSystem:
"""
Class that implements a minimal Batch System, needed to create a Worker (see below).
"""

def __init__(self):
self.config = self.__fake_config()

def getWaitDuration(self):
return 10

def __fake_config(self):
"""
Returns a dummy config for the batch system tests. We need a workflowID to be set up
since we are running tests without setting up a jobstore. This is the class version
to be used when an instance is not available.
:rtype: toil.common.Config
"""
config = Config()
from uuid import uuid4
config.workflowID = str(uuid4())
config.cleanWorkDir = 'always'
return config

def with_retries(self, operation, *args, **kwargs):
"""
The grid engine batch system needs a with_retries function when running the GridEngineThread, so fake one
"""
return operation(*args, **kwargs)


def call_qstat_or_qacct(args, **_):
# example outputs taken from https://2021.help.altair.com/2021.1/AltairGridEngine/8.7.0/UsersGuideGE.pdf
qacct_info = {}
job_id_info = {1: {"failed": True, "exit_code": 0, "completed": True}, 2: {"failed": True, "exit_code": 2, "completed": True},
3: {"failed": False, "exit_code": 0, "completed": True}, 4: {"failed": False, "exit_code": 10, "completed": True},
5: {"failed": False, "exit_code": 0, "completed": False}}
for job_id, status_info in job_id_info.items():
failed = 1 if status_info["failed"] else 0
exit_status = status_info["exit_code"]
qacct_info[job_id] = textwrap.dedent(f"""\
==============================================================
qname all.q
hostname kailua
group users
owner jondoe
project NONE
department defaultdepartment
jobname Sleeper
jobnumber 10
taskid undefined
account sge
priority 0
qsub_time Thu Mar 10 19:58:35 2011
start_time Thu Mar 10 19:58:42 2011
end_time Thu Mar 10 19:59:43 2011
granted_pe NONE
slots 1
failed {failed}
exit_status {exit_status}
ru_wallclock 61
ru_utime 0.070
ru_stime 0.050
ru_maxrss 1220
ru_ixrss 0
ru_ismrss 0
ru_idrss 0
""")
if args[0] == "qstat":
# This is guess for what qstat will return given a job. I'm unable to find an example for qstat.
# This also assumes the second argument args[1] is -j, as that is what we try to use
job_id = int(args[2])
if job_id not in job_id_info.keys() or job_id_info[job_id]["completed"]:
stderr = f"Following jobs do not exist {job_id}"
else:
# This is not the output of qstat when the job is running, and is just a guess
# We test on the existence of the string "Following jobs do not exist", so this should be okay for now
stderr = f"Job exists {job_id}"
raise CalledProcessErrorStderr(2, args, stderr=stderr)
elif args[0] == "qacct":
if args[1] != "-j":
# Documentation for qacct says if -j is not found then all jobs are listed
# https://gridscheduler.sourceforge.net/htmlman/htmlman1/qacct.html
# This is a guess for the output of qacct. We don't have a SGE cluster and I can't find a bare qacct example output online
qacct_response = "\n".join(qacct_info.values())
else:
job_id = int(args[2])
if job_id not in job_id_info.keys():
# This is a guess of the behavior when the job does not exist. Since the behavior is unknown, this is not currently tested
return ""
qacct_response = qacct_info[job_id]

return qacct_response


class GridEngineTest(ToilTest):
"""
Class for unit-testing GridEngineBatchSystem
"""

def setUp(self):
self.monkeypatch = pytest.MonkeyPatch()
self.worker = toil.batchSystems.gridengine.GridEngineBatchSystem.GridEngineThread(
newJobsQueue=Queue(),
updatedJobsQueue=Queue(),
killQueue=Queue(),
killedJobsQueue=Queue(),
boss=FakeBatchSystem())

###
### Tests for coalesce_job_exit_codes for gridengine.
###

def test_coalesce_job_exit_codes_one_exists(self):
self.monkeypatch.setattr(toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct)
job_ids = ['1'] # FAILED
expected_result = [1]
result = self.worker.coalesce_job_exit_codes(job_ids)
assert result == expected_result, f"{result} != {expected_result}"

def test_coalesce_job_exit_codes_one_still_running(self):
self.monkeypatch.setattr(toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct)
job_ids = ['5'] # Still running. We currently raise an exception when this happens
try:
self.worker.coalesce_job_exit_codes(job_ids)
except ExceededRetryAttempts:
pass
else:
raise RuntimeError("Test did not raise an exception!")

def test_coalesce_job_exit_codes_many_all_exist(self):
self.monkeypatch.setattr(toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct)
job_ids = ['1', # FAILED,
'2', # FAILED (with exit code that we ignore),
'3', # SUCCEEDED,
'4'] # EXIT CODE 10
# RUNNING and PENDING jobs should return None
expected_result = [
1,
1,
0,
10
]
result = self.worker.coalesce_job_exit_codes(job_ids)
assert result == expected_result, f"{result} != {expected_result}"

0 comments on commit 966c322

Please sign in to comment.