Skip to content

Commit

Permalink
Merge pull request #238 from mskcc/release/1.17.0
Browse files Browse the repository at this point in the history
Release 1.17.0
  • Loading branch information
sivkovic authored Jul 19, 2021
2 parents fdc0590 + b415144 commit afd7da9
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 32 deletions.
41 changes: 31 additions & 10 deletions batch_systems/lsf_client/lsf_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Submit and monitor LSF jobs
Submit, monitor, and control LSF jobs
"""
import os
import re
Expand All @@ -11,6 +11,10 @@
from orchestrator.models import Status


def format_lsf_job_id(job_id):
return "/{}".format(job_id)


class LSFClient(object):
"""
Client for LSF
Expand All @@ -25,7 +29,7 @@ def __init__(self):
"""
self.logger = logging.getLogger("LSF_client")

def submit(self, command, job_args, stdout, env={}):
def submit(self, command, job_args, stdout, job_id, env={}):
"""
Submit command to LSF and store log in stdout
Expand All @@ -38,7 +42,7 @@ def submit(self, command, job_args, stdout, env={}):
Returns:
int: lsf job id
"""
bsub_command = ["bsub", "-sla", settings.LSF_SLA, "-oo", stdout] + job_args
bsub_command = ["bsub", "-sla", settings.LSF_SLA, "-g", format_lsf_job_id(job_id), "-oo", stdout] + job_args

bsub_command.extend(command)
current_env = os.environ.copy()
Expand All @@ -57,17 +61,18 @@ def submit(self, command, job_args, stdout, env={}):
)
return self._parse_procid(process.stdout)

def abort(self, external_job_id):
def abort(self, job_id):
"""
Kill LSF job
Args:
external_job_id (str): external_job_id
job_id (str): job_id
Returns:
bool: successful
"""
bkill_command = ["bkill", external_job_id]
self.logger.debug("Aborting LSF jobs for job %s", job_id)
bkill_command = ["bkill", "-g", format_lsf_job_id(job_id), "0"]
process = subprocess.run(bkill_command, check=True, stdout=subprocess.PIPE, universal_newlines=True)
if process.returncode == 0:
return True
Expand Down Expand Up @@ -211,15 +216,31 @@ def status(self, external_job_id):
status = self._parse_status(process.stdout, external_job_id)
return status

def suspend(self, external_job_id):
bsub_command = ["bstop", str(external_job_id)]
def suspend(self, job_id):
"""
Suspend LSF job
Args:
extrnsl_job_id (str): id of job
Returns:
bool: successful
"""
self.logger.debug("Suspending LSF jobs for job %s", job_id)
bsub_command = ["bstop", "-g", format_lsf_job_id(job_id), "0"]
process = subprocess.run(bsub_command, stdout=subprocess.PIPE, universal_newlines=True)
if process.returncode == 0:
return True
return False

def resume(self, external_job_id):
bsub_command = ["bresume", str(external_job_id)]
def resume(self, job_id):
"""
Resume LSF job
Args:
job_id (str): id of job
Returns:
bool: successful
"""
self.logger.debug("Unsuspending LSF jobs for job %s", job_id)
bsub_command = ["bresume", "-g", format_lsf_job_id(job_id), "0"]
process = subprocess.run(bsub_command, stdout=subprocess.PIPE, universal_newlines=True)
if process.returncode == 0:
return True
Expand Down
27 changes: 21 additions & 6 deletions orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
from datetime import timedelta
from celery import shared_task
from batch_systems.lsf_client import LSFClient
from django.conf import settings
from django.db import transaction
from django.utils.timezone import now
Expand Down Expand Up @@ -50,17 +49,33 @@ def on_failure_to_submit(self, exc, task_id, args, kwargs, einfo):

def suspend_job(job):
if Status(job.status).transition(Status.SUSPENDED):
client = LSFClient()
if not client.suspend(job.external_id):
submitter = JobSubmitterFactory.factory(
job.type,
str(job.id),
job.app,
job.inputs,
job.root_dir,
job.resume_job_store_location,
)
job_suspended = submitter.suspend()
if not job_suspended:
raise RetryException("Failed to suspend job: %s" % str(job.id))
job.update_status(Status.SUSPENDED)
return


def resume_job(job):
if Status(job.status) == Status.SUSPENDED:
client = LSFClient()
if not client.resume(job.external_id):
submitter = JobSubmitterFactory.factory(
job.type,
str(job.id),
job.app,
job.inputs,
job.root_dir,
job.resume_job_store_location,
)
job_resumed = submitter.resume()
if not job_resumed:
raise RetryException("Failed to resume job: %s" % str(job.id))
job.update_status(Status.RUNNING)
return
Expand Down Expand Up @@ -270,7 +285,7 @@ def abort_job(job):
job.root_dir,
job.resume_job_store_location,
)
job_killed = submitter.abort(job.external_id)
job_killed = submitter.abort()
if not job_killed:
raise RetryException("Failed to abort job %s" % str(job.id))
job.abort()
Expand Down
2 changes: 1 addition & 1 deletion ridgeback/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.16.0"
__version__ = "1.17.0"
22 changes: 19 additions & 3 deletions submitter/jobsubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@


class JobSubmitter(object):
def __init__(self, app, inputs, walltime, memlimit):
def __init__(self, job_id, app, inputs, walltime, memlimit):
self.app = App.factory(app)
self.job_id = job_id
self.inputs = inputs
self.lsf_client = LSFClient()
self.walltime = walltime
Expand All @@ -20,8 +21,23 @@ def submit(self):
def status(self, external_id):
return self.lsf_client.status(external_id)

def abort(self, external_id):
return self.lsf_client.abort(external_id)
def abort(self):
"""
Aborts the job
"""
return self.lsf_client.abort(self.job_id)

def resume(self):
"""
Resumes the job
"""
return self.lsf_client.resume(self.job_id)

def suspend(self):
"""
Suspends the job
"""
return self.lsf_client.suspend(self.job_id)

def get_commandline_status(self, cache):
"""
Expand Down
5 changes: 2 additions & 3 deletions submitter/nextflow_submitter/nextflow_jobsubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, walltime, mem
:param root_dir:
:param resume_jobstore:
"""
JobSubmitter.__init__(self, app, inputs, walltime, memlimit)
self.job_id = job_id
JobSubmitter.__init__(self, job_id, app, inputs, walltime, memlimit)
self.resume_jobstore = resume_jobstore
if resume_jobstore:
self.job_store_dir = resume_jobstore
Expand All @@ -52,7 +51,7 @@ def submit(self):
env["JAVA_HOME"] = "/opt/common/CentOS_7/java/jdk1.8.0_202/"
env["PATH"] = env["JAVA_HOME"] + "bin:" + os.environ["PATH"]
env["TMPDIR"] = self.job_tmp_dir
external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, env)
external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, self.job_id, env)
return external_id, self.job_store_dir, self.job_work_dir, self.job_outputs_dir

def _job_args(self):
Expand Down
9 changes: 5 additions & 4 deletions submitter/toil_submitter/toil_jobsubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ def translate_toil_to_model_status(status):

class ToilJobSubmitter(JobSubmitter):
def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit):
JobSubmitter.__init__(self, app, inputs, walltime, memlimit)
self.job_id = job_id
JobSubmitter.__init__(self, job_id, app, inputs, walltime, memlimit)
self.resume_jobstore = resume_jobstore
if resume_jobstore:
self.job_store_dir = resume_jobstore
Expand Down Expand Up @@ -54,7 +53,7 @@ def submit(self):
]:
env[e] = None

external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, env)
external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, self.job_id, env)
return external_id, self.job_store_dir, self.job_work_dir, self.job_outputs_dir

def get_commandline_status(self, cache):
Expand Down Expand Up @@ -156,7 +155,9 @@ def _memlimit(self):
return ["-M", self.memlimit] if self.memlimit else []

def _command_line(self):
if "access" in self.app.github.lower() and "nucleo" not in self.app.github.lower():
bypass_access_workflows = ["nucleo", "access_qc_generation"]
should_bypass_access_env = any([w in self.app.github.lower() for w in bypass_access_workflows])
if "access" in self.app.github.lower() and not should_bypass_access_env:
"""
Start ACCESS-specific code
"""
Expand Down
14 changes: 9 additions & 5 deletions tests/test_lsf_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def setUp(self):
Cannot connect to LSF. Please wait ...
"""
self.example_id = 12345678
self.example_job_id = 12345
self.example_lsf_id = "/12345"
self.submit_response = "Job <{}> is submitted".format(self.example_id)
self.submit_response_please_wait = please_wait_str + self.submit_response
self.lsf_client = LSFClient()
Expand Down Expand Up @@ -65,8 +67,10 @@ def test_submit(self, submit_process):
submit_process_obj = Mock()
submit_process_obj.stdout = self.submit_response
submit_process.return_value = submit_process_obj
lsf_id = self.lsf_client.submit(command, args, stdout_file, {})
expected_command = ["bsub", "-sla", settings.LSF_SLA, "-oo", stdout_file] + args + command
lsf_id = self.lsf_client.submit(command, args, stdout_file, self.example_job_id, {})
expected_command = (
["bsub", "-sla", settings.LSF_SLA, "-g", self.example_lsf_id, "-oo", stdout_file] + args + command
)
self.assertEqual(lsf_id, self.example_id)
self.assertEqual(submit_process.call_args[0][0], expected_command)

Expand All @@ -81,7 +85,7 @@ def test_submit_slow_lsf(self, submit_process):
submit_process_obj = Mock()
submit_process_obj.stdout = self.submit_response_please_wait
submit_process.return_value = submit_process_obj
lsf_id = self.lsf_client.submit(command, args, stdout_file, {})
lsf_id = self.lsf_client.submit(command, args, stdout_file, self.example_job_id, {})
self.assertEqual(lsf_id, self.example_id)

@patch("subprocess.run")
Expand All @@ -92,8 +96,8 @@ def test_abort(self, abort_process):
abort_process_obj = Mock()
abort_process_obj.returncode = 0
abort_process.return_value = abort_process_obj
expected_command = ["bkill", self.example_id]
aborted = self.lsf_client.abort(self.example_id)
expected_command = ["bkill", "-g", self.example_lsf_id, "0"]
aborted = self.lsf_client.abort(self.example_job_id)
self.assertEqual(abort_process.call_args[0][0], expected_command)
self.assertEqual(aborted, True)

Expand Down

0 comments on commit afd7da9

Please sign in to comment.