Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor job status query logic as per Tower (seqera platform) integration. #91

Merged
merged 3 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ oracle-nomad-cluster
.settings
/validation/nomad_temp/**
/validation/nomad
**/*.tsv
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=0.2.0-edge1
version=0.3.1
github_organization=nextflow-io
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class NomadExecutor extends Executor implements ExtensionPoint {

@Override
TaskHandler createTaskHandler(TaskRun task) {
assert task
assert task.workDir
log.trace "[NOMAD] launching process > ${task.name} -- work folder: ${task.workDirStr}"
return new NomadTaskHandler(task, this.config, service)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import nextflow.nomad.builders.JobBuilder
import nextflow.nomad.config.NomadConfig
import nextflow.processor.TaskRun
import nextflow.exception.ProcessSubmitException
import org.threeten.bp.OffsetDateTime

import java.nio.file.Path

Expand Down Expand Up @@ -114,52 +115,38 @@ class NomadService implements Closeable{
}


String getJobState(String jobId){
TaskState getTaskState(String jobId){
try {
List<AllocationListStub> allocations = safeExecutor.apply {
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null,
null, null)
}
AllocationListStub last = allocations?.sort {
AllocationListStub last = allocations ? allocations.sort {
it.modifyIndex
}?.last()
String currentState = last?.taskStates?.values()?.last()?.state
log.debug "Task $jobId , state=$currentState"
currentState ?: "Unknown"
}?.last() : null
TaskState currentState = last?.taskStates?.values()?.last()
log.debug "Task $jobId , state=${currentState?.state}"
currentState ?: new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now())
}catch(Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
"dead"
new TaskState(state: "unknown", failed: true, finishedAt: OffsetDateTime.now())
}
}



boolean checkIfRunning(String jobId){
String getJobStatus(String jobId){
try {
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status"
job.status == "running"
job.status
}catch (Exception e){
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
false
}
}

boolean checkIfDead(String jobId){
try{
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status"
job.status == "dead"
}catch (Exception e){
log.debug("[NOMAD] Failed to get job ${jobId} -- Cause: ${e.message ?: e}", e)
true
"unknown"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.nomad.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.model.TaskState
import nextflow.exception.ProcessSubmitException
import nextflow.exception.ProcessUnrecoverableException
import nextflow.executor.BashWrapperBuilder
Expand All @@ -31,6 +32,7 @@ import nextflow.processor.TaskStatus
import nextflow.trace.TraceRecord
import nextflow.util.Escape
import nextflow.SysEnv
import org.threeten.bp.OffsetDateTime

import java.nio.file.Path

Expand All @@ -51,7 +53,7 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

private String clientName = null

private String state
private TaskState state

private long timestamp

Expand All @@ -72,41 +74,46 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {

@Override
boolean checkIfRunning() {
if(isActive()) {
determineClientNode()
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")
if(isSubmitted()) {
def state = taskState0()
// include `terminated` state to allow the handler status to progress
if( state && ( ["running","terminated"].contains(state.state))){
status = TaskStatus.RUNNING
determineClientNode()
return true
}
}
nomadService.checkIfRunning(this.jobName)
return false
}

@Override
boolean checkIfCompleted() {
if (!nomadService.checkIfDead(this.jobName)) {
return false
}
if( !jobName ) throw new IllegalStateException("Missing Nomad Job name -- cannot check if running")

state = taskState0(this.jobName)
def state = taskState0()

final isFinished = [
"complete",
"failed",
"dead",
"lost"].contains(state)
final isFinished = state && (state.finishedAt != null || state.state == "unknow")

log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=$state completed=$isFinished"
log.debug "[NOMAD] checkIfCompleted task.name=$task.name; state=${state?.state} completed=$isFinished"

if (isFinished) {
// finalize the task
task.exitStatus = readExitFile()
task.stdout = outputFile
task.stderr = errorFile
this.status = TaskStatus.COMPLETED
if (state == "failed" || state == "lost" || state == "unknown")
status = TaskStatus.COMPLETED
if ( !state || state.failed ) {
task.error = new ProcessUnrecoverableException()
task.aborted = true
}

if (shouldDelete()) {
nomadService.jobPurge(this.jobName)
}

updateTimestamps(state?.startedAt, state?.finishedAt)
determineClientNode()
return true
}

Expand Down Expand Up @@ -180,13 +187,13 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
return ret
}

protected String taskState0(String taskName) {
protected TaskState taskState0() {
final now = System.currentTimeMillis()
final delta = now - timestamp;
if (!status || delta >= 1_000) {

def newState = nomadService.getJobState(jobName)
log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=$state newState=$newState"
def newState = nomadService.getTaskState(jobName)
log.debug "[NOMAD] Check jobState: jobName=$jobName currentState=${state?.state} newState=${newState?.state}"

if (newState) {
state = newState
Expand Down Expand Up @@ -229,4 +236,19 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
return result
}

void updateTimestamps(OffsetDateTime start, OffsetDateTime end){
try {
startTimeMillis = start.toInstant().toEpochMilli()
completeTimeMillis = end.toInstant().toEpochMilli()
} catch( Exception e ) {
// Only update if startTimeMillis hasn't already been set.
// If startTimeMillis _has_ been set, then both startTimeMillis
// and completeTimeMillis will have been set with the normal
// TaskHandler mechanism, so there's no need to reset them here.
if (!startTimeMillis) {
startTimeMillis = System.currentTimeMillis()
completeTimeMillis = System.currentTimeMillis()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class NomadDSLSpec extends Dsl2Spec{
if( recordedRequest.path.endsWith("/allocations")) {
summary = true
return new MockResponse().setResponseCode(200)
.setBody('{"Summary": {"test":{"Complete":1}}}').addHeader("Content-Type", "application/json")
.setBody(this.getClass().getResourceAsStream("/allocations.json").text).addHeader("Content-Type", "application/json")
}else {
return new MockResponse().setResponseCode(200)
.setBody('{"Status": "dead"}').addHeader("Content-Type", "application/json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,164 +65,30 @@ class NomadServiceSpec extends Specification{
mockWebServer.enqueue(new MockResponse()
.addHeader("Content-Type", "application/json"));

def state = service.getJobState("theId")
def state = service.getTaskState("theId")
def recordedRequest = mockWebServer.takeRequest();

then:
recordedRequest.method == "GET"
recordedRequest.path == "/v1/job/theId/allocations"

and:
state == "Unknown"
state.state == "unknown"

when:
mockWebServer.enqueue(new MockResponse()
.setBody("""
[
{
"ID": "ed344e0a-7290-d117-41d3-a64f853ca3c2",
"EvalID": "a9c5effc-2242-51b2-f1fe-054ee11ab189",
"Name": "example.cache[0]",
"NodeID": "cb1f6030-a220-4f92-57dc-7baaabdc3823",
"PreviousAllocation": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"NextAllocation": "cd13d9b9-4f97-7184-c88b-7b451981616b",
"RescheduleTracker": {
"Events": [
{
"PrevAllocID": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"PrevNodeID": "9230cd3b-3bda-9a3f-82f9-b2ea8dedb20e",
"RescheduleTime": 1517434161192946200,
"Delay": 5000000000
}
]
},
"JobID": "example",
"TaskGroup": "cache",
"DesiredStatus": "run",
"DesiredDescription": "",
"ClientStatus": "running",
"ClientDescription": "",
"TaskStates": {
"redis": {
"State": "running",
"Failed": false,
"StartedAt": "2017-05-25T23:41:23.240184101Z",
"FinishedAt": "0001-01-01T00:00:00Z",
"Events": [
{
"Type": "Received",
"Time": 1495755675956923000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Task Setup",
"Time": 1495755675957466400,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "Building Task Directory",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Driver",
"Time": 1495755675970286800,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": "Downloading image redis:7"
},
{
"Type": "Started",
"Time": 1495755683227522000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
}
]
}
},
"CreateIndex": 9,
"ModifyIndex": 13,
"CreateTime": 1495755675944527600,
"ModifyTime": 1495755675944527600
}
]

""")
.setBody(this.getClass().getResourceAsStream("/allocations.json").text)
.addHeader("Content-Type", "application/json"));

state = service.getJobState("theId")
state = service.getTaskState("theId")
recordedRequest = mockWebServer.takeRequest();

then:
recordedRequest.method == "GET"
recordedRequest.path == "/v1/job/theId/allocations"

and:
state == "running"
state.state == "running"

}

Expand Down
Loading
Loading