Skip to content

Commit

Permalink
Merge pull request #152 from orkes-io/workflow_apis
Browse files Browse the repository at this point in the history
Workflow idempotency changes.
  • Loading branch information
manan164 authored Jan 29, 2024
2 parents ad5b2d4 + 2b8e595 commit d55a13a
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 152 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ext {
versions = [
awaitility : '4.2.0',
commonsLang : '3.12.0',
conductor : '3.9.24-orkes',
conductor : '3.9.30-orkes',
jackson : '2.11.4!!',
junit : '5.9.0',
slf4j : '1.7.36',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.netflix.conductor.common.run.WorkflowTestRequest;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;

import io.orkes.conductor.client.http.ConflictException;

public abstract class WorkflowClient {

/** Creates a default workflow client */
Expand All @@ -39,7 +41,7 @@ public WorkflowClient() {
* @param startWorkflowRequest the {@link StartWorkflowRequest} object to start the workflow
* @return the id of the workflow instance that can be used for tracking
*/
public abstract String startWorkflow(StartWorkflowRequest startWorkflowRequest);
public abstract String startWorkflow(StartWorkflowRequest startWorkflowRequest) throws ConflictException;

/**
* Retrieve a workflow by workflow id
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/io/orkes/conductor/client/ApiClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.threeten.bp.OffsetDateTime;
import org.threeten.bp.format.DateTimeFormatter;

import com.netflix.conductor.common.validation.ErrorResponse;

import io.orkes.conductor.client.http.*;
import io.orkes.conductor.client.http.api.TokenResourceApi;
import io.orkes.conductor.client.http.auth.ApiKeyAuth;
Expand Down Expand Up @@ -863,6 +865,19 @@ public <T> T handleResponse(Response response, Type returnType) throws ApiExcept
if (response.body() != null) {
try {
respBody = response.body().string();
if (response.code() == 409) {
ErrorResponse errorResponse = json.deserialize(respBody, ErrorResponse.class);
String message = null;
if (errorResponse != null && errorResponse.getMessage() != null) {
message = errorResponse.getMessage();
} else {
message = response.message();
}
throw new ConflictException(
message,
response.code(),
response.headers().toMultimap(), respBody);
}
} catch (IOException e) {
throw new ApiException(
response.message(),
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/io/orkes/conductor/client/WorkflowClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.netflix.conductor.common.run.WorkflowTestRequest;

import io.orkes.conductor.client.http.ApiException;
import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest;
import io.orkes.conductor.client.model.WorkflowStateUpdate;
import io.orkes.conductor.client.model.WorkflowStatus;
import io.orkes.conductor.common.model.WorkflowRun;
Expand Down Expand Up @@ -94,8 +93,6 @@ public abstract Map<String, List<Workflow>> getWorkflowsByNamesAndCorrelationIds
*/
public abstract Workflow updateVariables(String workflowId, Map<String, Object> variables);

public abstract void jumpToTask(String workflowId, JumpWorkflowExecutionRequest jumpWorkflowExecutionRequest);

public abstract void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest body);

/**
Expand All @@ -107,7 +104,7 @@ public abstract Map<String, List<Workflow>> getWorkflowsByNamesAndCorrelationIds
* the call will return with the current status of the workflow
* @param updateRequest Payload for updating state of workflow.
*
* @return
* @return Returns updated workflow execution
*/
public abstract WorkflowRun updateWorkflow(String workflowId, List<String> waitUntilTaskRefNames, Integer waitForSeconds,
WorkflowStateUpdate updateRequest);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2023 Orkes, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.orkes.conductor.client.http;

import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;

import io.orkes.conductor.client.OrkesClientException;

public class ConflictException extends OrkesClientException {

private int code;
private Map<String, List<String>> responseHeaders;

private String responseBody;

private String message;

public ConflictException(
String message,
Throwable throwable,
int code,
Map<String, List<String>> responseHeaders,
String responseBody) {
super(message, throwable);
super.setCode(String.valueOf(code));
super.setStatus(code);
this.code = code;
this.responseHeaders = responseHeaders;
this.responseBody = responseBody;
this.message = message;
}

public ConflictException(String message, int code, Map<String, List<String>> responseHeaders, String responseBody) {
this(message, null, code, responseHeaders, responseBody);
super.setCode(String.valueOf(code));
super.setStatus(code);
super.setMessage(message);
this.code = code;
this.message = message;
this.responseBody = responseBody;
}

/**
*
* @return HTTP status code
*/
public int getStatusCode() {
return code;
}

/**
* Get the HTTP response headers.
*
* @return A map of list of string
*/
public Map<String, List<String>> getResponseHeaders() {
return responseHeaders;
}

@Override
public String getMessage() {
return getStatusCode()
+ ":"
+ (StringUtils.isBlank(responseBody) ? super.getMessage() : responseBody);
}

@Override
public String toString() {
return responseBody;
}

@Override
public boolean isClientError() {
return code > 399 && code < 499;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.orkes.conductor.client.http.api.WorkflowBulkResourceApi;
import io.orkes.conductor.client.http.api.WorkflowResourceApi;
import io.orkes.conductor.client.model.CorrelationIdsSearchRequest;
import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest;
import io.orkes.conductor.client.model.WorkflowStateUpdate;
import io.orkes.conductor.client.model.WorkflowStatus;
import io.orkes.conductor.common.model.WorkflowRun;
Expand Down Expand Up @@ -88,12 +87,12 @@ public WorkflowClient withConnectTimeout(int connectTimeout) {
}

@Override
public String startWorkflow(StartWorkflowRequest startWorkflowRequest) {
public String startWorkflow(StartWorkflowRequest startWorkflowRequest) throws ConflictException {
return httpClient.startWorkflow(startWorkflowRequest);
}

@Override
public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest request, String waitUntilTask) {
public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest request, String waitUntilTask) throws ConflictException {
if(apiClient.isUseGRPC()) {
return grpcWorkflowClient.executeWorkflow(request, waitUntilTask);
} else {
Expand All @@ -102,7 +101,7 @@ public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest reque
}

@Override
public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest request, String waitUntilTask, Integer waitForSeconds) {
public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest request, String waitUntilTask, Integer waitForSeconds) throws ConflictException {
if(apiClient.isUseGRPC()) {
return grpcWorkflowClient.executeWorkflow(request, waitUntilTask, waitForSeconds);
} else {
Expand Down Expand Up @@ -343,11 +342,6 @@ public void shutdown() {
}
}

@Override
public void jumpToTask(String workflowId, JumpWorkflowExecutionRequest jumpWorkflowExecutionRequest) {
httpClient.jumpToTaskWithHttpInfo(jumpWorkflowExecutionRequest, workflowId);
}

@Override
public void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest upgradeWorkflowRequest ) {
httpClient.upgradeRunningWorkflow(upgradeWorkflowRequest, workflowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3834,95 +3834,6 @@ private Call updateVariablesCall(String workflowId, Map<String, Object> variable
null);
}

/**
* Build call for jumpToTask
* @param body (required)
* @param workflowId (required)
* @param progressListener Progress listener
* @param progressRequestListener Progress request listener
* @return Call to execute
* @throws ApiException If fail to serialize the request body object
*/
public com.squareup.okhttp.Call jumpToTaskCall(JumpWorkflowExecutionRequest body, String workflowId, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException {
Object localVarPostBody = body;

// create path and map variables
String localVarPath = "/workflow/{workflowId}/jump"
.replaceAll("\\{" + "workflowId" + "\\}", apiClient.escapeString(workflowId.toString()));

List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();

Map<String, String> localVarHeaderParams = new HashMap<String, String>();

Map<String, Object> localVarFormParams = new HashMap<String, Object>();

final String[] localVarAccepts = {

};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
if (localVarAccept != null) localVarHeaderParams.put("Accept", localVarAccept);

final String[] localVarContentTypes = {
"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
localVarHeaderParams.put("Content-Type", localVarContentType);

if(progressListener != null) {
apiClient.getHttpClient().networkInterceptors().add(new com.squareup.okhttp.Interceptor() {
@Override
public com.squareup.okhttp.Response intercept(com.squareup.okhttp.Interceptor.Chain chain) throws IOException {
com.squareup.okhttp.Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new ProgressResponseBody(originalResponse.body(), progressListener))
.build();
}
});
}

String[] localVarAuthNames = new String[] { "api_key" };
return apiClient.buildCall(localVarPath, "POST", localVarQueryParams, localVarCollectionQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarAuthNames, progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call jumpToTaskValidateBeforeCall(JumpWorkflowExecutionRequest body, String workflowId, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException {
// verify the required parameter 'body' is set
if (body == null) {
throw new ApiException("Missing the required parameter 'body' when calling jumpToTask(Async)");
}
// verify the required parameter 'workflowId' is set
if (workflowId == null) {
throw new ApiException("Missing the required parameter 'workflowId' when calling jumpToTask(Async)");
}

com.squareup.okhttp.Call call = jumpToTaskCall(body, workflowId, progressListener, progressRequestListener);
return call;
}

/**
* Jump workflow execution to given task
* Jump workflow execution to given task.
* @param body (required)
* @param workflowId (required)
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body
*/
public void jumpToTask(JumpWorkflowExecutionRequest body, String workflowId) throws ApiException {
jumpToTaskWithHttpInfo(body, workflowId);
}

/**
* Jump workflow execution to given task
* Jump workflow execution to given task.
* @param body (required)
* @param workflowId (required)
* @return ApiResponse&lt;Void&gt;
* @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body
*/
public ApiResponse<Void> jumpToTaskWithHttpInfo(JumpWorkflowExecutionRequest body, String workflowId) throws ApiException {
com.squareup.okhttp.Call call = jumpToTaskValidateBeforeCall(body, workflowId, null, null);
return apiClient.execute(call);
}
/**
* Upgrade running workflow to newer version
* Upgrade running workflow to newer version
Expand Down

This file was deleted.

2 changes: 2 additions & 0 deletions src/main/java/io/orkes/conductor/client/model/TagObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public static TypeEnum fromValue(String input) {
}

@SerializedName("type")
@Deprecated
// This is not required anymore. Type has been moved to WorkflowDef.RateLimitConfig as METADATA type
private TypeEnum type = null;

@SerializedName("value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.orkes.conductor.client.util.ApiUtil;

public abstract class ClientTest {

protected static OrkesClients orkesClients;

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public void testSkipTaskFromWorkflow() throws Exception {
() -> workflowClient.terminateWorkflowsWithFailure(List.of(workflowId), null, false));
}

@Test
public void testUpdateVariables() {
ConductorWorkflow<Object> workflow = new ConductorWorkflow<>(workflowExecutor);
workflow.add(new SimpleTask("simple_task", "simple_task_ref"));
Expand Down
Loading

0 comments on commit d55a13a

Please sign in to comment.