Skip to content

Commit

Permalink
Return recent task failures in query failure exception
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Feb 23, 2024
1 parent 9c0c159 commit acc40c9
Showing 1 changed file with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.SoftReference;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -123,6 +124,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -664,6 +666,7 @@ private static class Scheduler
private static final long SCHEDULER_MAX_DEBUG_INFO_FREQUENCY_MILLIS = MINUTES.toMillis(10);
private static final long SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS = SECONDS.toMillis(30);
private static final int EVENTS_DEBUG_INFOS_PER_BUCKET = 10;
private static final int TASK_FAILURES_LOG_SIZE = 5;

private final QueryStateMachine queryStateMachine;
private final Metadata metadata;
Expand Down Expand Up @@ -700,6 +703,7 @@ private static class Scheduler
private final Stopwatch noEventsStopwatch = Stopwatch.createUnstarted();
private final Stopwatch debugInfoStopwatch = Stopwatch.createUnstarted();
private final Optional<EventDebugInfos> eventDebugInfos;
private final Queue<Map.Entry<TaskId, RuntimeException>> taskFailures = new ArrayDeque<>(TASK_FAILURES_LOG_SIZE);

private boolean started;
private boolean runtimeAdaptivePartitioningApplied;
Expand Down Expand Up @@ -1016,6 +1020,11 @@ private boolean checkComplete()
RuntimeException failure = failureCause == null ?
new TrinoException(GENERIC_INTERNAL_ERROR, "stage failed due to unknown error: %s".formatted(execution.getStageId())) :
failureCause.toException();

taskFailures.forEach(taskFailure -> {
failure.addSuppressed(new RuntimeException("Task " + taskFailure.getKey() + " failed", taskFailure.getValue()));
});

queryStateMachine.transitionToFailed(failure);
return true;
}
Expand Down Expand Up @@ -1460,6 +1469,7 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map<Sta
};
StageExecution execution = new StageExecution(
taskDescriptorStorage,
taskFailures,
stage,
taskSource,
sinkPartitioningScheme,
Expand Down Expand Up @@ -1855,6 +1865,7 @@ private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo execut
public static class StageExecution
{
private final TaskDescriptorStorage taskDescriptorStorage;
private final Queue<Map.Entry<TaskId, RuntimeException>> taskFailures;

private final SqlStage stage;
private final EventDrivenTaskSource taskSource;
Expand Down Expand Up @@ -1889,6 +1900,7 @@ public static class StageExecution

private StageExecution(
TaskDescriptorStorage taskDescriptorStorage,
Queue<Map.Entry<TaskId, RuntimeException>> taskFailures,
SqlStage stage,
EventDrivenTaskSource taskSource,
FaultTolerantPartitioningScheme sinkPartitioningScheme,
Expand All @@ -1901,6 +1913,7 @@ private StageExecution(
DynamicFilterService dynamicFilterService)
{
this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
this.taskFailures = requireNonNull(taskFailures, "taskFailures is null");
this.stage = requireNonNull(stage, "stage is null");
this.taskSource = requireNonNull(taskSource, "taskSource is null");
this.sinkPartitioningScheme = requireNonNull(sinkPartitioningScheme, "sinkPartitioningScheme is null");
Expand Down Expand Up @@ -2362,6 +2375,8 @@ public List<PrioritizedScheduledTask> taskFailed(TaskId taskId, ExecutionFailure
return ImmutableList.of();
}

recordTaskFailureInLog(taskId, failure);

if (!partition.isSealed()) {
// don't reschedule speculative tasks
return ImmutableList.of();
Expand All @@ -2373,6 +2388,14 @@ public List<PrioritizedScheduledTask> taskFailed(TaskId taskId, ExecutionFailure
return ImmutableList.of(PrioritizedScheduledTask.create(stage.getStageId(), partitionId, schedulingPriority));
}

private void recordTaskFailureInLog(TaskId taskId, RuntimeException failure)
{
if (taskFailures.size() == Scheduler.TASK_FAILURES_LOG_SIZE) {
taskFailures.remove();
}
taskFailures.add(Map.entry(taskId, failure));
}

public MemoryRequirements getMemoryRequirements(int partitionId)
{
return getStagePartition(partitionId).getMemoryRequirements();
Expand Down

0 comments on commit acc40c9

Please sign in to comment.