Skip to content

Commit

Permalink
Merge pull request #31 from nextflow-io/improve-pooling
Browse files Browse the repository at this point in the history
Improve pooling
  • Loading branch information
abhi18av authored Mar 13, 2024
2 parents f734b51 + d29b1d7 commit 4cb52d7
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 32 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.0.3-rc
version=0.0.3
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiClient
import io.nomadproject.client.api.JobsApi
import io.nomadproject.client.models.AllocationListStub
import io.nomadproject.client.models.Job
import io.nomadproject.client.models.JobRegisterRequest
import io.nomadproject.client.models.JobRegisterResponse
Expand All @@ -30,7 +31,6 @@ import io.nomadproject.client.models.Resources
import io.nomadproject.client.models.RestartPolicy
import io.nomadproject.client.models.Task
import io.nomadproject.client.models.TaskGroup
import io.nomadproject.client.models.TaskGroupSummary
import io.nomadproject.client.models.VolumeMount
import io.nomadproject.client.models.VolumeRequest
import nextflow.nomad.NomadConfig
Expand Down Expand Up @@ -120,7 +120,7 @@ class NomadService implements Closeable{
)


if( config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE){
if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
Expand All @@ -130,7 +130,7 @@ class NomadService implements Closeable{
)
}

if( config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE){
if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
Expand Down Expand Up @@ -185,24 +185,13 @@ class NomadService implements Closeable{


String state(String jobId){
JobSummary summary = jobsApi.getJobSummary(jobId, config.jobOpts.region, config.jobOpts.namespace, null, null, null, null, null, null, null)
TaskGroupSummary taskGroupSummary = summary?.summary?.values()?.first()
switch (taskGroupSummary){
case {taskGroupSummary?.starting }:
return TaskGroupSummary.SERIALIZED_NAME_STARTING
case {taskGroupSummary?.complete }:
return TaskGroupSummary.SERIALIZED_NAME_COMPLETE
case {taskGroupSummary?.failed }:
return TaskGroupSummary.SERIALIZED_NAME_FAILED
case {taskGroupSummary?.lost }:
return TaskGroupSummary.SERIALIZED_NAME_LOST
case {taskGroupSummary?.queued }:
return TaskGroupSummary.SERIALIZED_NAME_QUEUED
case {taskGroupSummary?.running }:
return TaskGroupSummary.SERIALIZED_NAME_RUNNING
default:
TaskGroupSummary.SERIALIZED_NAME_UNKNOWN
}
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts.region, config.jobOpts.namespace, null, null, null, null, null, null, null, null)
AllocationListStub last = allocations?.sort{
it.modifyIndex
}?.last()
String currentState = last?.taskStates?.values()?.last()?.state
log.debug "Task $jobId , state=$currentState"
currentState ?: "Unknown"
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ class NomadTaskHandler extends TaskHandler implements FusionAwareTask {
state = taskState0(this.jobName)

final isDone = [
TaskGroupSummary.SERIALIZED_NAME_COMPLETE,
TaskGroupSummary.SERIALIZED_NAME_FAILED,
TaskGroupSummary.SERIALIZED_NAME_LOST].contains(state)
"complete",
"failed",
"dead",
"lost"].contains(state)

log.debug "[NOMAD] Task status $task.name taskId=${this.jobName}; state=$state completed=$isDone"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class NomadDSLSpec extends Dsl2Spec{
MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws InterruptedException {
switch (recordedRequest.method.toLowerCase()){
case "get":
if( recordedRequest.path.endsWith("/summary")) {
if( recordedRequest.path.endsWith("/allocations")) {
summary = true
return new MockResponse().setResponseCode(200)
.setBody('{"Summary": {"test":{"Complete":1}}}').addHeader("Content-Type", "application/json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,159 @@ class NomadServiceSpec extends Specification{

then:
recordedRequest.method == "GET"
recordedRequest.path == "/v1/job/theId/summary"
recordedRequest.path == "/v1/job/theId/allocations"

and:
state == "Unknown"

when:
mockWebServer.enqueue(new MockResponse()
.setBody(JsonOutput.toJson(["JobID":"test","Summary":[
test:[Starting:1]
]]).toString())
.setBody("""
[
{
"ID": "ed344e0a-7290-d117-41d3-a64f853ca3c2",
"EvalID": "a9c5effc-2242-51b2-f1fe-054ee11ab189",
"Name": "example.cache[0]",
"NodeID": "cb1f6030-a220-4f92-57dc-7baaabdc3823",
"PreviousAllocation": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"NextAllocation": "cd13d9b9-4f97-7184-c88b-7b451981616b",
"RescheduleTracker": {
"Events": [
{
"PrevAllocID": "516d2753-0513-cfc7-57ac-2d6fac18b9dc",
"PrevNodeID": "9230cd3b-3bda-9a3f-82f9-b2ea8dedb20e",
"RescheduleTime": 1517434161192946200,
"Delay": 5000000000
}
]
},
"JobID": "example",
"TaskGroup": "cache",
"DesiredStatus": "run",
"DesiredDescription": "",
"ClientStatus": "running",
"ClientDescription": "",
"TaskStates": {
"redis": {
"State": "running",
"Failed": false,
"StartedAt": "2017-05-25T23:41:23.240184101Z",
"FinishedAt": "0001-01-01T00:00:00Z",
"Events": [
{
"Type": "Received",
"Time": 1495755675956923000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Task Setup",
"Time": 1495755675957466400,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "Building Task Directory",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
},
{
"Type": "Driver",
"Time": 1495755675970286800,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": "Downloading image redis:7"
},
{
"Type": "Started",
"Time": 1495755683227522000,
"FailsTask": false,
"RestartReason": "",
"SetupError": "",
"DriverError": "",
"ExitCode": 0,
"Signal": 0,
"Message": "",
"KillTimeout": 0,
"KillError": "",
"KillReason": "",
"StartDelay": 0,
"DownloadError": "",
"ValidationError": "",
"DiskLimit": 0,
"FailedSibling": "",
"VaultError": "",
"TaskSignalReason": "",
"TaskSignal": "",
"DriverMessage": ""
}
]
}
},
"CreateIndex": 9,
"ModifyIndex": 13,
"CreateTime": 1495755675944527600,
"ModifyTime": 1495755675944527600
}
]
""")
.addHeader("Content-Type", "application/json"));

state = service.state("theId")
recordedRequest = mockWebServer.takeRequest();

then:
recordedRequest.method == "GET"
recordedRequest.path == "/v1/job/theId/summary"
recordedRequest.path == "/v1/job/theId/allocations"

and:
state == "Starting"
state == "running"

}

Expand Down

0 comments on commit 4cb52d7

Please sign in to comment.