diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy index bf6bacf..a5f0f3f 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy @@ -18,6 +18,8 @@ package nextflow.nomad import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.nomad.config.AffinitySpec +import nextflow.nomad.config.VolumeSpec /** * Nomad Config @@ -30,14 +32,6 @@ import groovy.util.logging.Slf4j class NomadConfig { final static protected API_VERSION = "v1" - final static public String VOLUME_DOCKER_TYPE = "docker" - final static public String VOLUME_CSI_TYPE = "csi" - final static public String VOLUME_HOST_TYPE = "host" - - final static protected String[] VOLUME_TYPES = [ - VOLUME_CSI_TYPE, VOLUME_DOCKER_TYPE, VOLUME_HOST_TYPE - ] - final NomadClientOpts clientOpts final NomadJobOpts jobOpts @@ -66,6 +60,7 @@ class NomadConfig { final String namespace final String dockerVolume final VolumeSpec volumeSpec + final AffinitySpec affinitySpec NomadJobOpts(Map nomadJobOpts){ deleteOnCompletion = nomadJobOpts.containsKey("deleteOnCompletion") ? @@ -93,39 +88,17 @@ class NomadConfig { }else{ volumeSpec = null } - } - } - - class VolumeSpec{ - - private String type - private String name - - String getType() { - return type - } - - String getName() { - return name - } - - VolumeSpec type(String type){ - this.type = type - this - } - - VolumeSpec name(String name){ - this.name = name - this - } - - protected validate(){ - if( !VOLUME_TYPES.contains(type) ) { - throw new IllegalArgumentException("Volume type $type is not supported") - } - if( !this.name ){ - throw new IllegalArgumentException("Volume name is required") + if( nomadJobOpts.affinity && nomadJobOpts.affinity instanceof Closure){ + this.affinitySpec = new AffinitySpec() + def closure = (nomadJobOpts.affinity as Closure) + def clone = closure.rehydrate(this.affinitySpec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + this.affinitySpec.validate() + }else{ + affinitySpec = null } } } + } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/AffinitySpec.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/AffinitySpec.groovy new file mode 100644 index 0000000..0acee10 --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/AffinitySpec.groovy @@ -0,0 +1,48 @@ +package nextflow.nomad.config + +class AffinitySpec{ + + private String attribute + private String operator + private String value + private Integer weight + + String getOperator(){ + return operator + } + + String getAttribute() { + return attribute + } + + String getValue() { + return value + } + + Integer getWeight() { + return weight + } + + AffinitySpec attribute(String attribute){ + this.attribute=attribute + this + } + + AffinitySpec operator(String operator){ + this.operator = operator + this + } + + AffinitySpec value(String value){ + this.value = value + this + } + + AffinitySpec weight(int weight){ + this.weight = weight + this + } + + void validate(){ + } +} \ No newline at end of file diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy new file mode 100644 index 0000000..480ea7b --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy @@ -0,0 +1,44 @@ +package nextflow.nomad.config + +import nextflow.nomad.NomadConfig + +class VolumeSpec { + + final static public String VOLUME_DOCKER_TYPE = "docker" + final static public String VOLUME_CSI_TYPE = "csi" + final static public String VOLUME_HOST_TYPE = "host" + + final static protected String[] VOLUME_TYPES = [ + VOLUME_CSI_TYPE, VOLUME_DOCKER_TYPE, VOLUME_HOST_TYPE + ] + + private String type + private String name + + String getType() { + return type + } + + String getName() { + return name + } + + VolumeSpec type(String type){ + this.type = type + this + } + + VolumeSpec name(String name){ + this.name = name + this + } + + void validate(){ + if( !VOLUME_TYPES.contains(type) ) { + throw new IllegalArgumentException("Volume type $type is not supported") + } + if( !this.name ){ + throw new IllegalArgumentException("Volume name is required") + } + } +} diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index b91cd04..d8da13b 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -21,11 +21,11 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.nomadproject.client.ApiClient import io.nomadproject.client.api.JobsApi +import io.nomadproject.client.model.Affinity import io.nomadproject.client.model.AllocationListStub import io.nomadproject.client.model.Job import io.nomadproject.client.model.JobRegisterRequest import io.nomadproject.client.model.JobRegisterResponse -import io.nomadproject.client.model.JobSummary import io.nomadproject.client.model.ReschedulePolicy import io.nomadproject.client.model.Resources import io.nomadproject.client.model.RestartPolicy @@ -34,6 +34,7 @@ import io.nomadproject.client.model.TaskGroup import io.nomadproject.client.model.VolumeMount import io.nomadproject.client.model.VolumeRequest import nextflow.nomad.NomadConfig +import nextflow.nomad.config.VolumeSpec import nextflow.processor.TaskRun import nextflow.util.MemoryUnit @@ -120,7 +121,7 @@ class NomadService implements Closeable{ ) - if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE){ + if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE){ taskGroup.volumes = [:] taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest( type: config.jobOpts.volumeSpec.type, @@ -130,7 +131,7 @@ class NomadService implements Closeable{ ) } - if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE){ + if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE){ taskGroup.volumes = [:] taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest( type: config.jobOpts.volumeSpec.type, @@ -180,6 +181,23 @@ class NomadService implements Closeable{ volume: config.jobOpts.volumeSpec.name )] } + + if( config.jobOpts.affinitySpec ){ + def affinity = new Affinity() + if(config.jobOpts.affinitySpec.attribute){ + affinity.ltarget(config.jobOpts.affinitySpec.attribute) + } + + affinity.operand(config.jobOpts.affinitySpec.operator ?: "=") + + if(config.jobOpts.affinitySpec.value){ + affinity.rtarget(config.jobOpts.affinitySpec.value) + } + if(config.jobOpts.affinitySpec.weight != null){ + affinity.weight(config.jobOpts.affinitySpec.weight) + } + taskDef.affinities([affinity]) + } taskDef } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy index 24c90ec..391207c 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy @@ -17,6 +17,7 @@ package nextflow.nomad +import nextflow.nomad.config.VolumeSpec import spock.lang.Specification /** @@ -130,7 +131,7 @@ class NomadConfigSpec extends Specification { then: config.jobOpts.volumeSpec - config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_DOCKER_TYPE + config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_DOCKER_TYPE config.jobOpts.volumeSpec.name == "test" when: @@ -140,7 +141,7 @@ class NomadConfigSpec extends Specification { then: config2.jobOpts.volumeSpec - config2.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE + config2.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE config2.jobOpts.volumeSpec.name == "test" when: @@ -150,7 +151,7 @@ class NomadConfigSpec extends Specification { then: config3.jobOpts.volumeSpec - config3.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE + config3.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE config3.jobOpts.volumeSpec.name == "test" when: @@ -161,4 +162,23 @@ class NomadConfigSpec extends Specification { then: thrown(IllegalArgumentException) } + + void "should instantiate an affinity spec if specified"() { + when: + def config = new NomadConfig([ + jobs: [affinity : { + attribute '${meta.my_custom_value}' + operator ">" + value "3" + weight 50 + }] + ]) + + then: + config.jobOpts.affinitySpec + config.jobOpts.affinitySpec.getAttribute() == '${meta.my_custom_value}' + config.jobOpts.affinitySpec.getOperator() == '>' + config.jobOpts.affinitySpec.getValue() == '3' + config.jobOpts.affinitySpec.getWeight() == 50 + } }