diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e67f1ecdf0..b277e5cccc 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -477,6 +477,22 @@ mesos: - make test threads="${TEST_THREADS}" tests=src/toil/test/src/promisedRequirementTest.py::MesosPromisedRequirementsTest - make test threads="${TEST_THREADS}" tests="src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSAutoscaleTest src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSStaticAutoscaleTest src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSAutoscaleTestMultipleNodeTypes src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSRestartTest::testAutoScaledCluster" +batchsystem: + rules: + - if: $CI_PIPELINE_SOURCE == "schedule" + - if: $CI_COMMIT_TAG + - if: $CI_COMMIT_BRANCH =~ /.*-fix-ci/ + - if: $CI_COMMIT_BRANCH + changes: + compare_to: 'refs/heads/master' + paths: + - 'src/toil/test/batchSystems/test_gridengine.py' + - 'src/toil/batchSystems/gridengine.py' + stage: integration + script: + - ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all] + - make test threads="${TEST_THREADS}" tests=src/toil/test/batchSystems/test_gridengine.py::GridEngineTest + # Cactus-on-Kubernetes integration (as a script and not a pytest test) cactus_integration: rules: diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 3fa70bdd3a..7a1a9d9bca 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -26,7 +26,7 @@ from toil.bus import ExternalBatchIdMessage, get_job_kind from toil.job import AcceleratorRequirement from toil.lib.misc import CalledProcessErrorStderr -from toil.lib.retry import old_retry, DEFAULT_DELAYS +from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry logger = logging.getLogger(__name__) @@ -41,6 +41,10 @@ # Accelerator requirements for the job JobTuple = Tuple[int, float, int, str, str, Dict[str, str], List[AcceleratorRequirement]] +class ExceededRetryAttempts(Exception): + def __init__(self): + super().__init__("Exceeded retry attempts talking to scheduler.") + class AbstractGridEngineBatchSystem(BatchSystemCleanupSupport): """ A partial implementation of BatchSystemSupport for batch systems run on a @@ -208,24 +212,15 @@ def checkOnJobs(self): running_job_list = list(self.runningJobs) batch_job_id_list = [self.getBatchSystemID(j) for j in running_job_list] if batch_job_id_list: - try: - # Get the statuses as a batch - statuses = self.boss.with_retries( - self.coalesce_job_exit_codes, batch_job_id_list + # Get the statuses as a batch + statuses = self.boss.with_retries( + self.coalesce_job_exit_codes, batch_job_id_list + ) + # We got the statuses as a batch + for running_job_id, status in zip(running_job_list, statuses): + activity = self._handle_job_status( + running_job_id, status, activity ) - except NotImplementedError: - # We have to get the statuses individually - for running_job_id, batch_job_id in zip(running_job_list, batch_job_id_list): - status = self.boss.with_retries(self.getJobExitCode, batch_job_id) - activity = self._handle_job_status( - running_job_id, status, activity - ) - else: - # We got the statuses as a batch - for running_job_id, status in zip(running_job_list, statuses): - activity = self._handle_job_status( - running_job_id, status, activity - ) self._checkOnJobsCache = activity self._checkOnJobsTimestamp = datetime.now() @@ -292,13 +287,19 @@ def coalesce_job_exit_codes(self, batch_job_id_list: list) -> List[Union[int, Tu Called by GridEngineThread.checkOnJobs(). - This is an optional part of the interface. It should raise - NotImplementedError if not actually implemented for a particular - scheduler. + The default implementation falls back on self.getJobExitCode and polls each job individually :param string batch_job_id_list: List of batch system job ID """ - raise NotImplementedError() + statuses = [] + try: + for batch_job_id in batch_job_id_list: + statuses.append(self.boss.with_retries(self.getJobExitCode, batch_job_id)) + except CalledProcessErrorStderr as err: + # This avoids the nested retry issue where we could issue n^2 retries when the backing scheduler somehow disappears + # We catch the internal retry exception and raise something else so the outer retry doesn't retry the entire function again + raise ExceededRetryAttempts() from err + return statuses @abstractmethod def prepareSubmission(self, diff --git a/src/toil/test/batchSystems/test_gridengine.py b/src/toil/test/batchSystems/test_gridengine.py new file mode 100644 index 0000000000..816865fe3d --- /dev/null +++ b/src/toil/test/batchSystems/test_gridengine.py @@ -0,0 +1,159 @@ +import textwrap +from queue import Queue + +import pytest + +import toil.batchSystems.gridengine +from toil.batchSystems.abstractGridEngineBatchSystem import ExceededRetryAttempts +from toil.common import Config +from toil.lib.misc import CalledProcessErrorStderr +from toil.test import ToilTest + + +class FakeBatchSystem: + """ + Class that implements a minimal Batch System, needed to create a Worker (see below). + """ + + def __init__(self): + self.config = self.__fake_config() + + def getWaitDuration(self): + return 10 + + def __fake_config(self): + """ + Returns a dummy config for the batch system tests. We need a workflowID to be set up + since we are running tests without setting up a jobstore. This is the class version + to be used when an instance is not available. + + :rtype: toil.common.Config + """ + config = Config() + from uuid import uuid4 + config.workflowID = str(uuid4()) + config.cleanWorkDir = 'always' + return config + + def with_retries(self, operation, *args, **kwargs): + """ + The grid engine batch system needs a with_retries function when running the GridEngineThread, so fake one + """ + return operation(*args, **kwargs) + + +def call_qstat_or_qacct(args, **_): + # example outputs taken from https://2021.help.altair.com/2021.1/AltairGridEngine/8.7.0/UsersGuideGE.pdf + qacct_info = {} + job_id_info = {1: {"failed": True, "exit_code": 0, "completed": True}, 2: {"failed": True, "exit_code": 2, "completed": True}, + 3: {"failed": False, "exit_code": 0, "completed": True}, 4: {"failed": False, "exit_code": 10, "completed": True}, + 5: {"failed": False, "exit_code": 0, "completed": False}} + for job_id, status_info in job_id_info.items(): + failed = 1 if status_info["failed"] else 0 + exit_status = status_info["exit_code"] + qacct_info[job_id] = textwrap.dedent(f"""\ + ============================================================== + qname all.q + hostname kailua + group users + owner jondoe + project NONE + department defaultdepartment + jobname Sleeper + jobnumber 10 + taskid undefined + account sge + priority 0 + qsub_time Thu Mar 10 19:58:35 2011 + start_time Thu Mar 10 19:58:42 2011 + end_time Thu Mar 10 19:59:43 2011 + granted_pe NONE + slots 1 + failed {failed} + exit_status {exit_status} + ru_wallclock 61 + ru_utime 0.070 + ru_stime 0.050 + ru_maxrss 1220 + ru_ixrss 0 + ru_ismrss 0 + ru_idrss 0 + """) + if args[0] == "qstat": + # This is guess for what qstat will return given a job. I'm unable to find an example for qstat. + # This also assumes the second argument args[1] is -j, as that is what we try to use + job_id = int(args[2]) + if job_id not in job_id_info.keys() or job_id_info[job_id]["completed"]: + stderr = f"Following jobs do not exist {job_id}" + else: + # This is not the output of qstat when the job is running, and is just a guess + # We test on the existence of the string "Following jobs do not exist", so this should be okay for now + stderr = f"Job exists {job_id}" + raise CalledProcessErrorStderr(2, args, stderr=stderr) + elif args[0] == "qacct": + if args[1] != "-j": + # Documentation for qacct says if -j is not found then all jobs are listed + # https://gridscheduler.sourceforge.net/htmlman/htmlman1/qacct.html + # This is a guess for the output of qacct. We don't have a SGE cluster and I can't find a bare qacct example output online + qacct_response = "\n".join(qacct_info.values()) + else: + job_id = int(args[2]) + if job_id not in job_id_info.keys(): + # This is a guess of the behavior when the job does not exist. Since the behavior is unknown, this is not currently tested + return "" + qacct_response = qacct_info[job_id] + + return qacct_response + + +class GridEngineTest(ToilTest): + """ + Class for unit-testing GridEngineBatchSystem + """ + + def setUp(self): + self.monkeypatch = pytest.MonkeyPatch() + self.worker = toil.batchSystems.gridengine.GridEngineBatchSystem.GridEngineThread( + newJobsQueue=Queue(), + updatedJobsQueue=Queue(), + killQueue=Queue(), + killedJobsQueue=Queue(), + boss=FakeBatchSystem()) + + ### + ### Tests for coalesce_job_exit_codes for gridengine. + ### + + def test_coalesce_job_exit_codes_one_exists(self): + self.monkeypatch.setattr(toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct) + job_ids = ['1'] # FAILED + expected_result = [1] + result = self.worker.coalesce_job_exit_codes(job_ids) + assert result == expected_result, f"{result} != {expected_result}" + + def test_coalesce_job_exit_codes_one_still_running(self): + self.monkeypatch.setattr(toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct) + job_ids = ['5'] # Still running. We currently raise an exception when this happens + try: + self.worker.coalesce_job_exit_codes(job_ids) + except ExceededRetryAttempts: + pass + else: + raise RuntimeError("Test did not raise an exception!") + + def test_coalesce_job_exit_codes_many_all_exist(self): + self.monkeypatch.setattr(toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct) + job_ids = ['1', # FAILED, + '2', # FAILED (with exit code that we ignore), + '3', # SUCCEEDED, + '4'] # EXIT CODE 10 + # RUNNING and PENDING jobs should return None + expected_result = [ + 1, + 1, + 0, + 10 + ] + result = self.worker.coalesce_job_exit_codes(job_ids) + assert result == expected_result, f"{result} != {expected_result}" +