diff --git a/gradle.properties b/gradle.properties index 6b323aac..a812460b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=3.0.10 \ No newline at end of file +version=3.0.12 \ No newline at end of file diff --git a/src/main/java/io/orkes/conductor/client/ApiClient.java b/src/main/java/io/orkes/conductor/client/ApiClient.java index c9b15404..d8c8ea6d 100644 --- a/src/main/java/io/orkes/conductor/client/ApiClient.java +++ b/src/main/java/io/orkes/conductor/client/ApiClient.java @@ -32,6 +32,7 @@ import java.text.DateFormat; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -1339,9 +1340,12 @@ private KeyStore newEmptyKeyStore(char[] password) throws GeneralSecurityExcepti } } - @SneakyThrows public String getToken() { - return tokenCache.get(TOKEN_CACHE_KEY, () -> refreshToken()); + try { + return tokenCache.get(TOKEN_CACHE_KEY, () -> refreshToken()); + } catch (ExecutionException e) { + return null; + } } private String refreshToken() { diff --git a/src/main/java/io/orkes/conductor/client/grpc/workflow/GrpcWorkflowClient.java b/src/main/java/io/orkes/conductor/client/grpc/workflow/GrpcWorkflowClient.java index 8850a59c..63ab1517 100644 --- a/src/main/java/io/orkes/conductor/client/grpc/workflow/GrpcWorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/grpc/workflow/GrpcWorkflowClient.java @@ -14,7 +14,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; @@ -58,30 +57,11 @@ public GrpcWorkflowClient(ApiClient apiClient) { .withInterceptors(new HeaderClientInterceptor(apiClient)); this.responseStream = new StartWorkflowResponseStream(executionMonitor); requestStream = stub.startWorkflow(responseStream); - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(()-> monitorChannel(channel), 1, 1, TimeUnit.SECONDS); } - private void monitorChannel(ManagedChannel channel) { - ConnectivityState state = channel.getState(false); - switch (state) { - case READY: - if(!responseStream.isReady()) { - log.info("Connection State {}", state); - } - break; - case SHUTDOWN: - case CONNECTING: - case TRANSIENT_FAILURE: - break; - default: - log.info("Channel state: {}", state); - } - } - - private boolean reConnect() { + private synchronized boolean reConnect() { try { requestStream = stub.startWorkflow(this.responseStream); - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); return true; } catch (Exception connectException) { log.error("Server not ready {}", connectException.getMessage(), connectException); @@ -89,16 +69,28 @@ private boolean reConnect() { } } - public CompletableFuture executeWorkflow( - StartWorkflowRequest startWorkflowRequest, String waitUntilTask) { + public CompletableFuture executeWorkflow(StartWorkflowRequest startWorkflowRequest, String waitUntilTask) { if (!responseStream.isReady()) { - reConnect(); - throw new RuntimeException("Server is not yet ready to accept the requests"); + int connectAttempts = 3; + int sleepTime = 200; + + while (connectAttempts > 0) { + reConnect(); + log.info("Connection attempt {} backoff for {} millis", connectAttempts, sleepTime); + Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); + if(responseStream.isReady()) { + break; + } + connectAttempts--; + sleepTime = sleepTime * 2; + } + if(!responseStream.isReady()) { + throw new RuntimeException("Server is not yet ready to accept the requests"); + } } String requestId = UUID.randomUUID().toString(); - OrkesWorkflowService.StartWorkflowRequest.Builder requestBuilder = - OrkesWorkflowService.StartWorkflowRequest.newBuilder(); + OrkesWorkflowService.StartWorkflowRequest.Builder requestBuilder = OrkesWorkflowService.StartWorkflowRequest.newBuilder(); requestBuilder.setRequestId(requestId).setIdempotencyKey(requestId).setMonitor(true); if (waitUntilTask != null) { requestBuilder.setWaitUntilTask(waitUntilTask); diff --git a/src/main/java/io/orkes/conductor/client/grpc/workflow/StartWorkflowResponseStream.java b/src/main/java/io/orkes/conductor/client/grpc/workflow/StartWorkflowResponseStream.java index fcb46d35..21254ba7 100644 --- a/src/main/java/io/orkes/conductor/client/grpc/workflow/StartWorkflowResponseStream.java +++ b/src/main/java/io/orkes/conductor/client/grpc/workflow/StartWorkflowResponseStream.java @@ -72,12 +72,9 @@ public void onError(Throwable t) { switch (code) { case UNAVAILABLE: case ABORTED: - // We should reconnect here - log.warn("Server aborted connection"); - break; case INTERNAL: case UNKNOWN: - log.error("Received an error from the server {}-{}", code, t.getMessage(), t); + log.error("Received an error from the server {}-{}", code, t.getMessage()); break; case CANCELLED: log.info("Server cancelled"); //TODO: move this to trace diff --git a/src/test/java/io/orkes/conductor/client/Main.java b/src/test/java/io/orkes/conductor/client/Main.java index 838ca430..04fcd9c4 100644 --- a/src/test/java/io/orkes/conductor/client/Main.java +++ b/src/test/java/io/orkes/conductor/client/Main.java @@ -31,7 +31,7 @@ import io.orkes.conductor.common.model.WorkflowRun; public class Main { - + public static void mainxx(String[] args) { ApiClient client = new ApiClient("http://localhost:8080/api"); @@ -90,7 +90,7 @@ public static void main(String[] args) throws InterruptedException { t.printStackTrace(); System.out.println("Error " + t.getMessage()); } - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); } System.out.println( "Time to to submit "