Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Spark UT framework: SPARK-10136 list of primitive list, custom created nested structure Parquet parsing issue #11589

Open
Tracked by #11381
Feng-Jiang28 opened this issue Oct 11, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@Feng-Jiang28
Copy link
Collaborator

Feng-Jiang28 commented Oct 11, 2024

Description:
Parquet files can be created in several other ways besides Spark,
the test case SPARK-10136 defined schema of the Parquet file to have a nested structure: a list of integers, then created the parquet file. Rapids failed to read such a specific structure of parquet, to fix this we need to correctly handle the nesting parquet.

root
 |-- f: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = true)

The error occurs when trying to read the Parquet file:

java.lang.ClassCastException: repeated group f_tuple (LIST) {
  repeated int32 f_tuple_tuple;
} is not primitive

The expected output of Spark:

scala> val readDf = spark.read.parquet(PATH_TO_nested_tuples_in_arrays.parquet)
readDf: org.apache.spark.sql.DataFrame = [f: array<array<int>>]

scala> readDf.show()
+----------------+
|               f|
+----------------+
|[[4, 5], [6, 7]]|
|[[0, 1], [2, 3]]|
+----------------+

GPU:

scala> val readDf = spark.read.parquet(PATH_TO_nested_tuples_in_arrays.parquet)
readDf: org.apache.spark.sql.DataFrame = [f: array<array<int>>]

scala> readDf.show()
24/10/11 14:28:14 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(f#0 as string) AS f#3 will run on GPU
      *Expression <Cast> cast(f#0 as string) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

24/10/11 14:28:15 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.ClassCastException: repeated group f_tuple (LIST) {
  repeated int32 f_tuple_tuple;
} is not primitive
	at org.apache.parquet.schema.Type.asPrimitiveType(Type.java:259)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.clipSparkType(ParquetSchemaUtils.scala:358)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.clipSparkArrayType(ParquetSchemaUtils.scala:374)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.clipSparkType(ParquetSchemaUtils.scala:349)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.clipSparkArrayType(ParquetSchemaUtils.scala:412)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.clipSparkType(ParquetSchemaUtils.scala:349)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.updateField$1(ParquetSchemaUtils.scala:448)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.$anonfun$clipSparkStructType$6(ParquetSchemaUtils.scala:469)
	at scala.Option.map(Option.scala:230)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.matchCaseInsensitiveField$2(ParquetSchemaUtils.scala:462)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.$anonfun$clipSparkStructType$10(ParquetSchemaUtils.scala:500)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at org.apache.spark.sql.types.StructType.flatMap(StructType.scala:102)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.clipSparkStructType(ParquetSchemaUtils.scala:494)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.clipSparkSchema(ParquetSchemaUtils.scala:338)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.$anonfun$evolveSchemaIfNeededAndClose$1(ParquetSchemaUtils.scala:513)
	at com.nvidia.spark.rapids.Arm$.closeOnExcept(Arm.scala:98)
	at com.nvidia.spark.rapids.ParquetSchemaUtils$.evolveSchemaIfNeededAndClose(ParquetSchemaUtils.scala:512)
	at com.nvidia.spark.rapids.ParquetTableReader.next(GpuParquetScan.scala:2722)
	at com.nvidia.spark.rapids.ParquetTableReader.next(GpuParquetScan.scala:2668)
	at com.nvidia.spark.rapids.CachedGpuBatchIterator$.$anonfun$apply$1(GpuDataProducer.scala:159)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.CachedGpuBatchIterator$.apply(GpuDataProducer.scala:156)
	at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readBatch$4(GpuMultiFileReader.scala:1066)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$AutoCloseableAttemptSpliterator.next(RmmRapid

The parquet file is provided in attachment.nested_tuples_in_arrays.zip

P.S:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Define the schema
val schema = StructType(Seq(
  StructField("f", ArrayType(ArrayType(IntegerType)), nullable = false)
))

// Create the data
val data = Seq(
  Row(Seq(Seq(0, 1), Seq(2, 3))),
  Row(Seq(Seq(4, 5), Seq(6, 7)))
)

// Create DataFrame
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

// Write to Parquet
df.write.parquet("/home/fejiang/Desktop/temp36")

val readDf = spark.read.parquet("/home/fejiang/Desktop/temp3")
readDf.show()
readDf.printSchema()

Running the code above, you can find that the beviour is as excepted.

+----------------+
|               f|
+----------------+
|[[4, 5], [6, 7]]|
|[[0, 1], [2, 3]]|
+----------------+


scala> readDf.printSchema()
root
 |-- f: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = true)

It might be because the parquet created by Spark has slightly difference from the parquet created by other ways.

@Feng-Jiang28 Feng-Jiang28 changed the title SPARK-10136 list of primitive list [BUG] Spark UT framework: SPARK-10136 list of primitive list, custom nested structure Parquet representation issue Oct 11, 2024
@Feng-Jiang28 Feng-Jiang28 changed the title [BUG] Spark UT framework: SPARK-10136 list of primitive list, custom nested structure Parquet representation issue [BUG] Spark UT framework: SPARK-10136 list of primitive list, custom nested structure Parquet parsing issue Oct 11, 2024
@Feng-Jiang28 Feng-Jiang28 added bug Something isn't working ? - Needs Triage Need team to review and classify labels Oct 11, 2024
@Feng-Jiang28 Feng-Jiang28 changed the title [BUG] Spark UT framework: SPARK-10136 list of primitive list, custom nested structure Parquet parsing issue [BUG] Spark UT framework: SPARK-10136 list of primitive list, custom created nested structure Parquet parsing issue Oct 11, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Oct 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants