Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve NotImplementedError in gridengine #5061

Merged
merged 8 commits into from
Aug 21, 2024
16 changes: 16 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,22 @@ mesos:
- make test threads="${TEST_THREADS}" tests=src/toil/test/src/promisedRequirementTest.py::MesosPromisedRequirementsTest
- make test threads="${TEST_THREADS}" tests="src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSAutoscaleTest src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSStaticAutoscaleTest src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSAutoscaleTestMultipleNodeTypes src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSRestartTest::testAutoScaledCluster"

batchsystem:
Copy link
Contributor Author

@stxue1 stxue1 Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think test_slurm.py and test_lsf_helper.py are ever ran on CI. But if necessary, they can be added to this section. I've added the basic grid engine tests in the meantime.

rules:
- if: $CI_PIPELINE_SOURCE == "schedule"
- if: $CI_COMMIT_TAG
- if: $CI_COMMIT_BRANCH =~ /.*-fix-ci/
- if: $CI_COMMIT_BRANCH
changes:
compare_to: 'refs/heads/master'
paths:
- 'src/toil/test/batchSystems/test_gridengine.py'
- 'src/toil/batchSystems/gridengine.py'
stage: integration
script:
- ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all]
- make test threads="${TEST_THREADS}" tests=src/toil/test/batchSystems/test_gridengine.py::GridEngineTest

# Cactus-on-Kubernetes integration (as a script and not a pytest test)
cactus_integration:
rules:
Expand Down
45 changes: 23 additions & 22 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from toil.bus import ExternalBatchIdMessage, get_job_kind
from toil.job import AcceleratorRequirement
from toil.lib.misc import CalledProcessErrorStderr
from toil.lib.retry import old_retry, DEFAULT_DELAYS
from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry

logger = logging.getLogger(__name__)

Expand All @@ -41,6 +41,10 @@
# Accelerator requirements for the job
JobTuple = Tuple[int, float, int, str, str, Dict[str, str], List[AcceleratorRequirement]]

class ExceededRetryAttempts(Exception):
def __init__(self):
super().__init__("Exceeded retry attempts talking to scheduler.")

class AbstractGridEngineBatchSystem(BatchSystemCleanupSupport):
"""
A partial implementation of BatchSystemSupport for batch systems run on a
Expand Down Expand Up @@ -208,24 +212,15 @@ def checkOnJobs(self):
running_job_list = list(self.runningJobs)
batch_job_id_list = [self.getBatchSystemID(j) for j in running_job_list]
if batch_job_id_list:
try:
# Get the statuses as a batch
statuses = self.boss.with_retries(
self.coalesce_job_exit_codes, batch_job_id_list
# Get the statuses as a batch
statuses = self.boss.with_retries(
self.coalesce_job_exit_codes, batch_job_id_list
)
# We got the statuses as a batch
for running_job_id, status in zip(running_job_list, statuses):
activity = self._handle_job_status(
running_job_id, status, activity
)
except NotImplementedError:
# We have to get the statuses individually
for running_job_id, batch_job_id in zip(running_job_list, batch_job_id_list):
status = self.boss.with_retries(self.getJobExitCode, batch_job_id)
activity = self._handle_job_status(
running_job_id, status, activity
)
else:
# We got the statuses as a batch
for running_job_id, status in zip(running_job_list, statuses):
activity = self._handle_job_status(
running_job_id, status, activity
)

self._checkOnJobsCache = activity
self._checkOnJobsTimestamp = datetime.now()
Expand Down Expand Up @@ -292,13 +287,19 @@ def coalesce_job_exit_codes(self, batch_job_id_list: list) -> List[Union[int, Tu

Called by GridEngineThread.checkOnJobs().

This is an optional part of the interface. It should raise
NotImplementedError if not actually implemented for a particular
scheduler.
The default implementation falls back on self.getJobExitCode and polls each job individually

:param string batch_job_id_list: List of batch system job ID
"""
raise NotImplementedError()
statuses = []
try:
for batch_job_id in batch_job_id_list:
statuses.append(self.boss.with_retries(self.getJobExitCode, batch_job_id))
except CalledProcessErrorStderr as err:
# This avoids the nested retry issue where we could issue n^2 retries when the backing scheduler somehow disappears
# We catch the internal retry exception and raise something else so the outer retry doesn't retry the entire function again
raise ExceededRetryAttempts() from err
return statuses

