Skip to content

Commit

Permalink
Implement an exception handler for api client
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <[email protected]>
  • Loading branch information
jagedn committed Sep 20, 2024
1 parent b55dc70 commit 85540b9
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@ class NomadClientOpts{

final static protected API_VERSION = "v1"

private Map<String,String> sysEnv

final String address
final String token
final int connectionTimeout

final int readTimeout
final int writeTimeout

final RetryConfig retryConfig

NomadClientOpts(Map nomadClientOpts, Map<String,String> env=null){
assert nomadClientOpts!=null

sysEnv = env==null ? new HashMap<String,String>(System.getenv()) : env
def sysEnv = env ?: new HashMap<String,String>(System.getenv())

def address = (nomadClientOpts.address?.toString() ?: sysEnv.get('NOMAD_ADDR'))
assert address != null, "Nomad Address is required"
Expand All @@ -50,8 +54,17 @@ class NomadClientOpts{
address +="/"
this.address = address + API_VERSION
this.token = nomadClientOpts.token ?: sysEnv.get('NOMAD_TOKEN')
this.connectionTimeout = (nomadClientOpts.connectionTimeout ?: 6000 ) as Integer
this.readTimeout = (nomadClientOpts.readTimeout ?: 6000 ) as Integer
this.writeTimeout = (nomadClientOpts.writeTimeout ?: 6000 ) as Integer

this.retryConfig = new RetryConfig(nomadClientOpts.retryConfig as Map ?: Collections.emptyMap())

//TODO: Add mTLS properties and env vars
// https://developer.hashicorp.com/nomad/docs/commands#mtls-environment-variables
}

RetryConfig getRetryConfig() {
return retryConfig
}
}
29 changes: 29 additions & 0 deletions plugins/nf-nomad/src/main/nextflow/nomad/config/RetryConfig.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package nextflow.nomad.config

import groovy.transform.CompileStatic
import nextflow.util.Duration


@CompileStatic
class RetryConfig {

Duration delay = Duration.of('250ms')
Duration maxDelay = Duration.of('90s')
int maxAttempts = 10
double jitter = 0.25

RetryConfig(){
this(Collections.emptyMap())
}

RetryConfig(Map config){
if( config.delay )
delay = config.delay as Duration
if( config.maxDelay )
maxDelay = config.maxDelay as Duration
if( config.maxAttempts )
maxAttempts = config.maxAttempts as int
if( config.jitter )
jitter = config.jitter as double
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package nextflow.nomad.executor

import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiException
import nextflow.nomad.config.RetryConfig

import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import java.util.function.Predicate

@Slf4j
@CompileStatic
class FailsafeExecutor {

private RetryConfig config

FailsafeExecutor(RetryConfig config){
this.config = config
}

protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {

final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
@Override
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
log.debug("Nomad TooManyRequests response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
}
}
return RetryPolicy.<T>builder()
.handleIf(cond)
.withBackoff(config.delay.toMillis(), config.maxDelay.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(config.maxAttempts)
.withJitter(config.jitter)
.onRetry(listener)
.build()
}

final private static List<Integer> RETRY_CODES = List.of(408, 429, 500, 502, 503, 504)

protected <T> T apply(CheckedSupplier<T> action) {
// define the retry condition
final cond = new Predicate<? extends Throwable>() {
@Override
boolean test(Throwable t) {
if( t instanceof ApiException && t.code in RETRY_CODES )
return true
if( t instanceof IOException || t.cause instanceof IOException )
return true
if( t instanceof TimeoutException || t.cause instanceof TimeoutException )
return true
return false
}
}
// create the retry policy object
final policy = retryPolicy(cond)
// apply the action with
return Failsafe.with(policy).get(action)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ class NomadService implements Closeable{
ApiClient apiClient
JobsApi jobsApi
VariablesApi variablesApi
FailsafeExecutor safeExecutor

NomadService(NomadConfig config) {
this.config = config

//TODO: Accommodate these connection level options in clientOpts()
final CONNECTION_TIMEOUT_MILLISECONDS = 60000
final READ_TIMEOUT_MILLISECONDS = 60000
final WRITE_TIMEOUT_MILLISECONDS = 60000
final CONNECTION_TIMEOUT_MILLISECONDS = config.clientOpts().connectionTimeout
final READ_TIMEOUT_MILLISECONDS = config.clientOpts().readTimeout
final WRITE_TIMEOUT_MILLISECONDS = config.clientOpts().writeTimeout

apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS)
apiClient.basePath = config.clientOpts().address
Expand All @@ -64,6 +64,8 @@ class NomadService implements Closeable{
}
this.jobsApi = new JobsApi(apiClient)
this.variablesApi = new VariablesApi(apiClient)

this.safeExecutor = new FailsafeExecutor(config.clientOpts().retryConfig)
}


Expand Down Expand Up @@ -96,8 +98,12 @@ class NomadService implements Closeable{
}

try {
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts().region, config.jobOpts().namespace, null, null)
return jobRegisterResponse.evalID
safeExecutor.apply {
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest,
config.jobOpts().region, config.jobOpts().namespace,
null, null)
jobRegisterResponse.evalID
}
} catch (ApiException apiException) {
log.debug("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException)
throw new ProcessSubmitException("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException)
Expand All @@ -110,7 +116,11 @@ class NomadService implements Closeable{

String getJobState(String jobId){
try {
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null)
List<AllocationListStub> allocations = safeExecutor.apply {
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null,
null, null)
}
AllocationListStub last = allocations?.sort {
it.modifyIndex
}?.last()
Expand All @@ -127,7 +137,10 @@ class NomadService implements Closeable{

boolean checkIfRunning(String jobId){
try {
Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null)
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status"
job.status == "running"
}catch (Exception e){
Expand All @@ -138,7 +151,10 @@ class NomadService implements Closeable{

boolean checkIfDead(String jobId){
try{
Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null)
Job job = safeExecutor.apply {
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status"
job.status == "dead"
}catch (Exception e){
Expand All @@ -158,15 +174,22 @@ class NomadService implements Closeable{
protected void purgeJob(String jobId, boolean purge){
log.debug "[NOMAD] purgeJob with jobId=${jobId}"
try {
jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, purge, true)
safeExecutor.apply {
jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, purge, true)
}
}catch(Exception e){
log.debug("[NOMAD] Failed to delete job ${jobId} -- Cause: ${e.message ?: e}", e)
}
}

String getClientOfJob(String jobId) {
try{
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null)
List<AllocationListStub> allocations = safeExecutor.apply {
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
null, null, null, null, null, null,
null, null)
}
if( !allocations ){
return null
}
Expand All @@ -183,10 +206,12 @@ class NomadService implements Closeable{
}

String getVariableValue(String path, String key){
var variable = variablesApi.getVariableQuery("$path/$key",
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null, null, null, null)
var variable = safeExecutor.apply {
variablesApi.getVariableQuery("$path/$key",
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null, null, null, null)
}
variable?.items?.find{ it.key == key }?.value
}

Expand All @@ -197,17 +222,22 @@ class NomadService implements Closeable{
void setVariableValue(String path, String key, String value){
var content = Map.of(key,value)
var variable = new Variable(path: path, items: content)
variablesApi.postVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
safeExecutor.apply {
variablesApi.postVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
}
}

List<String> getVariablesList(){
var listRequest = variablesApi.getVariablesListRequest(
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null, null, null, null)
var listRequest = safeExecutor.apply {
variablesApi.getVariablesListRequest(
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null,
null, null, null)
}
String path = (config.jobOpts().secretOpts?.path ?: '')+"/"
listRequest.collect{ it.path - path}
}
Expand All @@ -218,9 +248,11 @@ class NomadService implements Closeable{

void deleteVariable(String path, String key){
var variable = new Variable( items: Map.of(key, ""))
variablesApi.deleteVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
safeExecutor.apply {
variablesApi.deleteVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
}
}
}
6 changes: 5 additions & 1 deletion plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ class NomadDSLSpec extends Dsl2Spec{
[
client:
[
address : "http://${mockWebServer.hostName}:${mockWebServer.port}"
address : "http://${mockWebServer.hostName}:${mockWebServer.port}",
retryConfig:[
maxAttempts: 1,
delay: '1ms'
]
]
]
]).setScript(SCRIPT).execute()
Expand Down

0 comments on commit 85540b9

Please sign in to comment.