Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][Java to Scala test conversion] Convert and refactor TestParquetBatchReader #2714

Merged
merged 5 commits into from
Mar 4, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,58 +15,35 @@
*/
package io.delta.kernel.defaults.internal.parquet
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! you moved this file to the correct package.


import java.io.File
import java.math.{BigDecimal, RoundingMode}
import java.math.BigDecimal

import scala.collection.mutable.ArrayBuffer

import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath}
import io.delta.kernel.defaults.internal.DefaultKernelUtils.DateTimeConstants
import io.delta.golden.GoldenTableUtils.goldenTableFile
import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow, VectorTestUtils}
import io.delta.kernel.types._
import org.scalatest.funsuite.AnyFunSuite

class ParquetFileReaderSuite extends AnyFunSuite
with ParquetSuiteBase with VectorTestUtils with ExpressionTestUtils {

def getSingleParquetFile(directory: File): String = {
val parquetFiles = directory.listFiles().filter(_.getName.endsWith(".parquet"))
assert(parquetFiles.size == 1)
parquetFiles.head.getAbsolutePath
}

//////////////////////////////////////////////////////////////////////////////////
// Decimal type tests
//////////////////////////////////////////////////////////////////////////////////

private val DECIMAL_TYPES_DICT_FILE_V1 = getSingleParquetFile(
goldenTableFile("parquet-decimal-dictionaries-v1"))

private val DECIMAL_TYPES_DICT_FILE_V2 = getSingleParquetFile(
goldenTableFile("parquet-decimal-dictionaries-v2"))

test("decimals encoded using dictionary encoding ") {
val DECIMAL_DICT_FILE_V1 = goldenTableFile("parquet-decimal-dictionaries-v1").getAbsolutePath
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you convert the variable name to camelCase: decimalDictFileV1? Same for other other ones.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment: Below golden tables contains three decimal columns each stored in a different physical format: int32, int64 and fixed binary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkorukanti TYSM, I have updated it.

val DECIMAL_DICT_FILE_V2 = goldenTableFile("parquet-decimal-dictionaries-v2").getAbsolutePath

val expResult = (0 until 1000000).map { i =>
TestRow(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2))
}

val readSchema = new StructType()
.add("id", IntegerType.INTEGER)
.add("col1", new DecimalType(9, 0)) // INT32: 1 <= precision <= 9
.add("col2", new DecimalType(12, 0)) // INT64: 10 <= precision <= 18
.add("col3", new DecimalType(25, 0)) // FIXED_LEN_BYTE_ARRAY
val readSchema = tableSchema(DECIMAL_DICT_FILE_V1)

for (file <- Seq(DECIMAL_TYPES_DICT_FILE_V1, DECIMAL_TYPES_DICT_FILE_V2)) {
val actResult = readParquetFilesUsingSpark(file, readSchema)
for (file <- Seq(DECIMAL_DICT_FILE_V1, DECIMAL_DICT_FILE_V2)) {
val actResult = readParquetFilesUsingKernel(file, readSchema)

checkAnswer(actResult, expResult)
}
}

private val LARGE_SCALE_DECIMAL_TYPES_FILE = getSingleParquetFile(
goldenTableFile("parquet-decimal-type"))

test("large scale decimal type file") {
val LARGE_SCALE_DECIMAL_TYPES_FILE = goldenTableFile("parquet-decimal-type").getAbsolutePath

def expand(n: BigDecimal): BigDecimal = {
n.scaleByPowerOfTen(5).add(n)
Expand All @@ -77,11 +54,7 @@ class ParquetFileReaderSuite extends AnyFunSuite
val n = BigDecimal.valueOf(i)
TestRow(i, n.movePointLeft(1).setScale(1), n.setScale(5), n.setScale(5))
} else {
val negation = if (i % 33 == 0) {
-1
} else {
1
}
val negation = if (i % 33 == 0) -1 else 1
val n = BigDecimal.valueOf(i*negation)
TestRow(
i,
Expand All @@ -92,25 +65,21 @@ class ParquetFileReaderSuite extends AnyFunSuite
}
}

val readSchema = new StructType()
.add("id", IntegerType.INTEGER)
.add("col1", new DecimalType(5, 1)) // INT32: 1 <= precision <= 9
.add("col2", new DecimalType(10, 5)) // INT64: 10 <= precision <= 18
.add("col3", new DecimalType(20, 5)) // FIXED_LEN_BYTE_ARRAY
val readSchema = tableSchema(LARGE_SCALE_DECIMAL_TYPES_FILE)

val actResult = readParquetFilesUsingSpark(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema)
val actResult = readParquetFilesUsingKernel(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema)

checkAnswer(actResult, expResult)
}

private val ALL_TYPES_FILE = getSingleParquetFile(goldenTableFile("parquet-all-types"))
private val ALL_TYPES_FILE = goldenTableFile("parquet-all-types").getAbsolutePath

test("read all types of data") {
val readSchema = tableSchema(goldenTablePath("parquet-all-types"))
val readSchema = tableSchema(ALL_TYPES_FILE)

val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema)
val actResult = readParquetFilesUsingKernel(ALL_TYPES_FILE, readSchema)

val expResult = generateRowsFromAllTypesFile(readSchema)
val expResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema)

checkAnswer(actResult, expResult)
}
Expand All @@ -126,9 +95,9 @@ class ParquetFileReaderSuite extends AnyFunSuite
.add("ac", new StructType().add("aca", IntegerType.INTEGER)))
.add("array_of_prims", new ArrayType(IntegerType.INTEGER, true))

val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema)
val actResult = readParquetFilesUsingKernel(ALL_TYPES_FILE, readSchema)

val expResult = generateRowsFromAllTypesFile(readSchema)
val expResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema)

