From 85540b988c223e2a3e22dc883a21dc838eded976 Mon Sep 17 00:00:00 2001 From: Jorge Aguilera Date: Fri, 20 Sep 2024 18:26:32 +0200 Subject: [PATCH] Implement an exception handler for api client Signed-off-by: Jorge Aguilera --- .../nomad/config/NomadClientOpts.groovy | 19 +++- .../nextflow/nomad/config/RetryConfig.groovy | 29 +++++++ .../nomad/executor/FailsafeExecutor.groovy | 66 ++++++++++++++ .../nomad/executor/NomadService.groovy | 86 +++++++++++++------ .../test/nextflow/nomad/NomadDSLSpec.groovy | 6 +- 5 files changed, 175 insertions(+), 31 deletions(-) create mode 100644 plugins/nf-nomad/src/main/nextflow/nomad/config/RetryConfig.groovy create mode 100644 plugins/nf-nomad/src/main/nextflow/nomad/executor/FailsafeExecutor.groovy diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy index 6c3ab85..c784198 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy @@ -33,15 +33,19 @@ class NomadClientOpts{ final static protected API_VERSION = "v1" - private Map sysEnv - final String address final String token + final int connectionTimeout + + final int readTimeout + final int writeTimeout + + final RetryConfig retryConfig NomadClientOpts(Map nomadClientOpts, Map env=null){ assert nomadClientOpts!=null - sysEnv = env==null ? new HashMap(System.getenv()) : env + def sysEnv = env ?: new HashMap(System.getenv()) def address = (nomadClientOpts.address?.toString() ?: sysEnv.get('NOMAD_ADDR')) assert address != null, "Nomad Address is required" @@ -50,8 +54,17 @@ class NomadClientOpts{ address +="/" this.address = address + API_VERSION this.token = nomadClientOpts.token ?: sysEnv.get('NOMAD_TOKEN') + this.connectionTimeout = (nomadClientOpts.connectionTimeout ?: 6000 ) as Integer + this.readTimeout = (nomadClientOpts.readTimeout ?: 6000 ) as Integer + this.writeTimeout = (nomadClientOpts.writeTimeout ?: 6000 ) as Integer + + this.retryConfig = new RetryConfig(nomadClientOpts.retryConfig as Map ?: Collections.emptyMap()) //TODO: Add mTLS properties and env vars // https://developer.hashicorp.com/nomad/docs/commands#mtls-environment-variables } + + RetryConfig getRetryConfig() { + return retryConfig + } } \ No newline at end of file diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/RetryConfig.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/RetryConfig.groovy new file mode 100644 index 0000000..3c61e3d --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/RetryConfig.groovy @@ -0,0 +1,29 @@ +package nextflow.nomad.config + +import groovy.transform.CompileStatic +import nextflow.util.Duration + + +@CompileStatic +class RetryConfig { + + Duration delay = Duration.of('250ms') + Duration maxDelay = Duration.of('90s') + int maxAttempts = 10 + double jitter = 0.25 + + RetryConfig(){ + this(Collections.emptyMap()) + } + + RetryConfig(Map config){ + if( config.delay ) + delay = config.delay as Duration + if( config.maxDelay ) + maxDelay = config.maxDelay as Duration + if( config.maxAttempts ) + maxAttempts = config.maxAttempts as int + if( config.jitter ) + jitter = config.jitter as double + } +} diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/FailsafeExecutor.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/FailsafeExecutor.groovy new file mode 100644 index 0000000..4bd2cf3 --- /dev/null +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/FailsafeExecutor.groovy @@ -0,0 +1,66 @@ +package nextflow.nomad.executor + +import dev.failsafe.Failsafe +import dev.failsafe.RetryPolicy +import dev.failsafe.event.EventListener +import dev.failsafe.event.ExecutionAttemptedEvent +import dev.failsafe.function.CheckedSupplier +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.nomadproject.client.ApiException +import nextflow.nomad.config.RetryConfig + +import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeoutException +import java.util.function.Predicate + +@Slf4j +@CompileStatic +class FailsafeExecutor { + + private RetryConfig config + + FailsafeExecutor(RetryConfig config){ + this.config = config + } + + protected RetryPolicy retryPolicy(Predicate cond) { + + final listener = new EventListener>() { + @Override + void accept(ExecutionAttemptedEvent event) throws Throwable { + log.debug("Nomad TooManyRequests response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}") + } + } + return RetryPolicy.builder() + .handleIf(cond) + .withBackoff(config.delay.toMillis(), config.maxDelay.toMillis(), ChronoUnit.MILLIS) + .withMaxAttempts(config.maxAttempts) + .withJitter(config.jitter) + .onRetry(listener) + .build() + } + + final private static List RETRY_CODES = List.of(408, 429, 500, 502, 503, 504) + + protected T apply(CheckedSupplier action) { + // define the retry condition + final cond = new Predicate() { + @Override + boolean test(Throwable t) { + if( t instanceof ApiException && t.code in RETRY_CODES ) + return true + if( t instanceof IOException || t.cause instanceof IOException ) + return true + if( t instanceof TimeoutException || t.cause instanceof TimeoutException ) + return true + return false + } + } + // create the retry policy object + final policy = retryPolicy(cond) + // apply the action with + return Failsafe.with(policy).get(action) + } + +} diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index a91c01b..b29bf0f 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -45,14 +45,14 @@ class NomadService implements Closeable{ ApiClient apiClient JobsApi jobsApi VariablesApi variablesApi + FailsafeExecutor safeExecutor NomadService(NomadConfig config) { this.config = config - //TODO: Accommodate these connection level options in clientOpts() - final CONNECTION_TIMEOUT_MILLISECONDS = 60000 - final READ_TIMEOUT_MILLISECONDS = 60000 - final WRITE_TIMEOUT_MILLISECONDS = 60000 + final CONNECTION_TIMEOUT_MILLISECONDS = config.clientOpts().connectionTimeout + final READ_TIMEOUT_MILLISECONDS = config.clientOpts().readTimeout + final WRITE_TIMEOUT_MILLISECONDS = config.clientOpts().writeTimeout apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS) apiClient.basePath = config.clientOpts().address @@ -64,6 +64,8 @@ class NomadService implements Closeable{ } this.jobsApi = new JobsApi(apiClient) this.variablesApi = new VariablesApi(apiClient) + + this.safeExecutor = new FailsafeExecutor(config.clientOpts().retryConfig) } @@ -96,8 +98,12 @@ class NomadService implements Closeable{ } try { - JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts().region, config.jobOpts().namespace, null, null) - return jobRegisterResponse.evalID + safeExecutor.apply { + JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, + config.jobOpts().region, config.jobOpts().namespace, + null, null) + jobRegisterResponse.evalID + } } catch (ApiException apiException) { log.debug("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException) throw new ProcessSubmitException("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException) @@ -110,7 +116,11 @@ class NomadService implements Closeable{ String getJobState(String jobId){ try { - List allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null) + List allocations = safeExecutor.apply { + jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, + null, null, null, null, null, null, + null, null) + } AllocationListStub last = allocations?.sort { it.modifyIndex }?.last() @@ -127,7 +137,10 @@ class NomadService implements Closeable{ boolean checkIfRunning(String jobId){ try { - Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null) + Job job = safeExecutor.apply { + jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, + null, null, null, null, null, null, null) + } log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status" job.status == "running" }catch (Exception e){ @@ -138,7 +151,10 @@ class NomadService implements Closeable{ boolean checkIfDead(String jobId){ try{ - Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null) + Job job = safeExecutor.apply { + jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, + null, null, null, null, null, null, null) + } log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status" job.status == "dead" }catch (Exception e){ @@ -158,7 +174,10 @@ class NomadService implements Closeable{ protected void purgeJob(String jobId, boolean purge){ log.debug "[NOMAD] purgeJob with jobId=${jobId}" try { - jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, purge, true) + safeExecutor.apply { + jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace, + null, null, purge, true) + } }catch(Exception e){ log.debug("[NOMAD] Failed to delete job ${jobId} -- Cause: ${e.message ?: e}", e) } @@ -166,7 +185,11 @@ class NomadService implements Closeable{ String getClientOfJob(String jobId) { try{ - List allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null) + List allocations = safeExecutor.apply { + jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, + null, null, null, null, null, null, + null, null) + } if( !allocations ){ return null } @@ -183,10 +206,12 @@ class NomadService implements Closeable{ } String getVariableValue(String path, String key){ - var variable = variablesApi.getVariableQuery("$path/$key", - config.jobOpts().region, - config.jobOpts().namespace, - null, null, null, null, null, null, null) + var variable = safeExecutor.apply { + variablesApi.getVariableQuery("$path/$key", + config.jobOpts().region, + config.jobOpts().namespace, + null, null, null, null, null, null, null) + } variable?.items?.find{ it.key == key }?.value } @@ -197,17 +222,22 @@ class NomadService implements Closeable{ void setVariableValue(String path, String key, String value){ var content = Map.of(key,value) var variable = new Variable(path: path, items: content) - variablesApi.postVariable("$path/$key", variable, - config.jobOpts().region, - config.jobOpts().namespace, - null, null, null) + safeExecutor.apply { + variablesApi.postVariable("$path/$key", variable, + config.jobOpts().region, + config.jobOpts().namespace, + null, null, null) + } } List getVariablesList(){ - var listRequest = variablesApi.getVariablesListRequest( - config.jobOpts().region, - config.jobOpts().namespace, - null, null, null, null, null, null, null) + var listRequest = safeExecutor.apply { + variablesApi.getVariablesListRequest( + config.jobOpts().region, + config.jobOpts().namespace, + null, null, null, null, + null, null, null) + } String path = (config.jobOpts().secretOpts?.path ?: '')+"/" listRequest.collect{ it.path - path} } @@ -218,9 +248,11 @@ class NomadService implements Closeable{ void deleteVariable(String path, String key){ var variable = new Variable( items: Map.of(key, "")) - variablesApi.deleteVariable("$path/$key", variable, - config.jobOpts().region, - config.jobOpts().namespace, - null, null, null) + safeExecutor.apply { + variablesApi.deleteVariable("$path/$key", variable, + config.jobOpts().region, + config.jobOpts().namespace, + null, null, null) + } } } diff --git a/plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy b/plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy index 7d9781c..30b2f1c 100644 --- a/plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy +++ b/plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy @@ -195,7 +195,11 @@ class NomadDSLSpec extends Dsl2Spec{ [ client: [ - address : "http://${mockWebServer.hostName}:${mockWebServer.port}" + address : "http://${mockWebServer.hostName}:${mockWebServer.port}", + retryConfig:[ + maxAttempts: 1, + delay: '1ms' + ] ] ] ]).setScript(SCRIPT).execute()