From e98bde4a250b3d52f34436504931f3fcc56a5180 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Fri, 13 May 2022 14:43:29 -0700 Subject: [PATCH 1/2] make compatible with 3.8.1 --- build.gradle | 2 +- .../client/automator/TaskRunner.java | 85 +++++++++++-------- .../automator/TaskRunnerConfigurer.java | 8 +- 3 files changed, 59 insertions(+), 36 deletions(-) diff --git a/build.gradle b/build.gradle index 9897766a..5422b9b0 100644 --- a/build.gradle +++ b/build.gradle @@ -20,7 +20,7 @@ ext { versions = [ awaitility : '3.1.6', commonsLang: '3.12.0', - conductor : '3.5.0', + conductor : '3.8.1', eureka : '1.10.10', groovy : '2.5.15', jackson : '2.11.4!!', diff --git a/src/main/java/io/orkes/conductor/client/automator/TaskRunner.java b/src/main/java/io/orkes/conductor/client/automator/TaskRunner.java index 3b1974b9..49b14e0f 100644 --- a/src/main/java/io/orkes/conductor/client/automator/TaskRunner.java +++ b/src/main/java/io/orkes/conductor/client/automator/TaskRunner.java @@ -8,7 +8,6 @@ import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskResult; -import com.netflix.conductor.common.utils.RetryUtil; import com.netflix.discovery.EurekaClient; import com.netflix.spectator.api.Registry; import com.netflix.spectator.api.Spectator; @@ -27,6 +26,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.*; +import java.util.function.Function; class TaskRunner { private static final Logger LOGGER = LoggerFactory.getLogger(TaskRunner.class); @@ -224,43 +224,32 @@ private void executeTask(Worker worker, Task task) { worker.getClass().getSimpleName(), worker.getIdentity(), result.getStatus()); - updateWithRetry(updateRetryCount, task, result, worker); + updateTaskResult(updateRetryCount, task, result, worker); } - private void updateWithRetry(int count, Task task, TaskResult result, Worker worker) { + private void updateTaskResult(int count, Task task, TaskResult result, Worker worker) { try { - String updateTaskDesc = String.format( - "Retry updating task result: %s for task: %s in worker: %s", - result.toString(), task.getTaskDefName(), worker.getIdentity()); - String evaluatePayloadDesc = String.format( - "Evaluate Task payload for task: %s in worker: %s", - task.getTaskDefName(), worker.getIdentity()); - String methodName = "updateWithRetry"; - TaskResult finalResult = new RetryUtil() - .retryOnException( - () -> { - TaskResult taskResult = result.copy(); - taskClient.evaluateAndUploadLargePayload( - taskResult, task.getTaskType()); - return taskResult; - }, - null, - null, + // upload if necessary + Optional optionalExternalStorageLocation = + retryOperation( + (TaskResult taskResult) -> upload(taskResult, task.getTaskType()), count, - evaluatePayloadDesc, - methodName); + result, + "evaluateAndUploadLargePayload"); - new RetryUtil<>() - .retryOnException( - () -> { - taskClient.updateTask(finalResult); - return null; - }, - null, - null, - count, - updateTaskDesc, - methodName); + if (optionalExternalStorageLocation.isPresent()) { + result.setExternalOutputPayloadStoragePath(optionalExternalStorageLocation.get()); + result.setOutputData(null); + } + + retryOperation( + (TaskResult taskResult) -> { + taskClient.updateTask(taskResult); + return null; + }, + count, + result, + "updateTask"); } catch (Exception e) { worker.onErrorUpdate(task); MetricsContainer.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e); @@ -272,6 +261,34 @@ private void updateWithRetry(int count, Task task, TaskResult result, Worker wor } } + private Optional upload(TaskResult result, String taskType) { + try { + return taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType); + } catch (IllegalArgumentException iae) { + result.setReasonForIncompletion(iae.getMessage()); + result.setOutputData(null); + result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR); + return Optional.empty(); + } + } + + private R retryOperation(Function operation, int count, T input, String opName) { + int index = 0; + while (index < count) { + try { + return operation.apply(input); + } catch (Exception e) { + index++; + try { + Thread.sleep(500L); + } catch (InterruptedException ie) { + LOGGER.error("Retry interrupted", ie); + } + } + } + throw new RuntimeException("Exhausted retries performing " + opName); + } + private void handleException(Throwable t, TaskResult result, Worker worker, Task task) { LOGGER.error(String.format("Error while executing task %s", task.toString()), t); MetricsContainer.incrementTaskExecutionErrorCount(worker.getTaskDefName(), t); @@ -280,6 +297,6 @@ private void handleException(Throwable t, TaskResult result, Worker worker, Task StringWriter stringWriter = new StringWriter(); t.printStackTrace(new PrintWriter(stringWriter)); result.log(stringWriter.toString()); - updateWithRetry(updateRetryCount, task, result, worker); + updateTaskResult(updateRetryCount, task, result, worker); } } diff --git a/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java b/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java index af7dde30..92785a45 100644 --- a/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java +++ b/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java @@ -4,7 +4,6 @@ import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.client.worker.Worker; import com.netflix.discovery.EurekaClient; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,11 +136,17 @@ public TaskRunnerConfigurer.Builder withTaskToDomain(Map taskToD return this; } + public TaskRunnerConfigurer.Builder withTaskThreadCount(Map taskToThreadCount) { + this.taskToThreadCount = taskToThreadCount; + return this; + } + public TaskRunnerConfigurer.Builder withTaskToThreadCount(Map taskToThreadCount) { this.taskToThreadCount = taskToThreadCount; return this; } + public TaskRunnerConfigurer.Builder withTaskToTimeout(Map taskToTimeout) { this.taskToTimeout = taskToTimeout; return this; @@ -154,6 +159,7 @@ public TaskRunnerConfigurer.Builder withTaskToTimeout(Map taskT * Please see {@link TaskRunnerConfigurer#init()} method. The method must be * called after * this constructor for the polling to start. + * @return Builder instance */ public TaskRunnerConfigurer build() { return new TaskRunnerConfigurer(this); From e4ea4c02dcb203bc1447bb1b00b2dad4c3fe553e Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Fri, 13 May 2022 14:45:08 -0700 Subject: [PATCH 2/2] Update gradle.properties --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index caeaf120..62044170 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.2.0 +version=0.3.0