Skip to content

Commit

Permalink
[CELEBORN-845][BUG] Sort memory counter won't decrease after sort failed
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Decrease sort memory counter after sorting procedure is complete.

### Why are the changes needed?
Fix incorrect sort memory counter.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
UT.

Closes apache#1766 from FMX/CELEBORN-845.

Authored-by: mingji <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
FMX authored and zwangsheng committed Aug 4, 2023
1 parent 4e49105 commit 5754aa2
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,7 @@ object CelebornConf extends Logging {
.doc("Reserved memory when sorting a shuffle file off-heap.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v < Int.MaxValue, "Reserved memory per partition must be less than 2GB.")
.createWithDefaultString("1mb")

val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ public PartitionFilesSorter(
try {
task.sort();
} catch (InterruptedException e) {
logger.warn(
"File sorter thread was interrupted when expanding padding buffer.");
logger.warn("File sorter thread was interrupted.");
} finally {
memoryManager.releaseSortMemory(reservedMemoryPerPartition);
}
});
}
Expand Down Expand Up @@ -578,8 +579,6 @@ public void sort() throws InterruptedException {
sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);
}

memoryManager.releaseSortMemory(reserveMemory);

writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
updateSortedShuffleFiles(shuffleKey, fileId, originFileLen);
deleteOriginFiles();
Expand Down

0 comments on commit 5754aa2

Please sign in to comment.