Skip to content

Commit

Permalink
Merge branch 'master' into matthdsm-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
abhi18av authored Dec 8, 2024
2 parents d47df39 + 86ca369 commit bdc4d9c
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 190 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ghpages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: ghpages
on:
push:
branches:
- 'main'
- 'master'

workflow_dispatch:

Expand Down Expand Up @@ -46,4 +46,4 @@ jobs:
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v2
uses: actions/deploy-pages@v2
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ oracle-nomad-cluster
.settings
/validation/nomad_temp/**
/validation/nomad
**/*.tsv
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=0.2.0-edge1
version=0.3.1
github_organization=nextflow-io
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class NomadExecutor extends Executor implements ExtensionPoint {

@Override
TaskHandler createTaskHandler(TaskRun task) {
assert task
assert task.workDir
log.trace "[NOMAD] launching process > ${task.name} -- work folder: ${task.workDirStr}"
return new NomadTaskHandler(task, this.config, service)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import nextflow.nomad.builders.JobBuilder
import nextflow.nomad.config.NomadConfig
import nextflow.processor.TaskRun
import nextflow.exception.ProcessSubmitException
import org.threeten.bp.OffsetDateTime

import java.nio.file.Path

Expand Down Expand Up @@ -114,52 +115,38 @@ class NomadService implements Closeable{
}


String getJobState(String jobId){
TaskState getTaskState(String jobId){
try {
List<AllocationListStub> allocations = safeExecutor.apply {
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null,
null, null)
}
AllocationListStub last = allocations?.sort {
AllocationListStub last = allocations ? allocations.sort {
it.modifyIndex
}?.last()
String currentState = last?.taskStates?.values()?.last()?.state
log.debug "Task $jobId , state=$currentState"
currentState ?: "Unknown"
}?.last() : null
TaskState currentState = last?.taskStates?.values()?.last()
log.debug "Task $jobId , state=${currentState?.state}"
currentState ?: new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now())
}catch(Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
"dead"
new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now())
}
}



boolean checkIfRunning(String jobId){
String getJobStatus(String jobId){
try {
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status"
job.status == "running"
job.status
}catch (Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
false
}
}

boolean checkIfDead(String jobId){
try{
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status"
job.status == "dead"
}catch (Exception e){
log.debug("[NOMAD] Failed to get job ${jobId} -- Cause: ${e.message ?: e}", e)
true
"unknown"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.nomad.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.model.TaskState
import nextflow.exception.ProcessSubmitException
import nextflow.exception.ProcessUnrecoverableException
import nextflow.executor.BashWrapperBuilder
Expand All @@ -31,6 +32,7 @@ import nextflow.processor.TaskStatus
import nextflow.trace.TraceRecord
import nextflow.util.Escape
import nextflow.SysEnv
import org.threeten.bp.OffsetDateTime

import java.nio.file.Path

Expand All @@ -51,7 +53,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

private String clientName = null

private String state
private TaskState state

private long timestamp

Expand All @@ -72,41 +74,46 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

@Override
boolean checkIfRunning() {
if(isActive()) {
determineClientNode()
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")
if(isSubmitted()) {
def state = taskState0()
// include `terminated` state to allow the handler status to progress
if( state && ( ["running","terminated"].contains(state.state))){
status = TaskStatus.RUNNING
determineClientNode()
return true
}
}
nomadService.checkIfRunning(this.jobName)
return false
}

@Override
boolean checkIfCompleted() {
if (!nomadService.checkIfDead(this.jobName)) {
return false
}
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")

state = taskState0(this.jobName)
def state = taskState0()

final isFinished = [
"complete",
"failed",
"dead",
"lost"].contains(state)
final isFinished = state && (state.finishedAt != null || state.state == "unknow")

log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=$state completed=$isFinished"
log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=${state?.state} completed=$isFinished"

if (isFinished) {
// finalize the task
task.exitStatus = readExitFile()
task.stdout = outputFile
task.stderr = errorFile
this.status = TaskStatus.COMPLETED
if (state == "failed" || state == "lost" || state == "unknown")
status = TaskStatus.COMPLETED
if ( !state || state.failed ) {
task.error = new ProcessUnrecoverableException()
task.aborted = true
}

if (shouldDelete()) {
nomadService.jobPurge(this.jobName)
}

updateTimestamps(state?.startedAt, state?.finishedAt)
determineClientNode()
return true
}

Expand Down Expand Up @@ -180,13 +187,13 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
return ret
}

protected String taskState0(String taskName) {
protected TaskState taskState0() {
final now = System.currentTimeMillis()
final delta = now - timestamp;
if (!status || delta >= 1_000) {

def newState = nomadService.getJobState(jobName)
log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=$state newState=$newState"
def newState = nomadService.getTaskState(jobName)
log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=${state?.state} newState=${newState?.state}"

if (newState) {
state = newState
Expand Down Expand Up @@ -229,4 +236,19 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
return result
}

void updateTimestamps(OffsetDateTime start, OffsetDateTime end){
try {
startTimeMillis = start.toInstant().toEpochMilli()
completeTimeMillis = end.toInstant().toEpochMilli()
} catch( Exception e ) {
// Only update if startTimeMillis hasn't already been set.
// If startTimeMillis _has_ been set, then both startTimeMillis
// and completeTimeMillis will have been set with the normal
// TaskHandler mechanism, so there's no need to reset them here.
if (!startTimeMillis) {
startTimeMillis = System.currentTimeMillis()
completeTimeMillis = System.currentTimeMillis()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class NomadDSLSpec extends Dsl2Spec{
if( recordedRequest.path.endsWith("/allocations")) {
summary = true
return new MockResponse().setResponseCode(200)
.setBody('{"Summary": {"test":{"Complete":1}}}').addHeader("Content-Type", "application/json")
.setBody(this.getClass().getResourceAsStream("/allocations.json").text).addHeader("Content-Type", "application/json")
}else {
return new MockResponse().setResponseCode(200)
.setBody('{"Status": "dead"}').addHeader("Content-Type", "application/json")
Expand Down
Loading

0 comments on commit bdc4d9c

Please sign in to comment.