From cd69d4dd18cfaccf58bf64dde6268f7ea1d4415b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 23 Jun 2023 00:48:58 -0700 Subject: [PATCH] [SPARK-44153][CORE][UI] Support `Heap Histogram` column in `Executors` tab ### What changes were proposed in this pull request? This PR aims to support `Heap Histogram` column in `Executor` tab. ### Why are the changes needed? Like `Thread Dump` column, this is very helpful when we analyze executor live JVM status. ![Screenshot 2023-06-22 at 8 37 55 PM](https://github.com/apache/spark/assets/9700541/741c8deb-23ff-463d-8b1e-7c2e53d0b59f) ![Screenshot 2023-06-22 at 8 38 34 PM](https://github.com/apache/spark/assets/9700541/93f77f42-48b5-41fa-94ab-ea675f576331) ### Does this PR introduce _any_ user-facing change? Yes, but this is a new column and we provide `spark.ui.heapHistogramEnabled` configuration like `spark.ui.threadDumpsEnabled`. ### How was this patch tested? Manual review. Closes #41709 from dongjoon-hyun/SPARK-44153. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../ui/static/executorspage-template.html | 1 + .../apache/spark/ui/static/executorspage.js | 19 +++- .../scala/org/apache/spark/SparkContext.scala | 26 +++++- .../org/apache/spark/internal/config/UI.scala | 7 ++ .../spark/storage/BlockManagerMessages.scala | 5 ++ .../storage/BlockManagerStorageEndpoint.scala | 3 + .../ui/exec/ExecutorHeapHistogramPage.scala | 89 +++++++++++++++++++ .../apache/spark/ui/exec/ExecutorsTab.scala | 11 ++- .../scala/org/apache/spark/util/Utils.scala | 17 ++++ 9 files changed, 173 insertions(+), 5 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html index 37d56a06ded7f..ecda34b545a6a 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html @@ -128,6 +128,7 @@

Executors

