Skip to content

Commit

Permalink
UT adjust override checkScanSchemata & enabling ut of exclude_by_suff…
Browse files Browse the repository at this point in the history
…ix fea. (#11520)

* override checkScanScehmata

Signed-off-by: fejiang <[email protected]>

* case added for pruning suite

Signed-off-by: fejiang <[email protected]>

* exclude by suffix added

Signed-off-by: fejiang <[email protected]>

* issue number assigned

Signed-off-by: fejiang <[email protected]>

* nit

Signed-off-by: fejiang <[email protected]>

---------

Signed-off-by: fejiang <[email protected]>
  • Loading branch information
Feng-Jiang28 authored Oct 25, 2024
1 parent 05f40b5 commit 910b64d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,33 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.suites

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaPruningSuite
import org.apache.spark.sql.rapids.GpuFileSourceScanExec
import org.apache.spark.sql.rapids.utils.RapidsSQLTestsBaseTrait

class RapidsParquetSchemaPruningSuite
extends ParquetSchemaPruningSuite
with RapidsSQLTestsBaseTrait {}
with RapidsSQLTestsBaseTrait {

override protected def checkScanSchemata(df: DataFrame,
expectedSchemaCatalogStrings: String*): Unit = {
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
case scan: FileSourceScanExec => scan.requiredSchema
case gpuScan: GpuFileSourceScanExec => gpuScan.requiredSchema
}
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
s"but expected $expectedSchemaCatalogStrings")
fileSourceScanSchemata.zip(expectedSchemaCatalogStrings).foreach {
case (scanSchema, expectedScanSchemaCatalogString) =>
val expectedScanSchema = CatalystSqlParser.parseDataType(expectedScanSchemaCatalogString)
implicit val equality = schemaEquality
assert(scanSchema === expectedScanSchema)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ abstract class BackendTestSettings {
this
}

def excludeBySuffix(suffixes: String, reason: ExcludeReason): SuiteSettings = {
exclusion.add(ExcludeBySuffix(suffixes))
excludeReasons.add(reason)
this
}

def includeRapidsTestsByPrefix(prefixes: String*): SuiteSettings = {
inclusion.add(IncludeRapidsTestByPrefix(prefixes: _*))
this
Expand All @@ -152,13 +158,20 @@ abstract class BackendTestSettings {
this
}

def excludeRapidsTestsBySuffix(suffixes: String, reason: ExcludeReason): SuiteSettings = {
exclusion.add(ExcludeRadpisTestByPrefix(suffixes))
excludeReasons.add(reason)
this
}

def includeAllRapidsTests(): SuiteSettings = {
inclusion.add(IncludeByPrefix(RAPIDS_TEST))
this
}

def excludeAllRapidsTests(reason: ExcludeReason): SuiteSettings = {
exclusion.add(ExcludeByPrefix(RAPIDS_TEST))
exclusion.add(ExcludeBySuffix(RAPIDS_TEST))
excludeReasons.add(reason)
this
}
Expand Down Expand Up @@ -210,6 +223,15 @@ abstract class BackendTestSettings {
}
}

private case class ExcludeBySuffix(suffixes: String*) extends ExcludeBase {
override def isExcluded(testName: String): Boolean = {
if (suffixes.exists(suffix => testName.endsWith(suffix))) {
return true
}
false
}
}

private case class IncludeRapidsTestByPrefix(prefixes: String*) extends IncludeBase {
override def isIncluded(testName: String): Boolean = {
if (prefixes.exists(prefix => testName.startsWith(RAPIDS_TEST + prefix))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@ class RapidsTestSettings extends BackendTestSettings {
.exclude("SPARK-31159: rebasing dates in write", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11480"))
.exclude("SPARK-35427: datetime rebasing in the EXCEPTION mode", ADJUST_UT("original test case inherited from Spark cannot find the needed local resources"))
enableSuite[RapidsParquetSchemaPruningSuite]
.excludeByPrefix("Spark vectorized reader", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11405"))
.excludeByPrefix("Non-vectorized reader", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11405"))
.excludeByPrefix("Case-insensitive parser", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11405"))
.excludeByPrefix("Case-sensitive parser", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11405"))
.excludeBySuffix("select a single complex field", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11619"))
.excludeBySuffix("select a single complex field and the partition column", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11620"))
.excludeBySuffix("select missing subfield", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11621"))
.excludeBySuffix("select explode of nested field of array of struct", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11653"))
.excludeBySuffix("empty schema intersection", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11627"))
.excludeBySuffix("select one deep nested complex field after join", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11628"))
.excludeBySuffix("select one deep nested complex field after outer join", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11629"))
enableSuite[RapidsParquetSchemaSuite]
.exclude("schema mismatch failure error message for parquet reader", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11434"))
.exclude("schema mismatch failure error message for parquet vectorized reader", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/11446"))
Expand Down

0 comments on commit 910b64d

Please sign in to comment.