Skip to content

Commit

Permalink
Merge pull request #22 from nextflow-io/mount-volume
Browse files Browse the repository at this point in the history
Mount volume spec
  • Loading branch information
abhi18av authored Mar 3, 2024
2 parents 21831ac + 374996e commit 6376aee
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 0 deletions.
55 changes: 55 additions & 0 deletions plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ import groovy.util.logging.Slf4j
class NomadConfig {
final static protected API_VERSION = "v1"

final static public String VOLUME_DOCKER_TYPE = "docker"
final static public String VOLUME_CSI_TYPE = "csi"
final static public String VOLUME_HOST_TYPE = "host"

final static protected String[] VOLUME_TYPES = [
VOLUME_CSI_TYPE, VOLUME_DOCKER_TYPE, VOLUME_HOST_TYPE
]

final NomadClientOpts clientOpts
final NomadJobOpts jobOpts

Expand Down Expand Up @@ -57,6 +65,7 @@ class NomadConfig {
final String region
final String namespace
final String dockerVolume
final VolumeSpec volumeSpec

NomadJobOpts(Map nomadJobOpts){
deleteOnCompletion = nomadJobOpts.containsKey("deleteOnCompletion") ?
Expand All @@ -71,6 +80,52 @@ class NomadConfig {
region = nomadJobOpts.region ?: null
namespace = nomadJobOpts.namespace ?: null
dockerVolume = nomadJobOpts.dockerVolume ?: null
if( dockerVolume ){
log.info "dockerVolume config will be deprecated, use volume type:'docker' name:'name' instead"
}
if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Closure){
this.volumeSpec = new VolumeSpec()
def closure = (nomadJobOpts.volume as Closure)
def clone = closure.rehydrate(this.volumeSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
this.volumeSpec.validate()
}else{
volumeSpec = null
}
}
}

class VolumeSpec{

private String type
private String name

String getType() {
return type
}

String getName() {
return name
}

VolumeSpec type(String type){
this.type = type
this
}

VolumeSpec name(String name){
this.name = name
this
}

protected validate(){
if( !VOLUME_TYPES.contains(type) ) {
throw new IllegalArgumentException("Volume type $type is not supported")
}
if( !this.name ){
throw new IllegalArgumentException("Volume name is required")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*
* Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain
* Copyright 2023, Stellenbosch University, South Africa
* Copyright 2022, Center for Medical Genetics, Ghent
*
Expand Down Expand Up @@ -28,6 +29,8 @@ import io.nomadproject.client.models.JobSummary
import io.nomadproject.client.models.Task
import io.nomadproject.client.models.TaskGroup
import io.nomadproject.client.models.TaskGroupSummary
import io.nomadproject.client.models.VolumeMount
import io.nomadproject.client.models.VolumeRequest
import nextflow.nomad.NomadConfig

/**
Expand Down Expand Up @@ -80,6 +83,15 @@ class NomadService implements Closeable{
name: "group",
tasks: [ task ]
)
if( config.jobOpts.volumeSpec){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
source: config.jobOpts.volumeSpec.name,
attachmentMode: "file-system",
accessMode: "multi-node-multi-writer"
)
}
return taskGroup
}

Expand All @@ -105,6 +117,13 @@ class NomadService implements Closeable{
readonly : false
]
}
if( config.jobOpts.volumeSpec){
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
task.volumeMounts = [ new VolumeMount(
destination: destinationDir,
volume: config.jobOpts.volumeSpec.name
)]
}
task
}

Expand Down
40 changes: 40 additions & 0 deletions plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,44 @@ class NomadConfigSpec extends Specification {
expect:
config.jobOpts.namespace == "namespace"
}

void "should instantiate a volume spec if specified"() {
when:
def config = new NomadConfig([
jobs: [volume : { type "docker" name "test" }]
])

then:
config.jobOpts.volumeSpec
config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec.name == "test"

when:
def config2 = new NomadConfig([
jobs: [volume : { type "csi" name "test" }]
])

then:
config2.jobOpts.volumeSpec
config2.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec.name == "test"

when:
def config3 = new NomadConfig([
jobs: [volume : { type "host" name "test" }]
])

then:
config3.jobOpts.volumeSpec
config3.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec.name == "test"

when:
new NomadConfig([
jobs: [volume : { type "not-supported" name "test" }]
])

then:
thrown(IllegalArgumentException)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,62 @@ class NomadServiceSpec extends Specification{
state == "Starting"

}

void "submit a task with a volume"(){
given:
def config = new NomadConfig(
client:[
address : "http://${mockWebServer.hostName}:${mockWebServer.port}"
],
jobs:[
volume: { type "csi" name "test" }
]
)
def service = new NomadService(config)

String id = "theId"
String name = "theName"
String image = "theImage"
List<String> args = ["theCommand", "theArgs"]
String workingDir = "a/b/c"
Map<String, String>env = [test:"test"]

mockWebServer.enqueue(new MockResponse()
.setBody(JsonOutput.toJson(["EvalID":"test"]).toString())
.addHeader("Content-Type", "application/json"));
when:

def idJob = service.submitTask(id, name, image, args, workingDir,env)
def recordedRequest = mockWebServer.takeRequest();
def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8())

then:
idJob

and:
recordedRequest.method == "POST"
recordedRequest.path == "/v1/jobs"

and:
body.Job
body.Job.ID == id
body.Job.Name == name
body.Job.Datacenters == []
body.Job.Type == "batch"
body.Job.TaskGroups.size() == 1
body.Job.TaskGroups[0].Name == "group"
body.Job.TaskGroups[0].Tasks.size() == 1
body.Job.TaskGroups[0].Tasks[0].Name == "nf-task"
body.Job.TaskGroups[0].Tasks[0].Driver == "docker"
body.Job.TaskGroups[0].Tasks[0].Config.image == image
body.Job.TaskGroups[0].Tasks[0].Config.work_dir == workingDir
body.Job.TaskGroups[0].Tasks[0].Config.command == args[0]
body.Job.TaskGroups[0].Tasks[0].Config.args == args.drop(1)

body.Job.TaskGroups[0].Volumes.size() == 1
body.Job.TaskGroups[0].Volumes['test'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"]
body.Job.TaskGroups[0].Tasks[0].VolumeMounts.size() == 1
body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"a", Volume:"test"]

}
}

0 comments on commit 6376aee

Please sign in to comment.