Skip to content

Commit

Permalink
Merge pull request #9 from orkes-io/3.8.1-compatibility
Browse files Browse the repository at this point in the history
make compatible with 3.8.1
  • Loading branch information
v1r3n authored May 13, 2022
2 parents e96eeeb + e4ea4c0 commit cdbfa01
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 37 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ext {
versions = [
awaitility : '3.1.6',
commonsLang: '3.12.0',
conductor : '3.5.0',
conductor : '3.8.1',
eureka : '1.10.10',
groovy : '2.5.15',
jackson : '2.11.4!!',
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.2.0
version=0.3.0
85 changes: 51 additions & 34 deletions src/main/java/io/orkes/conductor/client/automator/TaskRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.discovery.EurekaClient;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
Expand All @@ -27,6 +26,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.Function;

class TaskRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskRunner.class);
Expand Down Expand Up @@ -224,43 +224,32 @@ private void executeTask(Worker worker, Task task) {
worker.getClass().getSimpleName(),
worker.getIdentity(),
result.getStatus());
updateWithRetry(updateRetryCount, task, result, worker);
updateTaskResult(updateRetryCount, task, result, worker);
}

private void updateWithRetry(int count, Task task, TaskResult result, Worker worker) {
private void updateTaskResult(int count, Task task, TaskResult result, Worker worker) {
try {
String updateTaskDesc = String.format(
"Retry updating task result: %s for task: %s in worker: %s",
result.toString(), task.getTaskDefName(), worker.getIdentity());
String evaluatePayloadDesc = String.format(
"Evaluate Task payload for task: %s in worker: %s",
task.getTaskDefName(), worker.getIdentity());
String methodName = "updateWithRetry";
TaskResult finalResult = new RetryUtil<TaskResult>()
.retryOnException(
() -> {
TaskResult taskResult = result.copy();
taskClient.evaluateAndUploadLargePayload(
taskResult, task.getTaskType());
return taskResult;
},
null,
null,
// upload if necessary
Optional<String> optionalExternalStorageLocation =
retryOperation(
(TaskResult taskResult) -> upload(taskResult, task.getTaskType()),
count,
evaluatePayloadDesc,
methodName);
result,
"evaluateAndUploadLargePayload");

new RetryUtil<>()
.retryOnException(
() -> {
taskClient.updateTask(finalResult);
return null;
},
null,
null,
count,
updateTaskDesc,
methodName);
if (optionalExternalStorageLocation.isPresent()) {
result.setExternalOutputPayloadStoragePath(optionalExternalStorageLocation.get());
result.setOutputData(null);
}

retryOperation(
(TaskResult taskResult) -> {
taskClient.updateTask(taskResult);
return null;
},
count,
result,
"updateTask");
} catch (Exception e) {
worker.onErrorUpdate(task);
MetricsContainer.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
Expand All @@ -272,6 +261,34 @@ private void updateWithRetry(int count, Task task, TaskResult result, Worker wor
}
}

private Optional<String> upload(TaskResult result, String taskType) {
try {
return taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType);
} catch (IllegalArgumentException iae) {
result.setReasonForIncompletion(iae.getMessage());
result.setOutputData(null);
result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
return Optional.empty();
}
}

private <T, R> R retryOperation(Function<T, R> operation, int count, T input, String opName) {
int index = 0;
while (index < count) {
try {
return operation.apply(input);
} catch (Exception e) {
index++;
try {
Thread.sleep(500L);
} catch (InterruptedException ie) {
LOGGER.error("Retry interrupted", ie);
}
}
}
throw new RuntimeException("Exhausted retries performing " + opName);
}

private void handleException(Throwable t, TaskResult result, Worker worker, Task task) {
LOGGER.error(String.format("Error while executing task %s", task.toString()), t);
MetricsContainer.incrementTaskExecutionErrorCount(worker.getTaskDefName(), t);
Expand All @@ -280,6 +297,6 @@ private void handleException(Throwable t, TaskResult result, Worker worker, Task
StringWriter stringWriter = new StringWriter();
t.printStackTrace(new PrintWriter(stringWriter));
result.log(stringWriter.toString());
updateWithRetry(updateRetryCount, task, result, worker);
updateTaskResult(updateRetryCount, task, result, worker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.discovery.EurekaClient;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -137,11 +136,17 @@ public TaskRunnerConfigurer.Builder withTaskToDomain(Map<String, String> taskToD
return this;
}

public TaskRunnerConfigurer.Builder withTaskThreadCount(Map<String, Integer> taskToThreadCount) {
this.taskToThreadCount = taskToThreadCount;
return this;
}

public TaskRunnerConfigurer.Builder withTaskToThreadCount(Map<String, Integer> taskToThreadCount) {
this.taskToThreadCount = taskToThreadCount;
return this;
}


public TaskRunnerConfigurer.Builder withTaskToTimeout(Map<String, Integer> taskToTimeout) {
this.taskToTimeout = taskToTimeout;
return this;
Expand All @@ -154,6 +159,7 @@ public TaskRunnerConfigurer.Builder withTaskToTimeout(Map<String, Integer> taskT
* Please see {@link TaskRunnerConfigurer#init()} method. The method must be
* called after
* this constructor for the polling to start.
* @return Builder instance
*/
public TaskRunnerConfigurer build() {
return new TaskRunnerConfigurer(this);
Expand Down

0 comments on commit cdbfa01

Please sign in to comment.