diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 5480c7de054..7bc68c3c22a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -2173,6 +2173,7 @@ object CelebornConf extends Logging { .doc("Reserved memory when sorting a shuffle file off-heap.") .version("0.3.0") .bytesConf(ByteUnit.BYTE) + .checkValue(v => v < Int.MaxValue, "Reserved memory per partition must be less than 2GB.") .createWithDefaultString("1mb") val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] = diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 51f7af3f0a5..39361b909af 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -133,6 +133,7 @@ public PartitionFilesSorter( () -> { try { task.sort(); + memoryManager.releaseSortMemory(reservedMemoryPerPartition); } catch (InterruptedException e) { logger.warn( "File sorter thread was interrupted when expanding padding buffer."); @@ -586,8 +587,6 @@ public void sort() throws InterruptedException { sortedBlockInfoMap.put(mapId, sortedShuffleBlocks); } - memoryManager.releaseSortMemory(reserveMemory); - writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs); updateSortedShuffleFiles(shuffleKey, fileId, originFileLen); deleteOriginFiles();