diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy index 7f181f0..624efc2 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy @@ -30,6 +30,14 @@ import groovy.util.logging.Slf4j class NomadConfig { final static protected API_VERSION = "v1" + final static public String VOLUME_DOCKER_TYPE = "docker" + final static public String VOLUME_CSI_TYPE = "csi" + final static public String VOLUME_HOST_TYPE = "host" + + final static protected String[] VOLUME_TYPES = [ + VOLUME_CSI_TYPE, VOLUME_DOCKER_TYPE, VOLUME_HOST_TYPE + ] + final NomadClientOpts clientOpts final NomadJobOpts jobOpts @@ -57,6 +65,7 @@ class NomadConfig { final String region final String namespace final String dockerVolume + final VolumeSpec volumeSpec NomadJobOpts(Map nomadJobOpts){ deleteOnCompletion = nomadJobOpts.containsKey("deleteOnCompletion") ? @@ -71,6 +80,52 @@ class NomadConfig { region = nomadJobOpts.region ?: null namespace = nomadJobOpts.namespace ?: null dockerVolume = nomadJobOpts.dockerVolume ?: null + if( dockerVolume ){ + log.info "dockerVolume config will be deprecated, use volume type:'docker' name:'name' instead" + } + if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Closure){ + this.volumeSpec = new VolumeSpec() + def closure = (nomadJobOpts.volume as Closure) + def clone = closure.rehydrate(this.volumeSpec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + this.volumeSpec.validate() + }else{ + volumeSpec = null + } + } + } + + class VolumeSpec{ + + private String type + private String name + + String getType() { + return type + } + + String getName() { + return name + } + + VolumeSpec type(String type){ + this.type = type + this + } + + VolumeSpec name(String name){ + this.name = name + this + } + + protected validate(){ + if( !VOLUME_TYPES.contains(type) ) { + throw new IllegalArgumentException("Volume type $type is not supported") + } + if( !this.name ){ + throw new IllegalArgumentException("Volume name is required") + } } } } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index e210645..7c1cdd5 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -1,4 +1,5 @@ /* + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain * Copyright 2023, Stellenbosch University, South Africa * Copyright 2022, Center for Medical Genetics, Ghent * @@ -28,6 +29,8 @@ import io.nomadproject.client.models.JobSummary import io.nomadproject.client.models.Task import io.nomadproject.client.models.TaskGroup import io.nomadproject.client.models.TaskGroupSummary +import io.nomadproject.client.models.VolumeMount +import io.nomadproject.client.models.VolumeRequest import nextflow.nomad.NomadConfig /** @@ -80,6 +83,15 @@ class NomadService implements Closeable{ name: "group", tasks: [ task ] ) + if( config.jobOpts.volumeSpec){ + taskGroup.volumes = [:] + taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest( + type: config.jobOpts.volumeSpec.type, + source: config.jobOpts.volumeSpec.name, + attachmentMode: "file-system", + accessMode: "multi-node-multi-writer" + ) + } return taskGroup } @@ -105,6 +117,13 @@ class NomadService implements Closeable{ readonly : false ] } + if( config.jobOpts.volumeSpec){ + String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator) + task.volumeMounts = [ new VolumeMount( + destination: destinationDir, + volume: config.jobOpts.volumeSpec.name + )] + } task } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy index e85f117..51fa952 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy @@ -119,4 +119,44 @@ class NomadConfigSpec extends Specification { expect: config.jobOpts.namespace == "namespace" } + + void "should instantiate a volume spec if specified"() { + when: + def config = new NomadConfig([ + jobs: [volume : { type "docker" name "test" }] + ]) + + then: + config.jobOpts.volumeSpec + config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_DOCKER_TYPE + config.jobOpts.volumeSpec.name == "test" + + when: + def config2 = new NomadConfig([ + jobs: [volume : { type "csi" name "test" }] + ]) + + then: + config2.jobOpts.volumeSpec + config2.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE + config2.jobOpts.volumeSpec.name == "test" + + when: + def config3 = new NomadConfig([ + jobs: [volume : { type "host" name "test" }] + ]) + + then: + config3.jobOpts.volumeSpec + config3.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE + config3.jobOpts.volumeSpec.name == "test" + + when: + new NomadConfig([ + jobs: [volume : { type "not-supported" name "test" }] + ]) + + then: + thrown(IllegalArgumentException) + } } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy index 9fdbc73..3e7e69d 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy @@ -187,4 +187,62 @@ class NomadServiceSpec extends Specification{ state == "Starting" } + + void "submit a task with a volume"(){ + given: + def config = new NomadConfig( + client:[ + address : "http://${mockWebServer.hostName}:${mockWebServer.port}" + ], + jobs:[ + volume: { type "csi" name "test" } + ] + ) + def service = new NomadService(config) + + String id = "theId" + String name = "theName" + String image = "theImage" + List args = ["theCommand", "theArgs"] + String workingDir = "a/b/c" + Mapenv = [test:"test"] + + mockWebServer.enqueue(new MockResponse() + .setBody(JsonOutput.toJson(["EvalID":"test"]).toString()) + .addHeader("Content-Type", "application/json")); + when: + + def idJob = service.submitTask(id, name, image, args, workingDir,env) + def recordedRequest = mockWebServer.takeRequest(); + def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8()) + + then: + idJob + + and: + recordedRequest.method == "POST" + recordedRequest.path == "/v1/jobs" + + and: + body.Job + body.Job.ID == id + body.Job.Name == name + body.Job.Datacenters == [] + body.Job.Type == "batch" + body.Job.TaskGroups.size() == 1 + body.Job.TaskGroups[0].Name == "group" + body.Job.TaskGroups[0].Tasks.size() == 1 + body.Job.TaskGroups[0].Tasks[0].Name == "nf-task" + body.Job.TaskGroups[0].Tasks[0].Driver == "docker" + body.Job.TaskGroups[0].Tasks[0].Config.image == image + body.Job.TaskGroups[0].Tasks[0].Config.work_dir == workingDir + body.Job.TaskGroups[0].Tasks[0].Config.command == args[0] + body.Job.TaskGroups[0].Tasks[0].Config.args == args.drop(1) + + body.Job.TaskGroups[0].Volumes.size() == 1 + body.Job.TaskGroups[0].Volumes['test'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"] + body.Job.TaskGroups[0].Tasks[0].VolumeMounts.size() == 1 + body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"a", Volume:"test"] + + } }