Skip to content

Commit

Permalink
Improve the polling logic using the allocation response
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <[email protected]>
  • Loading branch information
jagedn committed Mar 3, 2024
1 parent 21831ac commit 477e519
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiClient
import io.nomadproject.client.api.JobsApi
import io.nomadproject.client.models.AllocationListStub
import io.nomadproject.client.models.Job
import io.nomadproject.client.models.JobRegisterRequest
import io.nomadproject.client.models.JobRegisterResponse
import io.nomadproject.client.models.JobSummary
import io.nomadproject.client.models.Task
import io.nomadproject.client.models.TaskGroup
import io.nomadproject.client.models.TaskGroupSummary
import nextflow.nomad.NomadConfig

/**
Expand Down Expand Up @@ -110,24 +109,13 @@ class NomadService implements Closeable{


String state(String jobId){
JobSummary summary = jobsApi.getJobSummary(jobId, config.jobOpts.region, config.jobOpts.namespace, null, null, null, null, null, null, null)
TaskGroupSummary taskGroupSummary = summary?.summary?.values()?.first()
switch (taskGroupSummary){
case {taskGroupSummary?.starting }:
return TaskGroupSummary.SERIALIZED_NAME_STARTING
case {taskGroupSummary?.complete }:
return TaskGroupSummary.SERIALIZED_NAME_COMPLETE
case {taskGroupSummary?.failed }:
return TaskGroupSummary.SERIALIZED_NAME_FAILED
case {taskGroupSummary?.lost }:
return TaskGroupSummary.SERIALIZED_NAME_LOST
case {taskGroupSummary?.queued }:
return TaskGroupSummary.SERIALIZED_NAME_QUEUED
case {taskGroupSummary?.running }:
return TaskGroupSummary.SERIALIZED_NAME_RUNNING
default:
TaskGroupSummary.SERIALIZED_NAME_UNKNOWN
}
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts.region, config.jobOpts.namespace, null, null, null, null, null, null, null, null)
AllocationListStub last = allocations?.sort{
it.modifyIndex
}?.last()
String currentState = last?.taskStates?.values()?.last()?.state
log.info "Task $jobId , state=$currentState"
currentState ?: "Unknown"
}

boolean checkIfRunning(String jobId){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
state = taskState0(this.jobName)

final isDone = [
TaskGroupSummary.SERIALIZED_NAME_COMPLETE,
TaskGroupSummary.SERIALIZED_NAME_FAILED,
TaskGroupSummary.SERIALIZED_NAME_LOST].contains(state)
"complete",
"failed",
"dead",
"lost"].contains(state)

log.debug "[NOMAD] Task status $task.name taskId=${this.jobName}; state=$state completed=$isDone"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class NomadDSLSpec extends Dsl2Spec{
MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws InterruptedException {
switch (recordedRequest.method.toLowerCase()){
case "get":
if( recordedRequest.path.endsWith("/summary")) {
if( recordedRequest.path.endsWith("/allocations")) {
summary = true
return new MockResponse().setResponseCode(200)
.setBody('{"Summary": {"test":{"Complete":1}}}').addHeader("Content-Type", "application/json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,27 +164,159 @@ class NomadServiceSpec extends Specification{

then:
recordedRequest.method == "GET"
recordedRequest.path == "/v1/job/theId/summary"
recordedRequest.path == "/v1/job/theId/allocations"

and:
state == "Unknown"

when:
mockWebServer.enqueue(new MockResponse()
.setBody(JsonOutput.toJson(["JobID":"test","Summary":[
test:[Starting:1]
]]).toString())
.setBody("""
[
{
"ID": "ed344e0a-7290-d117-41d3-a64f853ca3c2",
"EvalID": "a9c5effc-2242-51b2-f1fe-054ee11ab189",
"Name": "example.cache[0]",
"NodeID": "cb1f6030-a220-4f92-57dc-7baaabdc3823",
"PreviousAllocation": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"NextAllocation": "cd13d9b9-4f97-7184-c88b-7b451981616b",
"RescheduleTracker": {
"Events": [
{
"PrevAllocID": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"PrevNodeID": "9230cd3b-3bda-9a3f-82f9-b2ea8dedb20e",
"RescheduleTime": 1517434161192946200,
"Delay": 5000000000
}
]
},
"JobID": "example",
"TaskGroup": "cache",
"DesiredStatus": "run",
"DesiredDescription": "",
"ClientStatus": "running",
"ClientDescription": "",
"TaskStates": {
"redis": {
"State": "running",
"Failed": false,
"StartedAt": "2017-05-25T23:41:23.240184101Z",
"FinishedAt": "0001-01-01T00:00:00Z",
"Events": [
{
"Type": "Received",
"Time": 1495755675956923000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Task Setup",
"Time": 1495755675957466400,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "Building Task Directory",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Driver",
"Time": 1495755675970286800,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": "Downloading image redis:7"
},
{
"Type": "Started",
"Time": 1495755683227522000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
}
]
}
},
"CreateIndex": 9,
"ModifyIndex": 13,
"CreateTime": 1495755675944527600,
"ModifyTime": 1495755675944527600
}
]
""")
.addHeader("Content-Type", "application/json"));

state = service.state("theId")
recordedRequest = mockWebServer.takeRequest();

then:
recordedRequest.method == "GET"
recordedRequest.path == "/v1/job/theId/summary"
recordedRequest.path == "/v1/job/theId/allocations"

and:
state == "Starting"
state == "running"

}
}

0 comments on commit 477e519

Please sign in to comment.