From ddd637983be803ca3a6089de7f7abdb411969389 Mon Sep 17 00:00:00 2001 From: mingji Date: Thu, 27 Jul 2023 15:16:04 +0800 Subject: [PATCH] [CELEBORN-845][BUG] Sort memory counter won't decrease after sort failed ### What changes were proposed in this pull request? Decrease sort memory counter after sorting procedure is complete. ### Why are the changes needed? Fix incorrect sort memory counter. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT. Closes #1766 from FMX/CELEBORN-845. Authored-by: mingji Signed-off-by: mingji (cherry picked from commit 90959cbfd7097d83a8d4e076ab889d5b406300b9) Signed-off-by: mingji --- .../scala/org/apache/celeborn/common/CelebornConf.scala | 1 + .../deploy/worker/storage/PartitionFilesSorter.java | 7 +++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 8f969f9114f..7702bad835c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -2173,6 +2173,7 @@ object CelebornConf extends Logging { .doc("Reserved memory when sorting a shuffle file off-heap.") .version("0.3.0") .bytesConf(ByteUnit.BYTE) + .checkValue(v => v < Int.MaxValue, "Reserved memory per partition must be less than 2GB.") .createWithDefaultString("1mb") val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] = diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 51f7af3f0a5..e9bdfbf34db 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -134,8 +134,9 @@ public PartitionFilesSorter( try { task.sort(); } catch (InterruptedException e) { - logger.warn( - "File sorter thread was interrupted when expanding padding buffer."); + logger.warn("File sorter thread was interrupted."); + } finally { + memoryManager.releaseSortMemory(reservedMemoryPerPartition); } }); } @@ -586,8 +587,6 @@ public void sort() throws InterruptedException { sortedBlockInfoMap.put(mapId, sortedShuffleBlocks); } - memoryManager.releaseSortMemory(reserveMemory); - writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs); updateSortedShuffleFiles(shuffleKey, fileId, originFileLen); deleteOriginFiles();