Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-882][WORKER][METRICS] Add Pause Push Data Time Count Metrics & Dashboard Panel #1800

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions assets/grafana/celeborn-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,96 @@
"title": "metrics_PausePushDataAndReplicate_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "ms"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 63
},
"id": 182,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_PausePushDataTime_Value",
"refId": "A"
}
],
"title": "Pause Push Data Time Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT)
def metricsAppTopDiskUsageWindowSize: Int = get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE)
def metricsAppTopDiskUsageInterval: Long = get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)
def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)

// //////////////////////////////////////////////////////
// Quota //
Expand Down Expand Up @@ -3609,6 +3611,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10min")

val METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD: ConfigEntry[Int] =
buildConf("celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold")
.categories("metrics")
.doc("Force append worker pause spent time even if worker still in pause serving state." +
"Help user can find worker pause spent time increase, when worker always been pause state.")
.intConf
.createWithDefault(10)

val QUOTA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.quota.enabled")
.categories("quota")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ license: |
| celeborn.metrics.master.prometheus.port | 9098 | Master's Prometheus port. | 0.3.0 |
| celeborn.metrics.sample.rate | 1.0 | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 |
| celeborn.metrics.timer.slidingWindow.size | 4096 | The sliding window size of timer metric. | 0.2.0 |
| celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold | 10 | Force append worker pause spent time even if worker still in pause serving state.Help user can find worker pause spent time increase, when worker always been pause state. | |
| celeborn.metrics.worker.prometheus.host | <localhost> | Worker's Prometheus host. | 0.3.0 |
| celeborn.metrics.worker.prometheus.port | 9096 | Worker's Prometheus port. | 0.3.0 |
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class MemoryManager {
private final long pauseReplicateThreshold;
private final long resumeThreshold;
private final long maxSortMemory;
private final int forceAppendPauseSpentTimeThreshold;
private final List<MemoryPressureListener> memoryPressureListeners = new ArrayList<>();

private final ScheduledExecutorService checkService =
Expand All @@ -66,8 +67,10 @@ public class MemoryManager {
private final LongAdder pausePushDataCounter = new LongAdder();
private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
private ServingState servingState = ServingState.NONE_PAUSED;
private long pauseStartTime = -1L;
private long pausePushDataTime = 0L;
private int trimCounter = 0;
private volatile boolean isPaused = false;

// For credit stream
private final AtomicLong readBufferCounter = new AtomicLong(0);
private long readBufferThreshold = 0;
Expand Down Expand Up @@ -111,6 +114,7 @@ private MemoryManager(CelebornConf conf) {
double readBufferTargetRatio = conf.readBufferTargetRatio();
long readBufferTargetUpdateInterval = conf.readBufferTargetUpdateInterval();
long readBufferTargetNotifyThreshold = conf.readBufferTargetNotifyThreshold();
forceAppendPauseSpentTimeThreshold = conf.metricsWorkerForceAppendPauseSpentTimeThreshold();

maxDirectorMemory =
DynMethods.builder("maxDirectMemory")
Expand Down Expand Up @@ -247,6 +251,13 @@ protected void switchServingState() {
if (lastState == servingState) {
if (servingState != ServingState.NONE_PAUSED) {
logger.debug("Trigger action: TRIM");
trimCounter += 1;
// force to append pause spent time even we are in pause state
if (trimCounter >= forceAppendPauseSpentTimeThreshold) {
logger.debug(
"Trigger action: TRIM for {} times, force to append pause spent time.", trimCounter);
appendPauseSpentTime();
}
trimAllListeners();
}
return;
Expand All @@ -262,6 +273,7 @@ protected void switchServingState() {
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
} else if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
pauseStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
Expand All @@ -272,6 +284,7 @@ protected void switchServingState() {
pausePushDataAndReplicateCounter.increment();
if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
pauseStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
Expand All @@ -283,6 +296,8 @@ protected void switchServingState() {
trimAllListeners();
break;
case NONE_PAUSED:
// resume from paused mode, append pause spent time
appendPauseSpentTime();
if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
logger.info("Trigger action: RESUME REPLICATE");
memoryPressureListeners.forEach(
Expand Down Expand Up @@ -389,6 +404,18 @@ public int dispatchRequestsLength() {
return readBufferDispatcher.requestsLength();
}

public long getPausePushDataTime() {
return pausePushDataTime;
}

private void appendPauseSpentTime() {
long nextPauseStartTime = System.currentTimeMillis();
pausePushDataTime += nextPauseStartTime - pauseStartTime;
pauseStartTime = nextPauseStartTime;
// reset
trimCounter = 0;
}

public void addReadBufferTargetChangeListener(ReadBufferTargetChangeListener listener) {
synchronized (readBufferTargetChangeListeners) {
readBufferTargetChangeListeners.add(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.ACTIVE_SHUFFLE_SIZE) { () =>
storageManager.getActiveShuffleSize()
}
workerSource.addGauge(WorkerSource.PAUSE_PUSH_DATA_TIME) { () =>
memoryManager.getPausePushDataTime
}

private def highWorkload: Boolean = {
(memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ object WorkerSource {
val REPLICA_REGION_START_TIME = "ReplicaRegionStartTime"
val PRIMARY_REGION_FINISH_TIME = "PrimaryRegionFinishTime"
val REPLICA_REGION_FINISH_TIME = "ReplicaRegionFinishTime"
val PAUSE_PUSH_DATA_TIME = "PausePushDataTime"

// flush
val TAKE_BUFFER_TIME = "TakeBufferTime"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
// [CELEBORN-882] Test record pause push time
assert(memoryManager.getPausePushDataTime.longValue() > 0)
val lastPauseTime = memoryManager.getPausePushDataTime.longValue()

// NONE PAUSED -> PAUSE PUSH AND REPLICATE
memoryCounter.set(replicateThreshold + 1);
Expand All @@ -145,6 +148,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
assert(memoryManager.getPausePushDataTime.longValue() > lastPauseTime)
}

class MockMemoryPressureListener(
Expand Down
Loading