Skip to content

Commit

Permalink
add affinity spec configuration (#37)
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <[email protected]>
  • Loading branch information
jagedn authored Jun 12, 2024
1 parent f287683 commit 58726b2
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 46 deletions.
53 changes: 13 additions & 40 deletions plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package nextflow.nomad

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.nomad.config.AffinitySpec
import nextflow.nomad.config.VolumeSpec

/**
* Nomad Config
Expand All @@ -30,14 +32,6 @@ import groovy.util.logging.Slf4j
class NomadConfig {
final static protected API_VERSION = "v1"

final static public String VOLUME_DOCKER_TYPE = "docker"
final static public String VOLUME_CSI_TYPE = "csi"
final static public String VOLUME_HOST_TYPE = "host"

final static protected String[] VOLUME_TYPES = [
VOLUME_CSI_TYPE, VOLUME_DOCKER_TYPE, VOLUME_HOST_TYPE
]

final NomadClientOpts clientOpts
final NomadJobOpts jobOpts

Expand Down Expand Up @@ -66,6 +60,7 @@ class NomadConfig {
final String namespace
final String dockerVolume
final VolumeSpec volumeSpec
final AffinitySpec affinitySpec

NomadJobOpts(Map nomadJobOpts){
deleteOnCompletion = nomadJobOpts.containsKey("deleteOnCompletion") ?
Expand Down Expand Up @@ -93,39 +88,17 @@ class NomadConfig {
}else{
volumeSpec = null
}
}
}

class VolumeSpec{

private String type
private String name

String getType() {
return type
}

String getName() {
return name
}

VolumeSpec type(String type){
this.type = type
this
}

VolumeSpec name(String name){
this.name = name
this
}

protected validate(){
if( !VOLUME_TYPES.contains(type) ) {
throw new IllegalArgumentException("Volume type $type is not supported")
}
if( !this.name ){
throw new IllegalArgumentException("Volume name is required")
if( nomadJobOpts.affinity && nomadJobOpts.affinity instanceof Closure){
this.affinitySpec = new AffinitySpec()
def closure = (nomadJobOpts.affinity as Closure)
def clone = closure.rehydrate(this.affinitySpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
this.affinitySpec.validate()
}else{
affinitySpec = null
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package nextflow.nomad.config

class AffinitySpec{

private String attribute
private String operator
private String value
private Integer weight

String getOperator(){
return operator
}

String getAttribute() {
return attribute
}

String getValue() {
return value
}

Integer getWeight() {
return weight
}

AffinitySpec attribute(String attribute){
this.attribute=attribute
this
}

AffinitySpec operator(String operator){
this.operator = operator
this
}

AffinitySpec value(String value){
this.value = value
this
}

AffinitySpec weight(int weight){
this.weight = weight
this
}

void validate(){
}
}
44 changes: 44 additions & 0 deletions plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package nextflow.nomad.config

import nextflow.nomad.NomadConfig

class VolumeSpec {

final static public String VOLUME_DOCKER_TYPE = "docker"
final static public String VOLUME_CSI_TYPE = "csi"
final static public String VOLUME_HOST_TYPE = "host"

final static protected String[] VOLUME_TYPES = [
VOLUME_CSI_TYPE, VOLUME_DOCKER_TYPE, VOLUME_HOST_TYPE
]

private String type
private String name

String getType() {
return type
}

String getName() {
return name
}

VolumeSpec type(String type){
this.type = type
this
}

VolumeSpec name(String name){
this.name = name
this
}

void validate(){
if( !VOLUME_TYPES.contains(type) ) {
throw new IllegalArgumentException("Volume type $type is not supported")
}
if( !this.name ){
throw new IllegalArgumentException("Volume name is required")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiClient
import io.nomadproject.client.api.JobsApi
import io.nomadproject.client.model.Affinity
import io.nomadproject.client.model.AllocationListStub
import io.nomadproject.client.model.Job
import io.nomadproject.client.model.JobRegisterRequest
import io.nomadproject.client.model.JobRegisterResponse
import io.nomadproject.client.model.JobSummary
import io.nomadproject.client.model.ReschedulePolicy
import io.nomadproject.client.model.Resources
import io.nomadproject.client.model.RestartPolicy
Expand All @@ -34,6 +34,7 @@ import io.nomadproject.client.model.TaskGroup
import io.nomadproject.client.model.VolumeMount
import io.nomadproject.client.model.VolumeRequest
import nextflow.nomad.NomadConfig
import nextflow.nomad.config.VolumeSpec
import nextflow.processor.TaskRun
import nextflow.util.MemoryUnit

Expand Down Expand Up @@ -120,7 +121,7 @@ class NomadService implements Closeable{
)


if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE){
if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
Expand All @@ -130,7 +131,7 @@ class NomadService implements Closeable{
)
}

if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE){
if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
Expand Down Expand Up @@ -180,6 +181,23 @@ class NomadService implements Closeable{
volume: config.jobOpts.volumeSpec.name
)]
}

if( config.jobOpts.affinitySpec ){
def affinity = new Affinity()
if(config.jobOpts.affinitySpec.attribute){
affinity.ltarget(config.jobOpts.affinitySpec.attribute)
}

affinity.operand(config.jobOpts.affinitySpec.operator ?: "=")

if(config.jobOpts.affinitySpec.value){
affinity.rtarget(config.jobOpts.affinitySpec.value)
}
if(config.jobOpts.affinitySpec.weight != null){
affinity.weight(config.jobOpts.affinitySpec.weight)
}
taskDef.affinities([affinity])
}
taskDef
}

Expand Down
26 changes: 23 additions & 3 deletions plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package nextflow.nomad

import nextflow.nomad.config.VolumeSpec
import spock.lang.Specification

/**
Expand Down Expand Up @@ -130,7 +131,7 @@ class NomadConfigSpec extends Specification {

then:
config.jobOpts.volumeSpec
config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec.name == "test"

when:
Expand All @@ -140,7 +141,7 @@ class NomadConfigSpec extends Specification {

then:
config2.jobOpts.volumeSpec
config2.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec.name == "test"

when:
Expand All @@ -150,7 +151,7 @@ class NomadConfigSpec extends Specification {

then:
config3.jobOpts.volumeSpec
config3.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec.name == "test"

when:
Expand All @@ -161,4 +162,23 @@ class NomadConfigSpec extends Specification {
then:
thrown(IllegalArgumentException)
}

void "should instantiate an affinity spec if specified"() {
when:
def config = new NomadConfig([
jobs: [affinity : {
attribute '${meta.my_custom_value}'
operator ">"
value "3"
weight 50
}]
])

then:
config.jobOpts.affinitySpec
config.jobOpts.affinitySpec.getAttribute() == '${meta.my_custom_value}'
config.jobOpts.affinitySpec.getOperator() == '>'
config.jobOpts.affinitySpec.getValue() == '3'
config.jobOpts.affinitySpec.getWeight() == 50
}
}

0 comments on commit 58726b2

Please sign in to comment.