Skip to content

Commit

Permalink
[GLUTEN-7727][CORE] Unify the the variable name of GlutenConfig with …
Browse files Browse the repository at this point in the history
…glutenConf (apache#7728)
  • Loading branch information
beliefer authored Nov 4, 2024
1 parent 2bf912b commit a053338
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ private object CHRuleApi {
injector.injectTransform(
c =>
intercept(
SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session)))
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
c.session)))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))

// Gluten columnar: Fallback policies.
Expand All @@ -98,14 +99,15 @@ private object CHRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => intercept(each(c.session))))
injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))
injector.injectTransform(
c =>
intercept(SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session)))
intercept(
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session)))

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import scala.util.control.Breaks.{break, breakable}
// queryStagePrepRules.
case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
val columnarConf: GlutenConfig = GlutenConfig.getConf
val glutenConf: GlutenConfig = GlutenConfig.getConf
plan.foreach {
case bhj: BroadcastHashJoinExec =>
val buildSidePlan = bhj.buildSide match {
Expand All @@ -53,8 +53,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
case Some(exchange @ BroadcastExchangeExec(mode, child)) =>
val isTransformable =
if (
!columnarConf.enableColumnarBroadcastExchange ||
!columnarConf.enableColumnarBroadcastJoin
!glutenConf.enableColumnarBroadcastExchange ||
!glutenConf.enableColumnarBroadcastJoin
) {
ValidationResult.failed(
"columnar broadcast exchange is disabled or " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession)
extends Rule[SparkPlan]
with Logging {

val columnarConf: GlutenConfig = GlutenConfig.getConf
val scanOnly: Boolean = columnarConf.enableScanOnly
val enableColumnarHashAgg: Boolean = !scanOnly && columnarConf.enableColumnarHashAgg
val glutenConf: GlutenConfig = GlutenConfig.getConf
val scanOnly: Boolean = glutenConf.enableScanOnly
val enableColumnarHashAgg: Boolean = !scanOnly && glutenConf.enableColumnarHashAgg
val replaceSortAggWithHashAgg: Boolean = GlutenConfig.getConf.forceToUseHashAgg

private def isPartialAgg(partialAgg: BaseAggregateExec, finalAgg: BaseAggregateExec): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ private object VeloxRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))

// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
}

Expand Down Expand Up @@ -116,9 +116,9 @@ private object VeloxRuleApi {
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.inject(c => each(c.session)))
injector.inject(c => ColumnarCollapseTransformStages(c.conf))
injector.inject(c => ColumnarCollapseTransformStages(c.glutenConf))
injector.inject(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.inject(c => GlutenFallbackReporter(c.conf, c.session))
injector.inject(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.inject(_ => RemoveFallbackTagRule())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object ColumnarRuleApplier {
val session: SparkSession,
val ac: AdaptiveContext,
val outputsColumnar: Boolean) {
val conf: GlutenConfig = {
val glutenConf: GlutenConfig = {
new GlutenConfig(session.sessionState.conf)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ case class FallbackOnANSIMode(session: SparkSession) extends Rule[SparkPlan] {
}

case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] {
lazy val columnarConf: GlutenConfig = GlutenConfig.getConf
lazy val physicalJoinOptimize = columnarConf.enablePhysicalJoinOptimize
lazy val optimizeLevel: Integer = columnarConf.physicalJoinOptimizationThrottle
lazy val glutenConf: GlutenConfig = GlutenConfig.getConf
lazy val physicalJoinOptimize = glutenConf.enablePhysicalJoinOptimize
lazy val optimizeLevel: Integer = glutenConf.physicalJoinOptimizationThrottle

def existsMultiCodegens(plan: SparkPlan, count: Int = 0): Boolean =
plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,47 +149,50 @@ object Validators {
}
}

private class FallbackByUserOptions(conf: GlutenConfig) extends Validator {
private class FallbackByUserOptions(glutenConf: GlutenConfig) extends Validator {
override def validate(plan: SparkPlan): Validator.OutCome = plan match {
case p: SortExec if !conf.enableColumnarSort => fail(p)
case p: WindowExec if !conf.enableColumnarWindow => fail(p)
case p: SortMergeJoinExec if !conf.enableColumnarSortMergeJoin => fail(p)
case p: BatchScanExec if !conf.enableColumnarBatchScan => fail(p)
case p: FileSourceScanExec if !conf.enableColumnarFileScan => fail(p)
case p: ProjectExec if !conf.enableColumnarProject => fail(p)
case p: FilterExec if !conf.enableColumnarFilter => fail(p)
case p: UnionExec if !conf.enableColumnarUnion => fail(p)
case p: ExpandExec if !conf.enableColumnarExpand => fail(p)
case p: SortAggregateExec if !conf.forceToUseHashAgg => fail(p)
case p: ShuffledHashJoinExec if !conf.enableColumnarShuffledHashJoin => fail(p)
case p: ShuffleExchangeExec if !conf.enableColumnarShuffle => fail(p)
case p: BroadcastExchangeExec if !conf.enableColumnarBroadcastExchange => fail(p)
case p @ (_: LocalLimitExec | _: GlobalLimitExec) if !conf.enableColumnarLimit => fail(p)
case p: GenerateExec if !conf.enableColumnarGenerate => fail(p)
case p: CoalesceExec if !conf.enableColumnarCoalesce => fail(p)
case p: CartesianProductExec if !conf.cartesianProductTransformerEnabled => fail(p)
case p: SortExec if !glutenConf.enableColumnarSort => fail(p)
case p: WindowExec if !glutenConf.enableColumnarWindow => fail(p)
case p: SortMergeJoinExec if !glutenConf.enableColumnarSortMergeJoin => fail(p)
case p: BatchScanExec if !glutenConf.enableColumnarBatchScan => fail(p)
case p: FileSourceScanExec if !glutenConf.enableColumnarFileScan => fail(p)
case p: ProjectExec if !glutenConf.enableColumnarProject => fail(p)
case p: FilterExec if !glutenConf.enableColumnarFilter => fail(p)
case p: UnionExec if !glutenConf.enableColumnarUnion => fail(p)
case p: ExpandExec if !glutenConf.enableColumnarExpand => fail(p)
case p: SortAggregateExec if !glutenConf.forceToUseHashAgg => fail(p)
case p: ShuffledHashJoinExec if !glutenConf.enableColumnarShuffledHashJoin => fail(p)
case p: ShuffleExchangeExec if !glutenConf.enableColumnarShuffle => fail(p)
case p: BroadcastExchangeExec if !glutenConf.enableColumnarBroadcastExchange => fail(p)
case p @ (_: LocalLimitExec | _: GlobalLimitExec) if !glutenConf.enableColumnarLimit =>
fail(p)
case p: GenerateExec if !glutenConf.enableColumnarGenerate => fail(p)
case p: CoalesceExec if !glutenConf.enableColumnarCoalesce => fail(p)
case p: CartesianProductExec if !glutenConf.cartesianProductTransformerEnabled => fail(p)
case p: TakeOrderedAndProjectExec
if !(conf.enableTakeOrderedAndProject && conf.enableColumnarSort &&
conf.enableColumnarShuffle && conf.enableColumnarProject) =>
if !(glutenConf.enableTakeOrderedAndProject && glutenConf.enableColumnarSort &&
glutenConf.enableColumnarShuffle && glutenConf.enableColumnarProject) =>
fail(p)
case p: BroadcastHashJoinExec if !conf.enableColumnarBroadcastJoin =>
case p: BroadcastHashJoinExec if !glutenConf.enableColumnarBroadcastJoin =>
fail(p)
case p: BroadcastNestedLoopJoinExec
if !(conf.enableColumnarBroadcastJoin &&
conf.broadcastNestedLoopJoinTransformerTransformerEnabled) =>
if !(glutenConf.enableColumnarBroadcastJoin &&
glutenConf.broadcastNestedLoopJoinTransformerTransformerEnabled) =>
fail(p)
case p @ (_: HashAggregateExec | _: SortAggregateExec | _: ObjectHashAggregateExec)
if !conf.enableColumnarHashAgg =>
if !glutenConf.enableColumnarHashAgg =>
fail(p)
case p
if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(
plan) && !conf.enableColumnarWindowGroupLimit =>
plan) && !glutenConf.enableColumnarWindowGroupLimit =>
fail(p)
case p
if HiveTableScanExecTransformer.isHiveTableScan(p) && !conf.enableColumnarHiveTableScan =>
if HiveTableScanExecTransformer.isHiveTableScan(
p) && !glutenConf.enableColumnarHiveTableScan =>
fail(p)
case p: SampleExec
if !(conf.enableColumnarSample && BackendsApiManager.getSettings.supportSampleExec()) =>
if !(glutenConf.enableColumnarSample && BackendsApiManager.getSettings
.supportSampleExec()) =>
fail(p)
case _ => pass()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ case class InputIteratorTransformer(child: SparkPlan) extends UnaryTransformSupp
* generate/compile code.
*/
case class ColumnarCollapseTransformStages(
glutenConfig: GlutenConfig,
glutenConf: GlutenConfig,
transformStageCounter: AtomicInteger = ColumnarCollapseTransformStages.transformStageCounter)
extends Rule[SparkPlan] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import org.apache.spark.sql.execution.ui.GlutenEventUtils
* This rule is used to collect all fallback reason.
* 1. print fallback reason for each plan node 2. post all fallback reason using one event
*/
case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSession)
case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession)
extends Rule[SparkPlan]
with LogLevelUtil {

override def apply(plan: SparkPlan): SparkPlan = {
if (!glutenConfig.enableFallbackReport) {
if (!glutenConf.enableFallbackReport) {
return plan
}
printFallbackReason(plan)
Expand All @@ -52,7 +52,7 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio
}

private def printFallbackReason(plan: SparkPlan): Unit = {
val validationLogLevel = glutenConfig.validationLogLevel
val validationLogLevel = glutenConf.validationLogLevel
plan.foreachUp {
case _: GlutenPlan => // ignore
case p: SparkPlan if FallbackTags.nonEmpty(p) =>
Expand Down

0 comments on commit a053338

Please sign in to comment.