diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ExponentialGrowthPartitionMemoryEstimator.java index 4b902f04027a..722324f2cec5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ExponentialGrowthPartitionMemoryEstimator.java @@ -41,6 +41,7 @@ import java.util.stream.IntStream; import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.units.DataSize.Unit.PETABYTE; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionDefaultCoordinatorTaskMemory; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionDefaultTaskMemory; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile; @@ -176,9 +177,14 @@ public MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements prev // start with the maximum of previously used memory and actual usage DataSize newMemory = Ordering.natural().max(peakMemoryUsage, previousMemory); if (shouldIncreaseMemoryRequirement(errorCode)) { - // multiply if we hit an oom error - - newMemory = DataSize.of((long) (newMemory.toBytes() * growthFactor), DataSize.Unit.BYTE); + if (remainingAttempts == 1) { + // on last attempt try as much memory as possible + newMemory = DataSize.of(1, PETABYTE); + } + else { + // multiply if we hit an oom error + newMemory = DataSize.of((long) (newMemory.toBytes() * growthFactor), DataSize.Unit.BYTE); + } } // if we are still below current estimate for new partition let's bump further diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestExponentialGrowthPartitionMemoryEstimator.java index 6222df56b589..7eeee548fbbb 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestExponentialGrowthPartitionMemoryEstimator.java @@ -131,6 +131,32 @@ public void testEstimator() 5)) .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE))); + assertThat( + estimator.getNextRetryMemoryRequirements( + new MemoryRequirements(DataSize.of(50, MEGABYTE)), + DataSize.of(10, MEGABYTE), + EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode(), + 2)) + .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE))); + + // on last retry we expect whole node on memory error + assertThat( + estimator.getNextRetryMemoryRequirements( + new MemoryRequirements(DataSize.of(50, MEGABYTE)), + DataSize.of(10, MEGABYTE), + StandardErrorCode.TOO_MANY_REQUESTS_FAILED.toErrorCode(), + 1)) + .isEqualTo(new MemoryRequirements(DataSize.of(64, GIGABYTE))); + + // standard logic even for last retry on non memory related error + assertThat( + estimator.getNextRetryMemoryRequirements( + new MemoryRequirements(DataSize.of(50, MEGABYTE)), + DataSize.of(10, MEGABYTE), + StandardErrorCode.CORRUPT_PAGE.toErrorCode(), + 1)) + .isEqualTo(new MemoryRequirements(DataSize.of(50, MEGABYTE))); + // peak memory of failed task 70MB assertThat( estimator.getNextRetryMemoryRequirements(