@abstractmethod
def prepareSubmission(self,
Expand Down
159 changes: 159 additions & 0 deletions src/toil/test/batchSystems/test_gridengine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import textwrap
from queue import Queue

import pytest

import toil.batchSystems.gridengine
from toil.batchSystems.abstractGridEngineBatchSystem import ExceededRetryAttempts
from toil.common import Config
from toil.lib.misc import CalledProcessErrorStderr
from toil.test import ToilTest


class FakeBatchSystem:
"""
Class that implements a minimal Batch System, needed to create a Worker (see below).
"""

def __init__(self):
self.config = self.__fake_config()

def getWaitDuration(self):
return 10

def __fake_config(self):
"""
Returns a dummy config for the batch system tests. We need a workflowID to be set up
since we are running tests without setting up a jobstore. This is the class version
to be used when an instance is not available.

:rtype: toil.common.Config
"""
config = Config()
from uuid import uuid4
config.workflowID = str(uuid4())
config.cleanWorkDir = 'always'
return config

def with_retries(self, operation, *args, **kwargs):
"""
The grid engine batch system needs a with_retries function when running the GridEngineThread, so fake one
"""
return operation(*args, **kwargs)


def call_qstat_or_qacct(args, **_):
# example outputs taken from https://2021.help.altair.com/2021.1/AltairGridEngine/8.7.0/UsersGuideGE.pdf
qacct_info = {}
job_id_info = {1: {"failed": True, "exit_code": 0, "completed": True}, 2: {"failed": True, "exit_code": 2, "completed": True},
3: {"failed": False, "exit_code": 0, "completed": True}, 4: {"failed": False, "exit_code": 10, "completed": True},
5: {"failed": False, "exit_code": 0, "completed": False}}
for job_id, status_info in job_id_info.items():
failed = 1 if status_info["failed"] else 0
exit_status = status_info["exit_code"]
qacct_info[job_id] = textwrap.dedent(f"""\
==============================================================
qname all.q
hostname kailua
group users
owner jondoe
project NONE
department defaultdepartment
jobname Sleeper
jobnumber 10
taskid undefined
account sge
priority 0
qsub_time Thu Mar 10 19:58:35 2011
start_time Thu Mar 10 19:58:42 2011
end_time Thu Mar 10 19:59:43 2011
granted_pe NONE
slots 1
failed {failed}
exit_status {exit_status}
ru_wallclock 61
ru_utime 0.070
ru_stime 0.050
ru_maxrss 1220
ru_ixrss 0
ru_ismrss 0
ru_idrss 0
""")
if args[0] == "qstat":
# This is guess for what qstat will return given a job. I'm unable to find an example for qstat.
# This also assumes the second argument args[1] is -j, as that is what we try to use
job_id = int(args[2])
if job_id not in job_id_info.keys() or job_id_info[job_id]["completed"]:
stderr = f"Following jobs do not exist {job_id}"
else:
# This is not the output of qstat when the job is running, and is just a guess
# We test on the existence of the string "Following jobs do not exist", so this should be okay for now
stderr = f"Job exists {job_id}"
raise CalledProcessErrorStderr(2, args, stderr=stderr)
elif args[0] == "qacct":
if args[1] != "-j":
# Documentation for qacct says if -j is not found then all jobs are listed
# https://gridscheduler.sourceforge.net/htmlman/htmlman1/qacct.html
# This is a guess for the output of qacct. We don't have a SGE cluster and I can't find a bare qacct example output online
qacct_response = "\n".join(qacct_info.values())
else:
job_id = int(args[2])
if job_id not in job_id_info.keys():
# This is a guess of the behavior when the job does not exist. Since the behavior is unknown, this is not currently tested
return ""
qacct_response = qacct_info[job_id]

return qacct_response


class GridEngineTest(ToilTest):
"""
Class for unit-testing GridEngineBatchSystem
"""

def setUp(self):
self.monkeypatch = pytest.MonkeyPatch()
self.worker = toil.batchSystems.gridengine.GridEngineBatchSystem.GridEngineThread(
newJobsQueue=Queue(),
updatedJobsQueue=Queue(),
killQueue=Queue(),
killedJobsQueue=Queue(),
boss=FakeBatchSystem())

###
### Tests for coalesce_job_exit_codes for gridengine.
###

def test_coalesce_job_exit_codes_one_exists(self):
self.monkeypatch.setattr(toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct)
job_ids = ['1'] # FAILED
expected_result = [1]
result = self.worker.coalesce_job_exit_codes(job_ids)
assert result == expected_result, f"{result} != {expected_result}"

def test_coalesce_job_exit_codes_one_still_running(self):
self.monkeypatch.setattr(toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct)
job_ids = ['5'] # Still running. We currently raise an exception when this happens
try:
self.worker.coalesce_job_exit_codes(job_ids)
except ExceededRetryAttempts:
pass
else:
raise RuntimeError("Test did not raise an exception!")

def test_coalesce_job_exit_codes_many_all_exist(self):
self.monkeypatch.setattr(toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct)
job_ids = ['1', # FAILED,
'2', # FAILED (with exit code that we ignore),
'3', # SUCCEEDED,
'4'] # EXIT CODE 10
# RUNNING and PENDING jobs should return None
expected_result = [
1,
1,
0,
10
]
result = self.worker.coalesce_job_exit_codes(job_ids)
assert result == expected_result, f"{result} != {expected_result}"

Loading