Skip to content

Commit

Permalink
Merge branch 'master' into issues/5025-import-on-workers
Browse files Browse the repository at this point in the history
  • Loading branch information
stxue1 committed Sep 19, 2024
2 parents 8fee9a9 + 8faca0f commit 62a0cec
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 3 deletions.
2 changes: 1 addition & 1 deletion requirements-cwl.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cwltool==3.1.20240708091337
cwltool==3.1.20240909164951
schema-salad>=8.4.20230128170514,<9
galaxy-tool-util<25
galaxy-util<25
Expand Down
9 changes: 8 additions & 1 deletion src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ def _runStep(self):
newJob = self.newJobsQueue.get()
if newJob is None:
logger.debug('Received queue sentinel.')
# Send out kill signals before stopping
self.killJobs()
return False
if self.killJobs():
activity = True
Expand Down Expand Up @@ -414,7 +416,7 @@ def issueBatchJob(self, command: str, jobDesc, job_environment: Optional[Dict[st
gpus = accelerator['count']
else:
gpus = jobDesc.accelerators

self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, command, get_job_kind(jobDesc.get_names()),
job_environment, gpus))
logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(jobID),
Expand Down Expand Up @@ -497,6 +499,11 @@ def shutdown(self) -> None:
"""
Signals thread to shutdown (via sentinel) then cleanly joins the thread
"""

for jobID in self.getIssuedBatchJobIDs():
# Send kill signals to any jobs that might be running
self.killQueue.put(jobID)

self.shutdownLocal()
newJobsQueue = self.newJobsQueue
self.newJobsQueue = None
Expand Down
34 changes: 34 additions & 0 deletions src/toil/test/wdl/testfiles/wait.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version 1.0

workflow wait {
input {
}

call waiter_task {
input:
}

output {
String result = read_lines(waiter_task.out)[0]
}
}

task waiter_task {
input {
}

command <<<
sleep 10 &
sleep 2 &
wait
echo "waited"
>>>

output {
File out = stdout()
}

runtime {
docker: "ubuntu:22.04"
}
}
16 changes: 16 additions & 0 deletions src/toil/test/wdl/wdltoil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from toil.fileStores import FileID
from toil.provisioners import cluster_factory
from toil.test import (ToilTest,
needs_docker,
needs_docker_cuda,
needs_google_storage,
needs_singularity_or_docker,
Expand Down Expand Up @@ -165,6 +166,21 @@ def test_url_to_file(self):
assert 'url_to_file.first_line' in result
assert isinstance(result['url_to_file.first_line'], str)
self.assertEqual(result['url_to_file.first_line'], 'chr1\t248387328')

@needs_docker
def test_wait(self):
"""
Test if Bash "wait" works in WDL scripts.
"""
wdl = os.path.abspath('src/toil/test/wdl/testfiles/wait.wdl')

result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--wdlContainer=docker'])
result = json.loads(result_json)

assert 'wait.result' in result
assert isinstance(result['wait.result'], str)
self.assertEqual(result['wait.result'], 'waited')

def test_url_to_optional_file(self):
"""
Expand Down
3 changes: 2 additions & 1 deletion src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2089,7 +2089,8 @@ def add_injections(self, command_string: str, task_container: TaskContainer) ->
}
""")
parts.append(script)
parts.append(f"_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &")
# Launch in a subshell so that it doesn't interfere with Bash "wait" in the main shell
parts.append(f"(_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &)")

if isinstance(task_container, SwarmContainer) and platform.system() == "Darwin":
# With gRPC FUSE file sharing, files immediately downloaded before
Expand Down

0 comments on commit 62a0cec

Please sign in to comment.