Skip to content

Commit

Permalink
implement ConstraintAttrSpec
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <[email protected]>
  • Loading branch information
jagedn committed Jul 14, 2024
1 parent d63c50f commit 3e30508
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 124 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package nextflow.nomad.config

class ConstraintAttrSpec {

private String arch = null
private Integer numcores= null
private Integer reservablecores= null
private Double totalcompute= null

String getArch() {
return arch
}

Integer getNumcores() {
return numcores
}

Integer getReservablecores() {
return reservablecores
}

Double getTotalcompute() {
return totalcompute
}

ConstraintAttrSpec setCpu(Map map){
cpu(map)
}

ConstraintAttrSpec cpu(Map map){
this.arch = map.containsKey("arch") ? map["arch"].toString() : null
this.numcores = map.containsKey("numcores") ? map["numcores"] as int : null
this.reservablecores = map.containsKey("reservablecores") ? map["reservablecores"] as int : null
this.totalcompute = map.containsKey("totalcompute") ? map["totalcompute"] as double : null
this
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@ package nextflow.nomad.config
class ConstraintsSpec {

List<ConstraintNodeSpec> nodeSpecs = []
List<ConstraintAttrSpec> attrSpecs = []

ConstraintsSpec node( @DelegatesTo(ConstraintNodeSpec)Closure closure){
ConstraintNodeSpec constraintNodeSpec = new ConstraintNodeSpec()
def clone = closure.rehydrate(constraintNodeSpec, closure.owner, closure.thisObject)
ConstraintNodeSpec constraintSpec = new ConstraintNodeSpec()
def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
nodeSpecs << constraintNodeSpec
nodeSpecs << constraintSpec
this
}

ConstraintsSpec attr( @DelegatesTo(ConstraintAttrSpec)Closure closure){
ConstraintAttrSpec constraintSpec = new ConstraintAttrSpec()
def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
attrSpecs << constraintSpec
this
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package nextflow.nomad.executor

import io.nomadproject.client.model.Constraint
import nextflow.nomad.config.ConstraintAttrSpec
import nextflow.nomad.config.ConstraintNodeSpec
import nextflow.nomad.config.ConstraintsSpec

class ConstraintsBuilder {

protected static List<Constraint> constraintsSpecToList(ConstraintsSpec spec){
def constraints = [] as List<Constraint>
if( spec?.nodeSpecs ){
def nodes = spec.nodeSpecs
?.collect({ nodeConstraints(it)})
?.flatten() as List<Constraint>
constraints.addAll(nodes)
}
if( spec?.attrSpecs ){
def nodes = spec.attrSpecs
?.collect({ attrConstraints(it)})
?.flatten() as List<Constraint>
constraints.addAll(nodes)
}
return constraints
}

protected static List<Constraint> nodeConstraints(ConstraintNodeSpec nodeSpec){
def ret = [] as List<Constraint>
if( nodeSpec.id ){
ret.add new Constraint()
.ltarget('${node.unique.id}')
.operand("=")
.rtarget(nodeSpec.id)
}
if( nodeSpec.name ){
ret.add new Constraint()
.ltarget('${node.unique.name}')
.operand("=")
.rtarget(nodeSpec.name)
}
if( nodeSpec.clientClass ){
ret.add new Constraint()
.ltarget('${node.class}')
.operand("=")
.rtarget(nodeSpec.clientClass)
}
if( nodeSpec.dataCenter ){
ret.add new Constraint()
.ltarget('${node.datacenter}')
.operand("=")
.rtarget(nodeSpec.dataCenter)
}
if( nodeSpec.region ){
ret.add new Constraint()
.ltarget('${node.region}')
.operand("=")
.rtarget(nodeSpec.region)
}
if( nodeSpec.pool ){
ret.add new Constraint()
.ltarget('${node.pool}')
.operand("=")
.rtarget(nodeSpec.pool)
}
ret
}

protected static List<Constraint> attrConstraints(ConstraintAttrSpec nodeSpec) {
def ret = [] as List<Constraint>
if (nodeSpec.arch) {
ret.add new Constraint()
.ltarget('${attr.cpu.arch}')
.operand("=")
.rtarget(nodeSpec.arch)
}
if (nodeSpec.numcores) {
ret.add new Constraint()
.ltarget('${attr.cpu.numcores}')
.operand("=")
.rtarget("$nodeSpec.numcores")
}
if (nodeSpec.reservablecores) {
ret.add new Constraint()
.ltarget('${attr.cpu.reservablecores}')
.operand("=")
.rtarget("$nodeSpec.reservablecores")
}
if (nodeSpec.totalcompute) {
ret.add new Constraint()
.ltarget('${attr.cpu.totalcompute}')
.operand("=")
.rtarget("$nodeSpec.totalcompute")
}
ret
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiClient
import io.nomadproject.client.api.JobsApi
import io.nomadproject.client.model.*
import nextflow.nomad.config.ConstraintNodeSpec
import nextflow.nomad.config.ConstraintsSpec
import nextflow.nomad.config.NomadConfig
import nextflow.nomad.config.VolumeSpec
Expand Down Expand Up @@ -258,15 +257,15 @@ class NomadService implements Closeable{
def constraints = [] as List<Constraint>

if( config.jobOpts().constraintsSpec ){
def list = constraintsSpecToList(config.jobOpts().constraintsSpec)
def list = ConstraintsBuilder.constraintsSpecToList(config.jobOpts().constraintsSpec)
constraints.addAll(list)
}

if( task.processor?.config?.get(TaskDirectives.CONSTRAINTS) &&
task.processor?.config?.get(TaskDirectives.CONSTRAINTS) instanceof Closure) {
Closure closure = task.processor?.config?.get(TaskDirectives.CONSTRAINTS) as Closure
ConstraintsSpec constraintsSpec = ConstraintsSpec.parse(closure)
def list = constraintsSpecToList(constraintsSpec)
def list = ConstraintsBuilder.constraintsSpecToList(constraintsSpec)
constraints.addAll(list)
}

Expand All @@ -276,59 +275,7 @@ class NomadService implements Closeable{
taskDef
}

protected List<Constraint> constraintsSpecToList(ConstraintsSpec spec){
def constraints = [] as List<Constraint>
if( spec?.nodeSpecs ){
def nodes = config.jobOpts()
.constraintsSpec
?.nodeSpecs
?.collect({ nodeConstraints(it)})
?.flatten() as List<Constraint>
constraints.addAll(nodes)
}
return constraints
}

protected List<Constraint> nodeConstraints(ConstraintNodeSpec nodeSpec){
def ret = [] as List<Constraint>
if( nodeSpec.id ){
ret.add new Constraint()
.ltarget('${node.unique.id}')
.operand("=")
.rtarget(nodeSpec.id)
}
if( nodeSpec.name ){
ret.add new Constraint()
.ltarget('${node.unique.name}')
.operand("=")
.rtarget(nodeSpec.name)
}
if( nodeSpec.clientClass ){
ret.add new Constraint()
.ltarget('${node.class}')
.operand("=")
.rtarget(nodeSpec.clientClass)
}
if( nodeSpec.dataCenter ){
ret.add new Constraint()
.ltarget('${node.datacenter}')
.operand("=")
.rtarget(nodeSpec.dataCenter)
}
if( nodeSpec.region ){
ret.add new Constraint()
.ltarget('${node.region}')
.operand("=")
.rtarget(nodeSpec.region)
}
if( nodeSpec.pool ){
ret.add new Constraint()
.ltarget('${node.pool}')
.operand("=")
.rtarget(nodeSpec.pool)
}
ret
}

protected Job assignDatacenters(TaskRun task, Job job){
def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,25 @@ class NomadConstraintsSpec extends Specification {
dataCenter = 'dc1'
region = 'us'
}
attr{
cpu = [arch:'286']
}
}
]
])

then:
config.jobOpts.constraintsSpec
config.jobOpts.constraintsSpec.nodeSpecs.size()
config.jobOpts.constraintsSpec.nodeSpecs.size() == 1
config.jobOpts.constraintsSpec.nodeSpecs[0].id == "node-id"
config.jobOpts.constraintsSpec.nodeSpecs[0].name == "node-name"
config.jobOpts.constraintsSpec.nodeSpecs[0].clientClass == "linux-64bit"
config.jobOpts.constraintsSpec.nodeSpecs[0].pool == "custom-pool"
config.jobOpts.constraintsSpec.nodeSpecs[0].dataCenter == "dc1"
config.jobOpts.constraintsSpec.nodeSpecs[0].region == "us"

config.jobOpts.constraintsSpec.attrSpecs.size() == 1
config.jobOpts.constraintsSpec.attrSpecs[0].arch == '286'
}

void "should instantiate a no completed constraints spec"() {
Expand Down
Loading

0 comments on commit 3e30508

Please sign in to comment.