Skip to content

Commit

Permalink
Merge pull request #46 from nextflow-io/multiple-volumes
Browse files Browse the repository at this point in the history
implementation of possibility to add multiple volumes
  • Loading branch information
abhi18av authored Jun 21, 2024
2 parents f692563 + 3593b8e commit fa034a0
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 39 deletions.
37 changes: 30 additions & 7 deletions plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class NomadConfig {
final String region
final String namespace
final String dockerVolume
final VolumeSpec volumeSpec
final VolumeSpec[] volumeSpec
final AffinitySpec affinitySpec
final ConstraintSpec constraintSpec

Expand All @@ -83,23 +83,46 @@ class NomadConfig {
log.info "dockerVolume config will be deprecated, use volume type:'docker' name:'name' instead"
}

this.volumeSpec = parseVolume(nomadJobOpts)
this.volumeSpec = parseVolumes(nomadJobOpts)
this.affinitySpec = parseAffinity(nomadJobOpts)
this.constraintSpec = parseConstraint(nomadJobOpts)
}

VolumeSpec parseVolume(Map nomadJobOpts){
VolumeSpec[] parseVolumes(Map nomadJobOpts){
List<VolumeSpec> ret = []
if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Closure){
def volumeSpec = new VolumeSpec()
def closure = (nomadJobOpts.volume as Closure)
def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
volumeSpec.validate()
volumeSpec
}else{
null
volumeSpec.workDir(true)
ret.add volumeSpec
}

if( nomadJobOpts.volumes && nomadJobOpts.volumes instanceof List){
nomadJobOpts.volumes.each{ closure ->
if( closure instanceof Closure){
def volumeSpec = new VolumeSpec()
def clone = closure.rehydrate(volumeSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
ret.add volumeSpec
}
}
}

if( ret.size() && !ret.find{ it.workDir } ){
ret.first().workDir(true)
}

ret*.validate()

if( ret.findAll{ it.workDir}.size() > 1 ){
throw new IllegalArgumentException("No more than a workdir volume allowed")
}

return ret as VolumeSpec[]
}

AffinitySpec parseAffinity(Map nomadJobOpts) {
Expand Down
27 changes: 24 additions & 3 deletions plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package nextflow.nomad.config

import nextflow.nomad.NomadConfig

class VolumeSpec {

final static public String VOLUME_DOCKER_TYPE = "docker"
Expand All @@ -14,6 +12,8 @@ class VolumeSpec {

private String type
private String name
private String path
private boolean workDir = false

String getType() {
return type
Expand All @@ -23,6 +23,14 @@ class VolumeSpec {
return name
}

boolean getWorkDir() {
return workDir
}

String getPath() {
return path
}

VolumeSpec type(String type){
this.type = type
this
Expand All @@ -33,12 +41,25 @@ class VolumeSpec {
this
}

VolumeSpec workDir(boolean b){
this.workDir = b
this
}

VolumeSpec path(String path){
this.path = path
this
}

void validate(){
if( !VOLUME_TYPES.contains(type) ) {
throw new IllegalArgumentException("Volume type $type is not supported")
}
if( !this.name ){
throw new IllegalArgumentException("Volume name is required")
throw new IllegalArgumentException("Volume name is required (type $type)")
}
if( !this.workDir && !this.path ){
throw new IllegalArgumentException("Volume path is required in secondary volumes")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,26 @@ class NomadService implements Closeable{
)


if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE){
if( config.jobOpts.volumeSpec ) {
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
source: config.jobOpts.volumeSpec.name,
attachmentMode: "file-system",
accessMode: "multi-node-multi-writer"
)
}

if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
source: config.jobOpts.volumeSpec.name,
)
config.jobOpts.volumeSpec.eachWithIndex { volumeSpec , idx->
if (volumeSpec && volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE) {
taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest(
type: volumeSpec.type,
source: volumeSpec.name,
attachmentMode: "file-system",
accessMode: "multi-node-multi-writer"
)
}

if (volumeSpec && volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE) {
taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest(
type: volumeSpec.type,
source: volumeSpec.name,
)
}
}
}

return taskGroup
}

Expand Down Expand Up @@ -187,11 +189,15 @@ class NomadService implements Closeable{
}

if( config.jobOpts.volumeSpec){
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
taskDef.volumeMounts = [ new VolumeMount(
destination: destinationDir,
volume: config.jobOpts.volumeSpec.name
)]
taskDef.volumeMounts = []
config.jobOpts.volumeSpec.eachWithIndex { volumeSpec, idx ->
String destinationDir = volumeSpec.workDir ?
workingDir.split(File.separator).dropRight(2).join(File.separator) : volumeSpec.path
taskDef.volumeMounts.add new VolumeMount(
destination: destinationDir,
volume: "vol_${idx}".toString()
)
}
}

if( config.jobOpts.affinitySpec ){
Expand Down
82 changes: 76 additions & 6 deletions plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ class NomadConfigSpec extends Specification {

then:
config.jobOpts.volumeSpec
config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec.name == "test"
config.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec[0].name == "test"

when:
def config2 = new NomadConfig([
Expand All @@ -141,8 +141,8 @@ class NomadConfigSpec extends Specification {

then:
config2.jobOpts.volumeSpec
config2.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec.name == "test"
config2.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec[0].name == "test"

when:
def config3 = new NomadConfig([
Expand All @@ -151,8 +151,8 @@ class NomadConfigSpec extends Specification {

then:
config3.jobOpts.volumeSpec
config3.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec.name == "test"
config3.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec[0].name == "test"

when:
new NomadConfig([
Expand Down Expand Up @@ -198,4 +198,74 @@ class NomadConfigSpec extends Specification {
config.jobOpts.constraintSpec.getOperator() == '>'
config.jobOpts.constraintSpec.getValue() == '3'
}

void "should instantiate multiple volumes spec if specified"() {
when:
def config = new NomadConfig([
jobs: [
volumes : [
{ type "docker" name "test" }
]
]
])

then:
config.jobOpts.volumeSpec
config.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec[0].name == "test"
config.jobOpts.volumeSpec[0].workDir

when:
new NomadConfig([
jobs: [
volumes : [
{ type "csi" name "test" },
{ type "docker" name "test" },
]
]
])

then:
thrown(IllegalArgumentException)

when:
def config2 = new NomadConfig([
jobs: [
volumes : [
{ type "csi" name "test" },
{ type "docker" name "test" path '/data' },
]
]
])

then:
config2.jobOpts.volumeSpec.size()==2
config2.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec[0].name == "test"
config2.jobOpts.volumeSpec[1].type == VolumeSpec.VOLUME_DOCKER_TYPE
config2.jobOpts.volumeSpec[1].name == "test"

config.jobOpts.volumeSpec[0].workDir
config.jobOpts.volumeSpec.findAll{ it.workDir}.size() == 1

when:
def config3 = new NomadConfig([
jobs: [
volumes : [
{ type "csi" name "test" path '/data'},
{ type "docker" name "test" path '/data'},
],
volume : { type "host" name "test" },
]
])

then:
config3.jobOpts.volumeSpec.size()==3
config3.jobOpts.volumeSpec[0].type == VolumeSpec.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec[1].type == VolumeSpec.VOLUME_CSI_TYPE
config3.jobOpts.volumeSpec[2].type == VolumeSpec.VOLUME_DOCKER_TYPE

config.jobOpts.volumeSpec[0].workDir
config.jobOpts.volumeSpec.findAll{ it.workDir}.size() == 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ class NomadServiceSpec extends Specification{
body.Job.TaskGroups[0].Tasks[0].Config.args == args.drop(1)

body.Job.TaskGroups[0].Volumes.size() == 1
body.Job.TaskGroups[0].Volumes['test'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"]
body.Job.TaskGroups[0].Volumes['vol_0'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"]
body.Job.TaskGroups[0].Tasks[0].VolumeMounts.size() == 1
body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"/a", Volume:"test"]
body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"/a", Volume:"vol_0"]

}

Expand Down
23 changes: 23 additions & 0 deletions validation/multiple-volumes/2-volumes.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
plugins {
id 'nf-nomad@latest'
}

process {
executor = "nomad"
}

nomad {

client {
address = "http://localhost:4646"
}

jobs {
deleteOnCompletion = false
volumes = [
{ type "host" name "scratchdir" },
{ type "host" name "scratchdir" path "/var/data" }, // can mount same volume in different path
]
}

}
26 changes: 26 additions & 0 deletions validation/multiple-volumes/3-volumes.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
id 'nf-nomad@latest'
}

process {
executor = "nomad"
}

nomad {

client {
address = "http://localhost:4646"
}

jobs {
deleteOnCompletion = false

volume = { type "host" name "scratchdir" }

volumes = [
{ type "host" name "scratchdir" path "/var/data1" },
{ type "host" name "scratchdir" path "/var/data2" }
]
}

}
18 changes: 18 additions & 0 deletions validation/multiple-volumes/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env nextflow

process sayHello {
container 'ubuntu:20.04'

input:
val x
output:
stdout
script:
"""
echo '$x world!'
"""
}

workflow {
Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view
}

0 comments on commit fa034a0

Please sign in to comment.