Skip to content

Commit

Permalink
[Gluten-7402] Code cleanup for GlutenPlugin (apache#7403)
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer authored Oct 9, 2024
1 parent d06bddf commit f0c643d
Showing 1 changed file with 55 additions and 73 deletions.
128 changes: 55 additions & 73 deletions gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten

import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.GlutenConfig.GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY
import org.apache.gluten.GlutenConfig._
import org.apache.gluten.backend.Backend
import org.apache.gluten.events.GlutenBuildInfoEvent
import org.apache.gluten.exception.GlutenException
Expand Down Expand Up @@ -55,27 +55,26 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {

override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = {
_sc = Some(sc)
val conf = pluginContext.conf()

// Register Gluten listeners
GlutenSQLAppStatusListener.register(sc)
postBuildInfoEvent(sc)
if (conf.getBoolean(GLUTEN_SOFT_AFFINITY_ENABLED, GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE)) {
SoftAffinityListener.register(sc)
}

val conf = pluginContext.conf()
postBuildInfoEvent(sc)

setPredefinedConfigs(sc, conf)
// Initialize Backends API.

// Initialize Backend.
Backend.get().onDriverStart(sc, pluginContext)
if (
sc.getConf.getBoolean(
GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED,
GlutenConfig.GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE)
) {
SoftAffinityListener.register(sc)
}

Collections.emptyMap()
}

override def registerMetrics(appId: String, pluginContext: PluginContext): Unit = {
if (pluginContext.conf().getBoolean(GlutenConfig.GLUTEN_UI_ENABLED, true)) {
if (pluginContext.conf().getBoolean(GLUTEN_UI_ENABLED, true)) {
_sc.foreach {
sc =>
GlutenEventUtils.attachUI(sc)
Expand Down Expand Up @@ -113,10 +112,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
val infoMap = glutenBuildInfo.toMap
val loggingInfo = infoMap.toSeq
.sortBy(_._1)
.map {
case (name, value) =>
s"$name: $value"
}
.map { case (name, value) => s"$name: $value" }
.mkString(
"Gluten build info:\n==============================================================\n",
"\n",
Expand Down Expand Up @@ -152,8 +148,8 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
minOffHeapSize))
) {
throw new GlutenException(
s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " +
s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize")
s"Must set '$SPARK_OFFHEAP_ENABLED' to true " +
s"and set '$SPARK_OFFHEAP_SIZE_KEY' to be greater than $minOffHeapSize")
}

// Session's local time zone must be set. If not explicitly set by user, its default
Expand All @@ -162,71 +158,59 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {

// Task slots.
val taskSlots = SparkResourceUtil.getTaskSlots(conf)
conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString)

val onHeapSize: Long =
if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) {
conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)
} else {
// 1GB default
1024 * 1024 * 1024
}
conf.set(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString)

val onHeapSize: Long = conf.getSizeAsBytes(SPARK_ONHEAP_SIZE_KEY, 1024 * 1024 * 1024)

// If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap
// size. Otherwise, the off-heap size is set to the value specified by the user (if any).
// Note that this means that we will IGNORE the off-heap size specified by the user if the
// dynamic off-heap feature is enabled.
val offHeapSize: Long =
if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
// Since when dynamic off-heap sizing is enabled, we commingle on-heap
// and off-heap memory, we set the off-heap size to the usable on-heap size. We will
// size it with a memory fraction, which can be aggressively set, but the default
// is using the same way that Spark sizes on-heap memory:
//
// spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
// (spark.executor.memory - 300MB).
//
// We will be careful to use the same configuration settings as Spark to ensure
// that we are sizing the off-heap memory in the same way as Spark sizes on-heap memory.
// The 300MB value, unfortunately, is hard-coded in Spark code.
((onHeapSize - (300 * 1024 * 1024)) *
conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 0.6d)).toLong
} else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) {
// Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution
// memory pool, regardless of Spark option spark.memory.storageFraction.
conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
} else {
// Default Spark Value.
0L
}
val offHeapSize: Long = if (conf.getBoolean(GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
// Since when dynamic off-heap sizing is enabled, we commingle on-heap
// and off-heap memory, we set the off-heap size to the usable on-heap size. We will
// size it with a memory fraction, which can be aggressively set, but the default
// is using the same way that Spark sizes on-heap memory:
//
// spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
// (spark.executor.memory - 300MB).
//
// We will be careful to use the same configuration settings as Spark to ensure
// that we are sizing the off-heap memory in the same way as Spark sizes on-heap memory.
// The 300MB value, unfortunately, is hard-coded in Spark code.
((onHeapSize - (300 * 1024 * 1024)) *
conf.getDouble(GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 0.6d)).toLong
} else {
// Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution
// memory pool, regardless of Spark option spark.memory.storageFraction.
conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY, 0L)
}

conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString)
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
conf.set(GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString)
conf.set(SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)

val offHeapPerTask = offHeapSize / taskSlots
conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString)
conf.set(GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString)

// If we are using dynamic off-heap sizing, we should also enable off-heap memory
// officially.
if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")
if (conf.getBoolean(GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
conf.set(SPARK_OFFHEAP_ENABLED, "true")

// We already sized the off-heap per task in a conservative manner, so we can just
// use it.
conf.set(
GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
offHeapPerTask.toString)
conf.set(GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString)
} else {
// Let's make sure this is set to false explicitly if it is not on as it
// is looked up when throwing OOF exceptions.
conf.set(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, "false")
conf.set(GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, "false")

// Pessimistic off-heap sizes, with the assumption that all non-borrowable storage memory
// determined by spark.memory.storageFraction was used.
val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d)
val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong / taskSlots
conf.set(
GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
conservativeOffHeapPerTask.toString)
}

Expand All @@ -235,28 +219,28 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
// https://github.com/apache/incubator-gluten/pull/1931 was merged?
if (
!conf.getBoolean(
GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key,
GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
VANILLA_VECTORIZED_READERS_ENABLED.key,
VANILLA_VECTORIZED_READERS_ENABLED.defaultValue.get)
) {
// FIXME Hongze 22/12/06
// BatchScan.scala in shim was not always loaded by class loader.
// The file should be removed and the "ClassCastException" issue caused by
// spark.sql.<format>.enableVectorizedReader=true should be fixed in another way.
// Before the issue is fixed we force the use of vanilla row reader by using
// the following statement.
conf.set("spark.sql.parquet.enableVectorizedReader", "false")
conf.set("spark.sql.orc.enableVectorizedReader", "false")
conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", "false")
conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false")
conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false")
conf.set(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key, "false")
}
// When the Velox cache is enabled, the Velox file handle cache should also be enabled.
// Otherwise, a 'reference id not found' error may occur.
if (
conf.getBoolean(GlutenConfig.COLUMNAR_VELOX_CACHE_ENABLED.key, false) && !conf.getBoolean(
GlutenConfig.COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key,
false)
conf.getBoolean(COLUMNAR_VELOX_CACHE_ENABLED.key, false) &&
!conf.getBoolean(COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key, false)
) {
throw new IllegalArgumentException(s"${GlutenConfig.COLUMNAR_VELOX_CACHE_ENABLED.key} and " +
s"${GlutenConfig.COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key} should be enabled together.")
throw new IllegalArgumentException(
s"${COLUMNAR_VELOX_CACHE_ENABLED.key} and " +
s"${COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED.key} should be enabled together.")
}
}
}
Expand All @@ -266,9 +250,7 @@ private[gluten] class GlutenExecutorPlugin extends ExecutorPlugin {

/** Initialize the executor plugin. */
override def init(ctx: PluginContext, extraConf: util.Map[String, String]): Unit = {
val conf = ctx.conf()

// Initialize Backends API.
// Initialize Backend.
Backend.get().onExecutorStart(ctx)
}

Expand Down

0 comments on commit f0c643d

Please sign in to comment.