checkAnswer(actResult, expResult)
}
Expand All @@ -144,9 +113,9 @@ class ParquetFileReaderSuite extends AnyFunSuite
.add("aa", StringType.STRING)
.add("ac", new StructType().add("aca", IntegerType.INTEGER)))

val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema)
val actResult = readParquetFilesUsingKernel(ALL_TYPES_FILE, readSchema)

val expResult = generateRowsFromAllTypesFile(readSchema)
val expResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema)

checkAnswer(actResult, expResult)
}
Expand All @@ -170,107 +139,4 @@ class ParquetFileReaderSuite extends AnyFunSuite

checkAnswer(actResult2, expResult2)
}

private def generateRowsFromAllTypesFile(readSchema: StructType): Seq[TestRow] = {
(0 until 200).map { rowId =>
val expRow = ArrayBuffer.empty[Any]
readSchema.fields.forEach { field =>
val name = field.getName.toLowerCase()
val expValue = name match {
case "booleantype" => if (rowId % 87 != 0) rowId % 2 == 0 else null
case "bytetype" => if (rowId % 72 != 0) rowId.toByte else null
case "shorttype" => if (rowId % 56 != 0) rowId.toShort else null
case "datetype" => if (rowId % 61 != 0) {
Math.floorDiv(rowId * 20000000L, DateTimeConstants.MILLIS_PER_DAY).toInt
} else null
case "integertype" => if (rowId % 23 != 0) rowId else null
case "longtype" => if (rowId % 25 != 0) rowId + 1L else null
case "floattype" => if (rowId % 28 != 0) {
new BigDecimal(rowId * 0.234f).setScale(3, RoundingMode.HALF_UP)
.stripTrailingZeros.floatValue
} else null
case "doubletype" => if (rowId % 54 != 0) rowId * 234234.23d else null
case "stringtype" => if (rowId % 57 != 0) rowId.toString else null
case "binarytype" => if (rowId % 59 != 0) rowId.toString.getBytes else null
case "timestamptype" =>
// Tests only for spark.sql.parquet.outputTimestampTyp = INT96, other formats
// are tested in end-to-end tests in DeltaTableReadsSuite
if (rowId % 62 != 0) 23423523L * rowId * 1000 else null
case "decimal" =>
// Value is rounded to scale=2 when written
if (rowId % 67 != 0) new BigDecimal(rowId * 123.52).setScale(2, RoundingMode.HALF_UP)
else null
case "nested_struct" => generateNestedStructColumn(rowId)
case "array_of_prims" => if (rowId % 25 == 0) null
else if (rowId % 29 == 0) Vector.empty[Int]
else Vector(rowId, null, rowId + 1)
case "array_of_arrays" => generateArrayOfArraysColumn(rowId)
case "array_of_structs" => Vector(TestRow(rowId.toLong), null)
case "map_of_prims" => generateMapOfPrimsColumn(rowId)
case "map_of_rows" =>
Map(rowId + 1 -> (if (rowId % 10 == 0) TestRow(rowId * 20L) else null))
case "map_of_arrays" => generateMapOfArraysColumn(rowId)
case "missing_column_primitive" => null
case "missing_column_struct" => null
case _ => throw new IllegalArgumentException("unknown column: " + name)
}
expRow += expValue
}
TestRow(expRow: _*)
}
}

private def generateNestedStructColumn(rowId: Int): TestRow = {
if (rowId % 63 == 0) return null
if (rowId % 19 != 0 && rowId % 23 != 0) return TestRow(rowId.toString, TestRow(rowId))
if (rowId % 23 == 0) return TestRow(rowId.toString, null)
TestRow(null, null)
}

private def generateMapOfPrimsColumn(rowId: Int): Map[Int, Any] = {
if (rowId % 28 == 0) return null
if (rowId % 30 == 0) return Map.empty[Int, Any]
Map(
rowId -> (if (rowId % 29 == 0) null else rowId + 2L),
(if (rowId % 27 != 0) rowId + 2 else rowId + 3) -> (rowId + 9L)
)
}

private def generateArrayOfArraysColumn(rowId: Int): Seq[Seq[Any]] = {
if (rowId % 8 == 0) return null

val emptyArray = ArrayBuffer.empty[Int]
val singleElemArray = ArrayBuffer(rowId)
val doubleElemArray = ArrayBuffer(rowId + 10, rowId + 20)
val arrayWithNulls = ArrayBuffer(null, rowId + 200)
val singleElemNullArray = ArrayBuffer(null)

rowId % 7 match {
case 0 => Vector(singleElemArray, singleElemArray, arrayWithNulls)
case 1 => Vector(singleElemArray, doubleElemArray, emptyArray)
case 2 => Vector(arrayWithNulls)
case 3 => Vector(singleElemNullArray)
case 4 => Vector(null)
case 5 => Vector(emptyArray)
case 6 => Vector.empty[ArrayBuffer[Int]]
}
}

private def generateMapOfArraysColumn(rowId: Int): Map[Long, Seq[Any]] = {
if (rowId % 30 == 0) return null

val val1 = if (rowId % 4 == 0) ArrayBuffer[Any](rowId, null, rowId + 1)
else ArrayBuffer.empty[Any]
val val2 = if (rowId % 7 == 0) ArrayBuffer.empty[Any]
else ArrayBuffer[Any](null)
val expMap = if (rowId % 24 == 0) Map.empty[Long, ArrayBuffer[Any]]
else Map[Long, ArrayBuffer[Any]](rowId.toLong -> val1, rowId + 1L -> val2)

expMap
}

//////////////////////////////////////////////////////////////////////////////////
// Timestamp type tests
//////////////////////////////////////////////////////////////////////////////////
// TODO move over from DeltaTableReadsSuite once there is better testing infra
}
Loading