Skip to content

Commit

Permalink
map further statuses to running or completed
Browse files Browse the repository at this point in the history
  • Loading branch information
abhi18av committed Dec 10, 2024
1 parent fe9b9b7 commit b740505
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class JobBuilder {

def task = createTask(taskRun, args, env, jobOpts)
def taskGroup = new TaskGroup(
name: "group",
name: "nf-taskgroup",
tasks: [ task ],
reschedulePolicy: taskReschedulePolicy,
restartPolicy: taskRestartPolicy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ class NomadService implements Closeable{
it.modifyIndex
}?.last() : null
TaskState currentState = last?.taskStates?.values()?.last()
log.debug "Task $jobId , state=${currentState?.state}"
log.debug "[NOMAD] getTaskStatus $jobId , state=${currentState?.state}"
currentState ?: new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now())
}catch(Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
log.debug("[NOMAD] getTaskStatus Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now())
}
}
Expand All @@ -142,10 +142,10 @@ class NomadService implements Closeable{
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status"
log.debug "[NOMAD] getJobStatus jobID=$job.ID; status=$job.status"
job.status
}catch (Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
log.debug("[NOMAD] getJobStatus Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
"unknown"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,41 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT)
}


//-------------------------------------------------
//
// NOTE: From https://github.com/hashicorp/nomad/blob/6a41dc7b2f1fdbbc5a20ed267b4ad25fc2a14489/api/jobs.go#L1263-L1287
//
//-------------------------------------------------
// type JobChildrenSummary struct {
// Pending int64
// Running int64
// Dead int64
// }
//-------------------------------------------------
// type TaskGroupSummary struct {
// Queued int
// Complete int
// Failed int
// Running int
// Starting int
// Lost int
// Unknown int
// }
//-------------------------------------------------


@Override
boolean checkIfRunning() {
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")
if( !jobName ) throw new IllegalStateException("[NOMAD] Missing Nomad Job name -- cannot check if running")
if(isSubmitted()) {
def state = taskState0()

log.debug "[NOMAD] checkIfRunning task=$task.name; state=${state?.state}"

// include `terminated` state to allow the handler status to progress
if( state && ( ["running","terminated"].contains(state.state))){
status = TaskStatus.RUNNING
if( state && ( ["running","pending","unknown"].contains(state.state))){
this.status = TaskStatus.RUNNING
determineClientNode()
return true
}
Expand All @@ -89,15 +116,14 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

@Override
boolean checkIfCompleted() {
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")
if( !jobName ) throw new IllegalStateException("[NOMAD] Missing Nomad Job name -- cannot check if running")
def isFinished = false

def state = taskState0()

final isFinished = state && (state.finishedAt != null || state.state == "unknow")

log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=${state?.state} completed=$isFinished"
log.debug "[NOMAD] checkIfCompleted task=$task.name; state=${state?.state}"

if (isFinished) {
if( state && ( ["dead","complete"].contains(state.state))){
// finalize the task
task.exitStatus = readExitFile()
task.stdout = outputFile
Expand Down Expand Up @@ -193,7 +219,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
if (!status || delta >= 1_000) {

def newState = nomadService.getTaskState(jobName)
log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=${state?.state} newState=${newState?.state}"
log.debug "[NOMAD] taskState0 jobName=$jobName currentState=${state?.state} newState=${newState?.state}"

if (newState) {
state = newState
Expand Down
6 changes: 3 additions & 3 deletions plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class NomadDSLSpec extends Dsl2Spec{
}

when:
def SCRIPT = '''
def SCRIPT = '''
process sayHello{
container 'ubuntu:22.0.4'
input:
Expand All @@ -136,10 +136,10 @@ class NomadDSLSpec extends Dsl2Spec{
"""
echo '$x world!\'
"""
}
}
workflow {
channel.of('hi!') | sayHello | view
}
}
'''
and:
def result = new MockScriptRunner([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class JobBuilderSpec extends Specification {
def taskGroup = JobBuilder.createTaskGroup(taskRun, args, env, jobOpts)

then:
taskGroup.name == "group"
taskGroup.name == "nf-taskgroup"
taskGroup.tasks.size() == 1
taskGroup.tasks[0].name == "nf-task"
taskGroup.tasks[0].config.image == "test-container"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class NomadServiceSpec extends Specification{
body.Job.Name == name
body.Job.Type == "batch"
body.Job.TaskGroups.size() == 1
body.Job.TaskGroups[0].Name == "group"
body.Job.TaskGroups[0].Name == "nf-taskgroup"
body.Job.TaskGroups[0].Tasks.size() == 1
body.Job.TaskGroups[0].Tasks[0].Name == "nf-task"
body.Job.TaskGroups[0].Tasks[0].Resources.Cores == 1
Expand Down Expand Up @@ -224,7 +224,7 @@ class NomadServiceSpec extends Specification{
body.Job.Name == name
body.Job.Type == "batch"
body.Job.TaskGroups.size() == 1
body.Job.TaskGroups[0].Name == "group"
body.Job.TaskGroups[0].Name == "nf-taskgroup"
body.Job.TaskGroups[0].Tasks.size() == 1
body.Job.TaskGroups[0].Tasks[0].Name == "nf-task"
body.Job.TaskGroups[0].Tasks[0].Resources.Cores == 1
Expand Down
17 changes: 9 additions & 8 deletions validation/sun-nomadlab/nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ wave {
fusion {
enabled = true
exportStorageCredentials = true
logLevel = 'verbose' // 'info' | 'debug'
}

nomad {
Expand All @@ -45,15 +46,15 @@ nomad {

volume = { type "host" name "scratch" }

constraints = {
// attr {
// unique = [hostname:'nomad03']
// //raw 'platform.aws.instance-type', '=', 'm4.xlarge'
// constraints = {
// // attr {
// // unique = [hostname:'nomad03']
// // //raw 'platform.aws.instance-type', '=', 'm4.xlarge'

node {
unique = [name: "nomad03"]
}
// node {
// unique = [name: "nomad03"]
// }

}
// }
}
}

0 comments on commit b740505

Please sign in to comment.