Skip to content

Commit

Permalink
[SPARK-44153][CORE][UI] Support Heap Histogram column in `Executors…
Browse files Browse the repository at this point in the history
…` tab

### What changes were proposed in this pull request?

This PR aims to support `Heap Histogram` column in `Executor` tab.

### Why are the changes needed?

Like `Thread Dump` column, this is very helpful when we analyze executor live JVM status.

![Screenshot 2023-06-22 at 8 37 55 PM](https://github.com/apache/spark/assets/9700541/741c8deb-23ff-463d-8b1e-7c2e53d0b59f)

![Screenshot 2023-06-22 at 8 38 34 PM](https://github.com/apache/spark/assets/9700541/93f77f42-48b5-41fa-94ab-ea675f576331)

### Does this PR introduce _any_ user-facing change?

Yes, but this is a new column and we provide `spark.ui.heapHistogramEnabled` configuration like `spark.ui.threadDumpsEnabled`.

### How was this patch tested?

Manual review.

Closes apache#41709 from dongjoon-hyun/SPARK-44153.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Jun 23, 2023
1 parent 8eda2d8 commit cd69d4d
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ <h4 class="title-table">Executors</h4>
Shuffle Write</span></th>
<th>Logs</th>
<th>Thread Dump</th>
<th>Heap Histogram</th>
<th>Exec Loss Reason</th>
</tr>
</thead>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,25 @@
/* global jQuery, setDataTableDefaults */

var threadDumpEnabled = false;
var heapHistogramEnabled = false;

/* eslint-disable no-unused-vars */
function setThreadDumpEnabled(val) {
threadDumpEnabled = val;
}
function setHeapHistogramEnabled(val) {
heapHistogramEnabled = val;
}
/* eslint-enable no-unused-vars */

function getThreadDumpEnabled() {
return threadDumpEnabled;
}

function getHeapHistogramEnabled() {
return heapHistogramEnabled;
}

function formatLossReason(removeReason) {
if (removeReason) {
return removeReason
Expand Down Expand Up @@ -551,6 +559,12 @@ $(document).ready(function () {
return type === 'display' ? ("<a href='threadDump/?executorId=" + data + "'>Thread Dump</a>" ) : data;
}
},
{
name: 'heapHistogramCol',
data: 'id', render: function (data, type) {
return type === 'display' ? ("<a href='heapHistogram/?executorId=" + data + "'>Heap Histogram</a>") : data;
}
},
{
data: 'removeReason',
render: formatLossReason
Expand All @@ -566,14 +580,15 @@ $(document).ready(function () {
{"visible": false, "targets": 10},
{"visible": false, "targets": 13},
{"visible": false, "targets": 14},
{"visible": false, "targets": 25}
{"visible": false, "targets": 26}
],
"deferRender": true
};

execDataTable = $(selector).DataTable(conf);
execDataTable.column('executorLogsCol:name').visible(logsExist(response));
execDataTable.column('threadDumpCol:name').visible(getThreadDumpEnabled());
execDataTable.column('heapHistogramCol:name').visible(getHeapHistogramEnabled());
$('#active-executors [data-toggle="tooltip"]').tooltip();

// This section should be visible once API gives the response.
Expand Down Expand Up @@ -721,7 +736,7 @@ $(document).ready(function () {
"<div id='direct_mapped_pool_memory' class='direct_mapped_pool_memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='10'> Peak Pool Memory Direct / Mapped</div>" +
"<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='13'> Resources</div>" +
"<div id='resource_prof_id' class='resource-prof-id-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='14'> Resource Profile Id</div>" +
"<div id='exec_loss_reason' class='exec-loss-reason-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='25'> Exec Loss Reason</div>" +
"<div id='exec_loss_reason' class='exec-loss-reason-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='26'> Exec Loss Reason</div>" +
"</div>");

reselectCheckboxesBasedOnTaskTableState();
Expand Down
26 changes: 25 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ import org.apache.spark.shuffle.api.ShuffleDriverComponents
import org.apache.spark.status.{AppStatusSource, AppStatusStore}
import org.apache.spark.status.api.v1.ThreadStackTrace
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.storage.BlockManagerMessages.{TriggerHeapHistogram, TriggerThreadDump}
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
import org.apache.spark.util._
import org.apache.spark.util.logging.DriverLogger
Expand Down Expand Up @@ -750,6 +750,30 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

/**
* Called by the web UI to obtain executor heap histogram.
*/
private[spark] def getExecutorHeapHistogram(executorId: String): Option[Array[String]] = {
try {
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
Some(Utils.getHeapHistogram())
} else {
env.blockManager.master.getExecutorEndpointRef(executorId) match {
case Some(endpointRef) =>
Some(endpointRef.askSync[Array[String]](TriggerHeapHistogram))
case None =>
logWarning(s"Executor $executorId might already have stopped and " +
"can not request heap histogram from it.")
None
}
}
} catch {
case e: Exception =>
logError(s"Exception getting heap histogram from executor $executorId", e)
None
}
}

private[spark] def getLocalProperties: Properties = localProperties.get()

private[spark] def setLocalProperties(props: Properties): Unit = {
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/UI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.internal.config
import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.commons.lang3.{JavaVersion, SystemUtils}

import org.apache.spark.network.util.ByteUnit

private[spark] object UI {
Expand Down Expand Up @@ -97,6 +99,11 @@ private[spark] object UI {
.booleanConf
.createWithDefault(true)

val UI_HEAP_HISTOGRAM_ENABLED = ConfigBuilder("spark.ui.heapHistogramEnabled")
.version("3.5.0")
.booleanConf
.createWithDefault(SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11))

val UI_PROMETHEUS_ENABLED = ConfigBuilder("spark.ui.prometheus.enabled")
.internal()
.doc("Expose executor metrics at /metrics/executors/prometheus. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ private[spark] object BlockManagerMessages {
*/
case object TriggerThreadDump extends ToBlockManagerMasterStorageEndpoint

/**
* Driver to Executor message to get a heap histogram.
*/
case object TriggerHeapHistogram extends ToBlockManagerMasterStorageEndpoint

//////////////////////////////////////////////////////////////////////////////////
// Messages from storage endpoints to the master.
//////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class BlockManagerStorageEndpoint(
case TriggerThreadDump =>
context.reply(Utils.getThreadDump())

case TriggerHeapHistogram =>
context.reply(Utils.getHeapHistogram())

case ReplicateBlock(blockId, replicas, maxReplicas) =>
context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui.exec

import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Text}

import org.apache.spark.SparkContext
import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage}

private[ui] class ExecutorHeapHistogramPage(
parent: SparkUITab,
sc: Option[SparkContext]) extends WebUIPage("heapHistogram") {

// Match the lines containing object informations
val pattern = """\s*([0-9]+):\s+([0-9]+)\s+([0-9]+)\s+(\S+)(.*)""".r

def render(request: HttpServletRequest): Seq[Node] = {
val executorId = Option(request.getParameter("executorId")).map { executorId =>
UIUtils.decodeURLParameter(executorId)
}.getOrElse {
throw new IllegalArgumentException(s"Missing executorId parameter")
}
val time = System.currentTimeMillis()
val maybeHeapHistogram = sc.get.getExecutorHeapHistogram(executorId)

val content = maybeHeapHistogram.map { heapHistogram =>
val rows = heapHistogram.map { row =>
row match {
case pattern(rank, instances, bytes, name, module) =>
<tr class="accordion-heading">
<td>{rank}</td>
<td>{instances}</td>
<td>{bytes}</td>
<td>{name}</td>
<td>{module}</td>
</tr>
case pattern(rank, instances, bytes, name) =>
<tr class="accordion-heading">
<td>{rank}</td>
<td>{instances}</td>
<td>{bytes}</td>
<td>{name}</td>
<td></td>
</tr>
case _ =>
// Ignore the first two lines and the last line
//
// num #instances #bytes class name (module)
// -------------------------------------------------------
// ...
// Total 1267867 72845688
}
}
<div class="row">
<div class="col-12">
<p>Updated at {UIUtils.formatDate(time)}</p>
<table class={UIUtils.TABLE_CLASS_STRIPED + " accordion-group" + " sortable"}>
<thead>
<th>Rank</th>
<th>Instances</th>
<th>Bytes</th>
<th>Class Name</th>
<th>Module</th>
</thead>
<tbody>{rows}</tbody>
</table>
</div>
</div>
}.getOrElse(Text("Error fetching heap histogram"))
UIUtils.headerSparkPage(request, s"Heap Histogram for Executor $executorId", content, parent)
}
}
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,24 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
private def init(): Unit = {
val threadDumpEnabled =
parent.sc.isDefined && parent.conf.get(UI_THREAD_DUMPS_ENABLED)
val heapHistogramEnabled =
parent.sc.isDefined && parent.conf.get(UI_HEAP_HISTOGRAM_ENABLED)

attachPage(new ExecutorsPage(this, threadDumpEnabled))
attachPage(new ExecutorsPage(this, threadDumpEnabled, heapHistogramEnabled))
if (threadDumpEnabled) {
attachPage(new ExecutorThreadDumpPage(this, parent.sc))
}
if (heapHistogramEnabled) {
attachPage(new ExecutorHeapHistogramPage(this, parent.sc))
}
}

}

private[ui] class ExecutorsPage(
parent: SparkUITab,
threadDumpEnabled: Boolean)
threadDumpEnabled: Boolean,
heapHistogramEnabled: Boolean)
extends WebUIPage("") {

def render(request: HttpServletRequest): Seq[Node] = {
Expand All @@ -52,6 +58,7 @@ private[ui] class ExecutorsPage(
<script src={UIUtils.prependBaseUri(request, "/static/utils.js")}></script> ++
<script src={UIUtils.prependBaseUri(request, "/static/executorspage.js")}></script> ++
<script>setThreadDumpEnabled({threadDumpEnabled})</script>
<script>setHeapHistogramEnabled({heapHistogramEnabled})</script>
}

UIUtils.headerSparkPage(request, "Executors", content, parent, useDataTables = true)
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2287,6 +2287,23 @@ private[spark] object Utils extends Logging with SparkClassUtils {
}.map(threadInfoToThreadStackTrace)
}

/** Return a heap dump. Used to capture dumps for the web UI */
def getHeapHistogram(): Array[String] = {
// From Java 9+, we can use 'ProcessHandle.current().pid()'
val pid = getProcessName().split("@").head
val builder = new ProcessBuilder("jmap", "-histo:live", pid)
builder.redirectErrorStream(true)
val p = builder.start()
val r = new BufferedReader(new InputStreamReader(p.getInputStream()))
val rows = ArrayBuffer.empty[String]
var line = ""
while (line != null) {
if (line.nonEmpty) rows += line
line = r.readLine()
}
rows.toArray
}

def getThreadDumpForThread(threadId: Long): Option[ThreadStackTrace] = {
if (threadId <= 0) {
None
Expand Down

0 comments on commit cd69d4d

Please sign in to comment.