Skip to content

Commit

Permalink
[KYUUBI apache#6031] Add CollectMetricsPrettyDisplayListener
Browse files Browse the repository at this point in the history
  • Loading branch information
wForget committed Jan 31, 2024
1 parent 208354c commit cd23093
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,12 @@ object KyuubiSQLConf {
.version("1.9.0")
.booleanConf
.createWithDefault(true)

val COLLECT_METRICS_PRETTY_DISPLAY_ENABLED =
buildConf("spark.sql.optimizer.collectMetricsPrettyDisplay.enabled")
.doc("Display more detailed information such as name and metrics results" +
" for CollectMetrics node in the SQL Tab of Spark UI")
.version("1.9.0")
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.sql

import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}
import org.apache.spark.sql.execution.listener.CollectMetricsPrettyDisplayListener

import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, KyuubiUnsupportedOperationsCheck, MaxScanStrategy}

Expand All @@ -43,5 +44,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {

extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)

CollectMetricsPrettyDisplayListener.register()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.listener

import java.util.concurrent.atomic.AtomicBoolean

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphClusterWrapper, SparkPlanGraphNode, SparkPlanGraphNodeWrapper, SparkPlanGraphWrapper}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.status.ElementTrackingStore

import org.apache.kyuubi.sql.KyuubiSQLConf.COLLECT_METRICS_PRETTY_DISPLAY_ENABLED

private class CollectMetricsPrettyDisplayListener extends SparkListener with SQLConfHelper {

private def session: SparkSession = SparkSession.active
private def kvstore: ElementTrackingStore =
session.sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]

override def onOtherEvent(event: SparkListenerEvent): Unit = {
if (conf.getConf(COLLECT_METRICS_PRETTY_DISPLAY_ENABLED)) {
event match {
case e: SparkListenerSQLExecutionEnd =>
val qe = e.qe
if (qe.observedMetrics.nonEmpty) {
val executionId =
Option(session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(
_.toLong).getOrElse(e.executionId)

val sparkPlanInfo = fromSparkPlan(qe.executedPlan)

val planGraph = SparkPlanGraph(sparkPlanInfo)
val graphToStore = new SparkPlanGraphWrapper(
executionId,
toStoredNodes(planGraph.nodes),
planGraph.edges)
kvstore.write(graphToStore)
}
case _ =>
}
}
}

private def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case ReusedSubqueryExec(child) => child :: Nil
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
case stage: QueryStageExec => stage.plan :: Nil
case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
}

// dump the file scan metadata (e.g file path) to event log
val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
case _ => Map[String, String]()
}
val simpleString = plan match {
case c: CollectMetricsExec =>
val metrics: Map[String, Any] =
c.collectedMetrics.getValuesMap[Any](c.metricsSchema.fieldNames)
val metricsString = redactMapString(metrics, SQLConf.get.maxToStringFields)
s"CollectMetricsExec(${c.name}) $metricsString"
case p => p.simpleString(SQLConf.get.maxToStringFields)
}
new SparkPlanInfo(
plan.nodeName,
simpleString,
children.map(fromSparkPlan),
metadata,
metrics)
}

private def toStoredNodes(
nodes: collection.Seq[SparkPlanGraphNode]): collection.Seq[SparkPlanGraphNodeWrapper] = {
nodes.map {
case cluster: SparkPlanGraphCluster =>
val storedCluster = new SparkPlanGraphClusterWrapper(
cluster.id,
cluster.name,
cluster.desc,
toStoredNodes(cluster.nodes.toSeq),
cluster.metrics)
new SparkPlanGraphNodeWrapper(null, storedCluster)

case node =>
new SparkPlanGraphNodeWrapper(node, null)
}
}

private def redactMapString(map: Map[String, Any], maxFields: Int): String = {
// For security reason, redact the map value if the key is in certain patterns
val redactedMap = SQLConf.get.redactOptions(map)
// construct the redacted map as strings of the format "key=value"
val keyValuePairs = redactedMap.toSeq.map { item =>
item._1 + "=" + item._2
}
truncatedString(keyValuePairs, "[", ", ", "]", maxFields)
}
}

object CollectMetricsPrettyDisplayListener {

private val registered = new AtomicBoolean(false)

def register(): Unit = {
SparkContext.getActive.foreach { sc =>
if (registered.compareAndSet(false, true)) {
sc.addSparkListener(new CollectMetricsPrettyDisplayListener)
}
}
}

}

0 comments on commit cd23093

Please sign in to comment.