diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 5955a8f8d90c..4749eca08b3e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -115,6 +115,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.lang.ref.SoftReference; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -123,6 +124,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -664,6 +666,7 @@ private static class Scheduler private static final long SCHEDULER_MAX_DEBUG_INFO_FREQUENCY_MILLIS = MINUTES.toMillis(10); private static final long SCHEDULER_STALLED_DURATION_ON_TIME_EXCEEDED_THRESHOLD_MILLIS = SECONDS.toMillis(30); private static final int EVENTS_DEBUG_INFOS_PER_BUCKET = 10; + private static final int TASK_FAILURES_LOG_SIZE = 5; private final QueryStateMachine queryStateMachine; private final Metadata metadata; @@ -700,6 +703,7 @@ private static class Scheduler private final Stopwatch noEventsStopwatch = Stopwatch.createUnstarted(); private final Stopwatch debugInfoStopwatch = Stopwatch.createUnstarted(); private final Optional eventDebugInfos; + private final Queue> taskFailures = new ArrayDeque<>(TASK_FAILURES_LOG_SIZE); private boolean started; private boolean runtimeAdaptivePartitioningApplied; @@ -1016,6 +1020,11 @@ private boolean checkComplete() RuntimeException failure = failureCause == null ? new TrinoException(GENERIC_INTERNAL_ERROR, "stage failed due to unknown error: %s".formatted(execution.getStageId())) : failureCause.toException(); + + taskFailures.forEach(taskFailure -> { + failure.addSuppressed(new RuntimeException("Task " + taskFailure.getKey() + " failed", taskFailure.getValue())); + }); + queryStateMachine.transitionToFailed(failure); return true; } @@ -1460,6 +1469,7 @@ private void createStageExecution(SubPlan subPlan, boolean rootFragment, Map> taskFailures; private final SqlStage stage; private final EventDrivenTaskSource taskSource; @@ -1889,6 +1900,7 @@ public static class StageExecution private StageExecution( TaskDescriptorStorage taskDescriptorStorage, + Queue> taskFailures, SqlStage stage, EventDrivenTaskSource taskSource, FaultTolerantPartitioningScheme sinkPartitioningScheme, @@ -1901,6 +1913,7 @@ private StageExecution( DynamicFilterService dynamicFilterService) { this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null"); + this.taskFailures = requireNonNull(taskFailures, "taskFailures is null"); this.stage = requireNonNull(stage, "stage is null"); this.taskSource = requireNonNull(taskSource, "taskSource is null"); this.sinkPartitioningScheme = requireNonNull(sinkPartitioningScheme, "sinkPartitioningScheme is null"); @@ -2362,6 +2375,8 @@ public List taskFailed(TaskId taskId, ExecutionFailure return ImmutableList.of(); } + recordTaskFailureInLog(taskId, failure); + if (!partition.isSealed()) { // don't reschedule speculative tasks return ImmutableList.of(); @@ -2373,6 +2388,14 @@ public List taskFailed(TaskId taskId, ExecutionFailure return ImmutableList.of(PrioritizedScheduledTask.create(stage.getStageId(), partitionId, schedulingPriority)); } + private void recordTaskFailureInLog(TaskId taskId, RuntimeException failure) + { + if (taskFailures.size() == Scheduler.TASK_FAILURES_LOG_SIZE) { + taskFailures.remove(); + } + taskFailures.add(Map.entry(taskId, failure)); + } + public MemoryRequirements getMemoryRequirements(int partitionId) { return getStagePartition(partitionId).getMemoryRequirements();