Skip to content

Commit

Permalink
Merge pull request #113 from orkes-io/sync_workflow_api_changes
Browse files Browse the repository at this point in the history
Sync workflow api changes
  • Loading branch information
v1r3n authored May 22, 2023
2 parents 65f0dd4 + 0e12d69 commit 2a94ed7
Show file tree
Hide file tree
Showing 5 changed files with 532 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/main/java/io/orkes/conductor/client/WorkflowClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import io.orkes.conductor.common.model.WorkflowRun;

public abstract class WorkflowClient extends com.netflix.conductor.client.http.WorkflowClient {

@Deprecated
public abstract CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest request, String waitUntilTask);

public abstract CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest request, String waitUntilTask, Integer waitForSeconds);

public abstract WorkflowRun executeWorkflow(StartWorkflowRequest request, String waitUntilTask, Duration waitTimeout) throws ExecutionException, InterruptedException, TimeoutException;

public abstract BulkResponse pauseWorkflow(List<String> workflowIds) throws ApiException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,41 @@ public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest start
return future;
}

public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest startWorkflowRequest, String waitUntilTask, Integer waitForSeconds) {
if (!responseStream.isReady()) {
int connectAttempts = 3;
int sleepTime = 200;

while (connectAttempts > 0) {
reConnect();
log.info("Connection attempt {} backoff for {} millis", connectAttempts, sleepTime);
Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
if(responseStream.isReady()) {
break;
}
connectAttempts--;
sleepTime = sleepTime * 2;
}
if(!responseStream.isReady()) {
throw new RuntimeException("Server is not yet ready to accept the requests");
}
}
String requestId = UUID.randomUUID().toString();

OrkesWorkflowService.StartWorkflowRequest.Builder requestBuilder = OrkesWorkflowService.StartWorkflowRequest.newBuilder();
requestBuilder.setRequestId(requestId).setIdempotencyKey(requestId).setMonitor(true);
if (waitUntilTask != null) {
requestBuilder.setWaitUntilTask(waitUntilTask);
}
requestBuilder.setRequest(protoMappingHelper.toProto(startWorkflowRequest));
CompletableFuture<WorkflowRun> future = executionMonitor.monitorRequest(requestId);
future.orTimeout(waitForSeconds, TimeUnit.SECONDS);
synchronized (requestStream) {
requestStream.onNext(requestBuilder.build());
}
return future;
}

public void shutdown() {
channel.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest reque
}
}

@Override
public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest request, String waitUntilTask, Integer waitForSeconds) {
if(apiClient.isUseGRPC()) {
return grpcWorkflowClient.executeWorkflow(request, waitUntilTask, waitForSeconds);
} else {
return executeWorkflowHttp(request, waitUntilTask, waitForSeconds);
}
}

@Override
public WorkflowRun executeWorkflow(StartWorkflowRequest request, String waitUntilTask, Duration waitTimeout) throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<WorkflowRun> future = executeWorkflow(request, waitUntilTask);
Expand Down Expand Up @@ -129,6 +138,29 @@ private CompletableFuture<WorkflowRun> executeWorkflowHttp(StartWorkflowRequest
return future;
}

