-
Notifications
You must be signed in to change notification settings - Fork 355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-882][WORKER][METRICS] Add Pause Push Data Time Count
Metrics & Dashboard Panel
#1800
Conversation
Codecov Report
@@ Coverage Diff @@
## main #1800 +/- ##
==========================================
+ Coverage 46.49% 46.51% +0.03%
==========================================
Files 164 164
Lines 10222 10228 +6
Branches 936 936
==========================================
+ Hits 4752 4757 +5
- Misses 5156 5157 +1
Partials 314 314
... and 2 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@@ -65,7 +65,9 @@ public class MemoryManager { | |||
private final AtomicLong diskBufferCounter = new AtomicLong(0); | |||
private final LongAdder pausePushDataCounter = new LongAdder(); | |||
private final LongAdder pausePushDataAndReplicateCounter = new LongAdder(); | |||
private final AtomicLong pausePushDataTime = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LongAdder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -163,6 +166,7 @@ private MemoryManager(CelebornConf conf) { | |||
logger.info("Trigger action: RESUME PUSH and REPLICATE"); | |||
memoryPressureListeners.forEach( | |||
memoryPressureListener -> memoryPressureListener.onResume("all")); | |||
pausePushDataTime.addAndGet(System.currentTimeMillis() - lastPauseTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the state machine does not look correct to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RESUM
always be triggered after PAUSE_PUSH
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the worker goes into PUSH_AND_REPLICATE_PAUSED?
Marked as draft, should do some logic optimize before this. |
@@ -140,6 +142,7 @@ private MemoryManager(CelebornConf conf) { | |||
if (lastState != servingState) { | |||
logger.info("Serving state transformed from {} to {}", lastState, servingState); | |||
if (servingState == ServingState.PUSH_PAUSED) { | |||
lastPauseTime = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PUSH_AND_REPLICATE_PAUSED will pause push data and Celeborn worker might changed to PUSH_AND_REPLICATE_PAUSED directly is memory pressure is high.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got this. After the logic here clearly #1835 , will go on this pr.
5e75474
to
200fb00
Compare
CI fail related to #1844, will rebase and test agine. |
200fb00
to
1c766b3
Compare
long pauseSpendMills = System.currentTimeMillis() - pauseStartTime; | ||
logger.info( | ||
"Trigger action: RESUME PUSH and REPLICATE, pause push spent: " + pauseSpendMills); | ||
pausePushDataTime.add(pauseSpendMills); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider the corner case that a worker enters the pause state but never goes out, then pausePushDataTime
will never increase. Maybe we can increase pausePushDataTime
every N successive pause states, as well as state changes from pause to non_pause.
Also, I think we should consider worker under high load only when the pause time exceeds some threshold, instead of whenever it enters pause state, as #1840 does, cc @pan3793
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider the corner case that a worker enters the pause state but never goes out, then pausePushDataTime will never increase. Maybe we can increase pausePushDataTime every N successive pause states, as well as state changes from pause to non_pause.
This is a good idea.
1c766b3
to
f6cd42f
Compare
@waitinfuture As your comment to show Current PR implementation can do both of the things mentioned above. If you have any other ideas, please let me know. |
@@ -676,6 +676,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se | |||
def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT) | |||
def metricsAppTopDiskUsageWindowSize: Int = get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE) | |||
def metricsAppTopDiskUsageInterval: Long = get(METRICS_APP_TOP_DISK_USAGE_INTERVAL) | |||
def metricsWorkerForceAppendPauseSpentTimeThreshold: Int = | |||
get(METRICS_WORKER_PAUSE_SPENT_TINE_FORCE_APPEND_THRESHOLD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD
if (trimCounter.incrementAndGet() >= forceAppendPauseSpentTimeThreshold) { | ||
logger.debug( | ||
"Trigger action: TRIM for {} times, force to append pause spent time.", | ||
trimCounter.incrementAndGet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get
@@ -65,8 +67,11 @@ public class MemoryManager { | |||
private final AtomicLong diskBufferCounter = new AtomicLong(0); | |||
private final LongAdder pausePushDataCounter = new LongAdder(); | |||
private final LongAdder pausePushDataAndReplicateCounter = new LongAdder(); | |||
private final LongAdder pausePushDataTime = new LongAdder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since switchServingState
will be called sequentially in a single thread, I think it's safe to use long/int for pausePushDataTime
and trimCounter
58d0f48
to
ada12a2
Compare
9760414
to
525bdd4
Compare
@waitinfuture Thanks for your review, made correction, PTAL |
private volatile boolean isPaused = false; | ||
private final AtomicInteger trimCounter = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does trimCounter need to be atomic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks! Merging to main/0.3
…ics & Dashboard Panel ### What changes were proposed in this pull request? Add `PausePushDataTime ` Metrics ### Why are the changes needed? Count each celeborn worker pause time. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Cluster Test Closes #1800 from zwangsheng/CELEBORN-882. Lead-authored-by: zwangsheng <[email protected]> Co-authored-by: zwangsheng <[email protected]> Signed-off-by: zky.zhoukeyong <[email protected]> (cherry picked from commit 03a3981) Signed-off-by: zky.zhoukeyong <[email protected]>
What changes were proposed in this pull request?
Add
PausePushDataTime
MetricsWhy are the changes needed?
Count each celeborn worker pause time.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Cluster Test