diff --git a/.github/workflows/ghpages.yml b/.github/workflows/ghpages.yml index d4fc66a..f018918 100644 --- a/.github/workflows/ghpages.yml +++ b/.github/workflows/ghpages.yml @@ -2,7 +2,7 @@ name: ghpages on: push: branches: - - 'main' + - 'master' workflow_dispatch: @@ -46,4 +46,4 @@ jobs: steps: - name: Deploy to GitHub Pages id: deployment - uses: actions/deploy-pages@v2 \ No newline at end of file + uses: actions/deploy-pages@v2 diff --git a/.gitignore b/.gitignore index 61c50b1..3353b66 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ oracle-nomad-cluster .settings /validation/nomad_temp/** /validation/nomad +**/*.tsv \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 82d0a4d..9b1ae50 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=0.2.0-edge1 +version=0.3.1 github_organization=nextflow-io \ No newline at end of file diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy index 96a6b3b..5973aaf 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadExecutor.groovy @@ -60,6 +60,9 @@ class NomadExecutor extends Executor implements ExtensionPoint { @Override TaskHandler createTaskHandler(TaskRun task) { + assert task + assert task.workDir + log.trace "[NOMAD] launching process > ${task.name} -- work folder: ${task.workDirStr}" return new NomadTaskHandler(task, this.config, service) } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index b29bf0f..a04ad2e 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -28,6 +28,7 @@ import nextflow.nomad.builders.JobBuilder import nextflow.nomad.config.NomadConfig import nextflow.processor.TaskRun import nextflow.exception.ProcessSubmitException +import org.threeten.bp.OffsetDateTime import java.nio.file.Path @@ -114,52 +115,38 @@ class NomadService implements Closeable{ } - String getJobState(String jobId){ + TaskState getTaskState(String jobId){ try { List allocations = safeExecutor.apply { jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null) } - AllocationListStub last = allocations?.sort { + AllocationListStub last = allocations ? allocations.sort { it.modifyIndex - }?.last() - String currentState = last?.taskStates?.values()?.last()?.state - log.debug "Task $jobId , state=$currentState" - currentState ?: "Unknown" + }?.last() : null + TaskState currentState = last?.taskStates?.values()?.last() + log.debug "Task $jobId , state=${currentState?.state}" + currentState ?: new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now()) }catch(Exception e){ log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e) - "dead" + new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now()) } } - boolean checkIfRunning(String jobId){ + String getJobStatus(String jobId){ try { Job job = safeExecutor.apply { jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null) } log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status" - job.status == "running" + job.status }catch (Exception e){ log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e) - false - } - } - - boolean checkIfDead(String jobId){ - try{ - Job job = safeExecutor.apply { - jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, - null, null, null, null, null, null, null) - } - log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status" - job.status == "dead" - }catch (Exception e){ - log.debug("[NOMAD] Failed to get job ${jobId} -- Cause: ${e.message ?: e}", e) - true + "unknown" } } diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy index c5a91ae..87a52b5 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadTaskHandler.groovy @@ -19,6 +19,7 @@ package nextflow.nomad.executor import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import io.nomadproject.client.model.TaskState import nextflow.exception.ProcessSubmitException import nextflow.exception.ProcessUnrecoverableException import nextflow.executor.BashWrapperBuilder @@ -31,6 +32,7 @@ import nextflow.processor.TaskStatus import nextflow.trace.TraceRecord import nextflow.util.Escape import nextflow.SysEnv +import org.threeten.bp.OffsetDateTime import java.nio.file.Path @@ -51,7 +53,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { private String clientName = null - private String state + private TaskState state private long timestamp @@ -72,41 +74,46 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { @Override boolean checkIfRunning() { - if(isActive()) { - determineClientNode() + if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running") + if(isSubmitted()) { + def state = taskState0() + // include `terminated` state to allow the handler status to progress + if( state && ( ["running","terminated"].contains(state.state))){ + status = TaskStatus.RUNNING + determineClientNode() + return true + } } - nomadService.checkIfRunning(this.jobName) + return false } @Override boolean checkIfCompleted() { - if (!nomadService.checkIfDead(this.jobName)) { - return false - } + if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running") - state = taskState0(this.jobName) + def state = taskState0() - final isFinished = [ - "complete", - "failed", - "dead", - "lost"].contains(state) + final isFinished = state && (state.finishedAt != null || state.state == "unknow") - log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=$state completed=$isFinished" + log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=${state?.state} completed=$isFinished" if (isFinished) { // finalize the task task.exitStatus = readExitFile() task.stdout = outputFile task.stderr = errorFile - this.status = TaskStatus.COMPLETED - if (state == "failed" || state == "lost" || state == "unknown") + status = TaskStatus.COMPLETED + if ( !state || state.failed ) { task.error = new ProcessUnrecoverableException() + task.aborted = true + } if (shouldDelete()) { nomadService.jobPurge(this.jobName) } + updateTimestamps(state?.startedAt, state?.finishedAt) + determineClientNode() return true } @@ -180,13 +187,13 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { return ret } - protected String taskState0(String taskName) { + protected TaskState taskState0() { final now = System.currentTimeMillis() final delta = now - timestamp; if (!status || delta >= 1_000) { - def newState = nomadService.getJobState(jobName) - log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=$state newState=$newState" + def newState = nomadService.getTaskState(jobName) + log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=${state?.state} newState=${newState?.state}" if (newState) { state = newState @@ -229,4 +236,19 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask { return result } + void updateTimestamps(OffsetDateTime start, OffsetDateTime end){ + try { + startTimeMillis = start.toInstant().toEpochMilli() + completeTimeMillis = end.toInstant().toEpochMilli() + } catch( Exception e ) { + // Only update if startTimeMillis hasn't already been set. + // If startTimeMillis _has_ been set, then both startTimeMillis + // and completeTimeMillis will have been set with the normal + // TaskHandler mechanism, so there's no need to reset them here. + if (!startTimeMillis) { + startTimeMillis = System.currentTimeMillis() + completeTimeMillis = System.currentTimeMillis() + } + } + } } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy index 30b2f1c..193f3a5 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy @@ -110,7 +110,7 @@ class NomadDSLSpec extends Dsl2Spec{ if( recordedRequest.path.endsWith("/allocations")) { summary = true return new MockResponse().setResponseCode(200) - .setBody('{"Summary": {"test":{"Complete":1}}}').addHeader("Content-Type", "application/json") + .setBody(this.getClass().getResourceAsStream("/allocations.json").text).addHeader("Content-Type", "application/json") }else { return new MockResponse().setResponseCode(200) .setBody('{"Status": "dead"}').addHeader("Content-Type", "application/json") diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy index 66f84ad..7fa442e 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/executor/NomadServiceSpec.groovy @@ -65,7 +65,7 @@ class NomadServiceSpec extends Specification{ mockWebServer.enqueue(new MockResponse() .addHeader("Content-Type", "application/json")); - def state = service.getJobState("theId") + def state = service.getTaskState("theId") def recordedRequest = mockWebServer.takeRequest(); then: @@ -73,148 +73,14 @@ class NomadServiceSpec extends Specification{ recordedRequest.path == "/v1/job/theId/allocations" and: - state == "Unknown" + state.state == "unknown" when: mockWebServer.enqueue(new MockResponse() - .setBody(""" - [ - { - "ID": "ed344e0a-7290-d117-41d3-a64f853ca3c2", - "EvalID": "a9c5effc-2242-51b2-f1fe-054ee11ab189", - "Name": "example.cache[0]", - "NodeID": "cb1f6030-a220-4f92-57dc-7baaabdc3823", - "PreviousAllocation": "516d2753-0513-cfc7-57ac-2d6fac18b9dc", - "NextAllocation": "cd13d9b9-4f97-7184-c88b-7b451981616b", - "RescheduleTracker": { - "Events": [ - { - "PrevAllocID": "516d2753-0513-cfc7-57ac-2d6fac18b9dc", - "PrevNodeID": "9230cd3b-3bda-9a3f-82f9-b2ea8dedb20e", - "RescheduleTime": 1517434161192946200, - "Delay": 5000000000 - } - ] - }, - "JobID": "example", - "TaskGroup": "cache", - "DesiredStatus": "run", - "DesiredDescription": "", - "ClientStatus": "running", - "ClientDescription": "", - "TaskStates": { - "redis": { - "State": "running", - "Failed": false, - "StartedAt": "2017-05-25T23:41:23.240184101Z", - "FinishedAt": "0001-01-01T00:00:00Z", - "Events": [ - { - "Type": "Received", - "Time": 1495755675956923000, - "FailsTask": false, - "RestartReason": "", - "SetupError": "", - "DriverError": "", - "ExitCode": 0, - "Signal": 0, - "Message": "", - "KillTimeout": 0, - "KillError": "", - "KillReason": "", - "StartDelay": 0, - "DownloadError": "", - "ValidationError": "", - "DiskLimit": 0, - "FailedSibling": "", - "VaultError": "", - "TaskSignalReason": "", - "TaskSignal": "", - "DriverMessage": "" - }, - { - "Type": "Task Setup", - "Time": 1495755675957466400, - "FailsTask": false, - "RestartReason": "", - "SetupError": "", - "DriverError": "", - "ExitCode": 0, - "Signal": 0, - "Message": "Building Task Directory", - "KillTimeout": 0, - "KillError": "", - "KillReason": "", - "StartDelay": 0, - "DownloadError": "", - "ValidationError": "", - "DiskLimit": 0, - "FailedSibling": "", - "VaultError": "", - "TaskSignalReason": "", - "TaskSignal": "", - "DriverMessage": "" - }, - { - "Type": "Driver", - "Time": 1495755675970286800, - "FailsTask": false, - "RestartReason": "", - "SetupError": "", - "DriverError": "", - "ExitCode": 0, - "Signal": 0, - "Message": "", - "KillTimeout": 0, - "KillError": "", - "KillReason": "", - "StartDelay": 0, - "DownloadError": "", - "ValidationError": "", - "DiskLimit": 0, - "FailedSibling": "", - "VaultError": "", - "TaskSignalReason": "", - "TaskSignal": "", - "DriverMessage": "Downloading image redis:7" - }, - { - "Type": "Started", - "Time": 1495755683227522000, - "FailsTask": false, - "RestartReason": "", - "SetupError": "", - "DriverError": "", - "ExitCode": 0, - "Signal": 0, - "Message": "", - "KillTimeout": 0, - "KillError": "", - "KillReason": "", - "StartDelay": 0, - "DownloadError": "", - "ValidationError": "", - "DiskLimit": 0, - "FailedSibling": "", - "VaultError": "", - "TaskSignalReason": "", - "TaskSignal": "", - "DriverMessage": "" - } - ] - } - }, - "CreateIndex": 9, - "ModifyIndex": 13, - "CreateTime": 1495755675944527600, - "ModifyTime": 1495755675944527600 - } - ] - - """) + .setBody(this.getClass().getResourceAsStream("/allocations.json").text) .addHeader("Content-Type", "application/json")); - state = service.getJobState("theId") + state = service.getTaskState("theId") recordedRequest = mockWebServer.takeRequest(); then: @@ -222,7 +88,7 @@ class NomadServiceSpec extends Specification{ recordedRequest.path == "/v1/job/theId/allocations" and: - state == "running" + state.state == "running" } diff --git a/plugins/nf-nomad/src/testResources/allocations.json b/plugins/nf-nomad/src/testResources/allocations.json new file mode 100644 index 0000000..bd7b68e --- /dev/null +++ b/plugins/nf-nomad/src/testResources/allocations.json @@ -0,0 +1,132 @@ +[ + { + "ID": "ed344e0a-7290-d117-41d3-a64f853ca3c2", + "EvalID": "a9c5effc-2242-51b2-f1fe-054ee11ab189", + "Name": "example.cache[0]", + "NodeID": "cb1f6030-a220-4f92-57dc-7baaabdc3823", + "PreviousAllocation": "516d2753-0513-cfc7-57ac-2d6fac18b9dc", + "NextAllocation": "cd13d9b9-4f97-7184-c88b-7b451981616b", + "RescheduleTracker": { + "Events": [ + { + "PrevAllocID": "516d2753-0513-cfc7-57ac-2d6fac18b9dc", + "PrevNodeID": "9230cd3b-3bda-9a3f-82f9-b2ea8dedb20e", + "RescheduleTime": 1517434161192946200, + "Delay": 5000000000 + } + ] + }, + "JobID": "example", + "TaskGroup": "cache", + "DesiredStatus": "run", + "DesiredDescription": "", + "ClientStatus": "running", + "ClientDescription": "", + "TaskStates": { + "redis": { + "State": "running", + "Failed": false, + "StartedAt": "2017-05-25T23:41:23.240184101Z", + "FinishedAt": "0001-01-01T00:00:00Z", + "Events": [ + { + "Type": "Received", + "Time": 1495755675956923000, + "FailsTask": false, + "RestartReason": "", + "SetupError": "", + "DriverError": "", + "ExitCode": 0, + "Signal": 0, + "Message": "", + "KillTimeout": 0, + "KillError": "", + "KillReason": "", + "StartDelay": 0, + "DownloadError": "", + "ValidationError": "", + "DiskLimit": 0, + "FailedSibling": "", + "VaultError": "", + "TaskSignalReason": "", + "TaskSignal": "", + "DriverMessage": "" + }, + { + "Type": "Task Setup", + "Time": 1495755675957466400, + "FailsTask": false, + "RestartReason": "", + "SetupError": "", + "DriverError": "", + "ExitCode": 0, + "Signal": 0, + "Message": "Building Task Directory", + "KillTimeout": 0, + "KillError": "", + "KillReason": "", + "StartDelay": 0, + "DownloadError": "", + "ValidationError": "", + "DiskLimit": 0, + "FailedSibling": "", + "VaultError": "", + "TaskSignalReason": "", + "TaskSignal": "", + "DriverMessage": "" + }, + { + "Type": "Driver", + "Time": 1495755675970286800, + "FailsTask": false, + "RestartReason": "", + "SetupError": "", + "DriverError": "", + "ExitCode": 0, + "Signal": 0, + "Message": "", + "KillTimeout": 0, + "KillError": "", + "KillReason": "", + "StartDelay": 0, + "DownloadError": "", + "ValidationError": "", + "DiskLimit": 0, + "FailedSibling": "", + "VaultError": "", + "TaskSignalReason": "", + "TaskSignal": "", + "DriverMessage": "Downloading image redis:7" + }, + { + "Type": "Started", + "Time": 1495755683227522000, + "FailsTask": false, + "RestartReason": "", + "SetupError": "", + "DriverError": "", + "ExitCode": 0, + "Signal": 0, + "Message": "", + "KillTimeout": 0, + "KillError": "", + "KillReason": "", + "StartDelay": 0, + "DownloadError": "", + "ValidationError": "", + "DiskLimit": 0, + "FailedSibling": "", + "VaultError": "", + "TaskSignalReason": "", + "TaskSignal": "", + "DriverMessage": "" + } + ] + } + }, + "CreateIndex": 9, + "ModifyIndex": 13, + "CreateTime": 1495755675944527600, + "ModifyTime": 1495755675944527600 + } +] \ No newline at end of file diff --git a/validation/run-all.sh b/validation/run-all.sh index 8d41317..54bf01e 100755 --- a/validation/run-all.sh +++ b/validation/run-all.sh @@ -43,13 +43,15 @@ if [ "$SKIPLOCAL" == 0 ]; then -r dev -profile test,docker \ --outdir $(pwd)/nomad_temp/scratchdir/out - ./run-pipeline.sh -c basic/nextflow.config bactopia/bactopia \ - --accession SRX4563634 --coverage 100 --genome_size 2800000 \ - -profile test,docker --outdir $(pwd)/nomad_temp/scratchdir/bactopia/outdir \ - --datasets_cache $(pwd)/nomad_temp/scratchdir/bactopia/datasets +# Batctopia is failing with current version of nextflow due some bug in --max_cpus but cant find a fix +# ./run-pipeline.sh -c basic/nextflow.config bactopia/bactopia \ +# --accession SRX4563634 --coverage 100 --genome_size median --max_cpus 2 \ +# -profile test,docker --outdir $(pwd)/nomad_temp/scratchdir/bactopia/outdir \ +# --datasets_cache $(pwd)/nomad_temp/scratchdir/bactopia/datasets ./run-pipeline.sh -c secrets/nextflow.config secrets/main.nf + ./run-pipeline.sh -c tower/nextflow.config tower/main.nf else echo "skip local" fi diff --git a/validation/tower/main.nf b/validation/tower/main.nf new file mode 100644 index 0000000..2c53e53 --- /dev/null +++ b/validation/tower/main.nf @@ -0,0 +1,19 @@ +#!/usr/bin/env nextflow + +process sayHello { + container 'ubuntu:20.04' + + input: + val x + output: + stdout + script: + """ + sleep 10 + echo '$x world!' + """ +} + +workflow { + Channel.of('Bonjour', 'Ciao', 'Hello', 'Hola') | sayHello | view +} \ No newline at end of file diff --git a/validation/tower/nextflow.config b/validation/tower/nextflow.config new file mode 100644 index 0000000..fb92b2f --- /dev/null +++ b/validation/tower/nextflow.config @@ -0,0 +1,25 @@ +plugins { + id "nf-nomad@${System.getenv("NOMAD_PLUGIN_VERSION") ?: "latest"}" +} + +process { + executor = "nomad" +} + +tower { + enabled = true + workspaceId = "276172789442513" +} + +nomad { + + client { + address = "http://localhost:4646" + } + + jobs { + deleteOnCompletion = false + volume = { type "host" name "scratchdir" } + } + +}