private CompletableFuture<WorkflowRun> executeWorkflowHttp(StartWorkflowRequest startWorkflowRequest, String waitUntilTask, Integer waitForSeconds) {
CompletableFuture<WorkflowRun> future = new CompletableFuture<>();
String requestId = UUID.randomUUID().toString();
executorService.submit(
() -> {
try {
WorkflowRun response =
httpClient.executeWorkflow(
startWorkflowRequest,
startWorkflowRequest.getName(),
startWorkflowRequest.getVersion(),
waitUntilTask,
requestId,
waitForSeconds);
future.complete(response);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});

return future;
}

@Override
public Workflow getWorkflow(String workflowId, boolean includeTasks) {
return httpClient.getExecutionStatus(workflowId, includeTasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,99 @@ public WorkflowRun executeWorkflow(
return resp.getData();
}

/**
* Execute a workflow synchronously
*
* @param req (required)
* @param name (required)
* @param version (required)
* @param waitUntilTaskRef (optional)
* @param requestId (optional)
* @param waitForSeconds (optional)
* @return WorkflowRun
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the
* response body
*/
public WorkflowRun executeWorkflow(
StartWorkflowRequest req,
String name,
Integer version,
String waitUntilTaskRef,
String requestId,
Integer waitForSeconds)
throws ApiException {
ApiResponse<WorkflowRun> resp =
executeWorkflowWithHttpInfo(req, name, version, waitUntilTaskRef, requestId, waitForSeconds);
return resp.getData();
}

/**
* Execute a workflow synchronously
*
* @param body (required)
* @param name (required)
* @param version (required)
* @param waitUntilTaskRef (optional)
* @param requestId (required)
* @param waitForSeconds (optional)
* @return ApiResponse&lt;WorkflowRun&gt;
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the
* response body
*/
public ApiResponse<WorkflowRun> executeWorkflowWithHttpInfo(
StartWorkflowRequest body,
String name,
Integer version,
String waitUntilTaskRef,
String requestId,
Integer waitForSeconds)
throws ApiException {
com.squareup.okhttp.Call call =
executeWorkflowValidateBeforeCall(
body, name, version, waitUntilTaskRef, requestId, waitForSeconds,null, null);
Type localVarReturnType = new TypeToken<WorkflowRun>() {}.getType();
return apiClient.execute(call, localVarReturnType);
}

private com.squareup.okhttp.Call executeWorkflowValidateBeforeCall(
StartWorkflowRequest body,
String name,
Integer version,
String waitUntilTaskRef,
String requestId,
Integer waitForSeconds,
final ProgressResponseBody.ProgressListener progressListener,
final ProgressRequestBody.ProgressRequestListener progressRequestListener)
throws ApiException {
// verify the required parameter 'body' is set
if (body == null) {
throw new ApiException(
"Missing the required parameter 'body' when calling executeWorkflow(Async)");
}
// verify the required parameter 'name' is set
if (name == null) {
throw new ApiException(
"Missing the required parameter 'name' when calling executeWorkflow(Async)");
}
// verify the required parameter 'version' is set
if (version == null) {
throw new ApiException(
"Missing the required parameter 'version' when calling executeWorkflow(Async)");
}

com.squareup.okhttp.Call call =
executeWorkflowCall(
body,
name,
version,
waitUntilTaskRef,
requestId,
waitForSeconds,
progressListener,
progressRequestListener);
return call;
}

/**
* Execute a workflow synchronously
*
Expand Down Expand Up @@ -213,6 +306,87 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

public com.squareup.okhttp.Call executeWorkflowCall(
StartWorkflowRequest body,
String name,
Integer version,
String waitUntilTaskRef,
String requestId,
Integer waitForSeconds,
final ProgressResponseBody.ProgressListener progressListener,
final ProgressRequestBody.ProgressRequestListener progressRequestListener)
throws ApiException {
Object localVarPostBody = body;

// create path and map variables
String localVarPath =
"/workflow/execute/{name}/{version}"
.replaceAll("\\{" + "name" + "\\}", apiClient.escapeString(name.toString()))
.replaceAll(
"\\{" + "version" + "\\}",
apiClient.escapeString(version.toString()));

List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
if (requestId != null)
localVarQueryParams.addAll(apiClient.parameterToPair("requestId", requestId));

if (waitUntilTaskRef != null)
localVarQueryParams.addAll(
apiClient.parameterToPair("waitUntilTaskRef", waitUntilTaskRef));

if (waitForSeconds != null)
localVarQueryParams.addAll(
apiClient.parameterToPair("waitForSeconds", waitForSeconds));

Map<String, String> localVarHeaderParams = new HashMap<String, String>();

Map<String, Object> localVarFormParams = new HashMap<String, Object>();

final String[] localVarAccepts = {"application/json"};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
if (localVarAccept != null) localVarHeaderParams.put("Accept", localVarAccept);

final String[] localVarContentTypes = {"application/json"};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
localVarHeaderParams.put("Content-Type", localVarContentType);

if (progressListener != null) {
apiClient
.getHttpClient()
.networkInterceptors()
.add(
new com.squareup.okhttp.Interceptor() {
@Override
public com.squareup.okhttp.Response intercept(
com.squareup.okhttp.Interceptor.Chain chain)
throws IOException {
com.squareup.okhttp.Response originalResponse =
chain.proceed(chain.request());
return originalResponse
.newBuilder()
.body(
new ProgressResponseBody(
originalResponse.body(),
progressListener))
.build();
}
});
}

String[] localVarAuthNames = new String[] {"api_key"};
return apiClient.buildCall(
localVarPath,
"POST",
localVarQueryParams,
localVarCollectionQueryParams,
localVarPostBody,
localVarHeaderParams,
localVarFormParams,
localVarAuthNames,
progressRequestListener);
}

/**
* Execute a workflow synchronously (asynchronously)
*
Expand Down
Loading

0 comments on commit 2a94ed7

Please sign in to comment.