Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
FMX committed Oct 10, 2024
1 parent 332de4a commit 843f009
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class MemoryManager {
private final AtomicLong diskBufferCounter = new AtomicLong(0);
private final LongAdder pausePushDataCounter = new LongAdder();
private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
private ServingState servingState = ServingState.NONE_PAUSED;
public ServingState servingState = ServingState.NONE_PAUSED;
private long pausePushDataStartTime = -1L;
private long pausePushDataTime = 0L;
private long pausePushDataAndReplicateStartTime = -1L;
Expand Down Expand Up @@ -286,7 +286,7 @@ private MemoryManager(CelebornConf conf, StorageManager storageManager) {
}

public boolean shouldEvict(boolean aggressiveMemoryFileEvictEnabled, double evictRatio) {
return currentServingState() != ServingState.NONE_PAUSED
return servingState != ServingState.NONE_PAUSED
&& (aggressiveMemoryFileEvictEnabled
|| (memoryFileStorageCounter.sum() >= evictRatio * memoryFileStorageThreshold));
}
Expand Down Expand Up @@ -405,8 +405,7 @@ public void reserveSortMemory(long fileLen) {

public boolean sortMemoryReady() {
return bypassMemoryCheckForSortingFiles
|| (currentServingState() == ServingState.NONE_PAUSED
&& sortMemoryCounter.get() < maxSortMemory);
|| (servingState == ServingState.NONE_PAUSED && sortMemoryCounter.get() < maxSortMemory);
}

public void releaseSortMemory(long size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ private[celeborn] class Worker(
}

private def highWorkload: Boolean = {
(memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
(memoryManager.servingState, conf.workerActiveConnectionMax) match {
case (ServingState.PUSH_AND_REPLICATE_PAUSED, _) => true
case (ServingState.PUSH_PAUSED, _) => true
case (_, Some(activeConnectionMax)) =>
Expand Down

0 comments on commit 843f009

Please sign in to comment.