diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 48dafc21968..c2ff899eea0 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -676,6 +676,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT) def metricsAppTopDiskUsageWindowSize: Int = get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE) def metricsAppTopDiskUsageInterval: Long = get(METRICS_APP_TOP_DISK_USAGE_INTERVAL) + def metricsWorkerForceAppendPauseSpentTimeThreshold: Int = + get(METRICS_WORKER_PAUSE_SPENT_TINE_FORCE_APPEND_THRESHOLD) // ////////////////////////////////////////////////////// // Quota // @@ -3609,6 +3611,14 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("10min") + val METRICS_WORKER_PAUSE_SPENT_TINE_FORCE_APPEND_THRESHOLD: ConfigEntry[Int] = + buildConf("celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold") + .categories("metrics") + .doc("Force append worker pause spent time even if worker still in pause serving state." + + "Help user can find worker pause spent time increase, when worker always been pause state.") + .intConf + .createWithDefault(10) + val QUOTA_ENABLED: ConfigEntry[Boolean] = buildConf("celeborn.quota.enabled") .categories("quota") diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index d59a8200ec4..0954103a0d1 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -48,6 +49,7 @@ public class MemoryManager { private final long pauseReplicateThreshold; private final long resumeThreshold; private final long maxSortMemory; + private final int forceAppendPauseSpentTimeThreshold; private final List memoryPressureListeners = new ArrayList<>(); private final ScheduledExecutorService checkService = @@ -69,6 +71,7 @@ public class MemoryManager { private ServingState servingState = ServingState.NONE_PAUSED; private long pauseStartTime = -1L; private volatile boolean isPaused = false; + private final AtomicInteger trimCounter = new AtomicInteger(0); // For credit stream private final AtomicLong readBufferCounter = new AtomicLong(0); @@ -113,6 +116,7 @@ private MemoryManager(CelebornConf conf) { double readBufferTargetRatio = conf.readBufferTargetRatio(); long readBufferTargetUpdateInterval = conf.readBufferTargetUpdateInterval(); long readBufferTargetNotifyThreshold = conf.readBufferTargetNotifyThreshold(); + forceAppendPauseSpentTimeThreshold = conf.metricsWorkerForceAppendPauseSpentTimeThreshold(); maxDirectorMemory = DynMethods.builder("maxDirectMemory") @@ -248,6 +252,13 @@ protected void switchServingState() { servingState = currentServingState(); if (lastState == servingState) { if (servingState != ServingState.NONE_PAUSED) { + // force to append pause spent time even we are in pause state + if (trimCounter.incrementAndGet() >= forceAppendPauseSpentTimeThreshold) { + logger.debug( + "Trigger action: TRIM for {} times, force to append pause spent time.", + trimCounter.incrementAndGet()); + appendPauseSpentTime(); + } logger.debug("Trigger action: TRIM"); trimAllListeners(); } @@ -287,8 +298,8 @@ protected void switchServingState() { trimAllListeners(); break; case NONE_PAUSED: - long pauseSpendMills = System.currentTimeMillis() - pauseStartTime; - pausePushDataTime.add(pauseSpendMills); + // resume from paused mode, append pause spent time + appendPauseSpentTime(); if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) { logger.info("Trigger action: RESUME REPLICATE"); memoryPressureListeners.forEach( @@ -399,6 +410,14 @@ public long getPausePushDataTime() { return pausePushDataTime.sum(); } + private void appendPauseSpentTime() { + long nextPauseStartTime = System.currentTimeMillis(); + pausePushDataTime.add(nextPauseStartTime - pauseStartTime); + pauseStartTime = nextPauseStartTime; + // reset + trimCounter.set(0); + } + public void addReadBufferTargetChangeListener(ReadBufferTargetChangeListener listener) { synchronized (readBufferTargetChangeListeners) { readBufferTargetChangeListeners.add(listener);