diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintAttrSpec.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintAttrSpec.groovy new file mode 100644 index 0000000..0e3cda8 --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintAttrSpec.groovy @@ -0,0 +1,38 @@ +package nextflow.nomad.config + +class ConstraintAttrSpec { + + private String arch = null + private Integer numcores= null + private Integer reservablecores= null + private Double totalcompute= null + + String getArch() { + return arch + } + + Integer getNumcores() { + return numcores + } + + Integer getReservablecores() { + return reservablecores + } + + Double getTotalcompute() { + return totalcompute + } + + ConstraintAttrSpec setCpu(Map map){ + cpu(map) + } + + ConstraintAttrSpec cpu(Map map){ + this.arch = map.containsKey("arch") ? map["arch"].toString() : null + this.numcores = map.containsKey("numcores") ? map["numcores"] as int : null + this.reservablecores = map.containsKey("reservablecores") ? map["reservablecores"] as int : null + this.totalcompute = map.containsKey("totalcompute") ? map["totalcompute"] as double : null + this + } + +} diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintsSpec.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintsSpec.groovy index 1d5aedd..0de58da 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintsSpec.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/ConstraintsSpec.groovy @@ -3,13 +3,23 @@ package nextflow.nomad.config class ConstraintsSpec { List nodeSpecs = [] + List attrSpecs = [] ConstraintsSpec node( @DelegatesTo(ConstraintNodeSpec)Closure closure){ - ConstraintNodeSpec constraintNodeSpec = new ConstraintNodeSpec() - def clone = closure.rehydrate(constraintNodeSpec, closure.owner, closure.thisObject) + ConstraintNodeSpec constraintSpec = new ConstraintNodeSpec() + def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject) clone.resolveStrategy = Closure.DELEGATE_FIRST clone() - nodeSpecs << constraintNodeSpec + nodeSpecs << constraintSpec + this + } + + ConstraintsSpec attr( @DelegatesTo(ConstraintAttrSpec)Closure closure){ + ConstraintAttrSpec constraintSpec = new ConstraintAttrSpec() + def clone = closure.rehydrate(constraintSpec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + attrSpecs << constraintSpec this } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/ConstraintsBuilder.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/ConstraintsBuilder.groovy new file mode 100644 index 0000000..5c5cc05 --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/ConstraintsBuilder.groovy @@ -0,0 +1,97 @@ +package nextflow.nomad.executor + +import io.nomadproject.client.model.Constraint +import nextflow.nomad.config.ConstraintAttrSpec +import nextflow.nomad.config.ConstraintNodeSpec +import nextflow.nomad.config.ConstraintsSpec + +class ConstraintsBuilder { + + protected static List constraintsSpecToList(ConstraintsSpec spec){ + def constraints = [] as List + if( spec?.nodeSpecs ){ + def nodes = spec.nodeSpecs + ?.collect({ nodeConstraints(it)}) + ?.flatten() as List + constraints.addAll(nodes) + } + if( spec?.attrSpecs ){ + def nodes = spec.attrSpecs + ?.collect({ attrConstraints(it)}) + ?.flatten() as List + constraints.addAll(nodes) + } + return constraints + } + + protected static List nodeConstraints(ConstraintNodeSpec nodeSpec){ + def ret = [] as List + if( nodeSpec.id ){ + ret.add new Constraint() + .ltarget('${node.unique.id}') + .operand("=") + .rtarget(nodeSpec.id) + } + if( nodeSpec.name ){ + ret.add new Constraint() + .ltarget('${node.unique.name}') + .operand("=") + .rtarget(nodeSpec.name) + } + if( nodeSpec.clientClass ){ + ret.add new Constraint() + .ltarget('${node.class}') + .operand("=") + .rtarget(nodeSpec.clientClass) + } + if( nodeSpec.dataCenter ){ + ret.add new Constraint() + .ltarget('${node.datacenter}') + .operand("=") + .rtarget(nodeSpec.dataCenter) + } + if( nodeSpec.region ){ + ret.add new Constraint() + .ltarget('${node.region}') + .operand("=") + .rtarget(nodeSpec.region) + } + if( nodeSpec.pool ){ + ret.add new Constraint() + .ltarget('${node.pool}') + .operand("=") + .rtarget(nodeSpec.pool) + } + ret + } + + protected static List attrConstraints(ConstraintAttrSpec nodeSpec) { + def ret = [] as List + if (nodeSpec.arch) { + ret.add new Constraint() + .ltarget('${attr.cpu.arch}') + .operand("=") + .rtarget(nodeSpec.arch) + } + if (nodeSpec.numcores) { + ret.add new Constraint() + .ltarget('${attr.cpu.numcores}') + .operand("=") + .rtarget("$nodeSpec.numcores") + } + if (nodeSpec.reservablecores) { + ret.add new Constraint() + .ltarget('${attr.cpu.reservablecores}') + .operand("=") + .rtarget("$nodeSpec.reservablecores") + } + if (nodeSpec.totalcompute) { + ret.add new Constraint() + .ltarget('${attr.cpu.totalcompute}') + .operand("=") + .rtarget("$nodeSpec.totalcompute") + } + ret + } + +} diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index 8c7d942..cd800c9 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -22,7 +22,6 @@ import groovy.util.logging.Slf4j import io.nomadproject.client.ApiClient import io.nomadproject.client.api.JobsApi import io.nomadproject.client.model.* -import nextflow.nomad.config.ConstraintNodeSpec import nextflow.nomad.config.ConstraintsSpec import nextflow.nomad.config.NomadConfig import nextflow.nomad.config.VolumeSpec @@ -258,7 +257,7 @@ class NomadService implements Closeable{ def constraints = [] as List if( config.jobOpts().constraintsSpec ){ - def list = constraintsSpecToList(config.jobOpts().constraintsSpec) + def list = ConstraintsBuilder.constraintsSpecToList(config.jobOpts().constraintsSpec) constraints.addAll(list) } @@ -266,7 +265,7 @@ class NomadService implements Closeable{ task.processor?.config?.get(TaskDirectives.CONSTRAINTS) instanceof Closure) { Closure closure = task.processor?.config?.get(TaskDirectives.CONSTRAINTS) as Closure ConstraintsSpec constraintsSpec = ConstraintsSpec.parse(closure) - def list = constraintsSpecToList(constraintsSpec) + def list = ConstraintsBuilder.constraintsSpecToList(constraintsSpec) constraints.addAll(list) } @@ -276,59 +275,7 @@ class NomadService implements Closeable{ taskDef } - protected List constraintsSpecToList(ConstraintsSpec spec){ - def constraints = [] as List - if( spec?.nodeSpecs ){ - def nodes = config.jobOpts() - .constraintsSpec - ?.nodeSpecs - ?.collect({ nodeConstraints(it)}) - ?.flatten() as List - constraints.addAll(nodes) - } - return constraints - } - protected List nodeConstraints(ConstraintNodeSpec nodeSpec){ - def ret = [] as List - if( nodeSpec.id ){ - ret.add new Constraint() - .ltarget('${node.unique.id}') - .operand("=") - .rtarget(nodeSpec.id) - } - if( nodeSpec.name ){ - ret.add new Constraint() - .ltarget('${node.unique.name}') - .operand("=") - .rtarget(nodeSpec.name) - } - if( nodeSpec.clientClass ){ - ret.add new Constraint() - .ltarget('${node.class}') - .operand("=") - .rtarget(nodeSpec.clientClass) - } - if( nodeSpec.dataCenter ){ - ret.add new Constraint() - .ltarget('${node.datacenter}') - .operand("=") - .rtarget(nodeSpec.dataCenter) - } - if( nodeSpec.region ){ - ret.add new Constraint() - .ltarget('${node.region}') - .operand("=") - .rtarget(nodeSpec.region) - } - if( nodeSpec.pool ){ - ret.add new Constraint() - .ltarget('${node.pool}') - .operand("=") - .rtarget(nodeSpec.pool) - } - ret - } protected Job assignDatacenters(TaskRun task, Job job){ def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS) diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadConstraintsSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadConstraintsSpec.groovy index ac2a253..43bb352 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadConstraintsSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadConstraintsSpec.groovy @@ -40,19 +40,25 @@ class NomadConstraintsSpec extends Specification { dataCenter = 'dc1' region = 'us' } + attr{ + cpu = [arch:'286'] + } } ] ]) then: config.jobOpts.constraintsSpec - config.jobOpts.constraintsSpec.nodeSpecs.size() + config.jobOpts.constraintsSpec.nodeSpecs.size() == 1 config.jobOpts.constraintsSpec.nodeSpecs[0].id == "node-id" config.jobOpts.constraintsSpec.nodeSpecs[0].name == "node-name" config.jobOpts.constraintsSpec.nodeSpecs[0].clientClass == "linux-64bit" config.jobOpts.constraintsSpec.nodeSpecs[0].pool == "custom-pool" config.jobOpts.constraintsSpec.nodeSpecs[0].dataCenter == "dc1" config.jobOpts.constraintsSpec.nodeSpecs[0].region == "us" + + config.jobOpts.constraintsSpec.attrSpecs.size() == 1 + config.jobOpts.constraintsSpec.attrSpecs[0].arch == '286' } void "should instantiate a no completed constraints spec"() { diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceConstraintsSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceConstraintsSpec.groovy new file mode 100644 index 0000000..11a2d77 --- /dev/null +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceConstraintsSpec.groovy @@ -0,0 +1,251 @@ +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.nomad.executor + +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import nextflow.executor.Executor +import nextflow.nomad.config.NomadConfig +import nextflow.processor.TaskBean +import nextflow.processor.TaskConfig +import nextflow.processor.TaskProcessor +import nextflow.processor.TaskRun +import nextflow.script.ProcessConfig +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import spock.lang.Specification + +import java.nio.file.Files +import java.nio.file.Path + +/** + * Unit test for Nomad Service + * + * Validate requests using a Mock WebServer + * + * @author : Jorge Aguilera + */ +class NomadServiceConstraintsSpec extends Specification{ + + MockWebServer mockWebServer + + def setup() { + mockWebServer = new MockWebServer() + mockWebServer.start() + } + + def cleanup() { + mockWebServer.shutdown() + } + + void "submit a task with a node constraint"(){ + given: + def config = new NomadConfig( + client:[ + address : "http://${mockWebServer.hostName}:${mockWebServer.port}" + ], + jobs:[ + constraints: { + node { + unique = [name:'test'] + } + } + ] + ) + def service = new NomadService(config) + + String id = "theId" + String name = "theName" + String image = "theImage" + List args = ["theCommand", "theArgs"] + String workingDir = "/a/b/c" + Mapenv = [test:"test"] + + def mockTask = Mock(TaskRun){ + getName() >> name + getContainer() >> image + getConfig() >> Mock(TaskConfig) + getWorkDirStr() >> workingDir + getContainer() >> "ubuntu" + getProcessor() >> Mock(TaskProcessor){ + getExecutor() >> Mock(Executor){ + isFusionEnabled() >> false + } + } + getWorkDir() >> Path.of(workingDir) + toTaskBean() >> Mock(TaskBean){ + getWorkDir() >> Path.of(workingDir) + getScript() >> "theScript" + getShell() >> ["bash"] + getInputFiles() >> [:] + } + } + + mockWebServer.enqueue(new MockResponse() + .setBody(JsonOutput.toJson(["EvalID":"test"]).toString()) + .addHeader("Content-Type", "application/json")); + when: + + def idJob = service.submitTask(id, mockTask, args, env) + def recordedRequest = mockWebServer.takeRequest(); + def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8()) + + then: + idJob + + and: + recordedRequest.method == "POST" + recordedRequest.path == "/v1/jobs" + + and: + body.Job.TaskGroups[0].Tasks[0].Constraints[0].LTarget == '${node.unique.name}' + body.Job.TaskGroups[0].Tasks[0].Constraints[0].RTarget == 'test' + body.Job.TaskGroups[0].Tasks[0].Constraints[0].Operand == '=' + } + + void "submit a task with a config attr constraint"(){ + given: + def config = new NomadConfig( + client:[ + address : "http://${mockWebServer.hostName}:${mockWebServer.port}" + ], + jobs:[ + constraints: { + attr { + cpu = [arch:'286'] + } + } + ] + ) + def service = new NomadService(config) + + String id = "theId" + String name = "theName" + String image = "theImage" + List args = ["theCommand", "theArgs"] + String workingDir = "/a/b/c" + Mapenv = [test:"test"] + + def mockTask = Mock(TaskRun){ + getName() >> name + getContainer() >> image + getConfig() >> Mock(TaskConfig) + getWorkDirStr() >> workingDir + getContainer() >> "ubuntu" + getProcessor() >> Mock(TaskProcessor){ + getExecutor() >> Mock(Executor){ + isFusionEnabled() >> false + } + } + getWorkDir() >> Path.of(workingDir) + toTaskBean() >> Mock(TaskBean){ + getWorkDir() >> Path.of(workingDir) + getScript() >> "theScript" + getShell() >> ["bash"] + getInputFiles() >> [:] + } + } + + mockWebServer.enqueue(new MockResponse() + .setBody(JsonOutput.toJson(["EvalID":"test"]).toString()) + .addHeader("Content-Type", "application/json")); + when: + + def idJob = service.submitTask(id, mockTask, args, env) + def recordedRequest = mockWebServer.takeRequest(); + def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8()) + + then: + idJob + + and: + recordedRequest.method == "POST" + recordedRequest.path == "/v1/jobs" + + and: + body.Job.TaskGroups[0].Tasks[0].Constraints[0].LTarget == '${attr.cpu.arch}' + body.Job.TaskGroups[0].Tasks[0].Constraints[0].RTarget == '286' + body.Job.TaskGroups[0].Tasks[0].Constraints[0].Operand == '=' + } + + void "submit a task with an attr constraint"(){ + given: + def config = new NomadConfig( + client:[ + address : "http://${mockWebServer.hostName}:${mockWebServer.port}" + ], + ) + def service = new NomadService(config) + + String id = "theId" + String name = "theName" + String image = "theImage" + List args = ["theCommand", "theArgs"] + String workingDir = "/a/b/c" + Mapenv = [test:"test"] + + def contraints = { + attr { + cpu = [arch:'286'] + } + } + + def mockTask = Mock(TaskRun){ + getName() >> name + getContainer() >> image + getConfig() >> Mock(TaskConfig) + getWorkDirStr() >> workingDir + getContainer() >> "ubuntu" + getProcessor() >> Mock(TaskProcessor){ + getExecutor() >> Mock(Executor){ + isFusionEnabled() >> false + } + getConfig() >> Mock(ProcessConfig){ + get("constraints") >> contraints + } + } + getWorkDir() >> Path.of(workingDir) + toTaskBean() >> Mock(TaskBean){ + getWorkDir() >> Path.of(workingDir) + getScript() >> "theScript" + getShell() >> ["bash"] + getInputFiles() >> [:] + } + } + + mockWebServer.enqueue(new MockResponse() + .setBody(JsonOutput.toJson(["EvalID":"test"]).toString()) + .addHeader("Content-Type", "application/json")); + when: + + def idJob = service.submitTask(id, mockTask, args, env) + def recordedRequest = mockWebServer.takeRequest(); + def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8()) + + then: + idJob + + and: + recordedRequest.method == "POST" + recordedRequest.path == "/v1/jobs" + + and: + body.Job.TaskGroups[0].Tasks[0].Constraints[0].LTarget == '${attr.cpu.arch}' + body.Job.TaskGroups[0].Tasks[0].Constraints[0].RTarget == '286' + body.Job.TaskGroups[0].Tasks[0].Constraints[0].Operand == '=' + } +} diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy index 2904264..4683617 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy @@ -623,69 +623,4 @@ class NomadServiceSpec extends Specification{ ({ 'a'*10 }) | ['aaaaaaaaaa'] } - void "submit a task with a node constraint"(){ - given: - def config = new NomadConfig( - client:[ - address : "http://${mockWebServer.hostName}:${mockWebServer.port}" - ], - jobs:[ - constraints: { - node { - unique = [name:'test'] - } - } - ] - ) - def service = new NomadService(config) - - String id = "theId" - String name = "theName" - String image = "theImage" - List args = ["theCommand", "theArgs"] - String workingDir = "/a/b/c" - Mapenv = [test:"test"] - - def mockTask = Mock(TaskRun){ - getName() >> name - getContainer() >> image - getConfig() >> Mock(TaskConfig) - getWorkDirStr() >> workingDir - getContainer() >> "ubuntu" - getProcessor() >> Mock(TaskProcessor){ - getExecutor() >> Mock(Executor){ - isFusionEnabled() >> false - } - } - getWorkDir() >> Path.of(workingDir) - toTaskBean() >> Mock(TaskBean){ - getWorkDir() >> Path.of(workingDir) - getScript() >> "theScript" - getShell() >> ["bash"] - getInputFiles() >> [:] - } - } - - mockWebServer.enqueue(new MockResponse() - .setBody(JsonOutput.toJson(["EvalID":"test"]).toString()) - .addHeader("Content-Type", "application/json")); - when: - - def idJob = service.submitTask(id, mockTask, args, env) - def recordedRequest = mockWebServer.takeRequest(); - def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8()) - - then: - idJob - - and: - recordedRequest.method == "POST" - recordedRequest.path == "/v1/jobs" - - and: - body.Job.TaskGroups[0].Tasks[0].Constraints[0].LTarget == '${node.unique.name}' - body.Job.TaskGroups[0].Tasks[0].Constraints[0].RTarget == 'test' - body.Job.TaskGroups[0].Tasks[0].Constraints[0].Operand == '=' - } - }