diff --git a/docs/running/cliOptions.rst b/docs/running/cliOptions.rst index e4a8a96ecc..c5f678c1b3 100644 --- a/docs/running/cliOptions.rst +++ b/docs/running/cliOptions.rst @@ -199,6 +199,10 @@ levels in toil are based on priority from the logging module: 'h_vmem=MEMORY' to the qsub call, and instead rely on TOIL_GRIDGENGINE_ARGS to supply alternative arguments. Requires that TOIL_GRIDGENGINE_ARGS be set. + --memoryIsProduct + If the batch system understands requested memory as a product of the requested + memory and the number of cores, set this flag to properly allocate memory. This + can be fairly common with grid engine clusters (Ex: SGE, PBS, Torque). --runCwlInternalJobsOnWorkers Whether to run CWL internal jobs (e.g. CWLScatter) on the worker nodes instead of the primary node. If false diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 7a1a9d9bca..5f7d570ccd 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -128,7 +128,8 @@ def createJobs(self, newJob: JobTuple) -> bool: len(self.runningJobs) < int(self.boss.config.max_jobs): activity = True jobID, cpu, memory, command, jobName, environment, gpus = self.waitingJobs.pop(0) - + if self.boss.config.memory_is_product and cpu > 1: + memory = memory // cpu # prepare job submission command subLine = self.prepareSubmission(cpu, memory, jobID, command, jobName, environment, gpus) logger.debug("Running %r", subLine) diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index b33f9971fb..bfab8e5718 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -185,6 +185,10 @@ def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) - "systems such as gridengine, htcondor, torque, slurm, and lsf." ) + parser.add_argument('--memoryIsProduct', dest='memory_is_product', default=False, action="store_true", + help="If the batch system understands requested memory as a product of the requested memory and the number" + "of cores, set this flag to properly allocate memory.") + for name in get_batch_systems(): # All the batch systems are responsible for adding their own options # with the add_options class method. diff --git a/src/toil/common.py b/src/toil/common.py index 5c4083eefe..62a052be9f 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -243,6 +243,8 @@ class Config: # CWL cwl: bool + memory_is_product: bool + def __init__(self) -> None: # only default options that are not CLI options defined here (thus CLI options are centralized) self.cwl = False # will probably remove later @@ -417,6 +419,8 @@ def set_option(option_name: str, set_option("logLevel") set_option("colored_logs") + set_option("memory_is_product") + # Apply overrides as highest priority # Override workDir with value of TOIL_WORKDIR_OVERRIDE if it exists if os.getenv('TOIL_WORKDIR_OVERRIDE') is not None: diff --git a/src/toil/lib/aws/s3.py b/src/toil/lib/aws/s3.py new file mode 100644 index 0000000000..77cb94d56e --- /dev/null +++ b/src/toil/lib/aws/s3.py @@ -0,0 +1,28 @@ +# Copyright (C) 2015-2024 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import List + +from mypy_boto3_s3.type_defs import ListMultipartUploadsOutputTypeDef + +from toil.lib.aws import session, AWSServerErrors +from toil.lib.retry import retry + +logger = logging.getLogger(__name__) + + +@retry(errors=[AWSServerErrors]) +def list_multipart_uploads(bucket: str, region: str, prefix: str, max_uploads: int = 1) -> ListMultipartUploadsOutputTypeDef: + s3_client = session.client("s3", region_name=region) + return s3_client.list_multipart_uploads(Bucket=bucket, MaxUploads=max_uploads, Prefix=prefix)