diff --git a/requirements-cwl.txt b/requirements-cwl.txt index 5d1c3202c7..ff3084b490 100644 --- a/requirements-cwl.txt +++ b/requirements-cwl.txt @@ -1,4 +1,4 @@ -cwltool==3.1.20240708091337 +cwltool==3.1.20240909164951 schema-salad>=8.4.20230128170514,<9 galaxy-tool-util<25 galaxy-util<25 diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 5f7d570ccd..24bd4bb4fb 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -257,6 +257,8 @@ def _runStep(self): newJob = self.newJobsQueue.get() if newJob is None: logger.debug('Received queue sentinel.') + # Send out kill signals before stopping + self.killJobs() return False if self.killJobs(): activity = True @@ -414,7 +416,7 @@ def issueBatchJob(self, command: str, jobDesc, job_environment: Optional[Dict[st gpus = accelerator['count'] else: gpus = jobDesc.accelerators - + self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, command, get_job_kind(jobDesc.get_names()), job_environment, gpus)) logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(jobID), @@ -497,6 +499,11 @@ def shutdown(self) -> None: """ Signals thread to shutdown (via sentinel) then cleanly joins the thread """ + + for jobID in self.getIssuedBatchJobIDs(): + # Send kill signals to any jobs that might be running + self.killQueue.put(jobID) + self.shutdownLocal() newJobsQueue = self.newJobsQueue self.newJobsQueue = None diff --git a/src/toil/test/wdl/testfiles/wait.wdl b/src/toil/test/wdl/testfiles/wait.wdl new file mode 100644 index 0000000000..08024fffc5 --- /dev/null +++ b/src/toil/test/wdl/testfiles/wait.wdl @@ -0,0 +1,34 @@ +version 1.0 + +workflow wait { + input { + } + + call waiter_task { + input: + } + + output { + String result = read_lines(waiter_task.out)[0] + } +} + +task waiter_task { + input { + } + + command <<< + sleep 10 & + sleep 2 & + wait + echo "waited" + >>> + + output { + File out = stdout() + } + + runtime { + docker: "ubuntu:22.04" + } +} diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 4873f5a7bf..6de389b11c 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -17,6 +17,7 @@ from toil.fileStores import FileID from toil.provisioners import cluster_factory from toil.test import (ToilTest, + needs_docker, needs_docker_cuda, needs_google_storage, needs_singularity_or_docker, @@ -165,6 +166,21 @@ def test_url_to_file(self): assert 'url_to_file.first_line' in result assert isinstance(result['url_to_file.first_line'], str) self.assertEqual(result['url_to_file.first_line'], 'chr1\t248387328') + + @needs_docker + def test_wait(self): + """ + Test if Bash "wait" works in WDL scripts. + """ + wdl = os.path.abspath('src/toil/test/wdl/testfiles/wait.wdl') + + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--wdlContainer=docker']) + result = json.loads(result_json) + + assert 'wait.result' in result + assert isinstance(result['wait.result'], str) + self.assertEqual(result['wait.result'], 'waited') def test_url_to_optional_file(self): """ diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 12094a42d6..158398dd5f 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -2089,7 +2089,8 @@ def add_injections(self, command_string: str, task_container: TaskContainer) -> } """) parts.append(script) - parts.append(f"_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &") + # Launch in a subshell so that it doesn't interfere with Bash "wait" in the main shell + parts.append(f"(_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &)") if isinstance(task_container, SwarmContainer) and platform.system() == "Darwin": # With gRPC FUSE file sharing, files immediately downloaded before