Skip to content

Commit

Permalink
ParallelTaskGroup fixes (#541)
Browse files Browse the repository at this point in the history
- use the correct ExecutorManager method, rather than suppressing a warning
- remove a comment with incorrect information about ParallelTaskGroup being used only for Redshift (it is also used for Greenplum)
- (x2) replace throws Exception with a more precise declaration
- remove redundant extension of Callable
- remove redundant generics
- add missing annotations
  • Loading branch information
misolt committed Sep 4, 2024
1 parent 82ecf32 commit c08e096
Showing 1 changed file with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import com.google.common.base.Preconditions;
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
import com.google.edwmigration.dumper.plugin.ext.jdk.concurrent.ExecutorManager;
import java.util.concurrent.Callable;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.csv.CSVPrinter;

// Redshift is really slow, and is the only thing that uses this.
public class ParallelTaskGroup extends TaskGroup {

public ParallelTaskGroup(String name) {
Expand All @@ -42,21 +44,21 @@ public void addTask(Task<?> task) {
super.addTask(task);
}

private static class TaskRunner<T> implements Callable<T> {
private static class TaskRunner {

private final TaskRunContext context;
private final Task<T> task;
private final CSVPrinter printer;
@Nonnull private final TaskRunContext context;
@Nonnull private final Task<?> task;
@Nonnull private final CSVPrinter printer;

public TaskRunner(TaskRunContext context, Task<T> task, CSVPrinter printer) {
public TaskRunner(
@Nonnull TaskRunContext context, @Nonnull Task<?> task, @Nonnull CSVPrinter printer) {
this.context = context;
this.task = task;
this.printer = printer;
}

@Override
public T call() throws Exception {
T result = context.runChildTask(task);
public @Nullable Object call() throws IOException {
Object result = context.runChildTask(task);
TaskState state = context.getTaskState(task);
synchronized (printer) {
printer.printRecord(task, state);
Expand All @@ -66,15 +68,16 @@ public T call() throws Exception {
}

@Override
@SuppressWarnings(
"FutureReturnValueIgnored") // It's an ExecutorManager, which tracks the Future internally.
protected void doRun(TaskRunContext context, CSVPrinter printer, Handle handle) throws Exception {
protected void doRun(
@Nonnull TaskRunContext context, @Nonnull CSVPrinter printer, @Nonnull Handle handle)
throws ExecutionException, InterruptedException {
// Throws ExecutionException if any sub-task threw. However, runChildTask() is nothrow, so that
// never happens.
// We safely publish the CSVPrinter to the ExecutorManager.
try (ExecutorManager executorManager = new ExecutorManager(context.getExecutorService())) {
for (Task<?> task : getTasks()) {
executorManager.submit(new TaskRunner<>(context, task, printer));
TaskRunner runner = new TaskRunner(context, task, printer);
executorManager.execute(runner::call);
}
}
// We now, by the t-w-r, safely collect the CSVPrinter from the sub-threads.
Expand Down

0 comments on commit c08e096

Please sign in to comment.