diff --git a/setup.py b/setup.py index a5a62b9ab2..ccc5864873 100644 --- a/setup.py +++ b/setup.py @@ -59,6 +59,7 @@ def runSetup(): addict = 'addict<=2.2.0' sphinx = 'sphinx==1.7.5' pathlib2 = 'pathlib2==2.3.2' + enum34 = 'enum34==1.1.10' core_reqs = [ dill, @@ -71,7 +72,8 @@ def runSetup(): subprocess32, addict, sphinx, - pathlib2] + pathlib2, + enum34] aws_reqs = [ boto, diff --git a/src/toil/batchSystems/abstractBatchSystem.py b/src/toil/batchSystems/abstractBatchSystem.py index 5d87676c8e..ce5ed377e7 100644 --- a/src/toil/batchSystems/abstractBatchSystem.py +++ b/src/toil/batchSystems/abstractBatchSystem.py @@ -16,6 +16,7 @@ standard_library.install_aliases() from future.utils import with_metaclass from builtins import object +import enum import os import shutil import logging @@ -37,6 +38,44 @@ logger = logging.getLogger(__name__) +# Value to use as exitStatus in UpdatedBatchJobInfo.exitStatus when status is not available. +EXIT_STATUS_UNAVAILABLE_VALUE = 255 + +class BatchJobExitReason(enum.IntEnum): + FINISHED = 1 + """Successfully finished.""" + FAILED = 2 + """Job finished, but failed.""" + LOST = 3 + """Preemptable failure (job's executing host went away).""" + KILLED = 4 + """Job killed before finishing.""" + ERROR = 5 + """Internal error.""" + MEMLIMIT = 6 + """Job hit batch system imposed memory limit.""" + MISSING = 7 + """Job disappeared from the scheduler without actually stopping, so Toil killed it.""" + MAXJOBDURATION = 8 + """Job ran longer than --maxJobDuration, so Toil killed it.""" + PARTITION = 9 + """Job was not able to talk to the leader via the job store, so Toil declared it failed.""" + + + @classmethod + def to_string(cls, value): + """ + Convert to human-readable string. + + Given an int that may be or may be equal to a value from the enum, + produce the string value of its matching enum entry, or a stringified + int. + """ + try: + return cls(value).name + except ValueError: + return str(value) + # A class containing the information required for worker cleanup on shutdown of the batch system. WorkerCleanupInfo = namedtuple('WorkerCleanupInfo', ( diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 0b9f4fac4b..03583c054e 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -27,6 +27,7 @@ from toil import subprocess from toil.lib.objects import abstractclassmethod +from toil.lib.misc import CalledProcessErrorStderr from toil.batchSystems.abstractBatchSystem import BatchSystemLocalSupport @@ -34,17 +35,23 @@ def with_retries(operation, *args, **kwargs): - retries = 3 - latest_err = None - while retries: - retries -= 1 + """Add an incremental sleep after each retry.""" + latest_err = Exception + + for i in [1, 5, 10, 60, 90, 120]: try: return operation(*args, **kwargs) - except subprocess.CalledProcessError as err: + except CalledProcessErrorStderr as err: latest_err = err logger.error( "Operation %s failed with code %d: %s", - operation, err.returncode, err.output) + operation, + err.returncode, + err.output, + ) + logger.error("Retrying in %s", str(i)) + time.sleep(i) + raise latest_err @@ -234,7 +241,7 @@ def run(self): activity |= self.createJobs(newJob) activity |= self.checkOnJobs() if not activity: - logger.debug('No activity, sleeping for %is', self.boss.sleepSeconds()) + pass # logger.debug('No activity, sleeping for %is', self.boss.sleepSeconds()) @abstractmethod def prepareSubmission(self, cpu, memory, jobID, command, jobName): diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index b7eed8a9b6..37b11e8d84 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -15,27 +15,83 @@ from __future__ import absolute_import from __future__ import division from builtins import str +from collections import defaultdict from past.utils import old_div -import logging -import os from pipes import quote -from toil import subprocess -import time +import logging import math +import os +import subprocess # Python 3 compatibility imports -from six.moves.queue import Empty, Queue -from six import iteritems from toil.batchSystems import MemoryString -from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem +from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem, with_retries +from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE +from toil.lib.humanize import bytes2human +from toil.lib.misc import CalledProcessErrorStderr, call_command logger = logging.getLogger(__name__) + +MAX_MEMORY = 256 * 1e9 # More than 256 GB is hard to get +OUT_OF_MEM_RETRIES = 2 + +TERMINAL_STATES = { + "BOOT_FAIL": BatchJobExitReason.LOST, + "CANCELLED": BatchJobExitReason.KILLED, + "COMPLETED": BatchJobExitReason.FINISHED, + "DEADLINE": BatchJobExitReason.KILLED, + "FAILED": BatchJobExitReason.FAILED, + "NODE_FAIL": BatchJobExitReason.LOST, + "OUT_OF_MEMORY": BatchJobExitReason.MEMLIMIT, + "PREEMPTED": BatchJobExitReason.KILLED, + "REVOKED": BatchJobExitReason.KILLED, + "SPECIAL_EXIT": BatchJobExitReason.FAILED, + "TIMEOUT": BatchJobExitReason.KILLED +} + +# If a job is in one of these states, it might eventually move to a different +# state. +NONTERMINAL_STATES = { + "CONFIGURING", + "COMPLETING", + "PENDING", + "RUNNING", + "RESV_DEL_HOLD", + "REQUEUE_FED", + "REQUEUE_HOLD", + "REQUEUED", + "RESIZING", + "SIGNALING", + "STAGE_OUT", + "STOPPED", + "SUSPENDED" +} + class SlurmBatchSystem(AbstractGridEngineBatchSystem): + def __init__(self, *args, **kwargs): + """Create a mapping table for JobIDs to JobNodes.""" + super(SlurmBatchSystem, self).__init__(*args, **kwargs) + self.Id2Node = {} + self.resourceRetryCount = defaultdict(int) + + def issueBatchJob(self, jobDesc): + """Load the jobDesc into the JobID mapping table.""" + jobID = super(SlurmBatchSystem, self).issueBatchJob(jobDesc) + self.Id2Node[jobID] = jobDesc + return jobID + + class Worker(AbstractGridEngineBatchSystem.Worker): + def forgetJob(self, jobID): + """Remove jobNode from the mapping table when forgetting.""" + self.boss.Id2Node.pop(jobID, None) + self.boss.resourceRetryCount.pop(jobID, None) + return super(SlurmBatchSystem.Worker, self).forgetJob(jobID) + def getRunningJobIDs(self): # Should return a dictionary of Job IDs and number of seconds times = {} @@ -46,7 +102,7 @@ def getRunningJobIDs(self): # -h for no header # --format to get jobid i, state %t and time days-hours:minutes:seconds - lines = subprocess.check_output(['squeue', '-h', '--format', '%i %t %M']).decode('utf-8').split('\n') + lines = with_retries(call_command, ['squeue', '-h', '--format', '%i %t %M'], quiet=True).split('\n') for line in lines: values = line.split() if len(values) < 3: @@ -59,110 +115,310 @@ def getRunningJobIDs(self): return times def killJob(self, jobID): - subprocess.check_call(['scancel', self.getBatchSystemID(jobID)]) + with_retries(call_command, ['scancel', self.getBatchSystemID(jobID)]) def prepareSubmission(self, cpu, memory, jobID, command, jobName): return self.prepareSbatch(cpu, memory, jobID, jobName) + ['--wrap={}'.format(command)] def submitJob(self, subLine): try: - output = subprocess.check_output(subLine, stderr=subprocess.STDOUT).decode('utf-8') + output = with_retries(call_command, subLine) # sbatch prints a line like 'Submitted batch job 2954103' result = int(output.strip().split()[-1]) - logger.debug("sbatch submitted job %d", result) + logger.info("sbatch submitted job %d", result) + subprocess.check_call(["echo {0} >> $PWD/job_ids.txt".format(result)], shell=True) return result except OSError as e: logger.error("sbatch command failed") raise e - def getJobExitCode(self, slurmJobID): - logger.debug("Getting exit code for slurm job %d", int(slurmJobID)) - - state, rc = self._getJobDetailsFromSacct(slurmJobID) - - if rc == -999: - state, rc = self._getJobDetailsFromScontrol(slurmJobID) - - logger.debug("s job state is %s", state) - # If Job is in a running state, return None to indicate we don't have an update - if state in ('PENDING', 'RUNNING', 'CONFIGURING', 'COMPLETING', 'RESIZING', 'SUSPENDED'): + def getJobExitCode(self, batchJobID): + """ + Get job exit code for given batch job ID. + :param batchJobID: string of the form "[.]". + :return: integer job exit code. + """ + logger.debug("Getting exit code for slurm job %d", int(batchJobID)) + + slurm_job_id = int(batchJobID.split('.')[0]) + status_dict = self._get_job_details([slurm_job_id]) + status = status_dict[slurm_job_id] + + exit_status = self._get_job_return_code(status) + if exit_status is None: + return None + + exit_code, exit_reason = exit_status + if exit_reason == BatchJobExitReason.MEMLIMIT: + # Retry job with 2x memory if it was killed because of memory + jobID = self._getJobID(slurm_job_id) + exit_code = self._customRetry(jobID, slurm_job_id) + elif exit_reason == BatchJobExitReason.FINISHED: + pass #self._collectMetrics(slurm_job_id) + return exit_code + + def _getJobID(self, slurm_job_id): + """Get toil job ID from the slurm job ID.""" + job_ids_dict = {slurm_job[0]: toil_job for toil_job, slurm_job in self.batchJobIDs.items()} + if slurm_job_id not in job_ids_dict: + raise RuntimeError("Unknown slurmJobID, could not be converted") + return job_ids_dict[slurm_job_id] + + def _customRetry(self, jobID, slurm_job_id): + """Increase the job memory 2x and retry, when it's killed by memlimit problems.""" + try: + jobNode = self.boss.Id2Node[jobID] + except KeyError: + logger.error("Can't resource retry %s, jobNode not found", jobID) + return 1 + + job_retries = self.boss.resourceRetryCount[jobID] + if job_retries < OUT_OF_MEM_RETRIES and jobNode.memory < MAX_MEMORY: + jobNode.jobName = (jobNode.jobName or "") + " OOM resource retry " + str(job_retries) + memory = jobNode.memory * (job_retries + 1) * 2 if jobNode.memory < MAX_MEMORY else MAX_MEMORY + + sbatch_line = self.prepareSubmission( + jobNode.cores, memory, jobID, jobNode.command, jobNode.jobName + ) + logger.debug("Running %r", sbatch_line) + new_slurm_job_id = with_retries(self.submitJob, sbatch_line) + self.batchJobIDs[jobID] = (new_slurm_job_id, None) + self.boss.resourceRetryCount[jobID] += 1 + logger.info( + "Detected job %s killed by SLURM, attempting retry with 2x memory: %s", + slurm_job_id, new_slurm_job_id + ) + logger.info( + "Issued job %s with job batch system ID: " + "%s and cores: %s, disk: %s, and memory: %s", + jobNode, str(new_slurm_job_id), int(jobNode.cores), + bytes2human(jobNode.disk), bytes2human(memory) + ) + with self.runningJobsLock: + self.runningJobs.add(jobID) + else: + logger.error("Can't retry job %s for memlimit more than twice") + return 1 + return None + + def _get_job_details(self, batch_job_ids): + """ + Helper function for `getJobExitCode` and `coalesce_job_exit_codes`. + Fetch job details from Slurm's accounting system or job control system. + :param batch_job_ids: list of integer Job IDs. + :return: dict of job statuses, where key is the integer job ID, and value is a tuple + containing the job's state and exit code. + """ + try: + status_dict = self._getJobDetailsFromSacct(batch_job_ids) + except CalledProcessErrorStderr: + status_dict = self._getJobDetailsFromScontrol(batch_job_ids) + return status_dict + + def _get_job_return_code(self, status): + """ + Given a Slurm return code, status pair, summarize them into a Toil return code, exit reason pair. + + The return code may have already been OR'd with the 128-offset + Slurm-reported signal. + + Slurm will report return codes of 0 even if jobs time out instead + of succeeding: + + 2093597|TIMEOUT|0:0 + 2093597.batch|CANCELLED|0:15 + + So we guarantee here that, if the Slurm status string is not a + successful one as defined in + , we + will not return a successful return code. + + Helper function for `getJobExitCode` and `coalesce_job_exit_codes`. + :param status: tuple containing the job's state and it's return code from Slurm. + :return: the job's return code for Toil if it's completed, otherwise None. + """ + state, rc = status + + if state not in TERMINAL_STATES: + # Don't treat the job as exited yet return None + + exit_reason = TERMINAL_STATES[state] + + if exit_reason == BatchJobExitReason.FINISHED: + # The only state that should produce a 0 ever is COMPLETED. So + # if the job is COMPLETED and the exit reason is thus FINISHED, + # pass along the code it has. + return (rc, exit_reason) + + if rc == 0: + # The job claims to be in a state other than COMPLETED, but + # also to have not encountered a problem. Say the exit status + # is unavailable. + return (EXIT_STATUS_UNAVAILABLE_VALUE, exit_reason) + + # If the code is nonzero, pass it along. + return (rc, exit_reason) + + def _canonicalize_state(self, state): + """ + Turn a state string form SLURM into just the state token like "CANCELED". + """ + + # Slurm will sometimes send something like "CANCELED by 30065" in + # the state column for some reason. - return rc + state_token = state + + if " " in state_token: + state_token = state.split(" ", 1)[0] + + if state_token not in TERMINAL_STATES and state_token not in NONTERMINAL_STATES: + raise RuntimeError("Toil job in unimplemented Slurm state " + state) - def _getJobDetailsFromSacct(self, slurmJobID): - # SLURM job exit codes are obtained by running sacct. + return state_token + + def _collectMetrics(self, job_id): + """Print Slurm Job Metrics to file.""" + slurm_jobs_details = os.path.join(self.boss.config.writeLogs, "slurm_metrics.txt") + args = [ + 'sacct', + '-n' if os.path.isfile(slurm_jobs_details) else '', + '-X', + '-j', + str(job_id), + '--format=JobID,JobName%20,AllocCPUS,State,ExitCode,Start,End,Elapsed,NodeList,ReqMem,MaxRSS,MaxVMSize' + '>>', + slurm_jobs_details + ] + with_retries(call_command, args, quiet=True) + + def _getJobDetailsFromSacct(self, job_id_list): + """ + Get SLURM job exit codes for the jobs in `job_id_list` by running `sacct`. + :param job_id_list: list of integer batch job IDs. + :return: dict of job statuses, where key is the job-id, and value is a tuple + containing the job's state and exit code. + """ + job_ids = ",".join(str(id) for id in job_id_list) args = ['sacct', - '-n', # no header - '-j', str(slurmJobID), # job - '--format', 'State,ExitCode', # specify output columns - '-P', # separate columns with pipes - '-S', '1970-01-01'] # override start time limit - - process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - rc = process.returncode - - if rc != 0: - # no accounting system or some other error - return (None, -999) - - for line in process.stdout: - values = line.decode('utf-8').strip().split('|') - if len(values) < 2: + '-n', # no header + '-X', # Only main job + '-j', job_ids, # job + '--format', 'JobIDRaw,State,ExitCode', # specify output columns + '-P', # separate columns with pipes + '-S', '1970-01-01'] # override start time limit + try: + stdout = with_retries(call_command, args, quiet=True) + except CalledProcessErrorStderr as error: + logger.error("Error calling sacct: %s", str(error)) + raise error + + # Collect the job statuses in a dict; key is the job-id, value is a tuple containing + # job state and exit status. Initialize dict before processing output of `sacct`. + job_statuses = {} + for job_id in job_id_list: + job_statuses[job_id] = (None, None) + + for line in stdout.splitlines(): + values = line.strip().split('|') + if len(values) < 3: continue - state, exitcode = values - logger.debug("sacct job state is %s", state) - # If Job is in a running state, return None to indicate we don't have an update - status, signal = [int(n) for n in exitcode.split(':')] + job_id_raw, state, exitcode = values + state = self._canonicalize_state(state) + logger.debug("%s state of job %s is %s", args[0], job_id_raw, state) + # JobIDRaw is in the form JobID[.JobStep]; we're not interested in job steps. + job_id_parts = job_id_raw.split(".") + if len(job_id_parts) > 1: + continue + job_id = int(job_id_parts[0]) + status, signal = (int(n) for n in exitcode.split(':')) if signal > 0: # A non-zero signal may indicate e.g. an out-of-memory killed job status = 128 + signal - logger.debug("sacct exit code is %s, returning status %d", exitcode, status) - return (state, status) - logger.debug("Did not find exit code for job in sacct output") - return None + logger.debug("%s exit code of job %d is %s, return status %d", + args[0], job_id, exitcode, status) + job_statuses[job_id] = state, status + logger.debug("%s returning job statuses: %s", args[0], job_statuses) + return job_statuses - def _getJobDetailsFromScontrol(self, slurmJobID): + def _getJobDetailsFromScontrol(self, job_id_list): + """ + Get SLURM job exit codes for the jobs in `job_id_list` by running `scontrol`. + :param job_id_list: list of integer batch job IDs. + :return: dict of job statuses, where key is the job-id, and value is a tuple + containing the job's state and exit code. + """ args = ['scontrol', 'show', - 'job', - str(slurmJobID)] - - process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - - job = dict() - for line in process.stdout: - values = line.decode('utf-8').strip().split() - - # If job information is not available an error is issued: - # slurm_load_jobs error: Invalid job id specified - # There is no job information, so exit. - if len(values)>0 and values[0] == 'slurm_load_jobs': - return (None, None) - - # Output is in the form of many key=value pairs, multiple pairs on each line - # and multiple lines in the output. Each pair is pulled out of each line and - # added to a dictionary - for v in values: - bits = v.split('=') - job[bits[0]] = bits[1] - - state = job['JobState'] - try: - exitcode = job['ExitCode'] - if exitcode is not None: - status, signal = [int(n) for n in exitcode.split(':')] - if signal > 0: - # A non-zero signal may indicate e.g. an out-of-memory killed job - status = 128 + signal - logger.debug("scontrol exit code is %s, returning status %d", exitcode, status) - rc = status - else: + 'job'] + # `scontrol` can only return information about a single job, + # or all the jobs it knows about. + if len(job_id_list) == 1: + args.append(str(job_id_list[0])) + + stdout = with_retries(call_command, args, quiet=True) + + # Job records are separated by a blank line. + if isinstance(stdout, str): + job_records = stdout.strip().split('\n\n') + elif isinstance(stdout, bytes): + job_records = stdout.decode('utf-8').strip().split('\n\n') + + # Collect the job statuses in a dict; key is the job-id, value is a tuple containing + # job state and exit status. Initialize dict before processing output of `scontrol`. + job_statuses = {} + for job_id in job_id_list: + job_statuses[job_id] = (None, None) + + # `scontrol` will report "No jobs in the system", if there are no jobs in the system, + # and if no job-id was passed as argument to `scontrol`. + if len(job_records) > 0 and job_records[0] == "No jobs in the system": + return job_statuses + + for record in job_records: + job = {} + for line in record.splitlines(): + for item in line.split(): + # Output is in the form of many key=value pairs, multiple pairs on each line + # and multiple lines in the output. Each pair is pulled out of each line and + # added to a dictionary. + # Note: In some cases, the value itself may contain white-space. So, if we find + # a key without a value, we consider that key part of the previous value. + bits = item.split('=', 1) + if len(bits) == 1: + job[key] += ' ' + bits[0] + else: + key = bits[0] + job[key] = bits[1] + # The first line of the record contains the JobId. Stop processing the remainder + # of this record, if we're not interested in this job. + job_id = int(job['JobId']) + if job_id not in job_id_list: + logger.debug("%s job %d is not in the list", args[0], job_id) + break + if job_id not in job_id_list: + continue + state = job['JobState'] + state = self._canonicalize_state(state) + logger.debug("%s state of job %s is %s", args[0], job_id, state) + try: + exitcode = job['ExitCode'] + if exitcode is not None: + status, signal = (int(n) for n in exitcode.split(':')) + if signal > 0: + # A non-zero signal may indicate e.g. an out-of-memory killed job + status = 128 + signal + logger.debug("%s exit code of job %d is %s, return status %d", + args[0], job_id, exitcode, status) + rc = status + else: + rc = None + except KeyError: rc = None - except KeyError: - rc = None - - return (state, rc) + job_statuses[job_id] = (state, rc) + logger.debug("%s returning job statuses: %s", args[0], job_statuses) + return job_statuses """ Implementation-specific helper methods @@ -183,7 +439,11 @@ def prepareSbatch(self, cpu, mem, jobID, jobName): if mem is not None: # memory passed in is in bytes, but slurm expects megabytes - sbatch_line.append('--mem={}'.format(old_div(int(mem), 2 ** 20))) + per_cpu = os.getenv("TOIL_SLURM_PER_CPU") + if per_cpu == "Y": + sbatch_line.append('--mem-per-cpu={}'.format(old_div(int(mem), 2 ** 20))) + else: + sbatch_line.append('--mem={}'.format(old_div(int(mem), 2 ** 20))) if cpu is not None: sbatch_line.append('--cpus-per-task={}'.format(int(math.ceil(cpu)))) @@ -237,7 +497,7 @@ def obtainSystemConstants(cls): # --format to get memory, cpu max_cpu = 0 max_mem = MemoryString('0') - lines = subprocess.check_output(['sinfo', '-Nhe', '--format', '%m %c']).decode('utf-8').split('\n') + lines = with_retries(call_command, ['sinfo', '-Nhe', '--format', '%m %c'], quiet=True).split('\n') for line in lines: values = line.split() if len(values) < 2: diff --git a/src/toil/fileStores/abstractFileStore.py b/src/toil/fileStores/abstractFileStore.py index 8801c190b2..25f8530c1e 100644 --- a/src/toil/fileStores/abstractFileStore.py +++ b/src/toil/fileStores/abstractFileStore.py @@ -465,6 +465,9 @@ def _pidExists(pid): if err.errno == errno.ESRCH: # ESRCH == No such process return False + elif err.errno == errno.EPERM: + # EPERM == operation not permitted + return False else: raise else: diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index f71cf902f7..3e30a2638b 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -221,8 +221,11 @@ def update(self, job): # function is atomic. with open(self._getJobFileName(job.jobStoreID) + ".new", 'wb') as f: pickle.dump(job, f) - # This should be atomic for the file system - os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) + try: + os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) + except OSError: + # Try move when renaming between different file systems fail. + shutil.move(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) def delete(self, jobStoreID): # The jobStoreID is the relative path to the directory containing the job, @@ -417,6 +420,7 @@ def readFile(self, jobStoreFileID, localFilePath, symlink=False): # It worked! return except OSError as e: + # For the list of the possible errno codes, see: https://linux.die.net/man/2/symlink if e.errno == errno.EEXIST: # Overwrite existing file, emulating shutil.copyfile(). os.unlink(localFilePath) @@ -426,7 +430,12 @@ def readFile(self, jobStoreFileID, localFilePath, symlink=False): # Now we succeeded and don't need to copy return + elif e.errno == errno.EPERM: + # On some filesystems, the creation of symbolic links is not possible. + # In this case, we try to make a hard link. + pass else: + logger.error("Unexpected OSError when reading file " + jobStoreFilePath + " from job store") raise # If we get here, symlinking isn't an option. @@ -440,23 +449,38 @@ def readFile(self, jobStoreFileID, localFilePath, symlink=False): # It worked! return except OSError as e: + # For the list of the possible errno codes, see: https://linux.die.net/man/2/symlink if e.errno == errno.EEXIST: # Overwrite existing file, emulating shutil.copyfile(). os.unlink(localFilePath) # It would be very unlikely to fail again for same reason but possible # nonetheless in which case we should just give up. os.link(jobStoreFilePath, localFilePath) - # Now we succeeded and don't need to copy return elif e.errno == errno.EXDEV: # It's a cross-device link even though it didn't appear to be. # Just keep going and hit the file copy case. pass + # See https://github.com/DataBiosphere/toil/pull/4284 + elif e.errno == errno.EPERM: + # It's a cross-device link even though it didn't appear to be. + # Just keep going and hit the file copy case. + pass + elif e.errno == errno.EPERM: + # On some filesystems, hardlinking could be disallowed by permissions. + # In this case, we also fall back to making a complete copy. + pass + elif e.errno == errno.ELOOP: + # Too many symbolic links were encountered. Just keep going and hit the + # file copy case. + pass + elif e.errno == errno.EMLINK: + # The maximum number of links to file is reached. Just keep going and + # hit the file copy case. + pass else: - logger.critical('Unexpected OSError when reading file from job store') - logger.critical('jobStoreFilePath: ' + jobStoreFilePath + ' ' + str(os.path.exists(jobStoreFilePath))) - logger.critical('localFilePath: ' + localFilePath + ' ' + str(os.path.exists(localFilePath))) + logger.error("Unexpected OSError when reading file " + jobStoreFilePath + " from job store") raise # If we get here, neither a symlink nor a hardlink will work. @@ -567,7 +591,12 @@ def readStatsAndLogging(self, callback, readAll=False): newName = tempFile.rsplit('.', 1)[0] + '.new' newAbsTempFile = os.path.join(tempDir, newName) # Mark this item as read - os.rename(absTempFile, newAbsTempFile) + try: + os.rename(absTempFile, newAbsTempFile) + except OSError: + # Try move as rename fail between different file systems + shutil.move(absTempFile, newAbsTempFile) + return numberOfFilesProcessed ########################################## diff --git a/src/toil/lib/misc.py b/src/toil/lib/misc.py index e682e3310f..8b9ef582a9 100644 --- a/src/toil/lib/misc.py +++ b/src/toil/lib/misc.py @@ -1,19 +1,39 @@ import random from six.moves import xrange from math import sqrt +import datetime import errno +import logging import os import shutil +import subprocess import sys import time import socket from contextlib import contextmanager + if sys.version_info[0] < 3: # Define a usable FileNotFoundError as will be raised by os.remove on a # nonexistent file. FileNotFoundError = OSError +logger = logging.getLogger(__name__) +class CalledProcessErrorStderr(subprocess.CalledProcessError): + """Version of CalledProcessError that include stderr in the error message if it is set""" + + def __init__(self, returncode, cmd, output=None, stderr=None): + """Initialize parent class without stderr.""" + subprocess.CalledProcessError.__init__(self, returncode, cmd, output) + self.stderr = stderr + + def __str__(self): + if (self.returncode < 0) or (self.stderr is None): + return str(super(CalledProcessErrorStderr, self)) + else: + err = self.stderr if isinstance(self.stderr, str) else self.stderr.decode("ascii", errors="replace") + return "Command '%s' exit status %d:\n%s" % (" ".join(self.cmd), self.returncode, err) + def mkdir_p(path): """The equivalent of mkdir -p""" @@ -243,3 +263,46 @@ def atomic_copyobj(src_fh, dest_path, length=16384): with AtomicFileCreate(dest_path) as dest_path_tmp: with open(dest_path_tmp, 'wb') as dest_path_fh: shutil.copyfileobj(src_fh, dest_path_fh, length=length) + + +def call_command(cmd, input=None, timeout=None, useCLocale=True, env=None, quiet=False, *args): + """ + Simplified calling of external commands. + + If the process fails, CalledProcessErrorStderr is raised. + + The captured stderr is always printed, regardless of + if an exception occurs, so it can be logged. + + Always logs the command at debug log level. + + :param quiet: If True, do not log the command output. If False (the + default), do log the command output at debug log level. + + :param useCLocale: If True, C locale is forced, to prevent failures that + can occur in some batch systems when using UTF-8 locale. + + :returns: Command standard output, decoded as utf-8. + """ + + # NOTE: Interface MUST be kept in sync with call_sacct and call_scontrol in + # test_slurm.py, which monkey-patch this! + + # using non-C locales can cause GridEngine commands, maybe other to + # generate errors + if useCLocale: + env = dict(os.environ) if env is None else dict(env) # copy since modifying + env["LANGUAGE"] = env["LC_ALL"] = "C" + + logger.debug("run command: {}".format(" ".join(cmd))) + start_time = datetime.datetime.now() + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate(input=input) + end_time = datetime.datetime.now() + runtime = (end_time - start_time).total_seconds() + sys.stderr.write(stderr) + if proc.returncode != 0: + logger.debug("command failed in {}s: {}: {}".format(runtime, " ".join(cmd), stderr.rstrip())) + raise CalledProcessErrorStderr(proc.returncode, cmd, output=stdout, stderr=stderr) + logger.debug("command succeeded in {}s: {}{}".format(runtime, " ".join(cmd), (': ' + stdout.rstrip()) if not quiet else '')) + return stdout