From b55dc70462d53c4495782c311c3142358364deaa Mon Sep 17 00:00:00 2001 From: Abhinav Sharma Date: Wed, 18 Sep 2024 12:17:21 +0200 Subject: [PATCH] Refactor NomadService to rely on Builder classes (#89) * implement job builder * refactor the job modification methods * finalize the tests for JobBuilder and NomadJobOpts classes --- .../nextflow/nomad/builders/JobBuilder.groovy | 316 ++++++++++++++++++ .../nomad/executor/NomadService.groovy | 263 +-------------- .../nomad/builders/JobBuilderSpec.groovy | 102 ++++++ .../nomad/config/NomadJobOptsSpec.groovy | 133 ++++++++ .../nomad/executor/NomadServiceSpec.groovy | 2 +- 5 files changed, 569 insertions(+), 247 deletions(-) create mode 100644 plugins/nf-nomad/src/main/nextflow/nomad/builders/JobBuilder.groovy create mode 100644 plugins/nf-nomad/src/test/nextflow/nomad/builders/JobBuilderSpec.groovy create mode 100644 plugins/nf-nomad/src/test/nextflow/nomad/config/NomadJobOptsSpec.groovy diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/builders/JobBuilder.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/builders/JobBuilder.groovy new file mode 100644 index 0000000..e23a7a8 --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/builders/JobBuilder.groovy @@ -0,0 +1,316 @@ +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024-, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.nomad.builders + +import io.nomadproject.client.model.Affinity +import io.nomadproject.client.model.Constraint +import io.nomadproject.client.model.Job +import io.nomadproject.client.model.Spread +import io.nomadproject.client.model.TaskGroup +import io.nomadproject.client.model.Task +import io.nomadproject.client.model.ReschedulePolicy +import io.nomadproject.client.model.RestartPolicy +import io.nomadproject.client.model.Resources +import io.nomadproject.client.model.Template +import io.nomadproject.client.model.VolumeMount +import io.nomadproject.client.model.VolumeRequest +import nextflow.nomad.config.NomadJobOpts +import nextflow.nomad.executor.TaskDirectives +import nextflow.nomad.models.ConstraintsBuilder +import nextflow.nomad.models.JobConstraints +import nextflow.nomad.models.JobSpreads +import nextflow.nomad.models.JobVolume +import nextflow.nomad.models.SpreadsBuilder +import nextflow.processor.TaskRun +import nextflow.util.MemoryUnit + + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j + +/** +* +* @author Abhinav Sharma +*/ + +@Slf4j +@CompileStatic +class JobBuilder { + private Job job = new Job() + + JobBuilder withId(String id) { + job.ID = id + return this + } + + JobBuilder withName(String name) { + job.name = name + return this + } + + JobBuilder withType(String type) { + job.type = type + return this + } + + JobBuilder withDatacenters(List datacenters) { + job.datacenters = datacenters + return this + } + + static Job assignDatacenters(TaskRun task, Job job){ + def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS) + if( datacenters ){ + if( datacenters instanceof List) { + job.datacenters( datacenters as List) + return job; + } + if( datacenters instanceof Closure) { + String str = datacenters.call().toString() + job.datacenters( [str]) + return job; + } + job.datacenters( [datacenters.toString()] as List) + return job + } + job + } + + JobBuilder withNamespace(String namespace) { + job.namespace = namespace + return this + } + + JobBuilder withTaskGroups(List taskGroups) { + job.taskGroups = taskGroups + return this + } + + Job build() { + return job + } + + static protected Resources getResources(TaskRun task) { + final DEFAULT_CPUS = 1 + final DEFAULT_MEMORY = "500.MB" + + final taskCfg = task.getConfig() + final taskCores = !taskCfg.get("cpus") ? DEFAULT_CPUS : taskCfg.get("cpus") as Integer + final taskMemory = taskCfg.get("memory") ? new MemoryUnit( taskCfg.get("memory") as String ) : new MemoryUnit(DEFAULT_MEMORY) + + final res = new Resources() + .cores(taskCores) + .memoryMB(taskMemory.toMega() as Integer) + + return res + } + + static TaskGroup createTaskGroup(TaskRun taskRun, List args, Mapenv, NomadJobOpts jobOpts){ + final ReschedulePolicy taskReschedulePolicy = new ReschedulePolicy().attempts(jobOpts.rescheduleAttempts) + final RestartPolicy taskRestartPolicy = new RestartPolicy().attempts(jobOpts.restartAttempts) + + def task = createTask(taskRun, args, env, jobOpts) + def taskGroup = new TaskGroup( + name: "group", + tasks: [ task ], + reschedulePolicy: taskReschedulePolicy, + restartPolicy: taskRestartPolicy + ) + + + if( jobOpts.volumeSpec ) { + taskGroup.volumes = [:] + jobOpts.volumeSpec.eachWithIndex { volumeSpec , idx-> + if (volumeSpec && volumeSpec.type == JobVolume.VOLUME_CSI_TYPE) { + taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest( + type: volumeSpec.type, + source: volumeSpec.name, + attachmentMode: volumeSpec.attachmentMode, + accessMode: volumeSpec.accessMode, + readOnly: volumeSpec.readOnly, + ) + } + + if (volumeSpec && volumeSpec.type == JobVolume.VOLUME_HOST_TYPE) { + taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest( + type: volumeSpec.type, + source: volumeSpec.name, + readOnly: volumeSpec.readOnly, + ) + } + } + } + return taskGroup + } + + static Task createTask(TaskRun task, List args, Mapenv, NomadJobOpts jobOpts) { + final DRIVER = "docker" + final DRIVER_PRIVILEGED = true + + final imageName = task.container + final workingDir = task.workDir.toAbsolutePath().toString() + final taskResources = getResources(task) + + + def taskDef = new Task( + name: "nf-task", + driver: DRIVER, + resources: taskResources, + config: [ + image : imageName, + privileged: DRIVER_PRIVILEGED, + work_dir : workingDir, + command : args.first(), + args : args.tail(), + ] as Map, + env: env, + ) + + volumes(task, taskDef, workingDir, jobOpts) + affinity(task, taskDef, jobOpts) + constraint(task, taskDef, jobOpts) + constraints(task, taskDef, jobOpts) + secrets(task, taskDef, jobOpts) + return taskDef + } + + static protected Task volumes(TaskRun task, Task taskDef, String workingDir, NomadJobOpts jobOpts){ + if( jobOpts.dockerVolume){ + String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator) + taskDef.config.mount = [ + type : "volume", + target : destinationDir, + source : jobOpts.dockerVolume, + readonly : false + ] + } + + if( jobOpts.volumeSpec){ + taskDef.volumeMounts = [] + jobOpts.volumeSpec.eachWithIndex { volumeSpec, idx -> + String destinationDir = volumeSpec.workDir ? + workingDir.split(File.separator).dropRight(2).join(File.separator) : volumeSpec.path + taskDef.volumeMounts.add new VolumeMount( + destination: destinationDir, + volume: "vol_${idx}".toString() + ) + } + } + + taskDef + } + + static protected Task affinity(TaskRun task, Task taskDef, NomadJobOpts jobOpts) { + if (jobOpts.affinitySpec) { + def affinity = new Affinity() + if (jobOpts.affinitySpec.attribute) { + affinity.ltarget(jobOpts.affinitySpec.attribute) + } + + affinity.operand(jobOpts.affinitySpec.operator ?: "=") + + if (jobOpts.affinitySpec.value) { + affinity.rtarget(jobOpts.affinitySpec.value) + } + if (jobOpts.affinitySpec.weight != null) { + affinity.weight(jobOpts.affinitySpec.weight) + } + taskDef.affinities([affinity]) + } + taskDef + } + + protected static Task constraint(TaskRun task, Task taskDef, NomadJobOpts jobOpts){ + if( jobOpts.constraintSpec ){ + def constraint = new Constraint() + if(jobOpts.constraintSpec.attribute){ + constraint.ltarget(jobOpts.constraintSpec.attribute) + } + + constraint.operand(jobOpts.constraintSpec.operator ?: "=") + + if(jobOpts.constraintSpec.value){ + constraint.rtarget(jobOpts.constraintSpec.value) + } + taskDef.constraints([constraint]) + } + + taskDef + } + + protected static Task constraints(TaskRun task, Task taskDef, NomadJobOpts jobOpts){ + def constraints = [] as List + + if( jobOpts.constraintsSpec ){ + def list = ConstraintsBuilder.constraintsSpecToList(jobOpts.constraintsSpec) + constraints.addAll(list) + } + + if( task.processor?.config?.get(TaskDirectives.CONSTRAINTS) && + task.processor?.config?.get(TaskDirectives.CONSTRAINTS) instanceof Closure) { + Closure closure = task.processor?.config?.get(TaskDirectives.CONSTRAINTS) as Closure + JobConstraints constraintsSpec = JobConstraints.parse(closure) + def list = ConstraintsBuilder.constraintsSpecToList(constraintsSpec) + constraints.addAll(list) + } + + if( constraints.size()) { + taskDef.constraints(constraints) + } + taskDef + } + + protected static Task secrets(TaskRun task, Task taskDef, NomadJobOpts jobOpts){ + if( jobOpts?.secretOpts?.enabled) { + def secrets = task.processor?.config?.get(TaskDirectives.SECRETS) + if (secrets) { + Template template = new Template(envvars: true, destPath: "/secrets/nf-nomad") + String secretPath = jobOpts?.secretOpts?.path + String tmpl = secrets.collect { String name -> + "${name}={{ with nomadVar \"$secretPath/${name}\" }}{{ .${name} }}{{ end }}" + }.join('\n').stripIndent() + template.embeddedTmpl(tmpl) + taskDef.addTemplatesItem(template) + } + } + taskDef + } + + static Job spreads(TaskRun task, Job jobDef, NomadJobOpts jobOpts){ + def spreads = [] as List + if( jobOpts.spreadsSpec ){ + def list = SpreadsBuilder.spreadsSpecToList(jobOpts.spreadsSpec) + spreads.addAll(list) + } + if( task.processor?.config?.get(TaskDirectives.SPREAD) && + task.processor?.config?.get(TaskDirectives.SPREAD) instanceof Map) { + Map map = task.processor?.config?.get(TaskDirectives.SPREAD) as Map + JobSpreads spreadSpec = new JobSpreads() + spreadSpec.spread(map) + def list = SpreadsBuilder.spreadsSpecToList(spreadSpec) + spreads.addAll(list) + } + + spreads.each{ + jobDef.addSpreadsItem(it) + } + jobDef + } + + +} \ No newline at end of file diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index 42b25a6..a91c01b 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -24,14 +24,9 @@ import io.nomadproject.client.ApiException import io.nomadproject.client.api.JobsApi import io.nomadproject.client.api.VariablesApi import io.nomadproject.client.model.* -import nextflow.nomad.models.ConstraintsBuilder -import nextflow.nomad.models.JobConstraints +import nextflow.nomad.builders.JobBuilder import nextflow.nomad.config.NomadConfig -import nextflow.nomad.models.JobSpreads -import nextflow.nomad.models.JobVolume -import nextflow.nomad.models.SpreadsBuilder import nextflow.processor.TaskRun -import nextflow.util.MemoryUnit import nextflow.exception.ProcessSubmitException import java.nio.file.Path @@ -71,271 +66,47 @@ class NomadService implements Closeable{ this.variablesApi = new VariablesApi(apiClient) } - protected Resources getResources(TaskRun task) { - final DEFAULT_CPUS = 1 - final DEFAULT_MEMORY = "500.MB" - - final taskCfg = task.getConfig() - final taskCores = !taskCfg.get("cpus") ? DEFAULT_CPUS : taskCfg.get("cpus") as Integer - final taskMemory = taskCfg.get("memory") ? new MemoryUnit( taskCfg.get("memory") as String ) : new MemoryUnit(DEFAULT_MEMORY) - - final res = new Resources() - .cores(taskCores) - .memoryMB(taskMemory.toMega() as Integer) - - return res - } @Override void close() throws IOException { } - String submitTask(String id, TaskRun task, List args, Mapenv, Path saveJsonPath=null){ - Job job = new Job(); - job.ID = id - job.name = task.name - job.type = "batch" - job.datacenters = this.config.jobOpts().datacenters - job.namespace = this.config.jobOpts().namespace - job.taskGroups = [createTaskGroup(task, args, env)] - assignDatacenters(task, job) - spreads(task, job) + String submitTask(String id, TaskRun task, List args, Map env, Path saveJsonPath = null) { + Job job = new JobBuilder() + .withId(id) + .withName(task.name) + .withType("batch") +// .withDatacenters(task, this.config.jobOpts().datacenters) + .withNamespace(this.config.jobOpts().namespace) + .withTaskGroups([JobBuilder.createTaskGroup(task, args, env, this.config.jobOpts())]) + .build() + + JobBuilder.assignDatacenters(task, job) + JobBuilder.spreads(task, job, this.config.jobOpts()) JobRegisterRequest jobRegisterRequest = new JobRegisterRequest() jobRegisterRequest.setJob(job) - if( saveJsonPath ) try { + if (saveJsonPath) try { saveJsonPath.text = job.toString() - } - catch( Exception e ) { + } catch (Exception e) { log.debug "WARN: unable to save request json -- cause: ${e.message ?: e}" } - try { JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts().region, config.jobOpts().namespace, null, null) - jobRegisterResponse.evalID - } catch( ApiException apiException){ + return jobRegisterResponse.evalID + } catch (ApiException apiException) { log.debug("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException) throw new ProcessSubmitException("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException) } catch (Throwable e) { log.debug("[NOMAD] Failed to submit ${job.name} -- Cause: ${e.message ?: e}", e) throw new ProcessSubmitException("[NOMAD] Failed to submit ${job.name} -- Cause: ${e.message ?: e}", e) } - - } - - TaskGroup createTaskGroup(TaskRun taskRun, List args, Mapenv){ - final ReschedulePolicy taskReschedulePolicy = new ReschedulePolicy().attempts(this.config.jobOpts().rescheduleAttempts) - final RestartPolicy taskRestartPolicy = new RestartPolicy().attempts(this.config.jobOpts().restartAttempts) - - def task = createTask(taskRun, args, env) - def taskGroup = new TaskGroup( - name: "group", - tasks: [ task ], - reschedulePolicy: taskReschedulePolicy, - restartPolicy: taskRestartPolicy - ) - - - if( config.jobOpts().volumeSpec ) { - taskGroup.volumes = [:] - config.jobOpts().volumeSpec.eachWithIndex { volumeSpec , idx-> - if (volumeSpec && volumeSpec.type == JobVolume.VOLUME_CSI_TYPE) { - taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest( - type: volumeSpec.type, - source: volumeSpec.name, - attachmentMode: volumeSpec.attachmentMode, - accessMode: volumeSpec.accessMode, - readOnly: volumeSpec.readOnly, - ) - } - - if (volumeSpec && volumeSpec.type == JobVolume.VOLUME_HOST_TYPE) { - taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest( - type: volumeSpec.type, - source: volumeSpec.name, - readOnly: volumeSpec.readOnly, - ) - } - } - } - return taskGroup - } - - Task createTask(TaskRun task, List args, Mapenv) { - final DRIVER = "docker" - final DRIVER_PRIVILEGED = true - - final imageName = task.container - final workingDir = task.workDir.toAbsolutePath().toString() - final taskResources = getResources(task) - - - def taskDef = new Task( - name: "nf-task", - driver: DRIVER, - resources: taskResources, - config: [ - image : imageName, - privileged: DRIVER_PRIVILEGED, - work_dir : workingDir, - command : args.first(), - args : args.tail(), - ] as Map, - env: env, - ) - - volumes(task, taskDef, workingDir) - affinity(task, taskDef) - constraint(task, taskDef) - constraints(task, taskDef) - secrets(task, taskDef) - return taskDef - } - - protected Task volumes(TaskRun task, Task taskDef, String workingDir){ - if( config.jobOpts().dockerVolume){ - String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator) - taskDef.config.mount = [ - type : "volume", - target : destinationDir, - source : config.jobOpts().dockerVolume, - readonly : false - ] - } - - if( config.jobOpts().volumeSpec){ - taskDef.volumeMounts = [] - config.jobOpts().volumeSpec.eachWithIndex { volumeSpec, idx -> - String destinationDir = volumeSpec.workDir ? - workingDir.split(File.separator).dropRight(2).join(File.separator) : volumeSpec.path - taskDef.volumeMounts.add new VolumeMount( - destination: destinationDir, - volume: "vol_${idx}".toString() - ) - } - } - - taskDef } - protected Task affinity(TaskRun task, Task taskDef) { - if (config.jobOpts().affinitySpec) { - def affinity = new Affinity() - if (config.jobOpts().affinitySpec.attribute) { - affinity.ltarget(config.jobOpts().affinitySpec.attribute) - } - - affinity.operand(config.jobOpts().affinitySpec.operator ?: "=") - - if (config.jobOpts().affinitySpec.value) { - affinity.rtarget(config.jobOpts().affinitySpec.value) - } - if (config.jobOpts().affinitySpec.weight != null) { - affinity.weight(config.jobOpts().affinitySpec.weight) - } - taskDef.affinities([affinity]) - } - taskDef - } - - protected Task constraint(TaskRun task, Task taskDef){ - if( config.jobOpts().constraintSpec ){ - def constraint = new Constraint() - if(config.jobOpts().constraintSpec.attribute){ - constraint.ltarget(config.jobOpts().constraintSpec.attribute) - } - - constraint.operand(config.jobOpts().constraintSpec.operator ?: "=") - - if(config.jobOpts().constraintSpec.value){ - constraint.rtarget(config.jobOpts().constraintSpec.value) - } - taskDef.constraints([constraint]) - } - - taskDef - } - - protected Task constraints(TaskRun task, Task taskDef){ - def constraints = [] as List - - if( config.jobOpts().constraintsSpec ){ - def list = ConstraintsBuilder.constraintsSpecToList(config.jobOpts().constraintsSpec) - constraints.addAll(list) - } - - if( task.processor?.config?.get(TaskDirectives.CONSTRAINTS) && - task.processor?.config?.get(TaskDirectives.CONSTRAINTS) instanceof Closure) { - Closure closure = task.processor?.config?.get(TaskDirectives.CONSTRAINTS) as Closure - JobConstraints constraintsSpec = JobConstraints.parse(closure) - def list = ConstraintsBuilder.constraintsSpecToList(constraintsSpec) - constraints.addAll(list) - } - - if( constraints.size()) { - taskDef.constraints(constraints) - } - taskDef - } - - protected Task secrets(TaskRun task, Task taskDef){ - if( config.jobOpts()?.secretOpts?.enabled) { - def secrets = task.processor?.config?.get(TaskDirectives.SECRETS) - if (secrets) { - Template template = new Template(envvars: true, destPath: "/secrets/nf-nomad") - String secretPath = config.jobOpts()?.secretOpts?.path - String tmpl = secrets.collect { String name -> - "${name}={{ with nomadVar \"$secretPath/${name}\" }}{{ .${name} }}{{ end }}" - }.join('\n').stripIndent() - template.embeddedTmpl(tmpl) - taskDef.addTemplatesItem(template) - } - } - taskDef - } - - protected Job assignDatacenters(TaskRun task, Job job){ - def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS) - if( datacenters ){ - if( datacenters instanceof List) { - job.datacenters( datacenters as List) - return job; - } - if( datacenters instanceof Closure) { - String str = datacenters.call().toString() - job.datacenters( [str]) - return job; - } - job.datacenters( [datacenters.toString()] as List) - return job - } - job - } - - protected Job spreads(TaskRun task, Job jobDef){ - def spreads = [] as List - if( config.jobOpts().spreadsSpec ){ - def list = SpreadsBuilder.spreadsSpecToList(config.jobOpts().spreadsSpec) - spreads.addAll(list) - } - if( task.processor?.config?.get(TaskDirectives.SPREAD) && - task.processor?.config?.get(TaskDirectives.SPREAD) instanceof Map) { - Map map = task.processor?.config?.get(TaskDirectives.SPREAD) as Map - JobSpreads spreadSpec = new JobSpreads() - spreadSpec.spread(map) - def list = SpreadsBuilder.spreadsSpecToList(spreadSpec) - spreads.addAll(list) - } - - spreads.each{ - jobDef.addSpreadsItem(it) - } - jobDef - } String getJobState(String jobId){ try { diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/builders/JobBuilderSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/builders/JobBuilderSpec.groovy new file mode 100644 index 0000000..c515368 --- /dev/null +++ b/plugins/nf-nomad/src/test/nextflow/nomad/builders/JobBuilderSpec.groovy @@ -0,0 +1,102 @@ +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nextflow.nomad.builders + +import nextflow.nomad.config.NomadConfig +import nextflow.nomad.config.NomadJobOpts +import nextflow.processor.TaskRun +import okhttp3.mockwebserver.MockWebServer +import spock.lang.Specification + +/** + * Unit test for Nomad JobBuilder + * + * @author : Abhinav Sharma + */ + + +class JobBuilderSpec extends Specification { + + def "test JobBuilder withId method"() { + given: + def jobBuilder = new JobBuilder() + + when: + def jb = jobBuilder + .withId("test-id") + .build() + + then: + jb.ID == "test-id" + } + + + def "test createTask method"() { + given: + def jobOpts = Mock(NomadJobOpts) + def taskRun = Mock(TaskRun) + def args = ["arg1", "arg2"] + def env = ["key": "value"] + + taskRun.container >> "test-container" + taskRun.workDir >> new File("/test/workdir").toPath() + taskRun.getConfig() >> [cpus: 2, memory: "1GB"] + + when: + def task = JobBuilder.createTask(taskRun, args, env, jobOpts) + + then: + task.name == "nf-task" + task.driver == "docker" + task.config.image == "test-container" + task.config.command == "arg1" + task.config.args == ["arg2"] + task.env == env + task.resources.cores == 2 + task.resources.memoryMB == 1024 + } + + + def "test createTaskGroup method"() { + given: + def volumes = [{ type "csi" path "/container/path"}] + + def jobOpts = Mock(NomadJobOpts) + + def taskRun = Mock(TaskRun) + def args = ["arg1", "arg2"] + def env = ["key": "value"] + + taskRun.container >> "test-container" + taskRun.workDir >> new File("/test/workdir").toPath() + taskRun.getConfig() >> [cpus: 2, memory: "1GB"] + + when: + def taskGroup = JobBuilder.createTaskGroup(taskRun, args, env, jobOpts) + + then: + taskGroup.name == "group" + taskGroup.tasks.size() == 1 + taskGroup.tasks[0].name == "nf-task" + taskGroup.tasks[0].config.image == "test-container" + taskGroup.tasks[0].config.command == "arg1" + taskGroup.tasks[0].config.args == ["arg2"] + taskGroup.tasks[0].env == env + } + + +} diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadJobOptsSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadJobOptsSpec.groovy new file mode 100644 index 0000000..61fd04e --- /dev/null +++ b/plugins/nf-nomad/src/test/nextflow/nomad/config/NomadJobOptsSpec.groovy @@ -0,0 +1,133 @@ +/* + * Copyright 2023-, Stellenbosch University, South Africa + * Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.nomad.config + +import spock.lang.Specification + +/** + * Unit test for NomadJobOpts + * + * @author : Abhinav Sharma + */ +class NomadJobOptsSpec extends Specification { + + + def "test default values"() { + given: + def nomadJobOpts = new NomadJobOpts([:]) + + expect: + nomadJobOpts.rescheduleAttempts == 1 + nomadJobOpts.restartAttempts == 1 + nomadJobOpts.dockerVolume == null + } + + def "test rescheduleAttempts and restartAttempts"() { + given: + def nomadJobOpts = new NomadJobOpts([rescheduleAttempts: 3, restartAttempts: 2]) + + expect: + nomadJobOpts.rescheduleAttempts == 3 + nomadJobOpts.restartAttempts == 2 + } + + def "test dockerVolume"() { + given: + def nomadJobOpts = new NomadJobOpts([dockerVolume: "test-volume"]) + + expect: + nomadJobOpts.dockerVolume == "test-volume" + } + + def "test parseVolumes with single volume"() { + given: + def volumeClosure = { type "csi" name "test" } + def nomadJobOpts = new NomadJobOpts([volume: volumeClosure]) + + when: + def volumes = nomadJobOpts.parseVolumes([volume: volumeClosure]) + + then: + volumes.size() == 1 + volumes[0].workDir == true + } + + def "test parseVolumes with multiple volumes"() { + given: + def volumeClosure1 = { type "host" name "test" path "/volume/csi" } + def volumeClosure2 = { type "csi" name "test" path "/volume/host"} + def nomadJobOpts = new NomadJobOpts([volumes: [volumeClosure1, volumeClosure2]]) + + when: + def volumes = nomadJobOpts.parseVolumes([volumes: [volumeClosure1, volumeClosure2]]) + + then: + volumes.size() == 2 + volumes[0].path == "/volume/csi" + volumes[1].path == "/volume/host" + } + + def "test parseAffinity"() { + given: + def affinityClosure = { -> } + def nomadJobOpts = new NomadJobOpts([affinity: affinityClosure]) + + when: + def affinity = nomadJobOpts.parseAffinity([affinity: affinityClosure]) + + then: + affinity != null + } + + def "test parseConstraint"() { + given: + def constraintClosure = { -> } + def nomadJobOpts = new NomadJobOpts([constraint: constraintClosure]) + + when: + def constraint = nomadJobOpts.parseConstraint([constraint: constraintClosure]) + + then: + constraint != null + } + + def "test parseConstraints"() { + given: + def constraintsClosure = { -> } + def nomadJobOpts = new NomadJobOpts([constraints: constraintsClosure]) + + when: + def constraints = nomadJobOpts.parseConstraints([constraints: constraintsClosure]) + + then: + constraints != null + } + + + def "test parseSpreads"() { + given: + def spreadsClosure = { -> } + def nomadJobOpts = new NomadJobOpts([spreads: spreadsClosure]) + + when: + def spreads = nomadJobOpts.parseSpreads([spreads: spreadsClosure]) + + then: + spreads != null + } +} \ No newline at end of file diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy index 2902db3..66f84ad 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy @@ -557,7 +557,7 @@ class NomadServiceSpec extends Specification{ outputJson.text.indexOf(" Job {") != -1 } - void "submit a task with a datasource directive"(){ + void "submit a task with a datacenter directive"(){ given: def config = new NomadConfig( client:[