Skip to content

Commit

Permalink
refactor job status query logic
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <[email protected]>
  • Loading branch information
jagedn committed Nov 9, 2024
1 parent 4f9e45f commit 284305d
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import nextflow.nomad.builders.JobBuilder
import nextflow.nomad.config.NomadConfig
import nextflow.processor.TaskRun
import nextflow.exception.ProcessSubmitException
import org.threeten.bp.OffsetDateTime

import java.nio.file.Path

Expand Down Expand Up @@ -125,11 +126,11 @@ class NomadService implements Closeable{
it.modifyIndex
}?.last() : null
TaskState currentState = last?.taskStates?.values()?.last()
log.debug "Task $jobId , state=${currentState.state}"
currentState
log.debug "Task $jobId , state=${currentState?.state}"
currentState ?: new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now())
}catch(Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
new TaskState(state: "unknow")
new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now())
}
}

Expand All @@ -145,7 +146,7 @@ class NomadService implements Closeable{
job.status
}catch (Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
"Unknow"
"unknown"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
if(isSubmitted()) {
def state = taskState0()
// include `terminated` state to allow the handler status to progress
if( state && ( "running" == state.state || "terminated" == state.state)){
if( state && ( ["running","terminated"].contains(state.state))){
status = TaskStatus.RUNNING
determineClientNode()
return true
Expand All @@ -93,7 +93,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

def state = taskState0()

final isFinished = state && state.finishedAt != null
final isFinished = state && (state.finishedAt != null || state.state == "unknow")

log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=${state?.state} completed=$isFinished"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class NomadDSLSpec extends Dsl2Spec{
if( recordedRequest.path.endsWith("/allocations")) {
summary = true
return new MockResponse().setResponseCode(200)
.setBody('{"Summary": {"test":{"Complete":1}}}').addHeader("Content-Type", "application/json")
.setBody(this.getClass().getResourceAsStream("/allocations.json").text).addHeader("Content-Type", "application/json")
}else {
return new MockResponse().setResponseCode(200)
.setBody('{"Status": "dead"}').addHeader("Content-Type", "application/json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,145 +73,11 @@ class NomadServiceSpec extends Specification{
recordedRequest.path == "/v1/job/theId/allocations"

and:
state == "Unknown"
state.state == "unknown"

when:
mockWebServer.enqueue(new MockResponse()
.setBody("""
[
{
"ID": "ed344e0a-7290-d117-41d3-a64f853ca3c2",
"EvalID": "a9c5effc-2242-51b2-f1fe-054ee11ab189",
"Name": "example.cache[0]",
"NodeID": "cb1f6030-a220-4f92-57dc-7baaabdc3823",
"PreviousAllocation": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"NextAllocation": "cd13d9b9-4f97-7184-c88b-7b451981616b",
"RescheduleTracker": {
"Events": [
{
"PrevAllocID": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"PrevNodeID": "9230cd3b-3bda-9a3f-82f9-b2ea8dedb20e",
"RescheduleTime": 1517434161192946200,
"Delay": 5000000000
}
]
},
"JobID": "example",
"TaskGroup": "cache",
"DesiredStatus": "run",
"DesiredDescription": "",
"ClientStatus": "running",
"ClientDescription": "",
"TaskStates": {
"redis": {
"State": "running",
"Failed": false,
"StartedAt": "2017-05-25T23:41:23.240184101Z",
"FinishedAt": "0001-01-01T00:00:00Z",
"Events": [
{
"Type": "Received",
"Time": 1495755675956923000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Task Setup",
"Time": 1495755675957466400,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "Building Task Directory",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Driver",
"Time": 1495755675970286800,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": "Downloading image redis:7"
},
{
"Type": "Started",
"Time": 1495755683227522000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
}
]
}
},
"CreateIndex": 9,
"ModifyIndex": 13,
"CreateTime": 1495755675944527600,
"ModifyTime": 1495755675944527600
}
]
""")
.setBody(this.getClass().getResourceAsStream("/allocations.json").text)
.addHeader("Content-Type", "application/json"));

state = service.getTaskState("theId")
Expand All @@ -222,7 +88,7 @@ class NomadServiceSpec extends Specification{
recordedRequest.path == "/v1/job/theId/allocations"

and:
state == "running"
state.state == "running"

}

Expand Down
132 changes: 132 additions & 0 deletions plugins/nf-nomad/src/testResources/allocations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
[
{
"ID": "ed344e0a-7290-d117-41d3-a64f853ca3c2",
"EvalID": "a9c5effc-2242-51b2-f1fe-054ee11ab189",
"Name": "example.cache[0]",
"NodeID": "cb1f6030-a220-4f92-57dc-7baaabdc3823",
"PreviousAllocation": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"NextAllocation": "cd13d9b9-4f97-7184-c88b-7b451981616b",
"RescheduleTracker": {
"Events": [
{
"PrevAllocID": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"PrevNodeID": "9230cd3b-3bda-9a3f-82f9-b2ea8dedb20e",
"RescheduleTime": 1517434161192946200,
"Delay": 5000000000
}
]
},
"JobID": "example",
"TaskGroup": "cache",
"DesiredStatus": "run",
"DesiredDescription": "",
"ClientStatus": "running",
"ClientDescription": "",
"TaskStates": {
"redis": {
"State": "running",
"Failed": false,
"StartedAt": "2017-05-25T23:41:23.240184101Z",
"FinishedAt": "0001-01-01T00:00:00Z",
"Events": [
{
"Type": "Received",
"Time": 1495755675956923000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Task Setup",
"Time": 1495755675957466400,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "Building Task Directory",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Driver",
"Time": 1495755675970286800,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": "Downloading image redis:7"
},
{
"Type": "Started",
"Time": 1495755683227522000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
}
]
}
},
"CreateIndex": 9,
"ModifyIndex": 13,
"CreateTime": 1495755675944527600,
"ModifyTime": 1495755675944527600
}
]
10 changes: 6 additions & 4 deletions validation/run-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ if [ "$SKIPLOCAL" == 0 ]; then
-r dev -profile test,docker \
--outdir $(pwd)/nomad_temp/scratchdir/out

./run-pipeline.sh -c basic/nextflow.config bactopia/bactopia \
--accession SRX4563634 --coverage 100 --genome_size 2800000 \
-profile test,docker --outdir $(pwd)/nomad_temp/scratchdir/bactopia/outdir \
--datasets_cache $(pwd)/nomad_temp/scratchdir/bactopia/datasets
# Batctopia is failing with current version of nextflow due some bug in --max_cpus but cant find a fix
# ./run-pipeline.sh -c basic/nextflow.config bactopia/bactopia \
# --accession SRX4563634 --coverage 100 --genome_size median --max_cpus 2 \
# -profile test,docker --outdir $(pwd)/nomad_temp/scratchdir/bactopia/outdir \
# --datasets_cache $(pwd)/nomad_temp/scratchdir/bactopia/datasets

./run-pipeline.sh -c secrets/nextflow.config secrets/main.nf

./run-pipeline.sh -c tower/nextflow.config tower/main.nf
else
echo "skip local"
fi
Expand Down

0 comments on commit 284305d

Please sign in to comment.