Shuffle Write Logs Thread Dump + Heap Histogram Exec Loss Reason diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js index 92d75c18e4958..b52ece87ba125 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js @@ -20,17 +20,25 @@ /* global jQuery, setDataTableDefaults */ var threadDumpEnabled = false; +var heapHistogramEnabled = false; /* eslint-disable no-unused-vars */ function setThreadDumpEnabled(val) { threadDumpEnabled = val; } +function setHeapHistogramEnabled(val) { + heapHistogramEnabled = val; +} /* eslint-enable no-unused-vars */ function getThreadDumpEnabled() { return threadDumpEnabled; } +function getHeapHistogramEnabled() { + return heapHistogramEnabled; +} + function formatLossReason(removeReason) { if (removeReason) { return removeReason @@ -551,6 +559,12 @@ $(document).ready(function () { return type === 'display' ? ("Thread Dump" ) : data; } }, + { + name: 'heapHistogramCol', + data: 'id', render: function (data, type) { + return type === 'display' ? ("Heap Histogram") : data; + } + }, { data: 'removeReason', render: formatLossReason @@ -566,7 +580,7 @@ $(document).ready(function () { {"visible": false, "targets": 10}, {"visible": false, "targets": 13}, {"visible": false, "targets": 14}, - {"visible": false, "targets": 25} + {"visible": false, "targets": 26} ], "deferRender": true }; @@ -574,6 +588,7 @@ $(document).ready(function () { execDataTable = $(selector).DataTable(conf); execDataTable.column('executorLogsCol:name').visible(logsExist(response)); execDataTable.column('threadDumpCol:name').visible(getThreadDumpEnabled()); + execDataTable.column('heapHistogramCol:name').visible(getHeapHistogramEnabled()); $('#active-executors [data-toggle="tooltip"]').tooltip(); // This section should be visible once API gives the response. @@ -721,7 +736,7 @@ $(document).ready(function () { "
Peak Pool Memory Direct / Mapped
" + "
Resources
" + "
Resource Profile Id
" + - "
Exec Loss Reason
" + + "
Exec Loss Reason
" + ""); reselectCheckboxesBasedOnTaskTableState(); diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c32c674d64e0f..5aff6682bfdde 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -68,7 +68,7 @@ import org.apache.spark.shuffle.api.ShuffleDriverComponents import org.apache.spark.status.{AppStatusSource, AppStatusStore} import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.storage._ -import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump +import org.apache.spark.storage.BlockManagerMessages.{TriggerHeapHistogram, TriggerThreadDump} import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} import org.apache.spark.util._ import org.apache.spark.util.logging.DriverLogger @@ -750,6 +750,30 @@ class SparkContext(config: SparkConf) extends Logging { } } + /** + * Called by the web UI to obtain executor heap histogram. + */ + private[spark] def getExecutorHeapHistogram(executorId: String): Option[Array[String]] = { + try { + if (executorId == SparkContext.DRIVER_IDENTIFIER) { + Some(Utils.getHeapHistogram()) + } else { + env.blockManager.master.getExecutorEndpointRef(executorId) match { + case Some(endpointRef) => + Some(endpointRef.askSync[Array[String]](TriggerHeapHistogram)) + case None => + logWarning(s"Executor $executorId might already have stopped and " + + "can not request heap histogram from it.") + None + } + } + } catch { + case e: Exception => + logError(s"Exception getting heap histogram from executor $executorId", e) + None + } + } + private[spark] def getLocalProperties: Properties = localProperties.get() private[spark] def setLocalProperties(props: Properties): Unit = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index a32e60de2a45c..d0db5a9085481 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -20,6 +20,8 @@ package org.apache.spark.internal.config import java.util.Locale import java.util.concurrent.TimeUnit +import org.apache.commons.lang3.{JavaVersion, SystemUtils} + import org.apache.spark.network.util.ByteUnit private[spark] object UI { @@ -97,6 +99,11 @@ private[spark] object UI { .booleanConf .createWithDefault(true) + val UI_HEAP_HISTOGRAM_ENABLED = ConfigBuilder("spark.ui.heapHistogramEnabled") + .version("3.5.0") + .booleanConf + .createWithDefault(SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11)) + val UI_PROMETHEUS_ENABLED = ConfigBuilder("spark.ui.prometheus.enabled") .internal() .doc("Expose executor metrics at /metrics/executors/prometheus. " + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 24d0f239f7310..7fb145556a118 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -56,6 +56,11 @@ private[spark] object BlockManagerMessages { */ case object TriggerThreadDump extends ToBlockManagerMasterStorageEndpoint + /** + * Driver to Executor message to get a heap histogram. + */ + case object TriggerHeapHistogram extends ToBlockManagerMasterStorageEndpoint + ////////////////////////////////////////////////////////////////////////////////// // Messages from storage endpoints to the master. ////////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index d4c631e59a1a6..476be80e67df3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -78,6 +78,9 @@ class BlockManagerStorageEndpoint( case TriggerThreadDump => context.reply(Utils.getThreadDump()) + case TriggerHeapHistogram => + context.reply(Utils.getHeapHistogram()) + case ReplicateBlock(blockId, replicas, maxReplicas) => context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas)) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala new file mode 100644 index 0000000000000..6964711a7889c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.exec + +import javax.servlet.http.HttpServletRequest + +import scala.xml.{Node, Text} + +import org.apache.spark.SparkContext +import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage} + +private[ui] class ExecutorHeapHistogramPage( + parent: SparkUITab, + sc: Option[SparkContext]) extends WebUIPage("heapHistogram") { + + // Match the lines containing object informations + val pattern = """\s*([0-9]+):\s+([0-9]+)\s+([0-9]+)\s+(\S+)(.*)""".r + + def render(request: HttpServletRequest): Seq[Node] = { + val executorId = Option(request.getParameter("executorId")).map { executorId => + UIUtils.decodeURLParameter(executorId) + }.getOrElse { + throw new IllegalArgumentException(s"Missing executorId parameter") + } + val time = System.currentTimeMillis() + val maybeHeapHistogram = sc.get.getExecutorHeapHistogram(executorId) + + val content = maybeHeapHistogram.map { heapHistogram => + val rows = heapHistogram.map { row => + row match { + case pattern(rank, instances, bytes, name, module) => + + {rank} + {instances} + {bytes} + {name} + {module} + + case pattern(rank, instances, bytes, name) => + + {rank} + {instances} + {bytes} + {name} + + + case _ => + // Ignore the first two lines and the last line + // + // num #instances #bytes class name (module) + // ------------------------------------------------------- + // ... + // Total 1267867 72845688 + } + } +
+
+

Updated at {UIUtils.formatDate(time)}

+ + + + + + + + + {rows} +
RankInstancesBytesClass NameModule
+
+
+ }.getOrElse(Text("Error fetching heap histogram")) + UIUtils.headerSparkPage(request, s"Heap Histogram for Executor $executorId", content, parent) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 7a857e57ceeb1..b92c5e67989dd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -31,18 +31,24 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec private def init(): Unit = { val threadDumpEnabled = parent.sc.isDefined && parent.conf.get(UI_THREAD_DUMPS_ENABLED) + val heapHistogramEnabled = + parent.sc.isDefined && parent.conf.get(UI_HEAP_HISTOGRAM_ENABLED) - attachPage(new ExecutorsPage(this, threadDumpEnabled)) + attachPage(new ExecutorsPage(this, threadDumpEnabled, heapHistogramEnabled)) if (threadDumpEnabled) { attachPage(new ExecutorThreadDumpPage(this, parent.sc)) } + if (heapHistogramEnabled) { + attachPage(new ExecutorHeapHistogramPage(this, parent.sc)) + } } } private[ui] class ExecutorsPage( parent: SparkUITab, - threadDumpEnabled: Boolean) + threadDumpEnabled: Boolean, + heapHistogramEnabled: Boolean) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { @@ -52,6 +58,7 @@ private[ui] class ExecutorsPage( ++ ++ + } UIUtils.headerSparkPage(request, "Executors", content, parent, useDataTables = true) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6e8f2c496e8be..ee74eacb84f72 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2287,6 +2287,23 @@ private[spark] object Utils extends Logging with SparkClassUtils { }.map(threadInfoToThreadStackTrace) } + /** Return a heap dump. Used to capture dumps for the web UI */ + def getHeapHistogram(): Array[String] = { + // From Java 9+, we can use 'ProcessHandle.current().pid()' + val pid = getProcessName().split("@").head + val builder = new ProcessBuilder("jmap", "-histo:live", pid) + builder.redirectErrorStream(true) + val p = builder.start() + val r = new BufferedReader(new InputStreamReader(p.getInputStream())) + val rows = ArrayBuffer.empty[String] + var line = "" + while (line != null) { + if (line.nonEmpty) rows += line + line = r.readLine() + } + rows.toArray + } + def getThreadDumpForThread(threadId: Long): Option[ThreadStackTrace] = { if (threadId <= 0) { None