Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-3559][VL] Fix failing UTs with spark 3.4 #4023

Merged
merged 13 commits into from
Dec 26, 2023
15 changes: 15 additions & 0 deletions gluten-ut/spark34/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,23 @@
</exclusion>
</exclusions>
</dependency>
<!-- log4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<slf4j.version>2.0.6</slf4j.version>
<log4j.version>2.19.0</log4j.version>
<clickhouse.lib.path></clickhouse.lib.path>
</properties>
</profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class VeloxTestSettings extends BackendTestSettings {
"INCONSISTENT_BEHAVIOR_CROSS_VERSION: compatibility with Spark 2.4/3.2 in reading/writing dates")
.exclude("FAILED_EXECUTE_UDF: execute user defined function")
.exclude("UNRECOGNIZED_SQL_TYPE: unrecognized SQL type -100")
.exclude("INVALID_BUCKET_FILE: error if there exists any malformed bucket files")
.excludeByPrefix("SCALAR_SUBQUERY_TOO_MANY_ROWS:")
.excludeByPrefix("UNSUPPORTED_FEATURE.MULTI_ACTION_ALTER:")
enableSuite[GlutenQueryParsingErrorsSuite]
Expand Down Expand Up @@ -183,7 +182,7 @@ class VeloxTestSettings extends BackendTestSettings {
.includeByPrefix(
"gluten",
"SPARK-29906",
// "SPARK-30291",
"SPARK-30291",
"SPARK-30403",
"SPARK-30719",
"SPARK-31384",
Expand All @@ -196,7 +195,7 @@ class VeloxTestSettings extends BackendTestSettings {
"SPARK-35585",
"SPARK-32932",
"SPARK-33494",
// "SPARK-33933",
"SPARK-33933",
"SPARK-31220",
"SPARK-35874",
"SPARK-39551"
Expand Down Expand Up @@ -987,7 +986,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenCountMinSketchAggQuerySuite]
enableSuite[GlutenCsvFunctionsSuite]
enableSuite[GlutenCTEHintSuite]
.exclude("Resolve join hint in CTE")
enableSuite[GlutenCTEInlineSuiteAEOff]
enableSuite[GlutenCTEInlineSuiteAEOn]
enableSuite[GlutenDataFrameAggregateSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ class GlutenAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQL
}
}

ignore("gluten Logging plan changes for AQE") {
test("gluten Logging plan changes for AQE") {
val testAppender = new LogAppender("plan changes")
withLogAppender(testAppender) {
withSQLConf(
Expand Down Expand Up @@ -1450,7 +1450,7 @@ class GlutenAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQL
}
}

ignore("gluten test log level") {
test("gluten test log level") {
def verifyLog(expectedLevel: Level): Unit = {
val logAppender = new LogAppender("adaptive execution")
logAppender.setThreshold(expectedLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ package org.apache.spark.sql.execution.exchange
import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Literal}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.statsEstimation.StatsTestPlan
import org.apache.spark.sql.execution.{DummySparkPlan, SortExec}
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.internal.SQLConf

class GlutenEnsureRequirementsSuite extends EnsureRequirementsSuite with GlutenSQLTestsBaseTrait {
Expand All @@ -42,4 +48,57 @@ class GlutenEnsureRequirementsSuite extends EnsureRequirementsSuite with GlutenS
assert(res.queryExecution.executedPlan.collect { case s: ShuffleExchangeLike => s }.size == 2)
}
}

test(GLUTEN_TEST + "SPARK-41986: Introduce shuffle on SinglePartition") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ayushi-agarwal What is the reason behind rewriting this suite? Is it solely due to the addition of the configuration SQLConf.SHUFFLE_PARTITIONS.key -> "5"?

Copy link
Contributor Author

@ayushi-agarwal ayushi-agarwal Dec 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the test above this is also rewritten because of it. I think now both test cases can be removed from this file as the config has been added in line 34 now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf I have removed it.

val filesMaxPartitionBytes = conf.filesMaxPartitionBytes
val exprA = Literal(1)
val exprC = Literal(3)
val EnsureRequirements = new EnsureRequirements()
withSQLConf(
SQLConf.MAX_SINGLE_PARTITION_BYTES.key -> filesMaxPartitionBytes.toString,
SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
Seq(filesMaxPartitionBytes, filesMaxPartitionBytes + 1).foreach {
size =>
val logicalPlan = StatsTestPlan(Nil, 1L, AttributeMap.empty, Some(size))
val left = DummySparkPlan(outputPartitioning = SinglePartition)
left.setLogicalLink(logicalPlan)

val right = DummySparkPlan(outputPartitioning = SinglePartition)
right.setLogicalLink(logicalPlan)
val smjExec = SortMergeJoinExec(exprA :: Nil, exprC :: Nil, Inner, None, left, right)

if (size <= filesMaxPartitionBytes) {
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(
leftKeys,
rightKeys,
_,
_,
SortExec(_, _, _: DummySparkPlan, _),
SortExec(_, _, _: DummySparkPlan, _),
_) =>
assert(leftKeys === Seq(exprA))
assert(rightKeys === Seq(exprC))
case other => fail(other.toString)
}
} else {
EnsureRequirements.apply(smjExec) match {
case SortMergeJoinExec(
leftKeys,
rightKeys,
_,
_,
SortExec(_, _, ShuffleExchangeExec(left: HashPartitioning, _, _), _),
SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _),
_) =>
assert(leftKeys === Seq(exprA))
assert(rightKeys === Seq(exprC))
assert(left.numPartitions == 5)
assert(right.numPartitions == 5)
case other => fail(other.toString)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.collection.mutable.ArrayBuffer

class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper {

ignore("test fallback logging") {
test("test fallback logging") {
val testAppender = new LogAppender("fallback reason")
withLogAppender(testAppender) {
withSQLConf(
Expand All @@ -41,13 +41,14 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
sql("SELECT * FROM t").collect()
}
}
val msgRegex = """Validation failed for plan: Scan parquet default\.t\[QueryId=[0-9]+\],""" +
""" due to: columnar FileScan is not enabled in FileSourceScanExec\."""
val msgRegex =
"""Validation failed for plan: Scan parquet spark_catalog.default\.t\[QueryId=[0-9]+\],""" +
""" due to: columnar FileScan is not enabled in FileSourceScanExec\."""
assert(testAppender.loggingEvents.exists(_.getMessage.getFormattedMessage.matches(msgRegex)))
}
}

ignore("test fallback event") {
test("test fallback event") {
val kvStore = spark.sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
val glutenStore = new GlutenSQLAppStatusStore(kvStore)
assert(glutenStore.buildInfo().info.find(_._1 == "Gluten Version").exists(_._2 == VERSION))
Expand Down Expand Up @@ -77,7 +78,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
val id = runExecution("SELECT * FROM t")
val execution = glutenStore.execution(id)
assert(execution.isDefined)
assert(execution.get.numGlutenNodes == 2)
assert(execution.get.numGlutenNodes == 1)
JkSelf marked this conversation as resolved.
Show resolved Hide resolved
assert(execution.get.numFallbackNodes == 0)
assert(execution.get.fallbackNodeToReason.isEmpty)

Expand All @@ -86,9 +87,9 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
val execution = glutenStore.execution(id)
assert(execution.isDefined)
assert(execution.get.numGlutenNodes == 0)
assert(execution.get.numFallbackNodes == 2)
assert(execution.get.numFallbackNodes == 1)
val fallbackReason = execution.get.fallbackNodeToReason.head
assert(fallbackReason._1.contains("Scan parquet default.t"))
assert(fallbackReason._1.contains("Scan parquet spark_catalog.default.t"))
assert(fallbackReason._2.contains("columnar FileScan is not enabled in FileSourceScanExec"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class Spark34Shims extends SparkShims {
f =>
BucketingUtils
.getBucketId(f.toPath.getName)
.getOrElse(throw invalidBucketFile(f.toPath.getName))
.getOrElse(throw invalidBucketFile(f.urlEncodedPath))
}
}

Expand Down Expand Up @@ -148,10 +148,11 @@ class Spark34Shims extends SparkShims {
}
}

// https://issues.apache.org/jira/browse/SPARK-40400
private def invalidBucketFile(path: String): Throwable = {
new SparkException(
errorClass = "INVALID_BUCKET_FILE",
messageParameters = Map("error" -> path),
messageParameters = Map("path" -> path),
cause = null)
}
}
Loading