Skip to content

Commit

Permalink
Implement an exception handler for api client
Browse files Browse the repository at this point in the history
  • Loading branch information
abhi18av committed Jul 17, 2024
1 parent 88474c9 commit 845db90
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class NomadClientOpts{

final String address
final String token
final connectionTimeout

//NOTE: For now, these are not exposed.
final readTimeout = 6000
final writeTimeout = 6000

NomadClientOpts(Map nomadClientOpts, Map<String,String> env=null){
assert nomadClientOpts!=null
Expand All @@ -50,6 +55,7 @@ class NomadClientOpts{
address +="/"
this.address = address + API_VERSION
this.token = nomadClientOpts.token ?: sysEnv.get('NOMAD_TOKEN')
this.connectionTimeout = nomadClientOpts.connectionTimeout ?: 6000

//TODO: Add mTLS properties and env vars
// https://developer.hashicorp.com/nomad/docs/commands#mtls-environment-variables
Expand Down
127 changes: 66 additions & 61 deletions plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import java.nio.file.Path

@Slf4j
@CompileStatic
class NomadService implements Closeable{
class NomadService implements Closeable {

NomadConfig config

Expand All @@ -49,29 +49,36 @@ class NomadService implements Closeable{
NomadService(NomadConfig config) {
this.config = config

//TODO: Accommodate these connection level options in clientOpts()
final CONNECTION_TIMEOUT_MILLISECONDS = 60000
final READ_TIMEOUT_MILLISECONDS = 60000
final WRITE_TIMEOUT_MILLISECONDS = 60000
ApiClient apiClient = new ApiClient(connectTimeout: config.clientOpts().connectionTimeout,
readTimeout: config.clientOpts().readTimeout,
writeTimeout: config.clientOpts().writeTimeout)

ApiClient apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS)
apiClient.basePath = config.clientOpts().address
log.debug "[NOMAD] Client Address: ${config.clientOpts().address}"

if( config.clientOpts().token ){
log.debug "[NOMAD] Client Token: ${config.clientOpts().token?.take(5)}.."
apiClient.apiKey = config.clientOpts().token
try {
apiClient.basePath = config.clientOpts().address
log.debug "[NOMAD] Client Address: ${config.clientOpts().address}"

if (config.clientOpts().token) {
log.debug "[NOMAD] Client Token: ${config.clientOpts().token?.take(5)}.."
apiClient.apiKey = config.clientOpts().token
}
this.jobsApi = new JobsApi(apiClient)

} catch (Exception e) {

log.warn "[NOMAD] Cannot establish connection with the server | ${e.message}"

}
this.jobsApi = new JobsApi(apiClient)

}

protected Resources getResources(TaskRun task) {
final DEFAULT_CPUS = 1
final DEFAULT_MEMORY = "500.MB"

final taskCfg = task.getConfig()
final taskCores = !taskCfg.get("cpus") ? DEFAULT_CPUS : taskCfg.get("cpus") as Integer
final taskMemory = taskCfg.get("memory") ? new MemoryUnit( taskCfg.get("memory") as String ) : new MemoryUnit(DEFAULT_MEMORY)
final taskCores = !taskCfg.get("cpus") ? DEFAULT_CPUS : taskCfg.get("cpus") as Integer
final taskMemory = taskCfg.get("memory") ? new MemoryUnit(taskCfg.get("memory") as String) : new MemoryUnit(DEFAULT_MEMORY)

final res = new Resources()
.cores(taskCores)
Expand All @@ -84,7 +91,7 @@ class NomadService implements Closeable{
void close() throws IOException {
}

String submitTask(String id, TaskRun task, List<String> args, Map<String, String>env, Path saveJsonPath=null){
String submitTask(String id, TaskRun task, List<String> args, Map<String, String> env, Path saveJsonPath = null) {
Job job = new Job();
job.ID = id
job.name = task.name
Expand All @@ -99,10 +106,10 @@ class NomadService implements Closeable{
JobRegisterRequest jobRegisterRequest = new JobRegisterRequest()
jobRegisterRequest.setJob(job)

if( saveJsonPath ) try {
if (saveJsonPath) try {
saveJsonPath.text = job.toString()
}
catch( Exception e ) {
catch (Exception e) {
log.debug "WARN: unable to save request json -- cause: ${e.message ?: e}"
}

Expand All @@ -117,26 +124,26 @@ class NomadService implements Closeable{

}

TaskGroup createTaskGroup(TaskRun taskRun, List<String> args, Map<String, String>env){
TaskGroup createTaskGroup(TaskRun taskRun, List<String> args, Map<String, String> env) {
//NOTE: Force a single-allocation with no-retries per nomad job definition
final TASK_RESCHEDULE_ATTEMPTS = 0
final TASK_RESTART_ATTEMPTS = 0

final ReschedulePolicy taskReschedulePolicy = new ReschedulePolicy().attempts(TASK_RESCHEDULE_ATTEMPTS)
final RestartPolicy taskRestartPolicy = new RestartPolicy().attempts(TASK_RESTART_ATTEMPTS)
final ReschedulePolicy taskReschedulePolicy = new ReschedulePolicy().attempts(TASK_RESCHEDULE_ATTEMPTS)
final RestartPolicy taskRestartPolicy = new RestartPolicy().attempts(TASK_RESTART_ATTEMPTS)

def task = createTask(taskRun, args, env)
def taskGroup = new TaskGroup(
name: "group",
tasks: [ task ],
tasks: [task],
reschedulePolicy: taskReschedulePolicy,
restartPolicy: taskRestartPolicy
)


if( config.jobOpts().volumeSpec ) {
if (config.jobOpts().volumeSpec) {
taskGroup.volumes = [:]
config.jobOpts().volumeSpec.eachWithIndex { volumeSpec , idx->
config.jobOpts().volumeSpec.eachWithIndex { volumeSpec, idx ->
if (volumeSpec && volumeSpec.type == JobVolume.VOLUME_CSI_TYPE) {
taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest(
type: volumeSpec.type,
Expand All @@ -159,7 +166,7 @@ class NomadService implements Closeable{
return taskGroup
}

Task createTask(TaskRun task, List<String> args, Map<String, String>env) {
Task createTask(TaskRun task, List<String> args, Map<String, String> env) {
final DRIVER = "docker"
final DRIVER_PRIVILEGED = true

Expand Down Expand Up @@ -190,18 +197,18 @@ class NomadService implements Closeable{
return taskDef
}

protected Task volumes(TaskRun task, Task taskDef, String workingDir){
if( config.jobOpts().dockerVolume){
protected Task volumes(TaskRun task, Task taskDef, String workingDir) {
if (config.jobOpts().dockerVolume) {
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
taskDef.config.mount = [
type : "volume",
target : destinationDir,
source : config.jobOpts().dockerVolume,
readonly : false
type : "volume",
target : destinationDir,
source : config.jobOpts().dockerVolume,
readonly: false
]
}

if( config.jobOpts().volumeSpec){
if (config.jobOpts().volumeSpec) {
taskDef.volumeMounts = []
config.jobOpts().volumeSpec.eachWithIndex { volumeSpec, idx ->
String destinationDir = volumeSpec.workDir ?
Expand Down Expand Up @@ -236,16 +243,16 @@ class NomadService implements Closeable{
taskDef
}

protected Task constraint(TaskRun task, Task taskDef){
if( config.jobOpts().constraintSpec ){
protected Task constraint(TaskRun task, Task taskDef) {
if (config.jobOpts().constraintSpec) {
def constraint = new Constraint()
if(config.jobOpts().constraintSpec.attribute){
if (config.jobOpts().constraintSpec.attribute) {
constraint.ltarget(config.jobOpts().constraintSpec.attribute)
}

constraint.operand(config.jobOpts().constraintSpec.operator ?: "=")

if(config.jobOpts().constraintSpec.value){
if (config.jobOpts().constraintSpec.value) {
constraint.rtarget(config.jobOpts().constraintSpec.value)
}
taskDef.constraints([constraint])
Expand All @@ -254,49 +261,48 @@ class NomadService implements Closeable{
taskDef
}

protected Task constraints(TaskRun task, Task taskDef){
protected Task constraints(TaskRun task, Task taskDef) {
def constraints = [] as List<Constraint>

if( config.jobOpts().constraintsSpec ){
if (config.jobOpts().constraintsSpec) {
def list = ConstraintsBuilder.constraintsSpecToList(config.jobOpts().constraintsSpec)
constraints.addAll(list)
}

if( task.processor?.config?.get(TaskDirectives.CONSTRAINTS) &&
if (task.processor?.config?.get(TaskDirectives.CONSTRAINTS) &&
task.processor?.config?.get(TaskDirectives.CONSTRAINTS) instanceof Closure) {
Closure closure = task.processor?.config?.get(TaskDirectives.CONSTRAINTS) as Closure
JobConstraints constraintsSpec = JobConstraints.parse(closure)
def list = ConstraintsBuilder.constraintsSpecToList(constraintsSpec)
constraints.addAll(list)
}

if( constraints.size()) {
if (constraints.size()) {
taskDef.constraints(constraints)
}
taskDef
}



protected Job assignDatacenters(TaskRun task, Job job){
protected Job assignDatacenters(TaskRun task, Job job) {
def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS)
if( datacenters ){
if( datacenters instanceof List<String>) {
job.datacenters( datacenters as List<String>)
if (datacenters) {
if (datacenters instanceof List<String>) {
job.datacenters(datacenters as List<String>)
return job;
}
if( datacenters instanceof Closure) {
if (datacenters instanceof Closure) {
String str = datacenters.call().toString()
job.datacenters( [str])
job.datacenters([str])
return job;
}
job.datacenters( [datacenters.toString()] as List<String>)
job.datacenters([datacenters.toString()] as List<String>)
return job
}
job
}

String getJobState(String jobId){
String getJobState(String jobId) {
try {
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null)
AllocationListStub last = allocations?.sort {
Expand All @@ -305,31 +311,30 @@ class NomadService implements Closeable{
String currentState = last?.taskStates?.values()?.last()?.state
log.debug "Task $jobId , state=$currentState"
currentState ?: "Unknown"
}catch(Exception e){
} catch (Exception e) {
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
"dead"
}
}



boolean checkIfRunning(String jobId){
boolean checkIfRunning(String jobId) {
try {
Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null)
log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status"
job.status == "running"
}catch (Exception e){
} catch (Exception e) {
log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e)
false
}
}

boolean checkIfDead(String jobId){
try{
boolean checkIfDead(String jobId) {
try {
Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null)
log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status"
job.status == "dead"
}catch (Exception e){
} catch (Exception e) {
log.debug("[NOMAD] Failed to get job ${jobId} -- Cause: ${e.message ?: e}", e)
true
}
Expand All @@ -339,28 +344,28 @@ class NomadService implements Closeable{
purgeJob(jobId, false)
}

void jobPurge(String jobId){
void jobPurge(String jobId) {
purgeJob(jobId, true)
}

protected void purgeJob(String jobId, boolean purge){
protected void purgeJob(String jobId, boolean purge) {
log.debug "[NOMAD] purgeJob with jobId=${jobId}"
try {
jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, purge, true)
}catch(Exception e){
} catch (Exception e) {
log.debug("[NOMAD] Failed to delete job ${jobId} -- Cause: ${e.message ?: e}", e)
}
}

String getClientOfJob(String jobId) {
try{
try {
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null)
if( !allocations ){
if (!allocations) {
return null
}
AllocationListStub jobAllocation = allocations.first()
return jobAllocation.nodeName
}catch (Exception e){
} catch (Exception e) {
log.debug("[NOMAD] Failed to get job allocations ${jobId} -- Cause: ${e.message ?: e}", e)
throw new ProcessSubmitException("[NOMAD] Failed to get alloactions ${jobId} -- Cause: ${e.message ?: e}", e)
}
Expand Down

0 comments on commit 845db90

Please sign in to comment.