From e8f28ab45b94f685a38c4542f67928be8cc8494b Mon Sep 17 00:00:00 2001 From: Jorge Date: Mon, 16 Sep 2024 16:44:31 +0200 Subject: [PATCH] add spread feature (#88) Signed-off-by: Jorge Aguilera --- .../nextflow/nomad/config/NomadJobOpts.groovy | 14 ++++ .../nomad/executor/NomadService.groovy | 24 +++++++ .../nomad/executor/TaskDirectives.groovy | 5 +- .../nextflow/nomad/models/JobSpreads.groovy | 64 +++++++++++++++++++ .../nomad/models/SpreadsBuilder.groovy | 30 +++++++++ .../nomad/config/NomadConfigSpec.groovy | 32 ++++++++++ .../nomad/executor/NomadServiceSpec.groovy | 63 ++++++++++++++++++ validation/spread/main.nf | 18 ++++++ validation/spread/node-nextflow.config | 35 ++++++++++ 9 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 plugins/nf-nomad/src/main/nextflow/nomad/models/JobSpreads.groovy create mode 100644 plugins/nf-nomad/src/main/nextflow/nomad/models/SpreadsBuilder.groovy create mode 100644 validation/spread/main.nf create mode 100644 validation/spread/node-nextflow.config diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadJobOpts.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadJobOpts.groovy index 10ef8d3..f68bcdc 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadJobOpts.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadJobOpts.groovy @@ -22,6 +22,7 @@ import groovy.util.logging.Slf4j import nextflow.nomad.models.JobAffinity import nextflow.nomad.models.JobConstraint import nextflow.nomad.models.JobConstraints +import nextflow.nomad.models.JobSpreads import nextflow.nomad.models.JobVolume @@ -45,6 +46,7 @@ class NomadJobOpts{ JobAffinity affinitySpec JobConstraint constraintSpec JobConstraints constraintsSpec + JobSpreads spreadsSpec Integer rescheduleAttempts Integer restartAttempts @@ -85,6 +87,7 @@ class NomadJobOpts{ this.constraintSpec = parseConstraint(nomadJobOpts) this.constraintsSpec = parseConstraints(nomadJobOpts) this.secretOpts = parseSecrets(nomadJobOpts) + this.spreadsSpec = parseSpreads(nomadJobOpts) } JobVolume[] parseVolumes(Map nomadJobOpts){ @@ -177,4 +180,15 @@ class NomadJobOpts{ } } + JobSpreads parseSpreads(Map nomadJobOpts){ + if( nomadJobOpts.spreads && nomadJobOpts.spreads instanceof Closure){ + def spec = new JobSpreads() + def closure = (nomadJobOpts.spreads as Closure) + def clone = closure.rehydrate(spec, closure.owner, closure.thisObject) + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone() + spec.validate() + spec + } + } } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index 15f1ca7..42b25a6 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -27,7 +27,9 @@ import io.nomadproject.client.model.* import nextflow.nomad.models.ConstraintsBuilder import nextflow.nomad.models.JobConstraints import nextflow.nomad.config.NomadConfig +import nextflow.nomad.models.JobSpreads import nextflow.nomad.models.JobVolume +import nextflow.nomad.models.SpreadsBuilder import nextflow.processor.TaskRun import nextflow.util.MemoryUnit import nextflow.exception.ProcessSubmitException @@ -99,6 +101,7 @@ class NomadService implements Closeable{ job.taskGroups = [createTaskGroup(task, args, env)] assignDatacenters(task, job) + spreads(task, job) JobRegisterRequest jobRegisterRequest = new JobRegisterRequest() jobRegisterRequest.setJob(job) @@ -313,6 +316,27 @@ class NomadService implements Closeable{ job } + protected Job spreads(TaskRun task, Job jobDef){ + def spreads = [] as List + if( config.jobOpts().spreadsSpec ){ + def list = SpreadsBuilder.spreadsSpecToList(config.jobOpts().spreadsSpec) + spreads.addAll(list) + } + if( task.processor?.config?.get(TaskDirectives.SPREAD) && + task.processor?.config?.get(TaskDirectives.SPREAD) instanceof Map) { + Map map = task.processor?.config?.get(TaskDirectives.SPREAD) as Map + JobSpreads spreadSpec = new JobSpreads() + spreadSpec.spread(map) + def list = SpreadsBuilder.spreadsSpecToList(spreadSpec) + spreads.addAll(list) + } + + spreads.each{ + jobDef.addSpreadsItem(it) + } + jobDef + } + String getJobState(String jobId){ try { List allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null) diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/TaskDirectives.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/TaskDirectives.groovy index e72c381..473975b 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/TaskDirectives.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/TaskDirectives.groovy @@ -8,9 +8,12 @@ class TaskDirectives { public static final String SECRETS = "secret" + public static final String SPREAD = "spread" + public static final List ALL = [ DATACENTERS, CONSTRAINTS, - SECRETS + SECRETS, + SPREAD ] } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/models/JobSpreads.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/models/JobSpreads.groovy new file mode 100644 index 0000000..e86abae --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/models/JobSpreads.groovy @@ -0,0 +1,64 @@ +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package nextflow.nomad.models + +import org.apache.commons.lang3.tuple.Pair +import org.apache.commons.lang3.tuple.Triple + +/** + * Nomad Job Spread Spec + * + * @author Jorge Aguilera + */ + +class JobSpreads { + + private List>>> raws= [] + + List>>> getRaws() { + return raws + } + + JobSpreads setSpread(Map map){ + spread(map) + } + + JobSpreads spread(Map map){ + if( map.containsKey("name") && map.containsKey("weight")){ + def name = map.name as String + def weight = map.weight as int + def targets = [] as List + if( map.containsKey("targets") && map.targets instanceof Map){ + (map.targets as Map).entrySet().each{entry-> + def target = entry.key as String + if( entry.value.toString().isNumber() ){ + def targetW = entry.value as int + targets.add( Pair.of(target, targetW)) + } + } + } + raws.add Triple.of(name, weight, targets) + } + this + } + + void validate(){ + + } +} diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/models/SpreadsBuilder.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/models/SpreadsBuilder.groovy new file mode 100644 index 0000000..8bb601d --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/models/SpreadsBuilder.groovy @@ -0,0 +1,30 @@ +package nextflow.nomad.models + +import groovy.transform.CompileStatic +import io.nomadproject.client.model.Spread +import io.nomadproject.client.model.SpreadTarget + +/** + * Nomad Job Spread Spec Builder + * + * @author Jorge Aguilera + */ + +@CompileStatic +class SpreadsBuilder { + + static List spreadsSpecToList(JobSpreads spreads){ + def ret = [] as List + + spreads.raws.each{raw-> + def targets = [] as List + raw.right.each { + targets.add( new SpreadTarget(value: it.left, percent: it.right) ) + } + ret.add new Spread(attribute: raw.left, weight: raw.middle, spreadTarget: targets) + } + + return ret + } + +} diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadConfigSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadConfigSpec.groovy index 3856a57..0534576 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadConfigSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadConfigSpec.groovy @@ -295,4 +295,36 @@ class NomadConfigSpec extends Specification { then: thrown(IllegalArgumentException) } + + void "should instantiate a spread spec if specified"() { + when: + def config = new NomadConfig([ + jobs: [spreads : { + spread = [name:'test', weight:100] + }] + ]) + + then: + config.jobOpts.spreadsSpec + config.jobOpts.spreadsSpec.getRaws().size() == 1 + config.jobOpts.spreadsSpec.getRaws().first().left == 'test' + config.jobOpts.spreadsSpec.getRaws().first().middle == 100 + config.jobOpts.spreadsSpec.getRaws().first().right.size() == 0 + + when: + def config2 = new NomadConfig([ + jobs: [spreads : { + spread = [name:'test', weight:100, targets:[ a:50, b:100]] + }] + ]) + + then: + config2.jobOpts.spreadsSpec + config2.jobOpts.spreadsSpec.getRaws().size() == 1 + config2.jobOpts.spreadsSpec.getRaws().first().left == 'test' + config2.jobOpts.spreadsSpec.getRaws().first().middle == 100 + config2.jobOpts.spreadsSpec.getRaws().first().right.size() == 2 + config2.jobOpts.spreadsSpec.getRaws().first().right.first().left == 'a' + config2.jobOpts.spreadsSpec.getRaws().first().right.first().right == 50 + } } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy index 4683617..2902db3 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy @@ -623,4 +623,67 @@ class NomadServiceSpec extends Specification{ ({ 'a'*10 }) | ['aaaaaaaaaa'] } + void "submit a task with a spread"(){ + given: + def config = new NomadConfig( + client:[ + address : "http://${mockWebServer.hostName}:${mockWebServer.port}" + ], + jobs:[ + spreads : { + spread = [name:'test', weight:50, targets:['a':30]] + } + ] + ) + def service = new NomadService(config) + + String id = "theId" + String name = "theName" + String image = "theImage" + List args = ["theCommand", "theArgs"] + String workingDir = "/a/b/c" + Mapenv = [test:"test"] + + def mockTask = Mock(TaskRun){ + getName() >> name + getContainer() >> image + getConfig() >> Mock(TaskConfig) + getWorkDirStr() >> workingDir + getContainer() >> "ubuntu" + getProcessor() >> Mock(TaskProcessor){ + getExecutor() >> Mock(Executor){ + isFusionEnabled() >> false + } + } + getWorkDir() >> Path.of(workingDir) + toTaskBean() >> Mock(TaskBean){ + getWorkDir() >> Path.of(workingDir) + getScript() >> "theScript" + getShell() >> ["bash"] + getInputFiles() >> [:] + } + } + + mockWebServer.enqueue(new MockResponse() + .setBody(JsonOutput.toJson(["EvalID":"test"]).toString()) + .addHeader("Content-Type", "application/json")); + when: + + def idJob = service.submitTask(id, mockTask, args, env) + def recordedRequest = mockWebServer.takeRequest(); + def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8()) + + then: + idJob + + and: + recordedRequest.method == "POST" + recordedRequest.path == "/v1/jobs" + + and: + body.Job.Spreads[0].Attribute == 'test' + body.Job.Spreads[0].Weight == 50 + body.Job.Spreads[0].SpreadTarget.first().Value == 'a' + body.Job.Spreads[0].SpreadTarget.first().Percent == 30 + } } diff --git a/validation/spread/main.nf b/validation/spread/main.nf new file mode 100644 index 0000000..87f26f8 --- /dev/null +++ b/validation/spread/main.nf @@ -0,0 +1,18 @@ +#!/usr/bin/env nextflow + +process sayHello { + container 'ubuntu:20.04' + + input: + val x + output: + stdout + script: + """ + echo '$x world!' + """ +} + +workflow { + Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view +} \ No newline at end of file diff --git a/validation/spread/node-nextflow.config b/validation/spread/node-nextflow.config new file mode 100644 index 0000000..031cab3 --- /dev/null +++ b/validation/spread/node-nextflow.config @@ -0,0 +1,35 @@ +plugins { + id "nf-nomad@${System.getenv("NOMAD_PLUGIN_VERSION") ?: "latest"}" +} + +process { + executor = "nomad" +} + +nomad { + + client { + address = "http://localhost:4646" + } + + jobs { + deleteOnCompletion = false + volume = { type "host" name "scratchdir" } + + spreads = { + spread = [ name:'node.datacenter', weight: 50 ] + } + } + +} + +profiles{ + localnomad{ + process { + withName: sayHello { + datacenters = ['test-datacenter', 'demo-datacenter'] + spread = [ name:'node.datacenter', weight: 50, targets : ['us-east1':70, 'us-east2':30] ] + } + } + } +} \ No newline at end of file