Skip to content

Commit

Permalink
fix logging and token cache
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Oct 11, 2022
1 parent 410e230 commit ff3c355
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 36 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=3.0.10
version=3.0.12
8 changes: 6 additions & 2 deletions src/main/java/io/orkes/conductor/client/ApiClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.text.DateFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -1339,9 +1340,12 @@ private KeyStore newEmptyKeyStore(char[] password) throws GeneralSecurityExcepti
}
}

@SneakyThrows
public String getToken() {
return tokenCache.get(TOKEN_CACHE_KEY, () -> refreshToken());
try {
return tokenCache.get(TOKEN_CACHE_KEY, () -> refreshToken());
} catch (ExecutionException e) {
return null;
}
}

private String refreshToken() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
Expand Down Expand Up @@ -58,47 +57,40 @@ public GrpcWorkflowClient(ApiClient apiClient) {
.withInterceptors(new HeaderClientInterceptor(apiClient));
this.responseStream = new StartWorkflowResponseStream(executionMonitor);
requestStream = stub.startWorkflow(responseStream);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(()-> monitorChannel(channel), 1, 1, TimeUnit.SECONDS);
}

private void monitorChannel(ManagedChannel channel) {
ConnectivityState state = channel.getState(false);
switch (state) {
case READY:
if(!responseStream.isReady()) {
log.info("Connection State {}", state);
}
break;
case SHUTDOWN:
case CONNECTING:
case TRANSIENT_FAILURE:
break;
default:
log.info("Channel state: {}", state);
}
}

private boolean reConnect() {
private synchronized boolean reConnect() {
try {
requestStream = stub.startWorkflow(this.responseStream);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
return true;
} catch (Exception connectException) {
log.error("Server not ready {}", connectException.getMessage(), connectException);
return false;
}
}

public CompletableFuture<WorkflowRun> executeWorkflow(
StartWorkflowRequest startWorkflowRequest, String waitUntilTask) {
public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest startWorkflowRequest, String waitUntilTask) {
if (!responseStream.isReady()) {
reConnect();
throw new RuntimeException("Server is not yet ready to accept the requests");
int connectAttempts = 3;
int sleepTime = 200;

while (connectAttempts > 0) {
reConnect();
log.info("Connection attempt {} backoff for {} millis", connectAttempts, sleepTime);
Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
if(responseStream.isReady()) {
break;
}
connectAttempts--;
sleepTime = sleepTime * 2;
}
if(!responseStream.isReady()) {
throw new RuntimeException("Server is not yet ready to accept the requests");
}
}
String requestId = UUID.randomUUID().toString();

OrkesWorkflowService.StartWorkflowRequest.Builder requestBuilder =
OrkesWorkflowService.StartWorkflowRequest.newBuilder();
OrkesWorkflowService.StartWorkflowRequest.Builder requestBuilder = OrkesWorkflowService.StartWorkflowRequest.newBuilder();
requestBuilder.setRequestId(requestId).setIdempotencyKey(requestId).setMonitor(true);
if (waitUntilTask != null) {
requestBuilder.setWaitUntilTask(waitUntilTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,9 @@ public void onError(Throwable t) {
switch (code) {
case UNAVAILABLE:
case ABORTED:
// We should reconnect here
log.warn("Server aborted connection");
break;
case INTERNAL:
case UNKNOWN:
log.error("Received an error from the server {}-{}", code, t.getMessage(), t);
log.error("Received an error from the server {}-{}", code, t.getMessage());
break;
case CANCELLED:
log.info("Server cancelled"); //TODO: move this to trace
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/orkes/conductor/client/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.orkes.conductor.common.model.WorkflowRun;

public class Main {

public static void mainxx(String[] args) {
ApiClient client = new ApiClient("http://localhost:8080/api");

Expand Down Expand Up @@ -90,7 +90,7 @@ public static void main(String[] args) throws InterruptedException {
t.printStackTrace();
System.out.println("Error " + t.getMessage());
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
System.out.println(
"Time to to submit "
Expand Down

0 comments on commit ff3c355

Please sign in to comment.