diff --git a/batch_systems/lsf_client/lsf_client.py b/batch_systems/lsf_client/lsf_client.py index 547c18d9..3c9851fc 100644 --- a/batch_systems/lsf_client/lsf_client.py +++ b/batch_systems/lsf_client/lsf_client.py @@ -1,5 +1,5 @@ """ -Submit and monitor LSF jobs +Submit, monitor, and control LSF jobs """ import os import re @@ -11,6 +11,10 @@ from orchestrator.models import Status +def format_lsf_job_id(job_id): + return "/{}".format(job_id) + + class LSFClient(object): """ Client for LSF @@ -25,7 +29,7 @@ def __init__(self): """ self.logger = logging.getLogger("LSF_client") - def submit(self, command, job_args, stdout, env={}): + def submit(self, command, job_args, stdout, job_id, env={}): """ Submit command to LSF and store log in stdout @@ -38,7 +42,7 @@ def submit(self, command, job_args, stdout, env={}): Returns: int: lsf job id """ - bsub_command = ["bsub", "-sla", settings.LSF_SLA, "-oo", stdout] + job_args + bsub_command = ["bsub", "-sla", settings.LSF_SLA, "-g", format_lsf_job_id(job_id), "-oo", stdout] + job_args bsub_command.extend(command) current_env = os.environ.copy() @@ -57,17 +61,18 @@ def submit(self, command, job_args, stdout, env={}): ) return self._parse_procid(process.stdout) - def abort(self, external_job_id): + def abort(self, job_id): """ Kill LSF job Args: - external_job_id (str): external_job_id + job_id (str): job_id Returns: bool: successful """ - bkill_command = ["bkill", external_job_id] + self.logger.debug("Aborting LSF jobs for job %s", job_id) + bkill_command = ["bkill", "-g", format_lsf_job_id(job_id), "0"] process = subprocess.run(bkill_command, check=True, stdout=subprocess.PIPE, universal_newlines=True) if process.returncode == 0: return True @@ -211,15 +216,31 @@ def status(self, external_job_id): status = self._parse_status(process.stdout, external_job_id) return status - def suspend(self, external_job_id): - bsub_command = ["bstop", str(external_job_id)] + def suspend(self, job_id): + """ + Suspend LSF job + Args: + extrnsl_job_id (str): id of job + Returns: + bool: successful + """ + self.logger.debug("Suspending LSF jobs for job %s", job_id) + bsub_command = ["bstop", "-g", format_lsf_job_id(job_id), "0"] process = subprocess.run(bsub_command, stdout=subprocess.PIPE, universal_newlines=True) if process.returncode == 0: return True return False - def resume(self, external_job_id): - bsub_command = ["bresume", str(external_job_id)] + def resume(self, job_id): + """ + Resume LSF job + Args: + job_id (str): id of job + Returns: + bool: successful + """ + self.logger.debug("Unsuspending LSF jobs for job %s", job_id) + bsub_command = ["bresume", "-g", format_lsf_job_id(job_id), "0"] process = subprocess.run(bsub_command, stdout=subprocess.PIPE, universal_newlines=True) if process.returncode == 0: return True diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 1dc63d70..c2307098 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -3,7 +3,6 @@ import logging from datetime import timedelta from celery import shared_task -from batch_systems.lsf_client import LSFClient from django.conf import settings from django.db import transaction from django.utils.timezone import now @@ -50,8 +49,16 @@ def on_failure_to_submit(self, exc, task_id, args, kwargs, einfo): def suspend_job(job): if Status(job.status).transition(Status.SUSPENDED): - client = LSFClient() - if not client.suspend(job.external_id): + submitter = JobSubmitterFactory.factory( + job.type, + str(job.id), + job.app, + job.inputs, + job.root_dir, + job.resume_job_store_location, + ) + job_suspended = submitter.suspend() + if not job_suspended: raise RetryException("Failed to suspend job: %s" % str(job.id)) job.update_status(Status.SUSPENDED) return @@ -59,8 +66,16 @@ def suspend_job(job): def resume_job(job): if Status(job.status) == Status.SUSPENDED: - client = LSFClient() - if not client.resume(job.external_id): + submitter = JobSubmitterFactory.factory( + job.type, + str(job.id), + job.app, + job.inputs, + job.root_dir, + job.resume_job_store_location, + ) + job_resumed = submitter.resume() + if not job_resumed: raise RetryException("Failed to resume job: %s" % str(job.id)) job.update_status(Status.RUNNING) return @@ -270,7 +285,7 @@ def abort_job(job): job.root_dir, job.resume_job_store_location, ) - job_killed = submitter.abort(job.external_id) + job_killed = submitter.abort() if not job_killed: raise RetryException("Failed to abort job %s" % str(job.id)) job.abort() diff --git a/ridgeback/__init__.py b/ridgeback/__init__.py index 638c1217..30244104 100644 --- a/ridgeback/__init__.py +++ b/ridgeback/__init__.py @@ -1 +1 @@ -__version__ = "1.16.0" +__version__ = "1.17.0" diff --git a/submitter/jobsubmitter.py b/submitter/jobsubmitter.py index 730be315..c5884485 100644 --- a/submitter/jobsubmitter.py +++ b/submitter/jobsubmitter.py @@ -3,8 +3,9 @@ class JobSubmitter(object): - def __init__(self, app, inputs, walltime, memlimit): + def __init__(self, job_id, app, inputs, walltime, memlimit): self.app = App.factory(app) + self.job_id = job_id self.inputs = inputs self.lsf_client = LSFClient() self.walltime = walltime @@ -20,8 +21,23 @@ def submit(self): def status(self, external_id): return self.lsf_client.status(external_id) - def abort(self, external_id): - return self.lsf_client.abort(external_id) + def abort(self): + """ + Aborts the job + """ + return self.lsf_client.abort(self.job_id) + + def resume(self): + """ + Resumes the job + """ + return self.lsf_client.resume(self.job_id) + + def suspend(self): + """ + Suspends the job + """ + return self.lsf_client.suspend(self.job_id) def get_commandline_status(self, cache): """ diff --git a/submitter/nextflow_submitter/nextflow_jobsubmitter.py b/submitter/nextflow_submitter/nextflow_jobsubmitter.py index 3cd1697f..dda28e99 100644 --- a/submitter/nextflow_submitter/nextflow_jobsubmitter.py +++ b/submitter/nextflow_submitter/nextflow_jobsubmitter.py @@ -32,8 +32,7 @@ def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, walltime, mem :param root_dir: :param resume_jobstore: """ - JobSubmitter.__init__(self, app, inputs, walltime, memlimit) - self.job_id = job_id + JobSubmitter.__init__(self, job_id, app, inputs, walltime, memlimit) self.resume_jobstore = resume_jobstore if resume_jobstore: self.job_store_dir = resume_jobstore @@ -52,7 +51,7 @@ def submit(self): env["JAVA_HOME"] = "/opt/common/CentOS_7/java/jdk1.8.0_202/" env["PATH"] = env["JAVA_HOME"] + "bin:" + os.environ["PATH"] env["TMPDIR"] = self.job_tmp_dir - external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, env) + external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, self.job_id, env) return external_id, self.job_store_dir, self.job_work_dir, self.job_outputs_dir def _job_args(self): diff --git a/submitter/toil_submitter/toil_jobsubmitter.py b/submitter/toil_submitter/toil_jobsubmitter.py index 96afd11f..4fd21d9e 100644 --- a/submitter/toil_submitter/toil_jobsubmitter.py +++ b/submitter/toil_submitter/toil_jobsubmitter.py @@ -25,8 +25,7 @@ def translate_toil_to_model_status(status): class ToilJobSubmitter(JobSubmitter): def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit): - JobSubmitter.__init__(self, app, inputs, walltime, memlimit) - self.job_id = job_id + JobSubmitter.__init__(self, job_id, app, inputs, walltime, memlimit) self.resume_jobstore = resume_jobstore if resume_jobstore: self.job_store_dir = resume_jobstore @@ -54,7 +53,7 @@ def submit(self): ]: env[e] = None - external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, env) + external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, self.job_id, env) return external_id, self.job_store_dir, self.job_work_dir, self.job_outputs_dir def get_commandline_status(self, cache): @@ -156,7 +155,9 @@ def _memlimit(self): return ["-M", self.memlimit] if self.memlimit else [] def _command_line(self): - if "access" in self.app.github.lower() and "nucleo" not in self.app.github.lower(): + bypass_access_workflows = ["nucleo", "access_qc_generation"] + should_bypass_access_env = any([w in self.app.github.lower() for w in bypass_access_workflows]) + if "access" in self.app.github.lower() and not should_bypass_access_env: """ Start ACCESS-specific code """ diff --git a/tests/test_lsf_client.py b/tests/test_lsf_client.py index 4c3162d7..6ededa8d 100644 --- a/tests/test_lsf_client.py +++ b/tests/test_lsf_client.py @@ -16,6 +16,8 @@ def setUp(self): Cannot connect to LSF. Please wait ... """ self.example_id = 12345678 + self.example_job_id = 12345 + self.example_lsf_id = "/12345" self.submit_response = "Job <{}> is submitted".format(self.example_id) self.submit_response_please_wait = please_wait_str + self.submit_response self.lsf_client = LSFClient() @@ -65,8 +67,10 @@ def test_submit(self, submit_process): submit_process_obj = Mock() submit_process_obj.stdout = self.submit_response submit_process.return_value = submit_process_obj - lsf_id = self.lsf_client.submit(command, args, stdout_file, {}) - expected_command = ["bsub", "-sla", settings.LSF_SLA, "-oo", stdout_file] + args + command + lsf_id = self.lsf_client.submit(command, args, stdout_file, self.example_job_id, {}) + expected_command = ( + ["bsub", "-sla", settings.LSF_SLA, "-g", self.example_lsf_id, "-oo", stdout_file] + args + command + ) self.assertEqual(lsf_id, self.example_id) self.assertEqual(submit_process.call_args[0][0], expected_command) @@ -81,7 +85,7 @@ def test_submit_slow_lsf(self, submit_process): submit_process_obj = Mock() submit_process_obj.stdout = self.submit_response_please_wait submit_process.return_value = submit_process_obj - lsf_id = self.lsf_client.submit(command, args, stdout_file, {}) + lsf_id = self.lsf_client.submit(command, args, stdout_file, self.example_job_id, {}) self.assertEqual(lsf_id, self.example_id) @patch("subprocess.run") @@ -92,8 +96,8 @@ def test_abort(self, abort_process): abort_process_obj = Mock() abort_process_obj.returncode = 0 abort_process.return_value = abort_process_obj - expected_command = ["bkill", self.example_id] - aborted = self.lsf_client.abort(self.example_id) + expected_command = ["bkill", "-g", self.example_lsf_id, "0"] + aborted = self.lsf_client.abort(self.example_job_id) self.assertEqual(abort_process.call_args[0][0], expected_command) self.assertEqual(aborted, True)