diff --git a/build.gradle b/build.gradle index 36ae3def..25a7f546 100644 --- a/build.gradle +++ b/build.gradle @@ -20,7 +20,7 @@ ext { versions = [ awaitility : '4.2.0', commonsLang : '3.12.0', - conductor : '3.9.24-orkes', + conductor : '3.9.30-orkes', jackson : '2.11.4!!', junit : '5.9.0', slf4j : '1.7.36', diff --git a/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 914c08a9..dbd37c2b 100644 --- a/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -23,6 +23,8 @@ import com.netflix.conductor.common.run.WorkflowTestRequest; import com.netflix.conductor.common.utils.ExternalPayloadStorage; +import io.orkes.conductor.client.http.ConflictException; + public abstract class WorkflowClient { /** Creates a default workflow client */ @@ -39,7 +41,7 @@ public WorkflowClient() { * @param startWorkflowRequest the {@link StartWorkflowRequest} object to start the workflow * @return the id of the workflow instance that can be used for tracking */ - public abstract String startWorkflow(StartWorkflowRequest startWorkflowRequest); + public abstract String startWorkflow(StartWorkflowRequest startWorkflowRequest) throws ConflictException; /** * Retrieve a workflow by workflow id diff --git a/src/main/java/io/orkes/conductor/client/ApiClient.java b/src/main/java/io/orkes/conductor/client/ApiClient.java index d1737ca2..bdf234df 100644 --- a/src/main/java/io/orkes/conductor/client/ApiClient.java +++ b/src/main/java/io/orkes/conductor/client/ApiClient.java @@ -48,6 +48,8 @@ import org.threeten.bp.OffsetDateTime; import org.threeten.bp.format.DateTimeFormatter; +import com.netflix.conductor.common.validation.ErrorResponse; + import io.orkes.conductor.client.http.*; import io.orkes.conductor.client.http.api.TokenResourceApi; import io.orkes.conductor.client.http.auth.ApiKeyAuth; @@ -863,6 +865,19 @@ public T handleResponse(Response response, Type returnType) throws ApiExcept if (response.body() != null) { try { respBody = response.body().string(); + if (response.code() == 409) { + ErrorResponse errorResponse = json.deserialize(respBody, ErrorResponse.class); + String message = null; + if (errorResponse != null && errorResponse.getMessage() != null) { + message = errorResponse.getMessage(); + } else { + message = response.message(); + } + throw new ConflictException( + message, + response.code(), + response.headers().toMultimap(), respBody); + } } catch (IOException e) { throw new ApiException( response.message(), diff --git a/src/main/java/io/orkes/conductor/client/WorkflowClient.java b/src/main/java/io/orkes/conductor/client/WorkflowClient.java index 0a8655ad..b5e08958 100644 --- a/src/main/java/io/orkes/conductor/client/WorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/WorkflowClient.java @@ -26,7 +26,6 @@ import com.netflix.conductor.common.run.WorkflowTestRequest; import io.orkes.conductor.client.http.ApiException; -import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest; import io.orkes.conductor.client.model.WorkflowStateUpdate; import io.orkes.conductor.client.model.WorkflowStatus; import io.orkes.conductor.common.model.WorkflowRun; @@ -94,8 +93,6 @@ public abstract Map> getWorkflowsByNamesAndCorrelationIds */ public abstract Workflow updateVariables(String workflowId, Map variables); - public abstract void jumpToTask(String workflowId, JumpWorkflowExecutionRequest jumpWorkflowExecutionRequest); - public abstract void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest body); /** @@ -107,7 +104,7 @@ public abstract Map> getWorkflowsByNamesAndCorrelationIds * the call will return with the current status of the workflow * @param updateRequest Payload for updating state of workflow. * - * @return + * @return Returns updated workflow execution */ public abstract WorkflowRun updateWorkflow(String workflowId, List waitUntilTaskRefNames, Integer waitForSeconds, WorkflowStateUpdate updateRequest); diff --git a/src/main/java/io/orkes/conductor/client/http/ConflictException.java b/src/main/java/io/orkes/conductor/client/http/ConflictException.java new file mode 100644 index 00000000..ae4d54f2 --- /dev/null +++ b/src/main/java/io/orkes/conductor/client/http/ConflictException.java @@ -0,0 +1,89 @@ +/* + * Copyright 2023 Orkes, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.http; + +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; + +import io.orkes.conductor.client.OrkesClientException; + +public class ConflictException extends OrkesClientException { + + private int code; + private Map> responseHeaders; + + private String responseBody; + + private String message; + + public ConflictException( + String message, + Throwable throwable, + int code, + Map> responseHeaders, + String responseBody) { + super(message, throwable); + super.setCode(String.valueOf(code)); + super.setStatus(code); + this.code = code; + this.responseHeaders = responseHeaders; + this.responseBody = responseBody; + this.message = message; + } + + public ConflictException(String message, int code, Map> responseHeaders, String responseBody) { + this(message, null, code, responseHeaders, responseBody); + super.setCode(String.valueOf(code)); + super.setStatus(code); + super.setMessage(message); + this.code = code; + this.message = message; + this.responseBody = responseBody; + } + + /** + * + * @return HTTP status code + */ + public int getStatusCode() { + return code; + } + + /** + * Get the HTTP response headers. + * + * @return A map of list of string + */ + public Map> getResponseHeaders() { + return responseHeaders; + } + + @Override + public String getMessage() { + return getStatusCode() + + ":" + + (StringUtils.isBlank(responseBody) ? super.getMessage() : responseBody); + } + + @Override + public String toString() { + return responseBody; + } + + @Override + public boolean isClientError() { + return code > 399 && code < 499; + } +} diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index 59d530d4..3eca4225 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -34,7 +34,6 @@ import io.orkes.conductor.client.http.api.WorkflowBulkResourceApi; import io.orkes.conductor.client.http.api.WorkflowResourceApi; import io.orkes.conductor.client.model.CorrelationIdsSearchRequest; -import io.orkes.conductor.client.model.JumpWorkflowExecutionRequest; import io.orkes.conductor.client.model.WorkflowStateUpdate; import io.orkes.conductor.client.model.WorkflowStatus; import io.orkes.conductor.common.model.WorkflowRun; @@ -88,12 +87,12 @@ public WorkflowClient withConnectTimeout(int connectTimeout) { } @Override - public String startWorkflow(StartWorkflowRequest startWorkflowRequest) { + public String startWorkflow(StartWorkflowRequest startWorkflowRequest) throws ConflictException { return httpClient.startWorkflow(startWorkflowRequest); } @Override - public CompletableFuture executeWorkflow(StartWorkflowRequest request, String waitUntilTask) { + public CompletableFuture executeWorkflow(StartWorkflowRequest request, String waitUntilTask) throws ConflictException { if(apiClient.isUseGRPC()) { return grpcWorkflowClient.executeWorkflow(request, waitUntilTask); } else { @@ -102,7 +101,7 @@ public CompletableFuture executeWorkflow(StartWorkflowRequest reque } @Override - public CompletableFuture executeWorkflow(StartWorkflowRequest request, String waitUntilTask, Integer waitForSeconds) { + public CompletableFuture executeWorkflow(StartWorkflowRequest request, String waitUntilTask, Integer waitForSeconds) throws ConflictException { if(apiClient.isUseGRPC()) { return grpcWorkflowClient.executeWorkflow(request, waitUntilTask, waitForSeconds); } else { @@ -343,11 +342,6 @@ public void shutdown() { } } - @Override - public void jumpToTask(String workflowId, JumpWorkflowExecutionRequest jumpWorkflowExecutionRequest) { - httpClient.jumpToTaskWithHttpInfo(jumpWorkflowExecutionRequest, workflowId); - } - @Override public void upgradeRunningWorkflow(String workflowId, UpgradeWorkflowRequest upgradeWorkflowRequest ) { httpClient.upgradeRunningWorkflow(upgradeWorkflowRequest, workflowId); diff --git a/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java b/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java index c761592f..30bad469 100644 --- a/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java +++ b/src/main/java/io/orkes/conductor/client/http/api/WorkflowResourceApi.java @@ -3834,95 +3834,6 @@ private Call updateVariablesCall(String workflowId, Map variable null); } - /** - * Build call for jumpToTask - * @param body (required) - * @param workflowId (required) - * @param progressListener Progress listener - * @param progressRequestListener Progress request listener - * @return Call to execute - * @throws ApiException If fail to serialize the request body object - */ - public com.squareup.okhttp.Call jumpToTaskCall(JumpWorkflowExecutionRequest body, String workflowId, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException { - Object localVarPostBody = body; - - // create path and map variables - String localVarPath = "/workflow/{workflowId}/jump" - .replaceAll("\\{" + "workflowId" + "\\}", apiClient.escapeString(workflowId.toString())); - - List localVarQueryParams = new ArrayList(); - List localVarCollectionQueryParams = new ArrayList(); - - Map localVarHeaderParams = new HashMap(); - - Map localVarFormParams = new HashMap(); - - final String[] localVarAccepts = { - - }; - final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); - if (localVarAccept != null) localVarHeaderParams.put("Accept", localVarAccept); - - final String[] localVarContentTypes = { - "application/json" - }; - final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); - localVarHeaderParams.put("Content-Type", localVarContentType); - - if(progressListener != null) { - apiClient.getHttpClient().networkInterceptors().add(new com.squareup.okhttp.Interceptor() { - @Override - public com.squareup.okhttp.Response intercept(com.squareup.okhttp.Interceptor.Chain chain) throws IOException { - com.squareup.okhttp.Response originalResponse = chain.proceed(chain.request()); - return originalResponse.newBuilder() - .body(new ProgressResponseBody(originalResponse.body(), progressListener)) - .build(); - } - }); - } - - String[] localVarAuthNames = new String[] { "api_key" }; - return apiClient.buildCall(localVarPath, "POST", localVarQueryParams, localVarCollectionQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarAuthNames, progressRequestListener); - } - - @SuppressWarnings("rawtypes") - private com.squareup.okhttp.Call jumpToTaskValidateBeforeCall(JumpWorkflowExecutionRequest body, String workflowId, final ProgressResponseBody.ProgressListener progressListener, final ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException { - // verify the required parameter 'body' is set - if (body == null) { - throw new ApiException("Missing the required parameter 'body' when calling jumpToTask(Async)"); - } - // verify the required parameter 'workflowId' is set - if (workflowId == null) { - throw new ApiException("Missing the required parameter 'workflowId' when calling jumpToTask(Async)"); - } - - com.squareup.okhttp.Call call = jumpToTaskCall(body, workflowId, progressListener, progressRequestListener); - return call; - } - - /** - * Jump workflow execution to given task - * Jump workflow execution to given task. - * @param body (required) - * @param workflowId (required) - * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body - */ - public void jumpToTask(JumpWorkflowExecutionRequest body, String workflowId) throws ApiException { - jumpToTaskWithHttpInfo(body, workflowId); - } - - /** - * Jump workflow execution to given task - * Jump workflow execution to given task. - * @param body (required) - * @param workflowId (required) - * @return ApiResponse<Void> - * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body - */ - public ApiResponse jumpToTaskWithHttpInfo(JumpWorkflowExecutionRequest body, String workflowId) throws ApiException { - com.squareup.okhttp.Call call = jumpToTaskValidateBeforeCall(body, workflowId, null, null); - return apiClient.execute(call); - } /** * Upgrade running workflow to newer version * Upgrade running workflow to newer version diff --git a/src/main/java/io/orkes/conductor/client/model/JumpWorkflowExecutionRequest.java b/src/main/java/io/orkes/conductor/client/model/JumpWorkflowExecutionRequest.java deleted file mode 100644 index 78f7ff08..00000000 --- a/src/main/java/io/orkes/conductor/client/model/JumpWorkflowExecutionRequest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2024 Orkes, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.orkes.conductor.client.model; - -import java.util.Map; - -public class JumpWorkflowExecutionRequest { - - public Map getSkippedTasksOutput() { - return skippedTasksOutput; - } - - public void setSkippedTasksOutput(Map skippedTasksOutput) { - this.skippedTasksOutput = skippedTasksOutput; - } - - public Map getJumpTaskInput() { - return jumpTaskInput; - } - - public void setJumpTaskInput(Map jumpTaskInput) { - this.jumpTaskInput = jumpTaskInput; - } - - public String getTaskReferenceName() { - return taskReferenceName; - } - - public void setTaskReferenceName(String taskReferenceName) { - this.taskReferenceName = taskReferenceName; - } - private Map skippedTasksOutput; - - private Map jumpTaskInput; - - private String taskReferenceName; - -} diff --git a/src/main/java/io/orkes/conductor/client/model/TagObject.java b/src/main/java/io/orkes/conductor/client/model/TagObject.java index 6a487301..5c2bdba4 100644 --- a/src/main/java/io/orkes/conductor/client/model/TagObject.java +++ b/src/main/java/io/orkes/conductor/client/model/TagObject.java @@ -55,6 +55,8 @@ public static TypeEnum fromValue(String input) { } @SerializedName("type") + @Deprecated + // This is not required anymore. Type has been moved to WorkflowDef.RateLimitConfig as METADATA type private TypeEnum type = null; @SerializedName("value") diff --git a/src/test/java/io/orkes/conductor/client/api/ClientTest.java b/src/test/java/io/orkes/conductor/client/api/ClientTest.java index 81171714..f162b2af 100644 --- a/src/test/java/io/orkes/conductor/client/api/ClientTest.java +++ b/src/test/java/io/orkes/conductor/client/api/ClientTest.java @@ -16,6 +16,7 @@ import io.orkes.conductor.client.util.ApiUtil; public abstract class ClientTest { + protected static OrkesClients orkesClients; static { diff --git a/src/test/java/io/orkes/conductor/client/api/WorkflowClientTests.java b/src/test/java/io/orkes/conductor/client/api/WorkflowClientTests.java index b793c5f5..e5a104c9 100644 --- a/src/test/java/io/orkes/conductor/client/api/WorkflowClientTests.java +++ b/src/test/java/io/orkes/conductor/client/api/WorkflowClientTests.java @@ -206,6 +206,7 @@ public void testSkipTaskFromWorkflow() throws Exception { () -> workflowClient.terminateWorkflowsWithFailure(List.of(workflowId), null, false)); } + @Test public void testUpdateVariables() { ConductorWorkflow workflow = new ConductorWorkflow<>(workflowExecutor); workflow.add(new SimpleTask("simple_task", "simple_task_ref")); diff --git a/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java b/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java index 1505179f..e18f85b8 100644 --- a/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java +++ b/src/test/java/io/orkes/conductor/client/api/WorkflowStateUpdateTests.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -23,16 +24,19 @@ import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.metadata.workflow.IdempotencyStrategy; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; import com.netflix.conductor.common.run.Workflow; import io.orkes.conductor.client.WorkflowClient; +import io.orkes.conductor.client.http.ConflictException; import io.orkes.conductor.client.model.WorkflowStateUpdate; import io.orkes.conductor.common.model.WorkflowRun; import lombok.SneakyThrows; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class WorkflowStateUpdateTests extends ClientTest { @@ -96,4 +100,29 @@ public void test() { .collect(Collectors.toList())); } + + @Test + public void testIdempotency() { + StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest(); + startWorkflowRequest.setName("sync_task_variable_updates"); + startWorkflowRequest.setVersion(1); + String idempotencyKey = UUID.randomUUID().toString(); + startWorkflowRequest.setIdempotencyKey(idempotencyKey); + startWorkflowRequest.setIdempotencyStrategy(IdempotencyStrategy.FAIL); + String workflowId = workflowClient.startWorkflow(startWorkflowRequest); + + + startWorkflowRequest.setIdempotencyStrategy(IdempotencyStrategy.RETURN_EXISTING); + String workflowId2 = workflowClient.startWorkflow(startWorkflowRequest); + assertEquals(workflowId, workflowId2); + + startWorkflowRequest.setIdempotencyStrategy(IdempotencyStrategy.FAIL); + boolean conflict = false; + try { + workflowClient.startWorkflow(startWorkflowRequest); + } catch (ConflictException ce) { + conflict = true; + } + assertTrue(conflict); + } } diff --git a/src/test/java/io/orkes/conductor/client/util/Commons.java b/src/test/java/io/orkes/conductor/client/util/Commons.java index f69e5626..4c92caf3 100644 --- a/src/test/java/io/orkes/conductor/client/util/Commons.java +++ b/src/test/java/io/orkes/conductor/client/util/Commons.java @@ -32,6 +32,7 @@ public class Commons { public static TagObject getTagObject() { TagObject tagObject = new TagObject(); + tagObject.setType(null); tagObject.setKey("a"); tagObject.setValue("b"); return tagObject; @@ -39,6 +40,7 @@ public static TagObject getTagObject() { public static TagString getTagString() { TagString tagString = new TagString(); + tagString.setType(null); tagString.setKey("a"); tagString.setValue("b"); return tagString;