Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
zwangsheng committed Sep 7, 2023
1 parent b1e3d66 commit fe1e0b8
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 0 deletions.
90 changes: 90 additions & 0 deletions assets/grafana/celeborn-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,96 @@
"title": "metrics_PausePushDataAndReplicate_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "ms"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 63
},
"id": 182,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_PausePushDataTime_Value",
"refId": "A"
}
],
"title": "Pause Push Data Time Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public class MemoryManager {
private final AtomicLong diskBufferCounter = new AtomicLong(0);
private final LongAdder pausePushDataCounter = new LongAdder();
private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
private final LongAdder pausePushDataTime = new LongAdder();
private ServingState servingState = ServingState.NONE_PAUSED;
private long pauseStartTime = -1L;
private volatile boolean isPaused = false;

// For credit stream
Expand Down Expand Up @@ -262,6 +264,7 @@ protected void switchServingState() {
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
} else if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
pauseStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
Expand All @@ -272,6 +275,7 @@ protected void switchServingState() {
pausePushDataAndReplicateCounter.increment();
if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
pauseStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
Expand All @@ -283,6 +287,8 @@ protected void switchServingState() {
trimAllListeners();
break;
case NONE_PAUSED:
long pauseSpendMills = System.currentTimeMillis() - pauseStartTime;
pausePushDataTime.add(pauseSpendMills);
if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
logger.info("Trigger action: RESUME REPLICATE");
memoryPressureListeners.forEach(
Expand Down Expand Up @@ -389,6 +395,10 @@ public int dispatchRequestsLength() {
return readBufferDispatcher.requestsLength();
}

public long getPausePushDataTime() {
return pausePushDataTime.sum();
}

public void addReadBufferTargetChangeListener(ReadBufferTargetChangeListener listener) {
synchronized (readBufferTargetChangeListeners) {
readBufferTargetChangeListeners.add(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.ACTIVE_SHUFFLE_SIZE) { () =>
storageManager.getActiveShuffleSize()
}
workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_TIME) { () =>
memoryManager.getPausePushDataTime
}

private def highWorkload: Boolean = {
(memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ object WorkerSource {
val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
val PAUSE_PUSH_DATA_TIME = "PausePushDataTime"

// flush
val TAKE_BUFFER_TIME = "TakeBufferTime"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
// [CELEBORN-882] Test record pause push time
assert(memoryManager.getPausePushDataTime.longValue() > 0)
val lastPauseTime = memoryManager.getPausePushDataTime.longValue()

// NONE PAUSED -> PAUSE PUSH AND REPLICATE
memoryCounter.set(replicateThreshold + 1);
Expand All @@ -145,6 +148,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
assert(memoryManager.getPausePushDataTime.longValue() > lastPauseTime)
}

class MockMemoryPressureListener(
Expand Down

0 comments on commit fe1e0b8

Please sign in to comment.