From 188688d1744731c3f90ecbacb4d35151d665201c Mon Sep 17 00:00:00 2001 From: Tai Le Manh Date: Sat, 24 Feb 2024 22:51:51 +0700 Subject: [PATCH 1/5] [Kernel][Java to Scala test conversion] Convert and refactor TestParquetBatchReader Signed-off-by: Tai Le Manh --- .../parquet/ParquetFileReaderSuite.scala | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala new file mode 100644 index 00000000000..b2b459f93c2 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala @@ -0,0 +1,163 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.parquet + +import java.io.File +import java.lang.{Double => DoubleJ, Float => FloatJ} +import java.math.BigDecimal + +import io.delta.golden.GoldenTableUtils.goldenTableFile +import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow, VectorTestUtils} +import io.delta.kernel.types.{DecimalType, IntegerType, StructType} +import org.apache.hadoop.conf.Configuration +import org.scalatest.funsuite.AnyFunSuite + +class ParquetFileReaderSuite extends AnyFunSuite + with ParquetSuiteBase with VectorTestUtils with ExpressionTestUtils { + + def getSingleParquetFile(directory: File): String = { + val parquetFiles = directory.listFiles().filter(_.getName.endsWith(".parquet")) + assert(parquetFiles.size == 1) + parquetFiles.head.getAbsolutePath + } + + ////////////////////////////////////////////////////////////////////////////////// + // Decimal type tests + ////////////////////////////////////////////////////////////////////////////////// + + private val DECIMAL_TYPES_DICT_FILE_V1 = getSingleParquetFile( + goldenTableFile("parquet-decimal-dictionaries-v1")) + + private val DECIMAL_TYPES_DICT_FILE_V2 = getSingleParquetFile( + goldenTableFile("parquet-decimal-dictionaries-v2")) + + test("decimals encoded using dictionary encoding ") { + val expectedResult = (0 until 1000000).map { i => + TestRow(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2)) + } + + val readSchema = new StructType() + .add("id", IntegerType.INTEGER) + .add("col1", new DecimalType(9, 0)) // INT32: 1 <= precision <= 9 + .add("col2", new DecimalType(12, 0)) // INT64: 10 <= precision <= 18 + .add("col3", new DecimalType(25, 0)) // FIXED_LEN_BYTE_ARRAY + + val batchReader = new ParquetBatchReader(new Configuration()) + for (file <- Seq(DECIMAL_TYPES_DICT_FILE_V1, DECIMAL_TYPES_DICT_FILE_V2)) { + val batches = batchReader.read(file, readSchema) + val result = batches.toSeq.flatMap(_.getRows.toSeq) + checkAnswer(result, expectedResult) + } + } + + private val LARGE_SCALE_DECIMAL_TYPES_FILE = getSingleParquetFile( + goldenTableFile("parquet-decimal-type")) + + test("large scale decimal type file") { + + def expand(n: BigDecimal): BigDecimal = { + n.scaleByPowerOfTen(5).add(n) + } + + val expectedResult = (0 until 99998).map { i => + if (i % 85 == 0) { + val n = BigDecimal.valueOf(i) + TestRow(i, n.movePointLeft(1).setScale(1), n.setScale(5), n.setScale(5)) + } else { + val negation = if (i % 33 == 0) { + -1 + } else { + 1 + } + val n = BigDecimal.valueOf(i*negation) + TestRow( + i, + n.movePointLeft(1), + expand(n).movePointLeft(5), + expand(expand(expand(n))).movePointLeft(5) + ) + } + } + + val readSchema = new StructType() + .add("id", IntegerType.INTEGER) + .add("col1", new DecimalType(5, 1)) // INT32: 1 <= precision <= 9 + .add("col2", new DecimalType(10, 5)) // INT64: 10 <= precision <= 18 + .add("col3", new DecimalType(20, 5)) // FIXED_LEN_BYTE_ARRAY + + val batchReader = new ParquetBatchReader(new Configuration()) + val batches = batchReader.read(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema) + + val result = batches.toSeq.flatMap(_.getRows.toSeq) + checkAnswer(result, expectedResult) + } + + ////////////////////////////////////////////////////////////////////////////////// + // Timestamp type tests + ////////////////////////////////////////////////////////////////////////////////// + // TODO move over from DeltaTableReadsSuite once there is better testing infra + + test("read all types of data") { + Seq( + ( // no stats collection as NaN is present + Seq(Float.NegativeInfinity, Float.MinValue, -1.0f, + -0.0f, 0.0f, 1.0f, null, Float.MaxValue, Float.PositiveInfinity, Float.NaN), + Seq(Double.NegativeInfinity, Double.MinValue, -1.0d, + -0.0d, 0.0d, 1.0d, null, Double.MaxValue, Double.PositiveInfinity, Double.NaN), + 10, + (null, null, null), + (null, null, null) + ), + ( // Min and max are infinities + Seq(Float.NegativeInfinity, Float.MinValue, -1.0f, + -0.0f, 0.0f, 1.0f, null, Float.MaxValue, Float.PositiveInfinity), + Seq(Double.NegativeInfinity, Double.MinValue, -1.0d, + -0.0d, 0.0d, 1.0d, null, Double.MaxValue, Double.PositiveInfinity), + 9, + (Float.NegativeInfinity, Float.PositiveInfinity, 1L), + (Double.NegativeInfinity, Double.PositiveInfinity, 1L) + ) + ).foreach { + case (floats: Seq[FloatJ], doubles: Seq[DoubleJ], expRowCount, expFltStats, expDblStats) => + withTempDir { tempPath => + val targetDir = tempPath.getAbsolutePath + val testBatch = columnarBatch(floatVector(floats), doubleVector(doubles)) + val dataToWrite = Seq(testBatch.toFiltered) + + val writeOutput = + writeToParquetUsingKernel( + dataToWrite, + targetDir, + statsColumns = Seq(col("col_0"), col("col_1"))) + + assert(parquetFileRowCount(targetDir) == expRowCount) + verifyContent(targetDir, dataToWrite) + + val stats = writeOutput.head.getStatistics.get() + + def getStats(column: String): (Object, Object, Object) = + ( + Option(stats.getMinValues.get(col(column))).map(_.getValue).orNull, + Option(stats.getMaxValues.get(col(column))).map(_.getValue).orNull, + Option(stats.getNullCounts.get(col(column))).orNull + ) + + assert(getStats("col_0") === expFltStats) + assert(getStats("col_1") === expDblStats) + } + } + } +} From b153a3765e92491f8a144975cbdbd5c1f22e3054 Mon Sep 17 00:00:00 2001 From: Tai Le Manh Date: Sat, 2 Mar 2024 01:25:43 +0700 Subject: [PATCH 2/5] [Kernel][Java to Scala test conversion] Convert and refactor TestParquetBatchReader Signed-off-by: Tai Le Manh --- .../parquet/ParquetFileReaderSuite.scala | 233 +++++++++++++----- .../internal/parquet/ParquetSuiteBase.scala | 2 +- 2 files changed, 168 insertions(+), 67 deletions(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala index b2b459f93c2..e5f3c52b12c 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala @@ -16,13 +16,14 @@ package io.delta.kernel.defaults.internal.parquet import java.io.File -import java.lang.{Double => DoubleJ, Float => FloatJ} -import java.math.BigDecimal +import java.math.{BigDecimal, RoundingMode} -import io.delta.golden.GoldenTableUtils.goldenTableFile +import scala.collection.mutable.ArrayBuffer + +import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath} +import io.delta.kernel.defaults.internal.DefaultKernelUtils.DateTimeConstants import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow, VectorTestUtils} -import io.delta.kernel.types.{DecimalType, IntegerType, StructType} -import org.apache.hadoop.conf.Configuration +import io.delta.kernel.types._ import org.scalatest.funsuite.AnyFunSuite class ParquetFileReaderSuite extends AnyFunSuite @@ -45,7 +46,7 @@ class ParquetFileReaderSuite extends AnyFunSuite goldenTableFile("parquet-decimal-dictionaries-v2")) test("decimals encoded using dictionary encoding ") { - val expectedResult = (0 until 1000000).map { i => + val expResult = (0 until 1000000).map { i => TestRow(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2)) } @@ -55,11 +56,10 @@ class ParquetFileReaderSuite extends AnyFunSuite .add("col2", new DecimalType(12, 0)) // INT64: 10 <= precision <= 18 .add("col3", new DecimalType(25, 0)) // FIXED_LEN_BYTE_ARRAY - val batchReader = new ParquetBatchReader(new Configuration()) for (file <- Seq(DECIMAL_TYPES_DICT_FILE_V1, DECIMAL_TYPES_DICT_FILE_V2)) { - val batches = batchReader.read(file, readSchema) - val result = batches.toSeq.flatMap(_.getRows.toSeq) - checkAnswer(result, expectedResult) + val actResult = readParquetFilesUsingSpark(file, readSchema) + + checkAnswer(actResult, expResult) } } @@ -72,7 +72,7 @@ class ParquetFileReaderSuite extends AnyFunSuite n.scaleByPowerOfTen(5).add(n) } - val expectedResult = (0 until 99998).map { i => + val expResult = (0 until 99998).map { i => if (i % 85 == 0) { val n = BigDecimal.valueOf(i) TestRow(i, n.movePointLeft(1).setScale(1), n.setScale(5), n.setScale(5)) @@ -98,66 +98,167 @@ class ParquetFileReaderSuite extends AnyFunSuite .add("col2", new DecimalType(10, 5)) // INT64: 10 <= precision <= 18 .add("col3", new DecimalType(20, 5)) // FIXED_LEN_BYTE_ARRAY - val batchReader = new ParquetBatchReader(new Configuration()) - val batches = batchReader.read(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema) + val actResult = readParquetFilesUsingSpark(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema) - val result = batches.toSeq.flatMap(_.getRows.toSeq) - checkAnswer(result, expectedResult) + checkAnswer(actResult, expResult) } - ////////////////////////////////////////////////////////////////////////////////// - // Timestamp type tests - ////////////////////////////////////////////////////////////////////////////////// - // TODO move over from DeltaTableReadsSuite once there is better testing infra + private val ALL_TYPES_FILE = getSingleParquetFile(goldenTableFile("parquet-all-types")) test("read all types of data") { - Seq( - ( // no stats collection as NaN is present - Seq(Float.NegativeInfinity, Float.MinValue, -1.0f, - -0.0f, 0.0f, 1.0f, null, Float.MaxValue, Float.PositiveInfinity, Float.NaN), - Seq(Double.NegativeInfinity, Double.MinValue, -1.0d, - -0.0d, 0.0d, 1.0d, null, Double.MaxValue, Double.PositiveInfinity, Double.NaN), - 10, - (null, null, null), - (null, null, null) - ), - ( // Min and max are infinities - Seq(Float.NegativeInfinity, Float.MinValue, -1.0f, - -0.0f, 0.0f, 1.0f, null, Float.MaxValue, Float.PositiveInfinity), - Seq(Double.NegativeInfinity, Double.MinValue, -1.0d, - -0.0d, 0.0d, 1.0d, null, Double.MaxValue, Double.PositiveInfinity), - 9, - (Float.NegativeInfinity, Float.PositiveInfinity, 1L), - (Double.NegativeInfinity, Double.PositiveInfinity, 1L) - ) - ).foreach { - case (floats: Seq[FloatJ], doubles: Seq[DoubleJ], expRowCount, expFltStats, expDblStats) => - withTempDir { tempPath => - val targetDir = tempPath.getAbsolutePath - val testBatch = columnarBatch(floatVector(floats), doubleVector(doubles)) - val dataToWrite = Seq(testBatch.toFiltered) - - val writeOutput = - writeToParquetUsingKernel( - dataToWrite, - targetDir, - statsColumns = Seq(col("col_0"), col("col_1"))) - - assert(parquetFileRowCount(targetDir) == expRowCount) - verifyContent(targetDir, dataToWrite) - - val stats = writeOutput.head.getStatistics.get() - - def getStats(column: String): (Object, Object, Object) = - ( - Option(stats.getMinValues.get(col(column))).map(_.getValue).orNull, - Option(stats.getMaxValues.get(col(column))).map(_.getValue).orNull, - Option(stats.getNullCounts.get(col(column))).orNull - ) - - assert(getStats("col_0") === expFltStats) - assert(getStats("col_1") === expDblStats) - } + val readSchema = tableSchema(goldenTablePath("parquet-all-types")) + + val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) + + val expResult = (0 until 200).map { i => + val expRow = generateRowFromAllTypesFile(readSchema, i) + TestRow(expRow: _*) + } + + checkAnswer(actResult, expResult) + } + + test("read subset of columns") { + val readSchema = new StructType() + .add("byteType", ByteType.BYTE) + .add("booleanType", BooleanType.BOOLEAN) + .add("stringType", StringType.STRING) + .add("dateType", DateType.DATE) + .add("nested_struct", new StructType() + .add("aa", StringType.STRING) + .add("ac", new StructType().add("aca", IntegerType.INTEGER))) + .add("array_of_prims", new ArrayType(IntegerType.INTEGER, true)) + + val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) + + val expResult = (0 until 200).map { i => + val expRow = generateRowFromAllTypesFile(readSchema, i) + TestRow(expRow: _*) } + + checkAnswer(actResult, expResult) + } + + test("request row indices") { + val readSchema = new StructType() + .add("id", LongType.LONG) + .add(StructField.METADATA_ROW_INDEX_COLUMN) + + val path = getTestResourceFilePath("parquet-basic-row-indexes") + val actResult1 = readParquetFilesUsingKernel(path, readSchema) + val expResult1 = (0 until 30) + .map(i => TestRow(i, if (i < 10) i else if (i < 20) i - 10 else i - 20)) + + checkAnswer(actResult1, expResult1) + + // File with multiple row-groups [0, 20000) where rowIndex = id + val filePath = getTestResourceFilePath("parquet/") + val actResult2 = readParquetFilesUsingKernel(filePath, readSchema) + val expResult2 = (0 until 20000).map(i => TestRow(i, i)) + + checkAnswer(actResult2, expResult2) } + + private def generateRowFromAllTypesFile(readSchema: StructType, rowId: Int): Seq[Any] = { + val expRow = ArrayBuffer.empty[Any] + readSchema.fields.forEach { field => + val name = field.getName.toLowerCase() + val expValue = name match { + case "booleantype" => if (rowId % 87 != 0) rowId % 2 == 0 else null + case "bytetype" => if (rowId % 72 != 0) rowId.toByte else null + case "shorttype" => if (rowId % 56 != 0) rowId.toShort else null + case "datetype" => if (rowId % 61 != 0) { + Math.floorDiv(rowId * 20000000L, DateTimeConstants.MILLIS_PER_DAY).toInt + } else null + case "integertype" => if (rowId % 23 != 0) rowId else null + case "longtype" => if (rowId % 25 != 0) rowId + 1L else null + case "floattype" => if (rowId % 28 != 0) { + new BigDecimal(rowId * 0.234f).setScale(3, RoundingMode.HALF_UP) + .stripTrailingZeros.floatValue + } else null + case "doubletype" => if (rowId % 54 != 0) rowId * 234234.23d else null + case "stringtype" => if (rowId % 57 != 0) rowId.toString else null + case "binarytype" => if (rowId % 59 != 0) rowId.toString.getBytes else null + case "timestamptype" => + // Tests only for spark.sql.parquet.outputTimestampTyp = INT96, other formats + // are tested in end-to-end tests in DeltaTableReadsSuite + if (rowId % 62 != 0) 23423523L * rowId * 1000 else null + case "decimal" => + // Value is rounded to scale=2 when written + if (rowId % 67 != 0) new BigDecimal(rowId * 123.52).setScale(2, RoundingMode.HALF_UP) + else null + case "nested_struct" => generateNestedStructColumn(rowId) + case "array_of_prims" => if (rowId % 25 == 0) null + else if (rowId % 29 == 0) Vector.empty + else Vector(rowId, null, rowId + 1) + case "array_of_arrays" => generateArrayOfArraysColumn(rowId) + case "array_of_structs" => Vector(TestRow(rowId), null) + case "map_of_prims" => generateMapOfPrimsColumn(rowId) + case "map_of_rows" => Map(rowId + 1 -> (if (rowId % 10 == 0) TestRow(rowId*20) else null)) + case "map_of_arrays" => generateMapOfArraysColumn(rowId) + case "missing_column_primitive" => null + case "missing_column_struct" => null + case _ => throw new IllegalArgumentException("unknown column: " + name) + } + expRow += expValue + } + expRow + } + + private def generateNestedStructColumn(rowId: Int): TestRow = { + if (rowId % 63 == 0) return null + if (rowId % 19 != 0 && rowId % 23 != 0) return TestRow(rowId, TestRow(rowId)) + if (rowId % 23 == 0) return TestRow(rowId, null) + TestRow(null, null) + } + + private def generateMapOfPrimsColumn(rowId: Int): Map[Int, Any] = { + if (rowId % 28 == 0) return null + if (rowId % 30 == 0) return Map.empty[Int, Any] + Map( + rowId -> (if (rowId % 29 == 0) null else rowId + 2L), + (if (rowId % 27 != 0) rowId + 2 else rowId + 3) -> (rowId + 9L) + ) + } + + private def generateArrayOfArraysColumn(rowId: Int): Vector[ArrayBuffer[Any]] = { + if (rowId % 8 == 0) { + return null + } + val emptyArray = ArrayBuffer.empty[Any] + val singleElemArray = ArrayBuffer[Any](rowId) + val doubleElemArray = ArrayBuffer[Any](rowId + 10, rowId + 20) + val arrayWithNulls = ArrayBuffer[Any](null, rowId + 200) + val singleElemNullArray = ArrayBuffer[Any](null) + + rowId % 7 match { + case 0 => Vector(singleElemArray, singleElemArray, arrayWithNulls) + case 1 => Vector(singleElemArray, doubleElemArray, emptyArray) + case 2 => Vector(arrayWithNulls) + case 3 => Vector(singleElemNullArray) + case 4 => Vector(null) + case 5 => Vector(emptyArray) + case 6 => Vector.empty[ArrayBuffer[Any]] + } + } + + private def generateMapOfArraysColumn(rowId: Int): Map[Int, ArrayBuffer[Any]] = { + if (rowId % 30 == 0) { + return null + } + val val1 = if (rowId % 4 == 0) ArrayBuffer(rowId, null, rowId + 1) else ArrayBuffer.empty[Any] + val val2 = if (rowId % 7 == 0) ArrayBuffer.empty[Any] else ArrayBuffer[Any](null) + val expMap = if (rowId % 24 == 0) { + Map.empty[Int, ArrayBuffer[Any]] + } else { + Map[Int, ArrayBuffer[Any]](rowId -> val1, rowId + 1 -> val2) + } + + expMap + } + + ////////////////////////////////////////////////////////////////////////////////// + // Timestamp type tests + ////////////////////////////////////////////////////////////////////////////////// + // TODO move over from DeltaTableReadsSuite once there is better testing infra } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala index 9cdc0972047..f221aeeb4d7 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala @@ -250,7 +250,7 @@ trait ParquetSuiteBase extends TestUtils { } // Read the parquet files in actionFileDir using Spark Parquet reader - private def readParquetFilesUsingSpark( + def readParquetFilesUsingSpark( actualFileDir: String, readSchema: StructType): Seq[TestRow] = { spark.read .format("parquet") From 3ba2b41843d4cb8aae64fa6383f793ff8a543fc6 Mon Sep 17 00:00:00 2001 From: Tai Le Manh Date: Sun, 3 Mar 2024 20:33:52 +0700 Subject: [PATCH 3/5] [Kernel][Java to Scala test conversion] Convert and refactor TestParquetBatchReader Signed-off-by: Tai Le Manh --- .../parquet/TestParquetBatchReader.java | 589 ------------------ .../defaults/ParquetBatchReaderSuite.scala | 111 ---- .../parquet/ParquetFileReaderSuite.scala | 164 ++--- 3 files changed, 88 insertions(+), 776 deletions(-) delete mode 100644 kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/internal/parquet/TestParquetBatchReader.java delete mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ParquetBatchReaderSuite.scala diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/internal/parquet/TestParquetBatchReader.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/internal/parquet/TestParquetBatchReader.java deleted file mode 100644 index 5eac022e2b3..00000000000 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/internal/parquet/TestParquetBatchReader.java +++ /dev/null @@ -1,589 +0,0 @@ -/* - * Copyright (2023) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.defaults.internal.parquet; - -import java.io.File; -import java.io.IOException; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.net.URI; -import java.util.*; -import java.util.stream.Collectors; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; -import static io.delta.golden.GoldenTableUtils.goldenTableFile; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; - -import io.delta.kernel.data.*; -import io.delta.kernel.types.*; -import io.delta.kernel.utils.CloseableIterator; - -import io.delta.kernel.internal.util.Tuple2; -import io.delta.kernel.internal.util.VectorUtils; - -import io.delta.kernel.defaults.utils.DefaultKernelTestUtils; - -import io.delta.kernel.defaults.internal.DefaultKernelUtils; - -public class TestParquetBatchReader { - /** - * Test reads data from a Parquet file with data of various combinations of data types supported - * by the Delta Lake table protocol. - */ - private static final String ALL_TYPES_FILE = - Arrays.stream(goldenTableFile("parquet-all-types").listFiles()) - .filter(file -> file.getName().endsWith(".parquet")) - .map(File::getAbsolutePath) - .findFirst() - .get(); - - private static final StructType ALL_TYPES_FILE_SCHEMA = new StructType() - .add("byteType", ByteType.BYTE) - .add("shortType", ShortType.SHORT) - .add("integerType", IntegerType.INTEGER) - .add("longType", LongType.LONG) - .add("floatType", FloatType.FLOAT) - .add("doubleType", DoubleType.DOUBLE) - .add("decimal", new DecimalType(10, 2)) - .add("booleanType", BooleanType.BOOLEAN) - .add("stringType", StringType.STRING) - .add("binaryType", BinaryType.BINARY) - .add("dateType", DateType.DATE) - .add("timestampType", TimestampType.TIMESTAMP) - .add("nested_struct", - new StructType() - .add("aa", StringType.STRING) - .add("ac", new StructType().add("aca", IntegerType.INTEGER))) - .add("array_of_prims", - new ArrayType(IntegerType.INTEGER, true)) - .add("array_of_arrays", - new ArrayType(new ArrayType(IntegerType.INTEGER, true), true)) - .add("array_of_structs", - new ArrayType(new StructType().add("ab", LongType.LONG), true)) - .add("map_of_prims", new MapType(IntegerType.INTEGER, LongType.LONG, true)) - .add("map_of_rows", new MapType( - IntegerType.INTEGER, - new StructType().add("ab", LongType.LONG), - true)) - .add("map_of_arrays", new MapType( - LongType.LONG, - new ArrayType(IntegerType.INTEGER, true), - true)); - - @Test - public void readAllTypesOfData() - throws Exception { - readAndVerify(ALL_TYPES_FILE_SCHEMA, 90 /* readBatchSize */); - } - - @Test - public void readSubsetOfColumns() - throws Exception { - StructType readSchema = new StructType() - .add("byteType", ByteType.BYTE) - .add("booleanType", BooleanType.BOOLEAN) - .add("stringType", StringType.STRING) - .add("dateType", DateType.DATE) - .add("nested_struct", - new StructType() - .add("aa", StringType.STRING) - .add("ac", new StructType().add("aca", IntegerType.INTEGER))) - .add("array_of_prims", - new ArrayType(IntegerType.INTEGER, true)); - - readAndVerify(readSchema, 73 /* readBatchSize */); - } - - @Test - public void readSubsetOfColumnsWithMissingColumnsInFile() - throws Exception { - StructType readSchema = new StructType() - .add("booleanType", BooleanType.BOOLEAN) - .add("integerType", IntegerType.INTEGER) - .add("missing_column_struct", - new StructType().add("ab", IntegerType.INTEGER)) - .add("longType", LongType.LONG) - .add("missing_column_primitive", DateType.DATE) - .add("nested_struct", - new StructType() - .add("aa", StringType.STRING) - .add("ac", new StructType().add("aca", IntegerType.INTEGER)) - ); - - readAndVerify(readSchema, 23 /* readBatchSize */); - } - - @Test - public void requestRowIndices() throws IOException { - String path = DefaultKernelTestUtils.getTestResourceFilePath("parquet-basic-row-indexes"); - File dir = new File(URI.create(path).getPath()); - List parquetFiles = Arrays.stream(Objects.requireNonNull(dir.listFiles())) - .filter(file -> file.getName().endsWith(".parquet")) - .map(File::getAbsolutePath) - .collect(Collectors.toList()); - - StructType readSchema = new StructType() - .add("id", LongType.LONG) - .add(StructField.METADATA_ROW_INDEX_COLUMN); - - Configuration conf = new Configuration(); - // Set the batch size small enough so there will be multiple batches - conf.setInt("delta.kernel.default.parquet.reader.batch-size", 2); - ParquetBatchReader reader = new ParquetBatchReader(conf); - - for (String filePath : parquetFiles) { - try (CloseableIterator iter = reader.read(filePath, readSchema)) { - while (iter.hasNext()) { - ColumnarBatch batch = iter.next(); - for (int i = 0; i < batch.getSize(); i++) { - long id = batch.getColumnVector(0).getLong(i); - long rowIndex = batch.getColumnVector(1).getLong(i); - assertEquals(id % 10, rowIndex); - } - } - } - } - - // File with multiple row-groups [0, 20000) where rowIndex = id - String filePath = DefaultKernelTestUtils.getTestResourceFilePath( - "parquet/row_index_multiple_row_groups.parquet"); - reader = new ParquetBatchReader(new Configuration()); - try (CloseableIterator iter = reader.read(filePath, readSchema)) { - while (iter.hasNext()) { - ColumnarBatch batch = iter.next(); - for (int i = 0; i < batch.getSize(); i++) { - long id = batch.getColumnVector(0).getLong(i); - long rowIndex = batch.getColumnVector(1).getLong(i); - assertEquals(id, rowIndex); - } - } - } - } - - private static Configuration newConf(Optional batchSize) { - Configuration conf = new Configuration(); - if (batchSize.isPresent()) { - conf.set("delta.kernel.default.parquet.reader.batch-size", batchSize.get().toString()); - } - return conf; - } - - private static void readAndVerify(StructType readSchema, int readBatchSize) - throws Exception { - ParquetBatchReader batchReader = - new ParquetBatchReader(newConf(Optional.of(readBatchSize))); - List batches = - readAsBatches(batchReader, ALL_TYPES_FILE, readSchema); - - for (int rowId = 0; rowId < 200; rowId++) { - verifyRowFromAllTypesFile(readSchema, batches, rowId); - } - } - - private static List readAsBatches( - ParquetBatchReader parquetReader, - String path, - StructType readSchema) throws Exception { - List batches = new ArrayList<>(); - try (CloseableIterator dataIter = parquetReader.read(path, readSchema)) { - while (dataIter.hasNext()) { - batches.add(dataIter.next()); - } - } - return batches; - } - - private static void verifyRowFromAllTypesFile( - StructType readSchema, - List batches, - int rowId) { - Tuple2 batchWithIdx = getBatchForRowId(batches, rowId); - int ordinal = 0; - for (StructField structField : readSchema.fields()) { - String name = structField.getName().toLowerCase(); - ColumnVector vector = batchWithIdx._1.getColumnVector(ordinal); - switch (name) { - case "booleantype": { - Boolean expValue = (rowId % 87 != 0) ? rowId % 2 == 0 : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.booleanValue(), vector.getBoolean(batchWithIdx._2)); - } - break; - } - case "bytetype": { - Byte expValue = (rowId % 72 != 0) ? (byte) rowId : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.byteValue(), vector.getByte(batchWithIdx._2)); - } - break; - } - case "shorttype": { - Short expValue = (rowId % 56 != 0) ? (short) rowId : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.shortValue(), vector.getShort(batchWithIdx._2)); - } - break; - } - case "datetype": { - Integer expValue = (rowId % 61 != 0) ? - (int) Math.floorDiv( - rowId * 20000000L, - DefaultKernelUtils.DateTimeConstants.MILLIS_PER_DAY) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.intValue(), vector.getInt(batchWithIdx._2)); - } - break; - } - case "integertype": { - Integer expValue = (rowId % 23 != 0) ? rowId : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.intValue(), vector.getInt(batchWithIdx._2)); - } - break; - } - case "longtype": { - Long expValue = (rowId % 25 != 0) ? rowId + 1L : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.longValue(), vector.getLong(batchWithIdx._2)); - } - break; - } - case "floattype": { - Float expValue = (rowId % 28 != 0) ? (rowId * 0.234f) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.floatValue(), vector.getFloat(batchWithIdx._2), 0.02); - } - break; - } - case "doubletype": { - Double expValue = (rowId % 54 != 0) ? (rowId * 234234.23d) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.doubleValue(), vector.getDouble(batchWithIdx._2), - 0.02); - } - break; - } - case "stringtype": { - String expValue = (rowId % 57 != 0) ? Integer.toString(rowId) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue, vector.getString(batchWithIdx._2)); - } - break; - } - case "binarytype": { - byte[] expValue = (rowId % 59 != 0) ? Integer.toString(rowId).getBytes() : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertArrayEquals(expValue, vector.getBinary(batchWithIdx._2)); - } - break; - } - case "timestamptype": { - // Tests only for spark.sql.parquet.outputTimestampTyp = INT96, other formats - // are tested in end-to-end tests in DeltaTableReadsSuite - Long expValue = (rowId % 62 != 0) ? 23423523L * rowId * 1000 : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.longValue(), vector.getLong(batchWithIdx._2)); - } - break; - } - case "decimal": { - BigDecimal expValue = (rowId % 67 != 0) ? - // Value is rounded to scale=2 when written - new BigDecimal(rowId * 123.52).setScale(2, RoundingMode.HALF_UP) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue, vector.getDecimal(batchWithIdx._2)); - } - break; - } - case "nested_struct": - validateNestedStructColumn(vector, batchWithIdx._2, rowId); - break; - case "array_of_prims": { - boolean expIsNull = rowId % 25 == 0; - if (expIsNull) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - assertNull(vector.getArray(batchWithIdx._2)); - } else if (rowId % 29 == 0) { - checkArrayValue(vector.getArray(batchWithIdx._2), IntegerType.INTEGER, - Collections.emptyList()); - } else { - List expArray = Arrays.asList(rowId, null, rowId + 1); - checkArrayValue(vector.getArray(batchWithIdx._2), IntegerType.INTEGER, - expArray); - } - break; - } - case "array_of_arrays": - validateArrayOfArraysColumn(vector, batchWithIdx._2, rowId); - break; - case "array_of_structs": { - assertFalse(vector.isNullAt(batchWithIdx._2)); - ArrayValue arrayValue = vector.getArray(batchWithIdx._2); - ColumnVector elementVector = arrayValue.getElements(); - assertEquals(2, arrayValue.getSize()); - assertEquals(2, elementVector.getSize()); - assertTrue(elementVector.getDataType() instanceof StructType); - assertEquals(rowId, elementVector.getChild(0).getLong(0)); - assertTrue(elementVector.isNullAt(1)); - break; - } - case "map_of_prims": { - boolean expIsNull = rowId % 28 == 0; - if (expIsNull) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - assertNull(vector.getMap(batchWithIdx._2)); - } else if (rowId % 30 == 0) { - checkMapValue( - vector.getMap(batchWithIdx._2), - IntegerType.INTEGER, - LongType.LONG, - Collections.emptyMap() - ); - } else { - Map expValue = new HashMap() { - { - put(rowId, (rowId % 29 == 0) ? null : (rowId + 2L)); - put((rowId % 27 != 0) ? (rowId + 2) : (rowId + 3), rowId + 9L); - - } - }; - checkMapValue( - vector.getMap(batchWithIdx._2), - IntegerType.INTEGER, - LongType.LONG, - expValue - ); - } - break; - } - case "map_of_rows": { - // Map(i + 1 -> (if (i % 10 == 0) Row((i*20).longValue()) else null)) - assertFalse(vector.isNullAt(batchWithIdx._2)); - MapValue mapValue = vector.getMap(batchWithIdx._2); - Map actValue = VectorUtils.toJavaMap(mapValue); - - // entry 0: key = rowId - Integer key0 = rowId + 1; - boolean expValue0IsNull = rowId % 10 != 0; - Row actValue0 = actValue.get(key0); - if (expValue0IsNull) { - assertNull(actValue0); - } else { - Long actValue0Member = actValue0.getLong(0); - Long expValue0Member = rowId * 20L; - assertEquals(expValue0Member, actValue0Member); - } - break; - } - case "map_of_arrays": - validateMapOfArraysColumn(vector, batchWithIdx._2, rowId); - break; - case "missing_column_primitive": - case "missing_column_struct": { - assertTrue(vector.isNullAt(batchWithIdx._2)); - break; - } - default: - throw new IllegalArgumentException("unknown column: " + name); - } - ordinal++; - } - } - - private static void validateNestedStructColumn( - ColumnVector vector, int batchRowId, int tableRowId) { - boolean expNull = tableRowId % 63 == 0; - if (expNull) { - assertTrue(vector.isNullAt(batchRowId)); - return; - } - - boolean expAaValNull = tableRowId % 19 == 0; - boolean expAcValNull = tableRowId % 19 == 0 || tableRowId % 23 == 0; - final int aaColOrdinal = 0; - final int acColOrdinal = 1; - - assertEquals(vector.getChild(aaColOrdinal).isNullAt(batchRowId), expAaValNull); - assertEquals(vector.getChild(acColOrdinal).isNullAt(batchRowId), expAcValNull); - - if (!expAaValNull) { - String aaVal = vector.getChild(aaColOrdinal).getString(batchRowId); - assertEquals(Integer.toString(tableRowId), aaVal); - } - if (!expAcValNull) { - int actAcaVal = vector.getChild(acColOrdinal).getChild(0).getInt(batchRowId); - assertEquals(tableRowId, actAcaVal); - } - } - - private static void validateArrayOfArraysColumn( - ColumnVector vector, int batchRowId, int tableRowId) { - boolean expIsNull = tableRowId % 8 == 0; - if (expIsNull) { - assertTrue(vector.isNullAt(batchRowId)); - return; - } - - List singleElemArray = Arrays.asList(tableRowId); - List doubleElemArray = Arrays.asList(tableRowId + 10, tableRowId + 20); - List arrayWithNulls = Arrays.asList(null, tableRowId + 200); - List singleElemNullArray = Collections.singletonList(null); - List emptyArray = Collections.emptyList(); - - List> expArray = null; - switch (tableRowId % 7) { - case 0: - expArray = Arrays.asList(singleElemArray, singleElemArray, arrayWithNulls); - break; - case 1: - expArray = Arrays.asList(singleElemArray, doubleElemArray, emptyArray); - break; - case 2: - expArray = Arrays.asList(arrayWithNulls); - break; - case 3: - expArray = Arrays.asList(singleElemNullArray); - break; - case 4: - expArray = Collections.singletonList(null); - break; - case 5: - expArray = Collections.singletonList(emptyArray); - break; - case 6: - expArray = Collections.emptyList(); - break; - } - DataType expDataType = new ArrayType(IntegerType.INTEGER, true); - checkArrayValue(vector.getArray(batchRowId), expDataType, expArray); - } - - private static void validateMapOfArraysColumn( - ColumnVector vector, int batchRowId, int tableRowId) { - boolean expIsNull = tableRowId % 30 == 0; - if (expIsNull) { - assertTrue(vector.isNullAt(batchRowId)); - return; - } - - final List val1; - if (tableRowId % 4 == 0) { - val1 = Arrays.asList(tableRowId, null, tableRowId + 1); - } else { - val1 = Collections.emptyList(); - } - final List val2; - if (tableRowId % 7 == 0) { - val2 = Collections.emptyList(); - } else { - val2 = Collections.singletonList(null); - } - - Map> expMap = Collections.emptyMap(); - if (tableRowId % 24 != 0) { - expMap = new HashMap>() { - { - put((long) tableRowId, val1); - put(tableRowId + 1L, val2); - } - }; - } - checkMapValue( - vector.getMap(batchRowId), - LongType.LONG, - new ArrayType(IntegerType.INTEGER, true), - expMap - ); - } - - private static Tuple2 getBatchForRowId( - List batches, int rowId) { - int indexStart = 0; - for (ColumnarBatch batch : batches) { - if (indexStart <= rowId && rowId < indexStart + batch.getSize()) { - return new Tuple2<>(batch, rowId - indexStart); - } - indexStart += batch.getSize(); - } - - throw new IllegalArgumentException("row id is not found: " + rowId); - } - - private static void checkArrayValue( - ArrayValue arrayValue, DataType expDataType, List expList) { - int size = expList.size(); - ColumnVector elementVector = arrayValue.getElements(); - // Check the size is as expected and arrayValue.getSize == elementVector.getSize - assertEquals(size, arrayValue.getSize()); - assertEquals(size, elementVector.getSize()); - // Check the element vector has the correct data type - assertEquals(elementVector.getDataType(), expDataType); - // Check the elements are correct - assertEquals(expList, VectorUtils.toJavaList(arrayValue)); - assertThrows(IllegalArgumentException.class, - () -> DefaultKernelTestUtils.getValueAsObject(elementVector, size + 1)); - } - - private static void checkMapValue( - MapValue mapValue, DataType keyDataType, DataType valueDataType, Map expMap) { - int size = expMap.size(); - ColumnVector keyVector = mapValue.getKeys(); - ColumnVector valueVector = mapValue.getValues(); - // Check the size mapValue.getSize == keyVector.getSize == valueVector.getSize - assertEquals(size, mapValue.getSize()); - assertEquals(size, keyVector.getSize()); - assertEquals(size, valueVector.getSize()); - // Check the key and value vector has the correct data type - assertEquals(keyVector.getDataType(), keyDataType); - assertEquals(valueVector.getDataType(), valueDataType); - // Check the elements are correct - assertEquals(expMap, VectorUtils.toJavaMap(mapValue)); - assertThrows(IllegalArgumentException.class, - () -> DefaultKernelTestUtils.getValueAsObject(keyVector, size + 1)); - assertThrows(IllegalArgumentException.class, - () -> DefaultKernelTestUtils.getValueAsObject(valueVector, size + 1)); - } -} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ParquetBatchReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ParquetBatchReaderSuite.scala deleted file mode 100644 index abfaf127d75..00000000000 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ParquetBatchReaderSuite.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (2021) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.defaults - -import java.io.File -import java.math.BigDecimal - -import io.delta.golden.GoldenTableUtils.goldenTableFile -import io.delta.kernel.defaults.internal.parquet.ParquetBatchReader -import io.delta.kernel.defaults.utils.{TestRow, TestUtils} -import io.delta.kernel.types.{DecimalType, IntegerType, StructType} -import org.apache.hadoop.conf.Configuration -import org.scalatest.funsuite.AnyFunSuite - -class ParquetBatchReaderSuite extends AnyFunSuite with TestUtils { - - def getSingleParquetFile(directory: File): String = { - val parquetFiles = directory.listFiles().filter(_.getName.endsWith(".parquet")) - assert(parquetFiles.size == 1) - parquetFiles.head.getAbsolutePath - } - - ////////////////////////////////////////////////////////////////////////////////// - // Decimal type tests - ////////////////////////////////////////////////////////////////////////////////// - - private val DECIMAL_TYPES_DICT_FILE_V1 = getSingleParquetFile( - goldenTableFile("parquet-decimal-dictionaries-v1")) - - private val DECIMAL_TYPES_DICT_FILE_V2 = getSingleParquetFile( - goldenTableFile("parquet-decimal-dictionaries-v2")) - - test("decimals encoded using dictionary encoding ") { - val expectedResult = (0 until 1000000).map { i => - TestRow(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2)) - } - - val readSchema = new StructType() - .add("id", IntegerType.INTEGER) - .add("col1", new DecimalType(9, 0)) // INT32: 1 <= precision <= 9 - .add("col2", new DecimalType(12, 0)) // INT64: 10 <= precision <= 18 - .add("col3", new DecimalType(25, 0)) // FIXED_LEN_BYTE_ARRAY - - val batchReader = new ParquetBatchReader(new Configuration()) - for (file <- Seq(DECIMAL_TYPES_DICT_FILE_V1, DECIMAL_TYPES_DICT_FILE_V2)) { - val batches = batchReader.read(file, readSchema) - val result = batches.toSeq.flatMap(_.getRows.toSeq) - checkAnswer(result, expectedResult) - } - } - - private val LARGE_SCALE_DECIMAL_TYPES_FILE = getSingleParquetFile( - goldenTableFile("parquet-decimal-type")) - - test("large scale decimal type file") { - - def expand(n: BigDecimal): BigDecimal = { - n.scaleByPowerOfTen(5).add(n) - } - - val expectedResult = (0 until 99998).map { i => - if (i % 85 == 0) { - val n = BigDecimal.valueOf(i) - TestRow(i, n.movePointLeft(1).setScale(1), n.setScale(5), n.setScale(5)) - } else { - val negation = if (i % 33 == 0) { - -1 - } else { - 1 - } - val n = BigDecimal.valueOf(i*negation) - TestRow( - i, - n.movePointLeft(1), - expand(n).movePointLeft(5), - expand(expand(expand(n))).movePointLeft(5) - ) - } - } - - val readSchema = new StructType() - .add("id", IntegerType.INTEGER) - .add("col1", new DecimalType(5, 1)) // INT32: 1 <= precision <= 9 - .add("col2", new DecimalType(10, 5)) // INT64: 10 <= precision <= 18 - .add("col3", new DecimalType(20, 5)) // FIXED_LEN_BYTE_ARRAY - - val batchReader = new ParquetBatchReader(new Configuration()) - val batches = batchReader.read(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema) - - val result = batches.toSeq.flatMap(_.getRows.toSeq) - checkAnswer(result, expectedResult) - } - - ////////////////////////////////////////////////////////////////////////////////// - // Timestamp type tests - ////////////////////////////////////////////////////////////////////////////////// - // TODO move over from DeltaTableReadsSuite once there is better testing infra -} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala index e5f3c52b12c..6152b1b83f6 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala @@ -110,10 +110,7 @@ class ParquetFileReaderSuite extends AnyFunSuite val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) - val expResult = (0 until 200).map { i => - val expRow = generateRowFromAllTypesFile(readSchema, i) - TestRow(expRow: _*) - } + val expResult = generateRowsFromAllTypesFile(readSchema) checkAnswer(actResult, expResult) } @@ -131,10 +128,25 @@ class ParquetFileReaderSuite extends AnyFunSuite val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) - val expResult = (0 until 200).map { i => - val expRow = generateRowFromAllTypesFile(readSchema, i) - TestRow(expRow: _*) - } + val expResult = generateRowsFromAllTypesFile(readSchema) + + checkAnswer(actResult, expResult) + } + + test("read subset of columns with missing columns in file") { + val readSchema = new StructType() + .add("booleanType", BooleanType.BOOLEAN) + .add("integerType", IntegerType.INTEGER) + .add("missing_column_struct", new StructType().add("ab", IntegerType.INTEGER)) + .add("longType", LongType.LONG) + .add("missing_column_primitive", DateType.DATE) + .add("nested_struct", new StructType() + .add("aa", StringType.STRING) + .add("ac", new StructType().add("aca", IntegerType.INTEGER))) + + val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) + + val expResult = generateRowsFromAllTypesFile(readSchema) checkAnswer(actResult, expResult) } @@ -146,69 +158,72 @@ class ParquetFileReaderSuite extends AnyFunSuite val path = getTestResourceFilePath("parquet-basic-row-indexes") val actResult1 = readParquetFilesUsingKernel(path, readSchema) - val expResult1 = (0 until 30) - .map(i => TestRow(i, if (i < 10) i else if (i < 20) i - 10 else i - 20)) + val expResult1 = (0L until 30L) + .map(i => TestRow(i, if (i < 10) i else if (i < 20) i - 10L else i - 20L)) checkAnswer(actResult1, expResult1) // File with multiple row-groups [0, 20000) where rowIndex = id val filePath = getTestResourceFilePath("parquet/") val actResult2 = readParquetFilesUsingKernel(filePath, readSchema) - val expResult2 = (0 until 20000).map(i => TestRow(i, i)) + val expResult2 = (0L until 20000L).map(i => TestRow(i, i)) checkAnswer(actResult2, expResult2) } - private def generateRowFromAllTypesFile(readSchema: StructType, rowId: Int): Seq[Any] = { - val expRow = ArrayBuffer.empty[Any] - readSchema.fields.forEach { field => - val name = field.getName.toLowerCase() - val expValue = name match { - case "booleantype" => if (rowId % 87 != 0) rowId % 2 == 0 else null - case "bytetype" => if (rowId % 72 != 0) rowId.toByte else null - case "shorttype" => if (rowId % 56 != 0) rowId.toShort else null - case "datetype" => if (rowId % 61 != 0) { - Math.floorDiv(rowId * 20000000L, DateTimeConstants.MILLIS_PER_DAY).toInt - } else null - case "integertype" => if (rowId % 23 != 0) rowId else null - case "longtype" => if (rowId % 25 != 0) rowId + 1L else null - case "floattype" => if (rowId % 28 != 0) { - new BigDecimal(rowId * 0.234f).setScale(3, RoundingMode.HALF_UP) - .stripTrailingZeros.floatValue - } else null - case "doubletype" => if (rowId % 54 != 0) rowId * 234234.23d else null - case "stringtype" => if (rowId % 57 != 0) rowId.toString else null - case "binarytype" => if (rowId % 59 != 0) rowId.toString.getBytes else null - case "timestamptype" => - // Tests only for spark.sql.parquet.outputTimestampTyp = INT96, other formats - // are tested in end-to-end tests in DeltaTableReadsSuite - if (rowId % 62 != 0) 23423523L * rowId * 1000 else null - case "decimal" => - // Value is rounded to scale=2 when written - if (rowId % 67 != 0) new BigDecimal(rowId * 123.52).setScale(2, RoundingMode.HALF_UP) - else null - case "nested_struct" => generateNestedStructColumn(rowId) - case "array_of_prims" => if (rowId % 25 == 0) null - else if (rowId % 29 == 0) Vector.empty - else Vector(rowId, null, rowId + 1) - case "array_of_arrays" => generateArrayOfArraysColumn(rowId) - case "array_of_structs" => Vector(TestRow(rowId), null) - case "map_of_prims" => generateMapOfPrimsColumn(rowId) - case "map_of_rows" => Map(rowId + 1 -> (if (rowId % 10 == 0) TestRow(rowId*20) else null)) - case "map_of_arrays" => generateMapOfArraysColumn(rowId) - case "missing_column_primitive" => null - case "missing_column_struct" => null - case _ => throw new IllegalArgumentException("unknown column: " + name) + private def generateRowsFromAllTypesFile(readSchema: StructType): Seq[TestRow] = { + (0 until 200).map { rowId => + val expRow = ArrayBuffer.empty[Any] + readSchema.fields.forEach { field => + val name = field.getName.toLowerCase() + val expValue = name match { + case "booleantype" => if (rowId % 87 != 0) rowId % 2 == 0 else null + case "bytetype" => if (rowId % 72 != 0) rowId.toByte else null + case "shorttype" => if (rowId % 56 != 0) rowId.toShort else null + case "datetype" => if (rowId % 61 != 0) { + Math.floorDiv(rowId * 20000000L, DateTimeConstants.MILLIS_PER_DAY).toInt + } else null + case "integertype" => if (rowId % 23 != 0) rowId else null + case "longtype" => if (rowId % 25 != 0) rowId + 1L else null + case "floattype" => if (rowId % 28 != 0) { + new BigDecimal(rowId * 0.234f).setScale(3, RoundingMode.HALF_UP) + .stripTrailingZeros.floatValue + } else null + case "doubletype" => if (rowId % 54 != 0) rowId * 234234.23d else null + case "stringtype" => if (rowId % 57 != 0) rowId.toString else null + case "binarytype" => if (rowId % 59 != 0) rowId.toString.getBytes else null + case "timestamptype" => + // Tests only for spark.sql.parquet.outputTimestampTyp = INT96, other formats + // are tested in end-to-end tests in DeltaTableReadsSuite + if (rowId % 62 != 0) 23423523L * rowId * 1000 else null + case "decimal" => + // Value is rounded to scale=2 when written + if (rowId % 67 != 0) new BigDecimal(rowId * 123.52).setScale(2, RoundingMode.HALF_UP) + else null + case "nested_struct" => generateNestedStructColumn(rowId) + case "array_of_prims" => if (rowId % 25 == 0) null + else if (rowId % 29 == 0) Vector.empty[Int] + else Vector(rowId, null, rowId + 1) + case "array_of_arrays" => generateArrayOfArraysColumn(rowId) + case "array_of_structs" => Vector(TestRow(rowId.toLong), null) + case "map_of_prims" => generateMapOfPrimsColumn(rowId) + case "map_of_rows" => + Map(rowId + 1 -> (if (rowId % 10 == 0) TestRow(rowId * 20L) else null)) + case "map_of_arrays" => generateMapOfArraysColumn(rowId) + case "missing_column_primitive" => null + case "missing_column_struct" => null + case _ => throw new IllegalArgumentException("unknown column: " + name) + } + expRow += expValue } - expRow += expValue + TestRow(expRow: _*) } - expRow } private def generateNestedStructColumn(rowId: Int): TestRow = { if (rowId % 63 == 0) return null - if (rowId % 19 != 0 && rowId % 23 != 0) return TestRow(rowId, TestRow(rowId)) - if (rowId % 23 == 0) return TestRow(rowId, null) + if (rowId % 19 != 0 && rowId % 23 != 0) return TestRow(rowId.toString, TestRow(rowId)) + if (rowId % 23 == 0) return TestRow(rowId.toString, null) TestRow(null, null) } @@ -221,15 +236,14 @@ class ParquetFileReaderSuite extends AnyFunSuite ) } - private def generateArrayOfArraysColumn(rowId: Int): Vector[ArrayBuffer[Any]] = { - if (rowId % 8 == 0) { - return null - } - val emptyArray = ArrayBuffer.empty[Any] - val singleElemArray = ArrayBuffer[Any](rowId) - val doubleElemArray = ArrayBuffer[Any](rowId + 10, rowId + 20) - val arrayWithNulls = ArrayBuffer[Any](null, rowId + 200) - val singleElemNullArray = ArrayBuffer[Any](null) + private def generateArrayOfArraysColumn(rowId: Int): Seq[Seq[Any]] = { + if (rowId % 8 == 0) return null + + val emptyArray = ArrayBuffer.empty[Int] + val singleElemArray = ArrayBuffer(rowId) + val doubleElemArray = ArrayBuffer(rowId + 10, rowId + 20) + val arrayWithNulls = ArrayBuffer(null, rowId + 200) + val singleElemNullArray = ArrayBuffer(null) rowId % 7 match { case 0 => Vector(singleElemArray, singleElemArray, arrayWithNulls) @@ -238,21 +252,19 @@ class ParquetFileReaderSuite extends AnyFunSuite case 3 => Vector(singleElemNullArray) case 4 => Vector(null) case 5 => Vector(emptyArray) - case 6 => Vector.empty[ArrayBuffer[Any]] + case 6 => Vector.empty[ArrayBuffer[Int]] } } - private def generateMapOfArraysColumn(rowId: Int): Map[Int, ArrayBuffer[Any]] = { - if (rowId % 30 == 0) { - return null - } - val val1 = if (rowId % 4 == 0) ArrayBuffer(rowId, null, rowId + 1) else ArrayBuffer.empty[Any] - val val2 = if (rowId % 7 == 0) ArrayBuffer.empty[Any] else ArrayBuffer[Any](null) - val expMap = if (rowId % 24 == 0) { - Map.empty[Int, ArrayBuffer[Any]] - } else { - Map[Int, ArrayBuffer[Any]](rowId -> val1, rowId + 1 -> val2) - } + private def generateMapOfArraysColumn(rowId: Int): Map[Long, Seq[Any]] = { + if (rowId % 30 == 0) return null + + val val1 = if (rowId % 4 == 0) ArrayBuffer[Any](rowId, null, rowId + 1) + else ArrayBuffer.empty[Any] + val val2 = if (rowId % 7 == 0) ArrayBuffer.empty[Any] + else ArrayBuffer[Any](null) + val expMap = if (rowId % 24 == 0) Map.empty[Long, ArrayBuffer[Any]] + else Map[Long, ArrayBuffer[Any]](rowId.toLong -> val1, rowId + 1L -> val2) expMap } From a688dee173851380320695338266e6b08d28a54b Mon Sep 17 00:00:00 2001 From: Tai Le Manh Date: Tue, 5 Mar 2024 01:04:56 +0700 Subject: [PATCH 4/5] [Kernel][Java to Scala test conversion] Convert and refactor TestParquetBatchReader Signed-off-by: Tai Le Manh --- .../parquet/ParquetFileReaderSuite.scala | 174 ++---------------- 1 file changed, 20 insertions(+), 154 deletions(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala index 6152b1b83f6..2f393882cc7 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala @@ -15,13 +15,9 @@ */ package io.delta.kernel.defaults.internal.parquet -import java.io.File -import java.math.{BigDecimal, RoundingMode} +import java.math.BigDecimal -import scala.collection.mutable.ArrayBuffer - -import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath} -import io.delta.kernel.defaults.internal.DefaultKernelUtils.DateTimeConstants +import io.delta.golden.GoldenTableUtils.goldenTableFile import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow, VectorTestUtils} import io.delta.kernel.types._ import org.scalatest.funsuite.AnyFunSuite @@ -29,44 +25,25 @@ import org.scalatest.funsuite.AnyFunSuite class ParquetFileReaderSuite extends AnyFunSuite with ParquetSuiteBase with VectorTestUtils with ExpressionTestUtils { - def getSingleParquetFile(directory: File): String = { - val parquetFiles = directory.listFiles().filter(_.getName.endsWith(".parquet")) - assert(parquetFiles.size == 1) - parquetFiles.head.getAbsolutePath - } - - ////////////////////////////////////////////////////////////////////////////////// - // Decimal type tests - ////////////////////////////////////////////////////////////////////////////////// - - private val DECIMAL_TYPES_DICT_FILE_V1 = getSingleParquetFile( - goldenTableFile("parquet-decimal-dictionaries-v1")) - - private val DECIMAL_TYPES_DICT_FILE_V2 = getSingleParquetFile( - goldenTableFile("parquet-decimal-dictionaries-v2")) - test("decimals encoded using dictionary encoding ") { + val DECIMAL_DICT_FILE_V1 = goldenTableFile("parquet-decimal-dictionaries-v1").getAbsolutePath + val DECIMAL_DICT_FILE_V2 = goldenTableFile("parquet-decimal-dictionaries-v2").getAbsolutePath + val expResult = (0 until 1000000).map { i => TestRow(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2)) } - val readSchema = new StructType() - .add("id", IntegerType.INTEGER) - .add("col1", new DecimalType(9, 0)) // INT32: 1 <= precision <= 9 - .add("col2", new DecimalType(12, 0)) // INT64: 10 <= precision <= 18 - .add("col3", new DecimalType(25, 0)) // FIXED_LEN_BYTE_ARRAY + val readSchema = tableSchema(DECIMAL_DICT_FILE_V1) - for (file <- Seq(DECIMAL_TYPES_DICT_FILE_V1, DECIMAL_TYPES_DICT_FILE_V2)) { - val actResult = readParquetFilesUsingSpark(file, readSchema) + for (file <- Seq(DECIMAL_DICT_FILE_V1, DECIMAL_DICT_FILE_V2)) { + val actResult = readParquetFilesUsingKernel(file, readSchema) checkAnswer(actResult, expResult) } } - private val LARGE_SCALE_DECIMAL_TYPES_FILE = getSingleParquetFile( - goldenTableFile("parquet-decimal-type")) - test("large scale decimal type file") { + val LARGE_SCALE_DECIMAL_TYPES_FILE = goldenTableFile("parquet-decimal-type").getAbsolutePath def expand(n: BigDecimal): BigDecimal = { n.scaleByPowerOfTen(5).add(n) @@ -77,11 +54,7 @@ class ParquetFileReaderSuite extends AnyFunSuite val n = BigDecimal.valueOf(i) TestRow(i, n.movePointLeft(1).setScale(1), n.setScale(5), n.setScale(5)) } else { - val negation = if (i % 33 == 0) { - -1 - } else { - 1 - } + val negation = if (i % 33 == 0) -1 else 1 val n = BigDecimal.valueOf(i*negation) TestRow( i, @@ -92,25 +65,21 @@ class ParquetFileReaderSuite extends AnyFunSuite } } - val readSchema = new StructType() - .add("id", IntegerType.INTEGER) - .add("col1", new DecimalType(5, 1)) // INT32: 1 <= precision <= 9 - .add("col2", new DecimalType(10, 5)) // INT64: 10 <= precision <= 18 - .add("col3", new DecimalType(20, 5)) // FIXED_LEN_BYTE_ARRAY + val readSchema = tableSchema(LARGE_SCALE_DECIMAL_TYPES_FILE) - val actResult = readParquetFilesUsingSpark(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema) + val actResult = readParquetFilesUsingKernel(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema) checkAnswer(actResult, expResult) } - private val ALL_TYPES_FILE = getSingleParquetFile(goldenTableFile("parquet-all-types")) + private val ALL_TYPES_FILE = goldenTableFile("parquet-all-types").getAbsolutePath test("read all types of data") { - val readSchema = tableSchema(goldenTablePath("parquet-all-types")) + val readSchema = tableSchema(ALL_TYPES_FILE) - val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) + val actResult = readParquetFilesUsingKernel(ALL_TYPES_FILE, readSchema) - val expResult = generateRowsFromAllTypesFile(readSchema) + val expResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) checkAnswer(actResult, expResult) } @@ -126,9 +95,9 @@ class ParquetFileReaderSuite extends AnyFunSuite .add("ac", new StructType().add("aca", IntegerType.INTEGER))) .add("array_of_prims", new ArrayType(IntegerType.INTEGER, true)) - val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) + val actResult = readParquetFilesUsingKernel(ALL_TYPES_FILE, readSchema) - val expResult = generateRowsFromAllTypesFile(readSchema) + val expResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) checkAnswer(actResult, expResult) } @@ -144,9 +113,9 @@ class ParquetFileReaderSuite extends AnyFunSuite .add("aa", StringType.STRING) .add("ac", new StructType().add("aca", IntegerType.INTEGER))) - val actResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) + val actResult = readParquetFilesUsingKernel(ALL_TYPES_FILE, readSchema) - val expResult = generateRowsFromAllTypesFile(readSchema) + val expResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) checkAnswer(actResult, expResult) } @@ -170,107 +139,4 @@ class ParquetFileReaderSuite extends AnyFunSuite checkAnswer(actResult2, expResult2) } - - private def generateRowsFromAllTypesFile(readSchema: StructType): Seq[TestRow] = { - (0 until 200).map { rowId => - val expRow = ArrayBuffer.empty[Any] - readSchema.fields.forEach { field => - val name = field.getName.toLowerCase() - val expValue = name match { - case "booleantype" => if (rowId % 87 != 0) rowId % 2 == 0 else null - case "bytetype" => if (rowId % 72 != 0) rowId.toByte else null - case "shorttype" => if (rowId % 56 != 0) rowId.toShort else null - case "datetype" => if (rowId % 61 != 0) { - Math.floorDiv(rowId * 20000000L, DateTimeConstants.MILLIS_PER_DAY).toInt - } else null - case "integertype" => if (rowId % 23 != 0) rowId else null - case "longtype" => if (rowId % 25 != 0) rowId + 1L else null - case "floattype" => if (rowId % 28 != 0) { - new BigDecimal(rowId * 0.234f).setScale(3, RoundingMode.HALF_UP) - .stripTrailingZeros.floatValue - } else null - case "doubletype" => if (rowId % 54 != 0) rowId * 234234.23d else null - case "stringtype" => if (rowId % 57 != 0) rowId.toString else null - case "binarytype" => if (rowId % 59 != 0) rowId.toString.getBytes else null - case "timestamptype" => - // Tests only for spark.sql.parquet.outputTimestampTyp = INT96, other formats - // are tested in end-to-end tests in DeltaTableReadsSuite - if (rowId % 62 != 0) 23423523L * rowId * 1000 else null - case "decimal" => - // Value is rounded to scale=2 when written - if (rowId % 67 != 0) new BigDecimal(rowId * 123.52).setScale(2, RoundingMode.HALF_UP) - else null - case "nested_struct" => generateNestedStructColumn(rowId) - case "array_of_prims" => if (rowId % 25 == 0) null - else if (rowId % 29 == 0) Vector.empty[Int] - else Vector(rowId, null, rowId + 1) - case "array_of_arrays" => generateArrayOfArraysColumn(rowId) - case "array_of_structs" => Vector(TestRow(rowId.toLong), null) - case "map_of_prims" => generateMapOfPrimsColumn(rowId) - case "map_of_rows" => - Map(rowId + 1 -> (if (rowId % 10 == 0) TestRow(rowId * 20L) else null)) - case "map_of_arrays" => generateMapOfArraysColumn(rowId) - case "missing_column_primitive" => null - case "missing_column_struct" => null - case _ => throw new IllegalArgumentException("unknown column: " + name) - } - expRow += expValue - } - TestRow(expRow: _*) - } - } - - private def generateNestedStructColumn(rowId: Int): TestRow = { - if (rowId % 63 == 0) return null - if (rowId % 19 != 0 && rowId % 23 != 0) return TestRow(rowId.toString, TestRow(rowId)) - if (rowId % 23 == 0) return TestRow(rowId.toString, null) - TestRow(null, null) - } - - private def generateMapOfPrimsColumn(rowId: Int): Map[Int, Any] = { - if (rowId % 28 == 0) return null - if (rowId % 30 == 0) return Map.empty[Int, Any] - Map( - rowId -> (if (rowId % 29 == 0) null else rowId + 2L), - (if (rowId % 27 != 0) rowId + 2 else rowId + 3) -> (rowId + 9L) - ) - } - - private def generateArrayOfArraysColumn(rowId: Int): Seq[Seq[Any]] = { - if (rowId % 8 == 0) return null - - val emptyArray = ArrayBuffer.empty[Int] - val singleElemArray = ArrayBuffer(rowId) - val doubleElemArray = ArrayBuffer(rowId + 10, rowId + 20) - val arrayWithNulls = ArrayBuffer(null, rowId + 200) - val singleElemNullArray = ArrayBuffer(null) - - rowId % 7 match { - case 0 => Vector(singleElemArray, singleElemArray, arrayWithNulls) - case 1 => Vector(singleElemArray, doubleElemArray, emptyArray) - case 2 => Vector(arrayWithNulls) - case 3 => Vector(singleElemNullArray) - case 4 => Vector(null) - case 5 => Vector(emptyArray) - case 6 => Vector.empty[ArrayBuffer[Int]] - } - } - - private def generateMapOfArraysColumn(rowId: Int): Map[Long, Seq[Any]] = { - if (rowId % 30 == 0) return null - - val val1 = if (rowId % 4 == 0) ArrayBuffer[Any](rowId, null, rowId + 1) - else ArrayBuffer.empty[Any] - val val2 = if (rowId % 7 == 0) ArrayBuffer.empty[Any] - else ArrayBuffer[Any](null) - val expMap = if (rowId % 24 == 0) Map.empty[Long, ArrayBuffer[Any]] - else Map[Long, ArrayBuffer[Any]](rowId.toLong -> val1, rowId + 1L -> val2) - - expMap - } - - ////////////////////////////////////////////////////////////////////////////////// - // Timestamp type tests - ////////////////////////////////////////////////////////////////////////////////// - // TODO move over from DeltaTableReadsSuite once there is better testing infra } From aff2f564aba5462dd7c2ee27c9faa051841cb3ad Mon Sep 17 00:00:00 2001 From: Tai Le Manh Date: Tue, 5 Mar 2024 01:28:07 +0700 Subject: [PATCH 5/5] [Kernel][Java to Scala test conversion] Convert and refactor TestParquetBatchReader Signed-off-by: Tai Le Manh --- .../parquet/ParquetFileReaderSuite.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala index 2f393882cc7..8b19a8422db 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala @@ -26,16 +26,18 @@ class ParquetFileReaderSuite extends AnyFunSuite with ParquetSuiteBase with VectorTestUtils with ExpressionTestUtils { test("decimals encoded using dictionary encoding ") { - val DECIMAL_DICT_FILE_V1 = goldenTableFile("parquet-decimal-dictionaries-v1").getAbsolutePath - val DECIMAL_DICT_FILE_V2 = goldenTableFile("parquet-decimal-dictionaries-v2").getAbsolutePath + // Below golden tables contains three decimal columns + // each stored in a different physical format: int32, int64 and fixed binary + val decimalDictFileV1 = goldenTableFile("parquet-decimal-dictionaries-v1").getAbsolutePath + val decimalDictFileV2 = goldenTableFile("parquet-decimal-dictionaries-v2").getAbsolutePath val expResult = (0 until 1000000).map { i => TestRow(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2)) } - val readSchema = tableSchema(DECIMAL_DICT_FILE_V1) + val readSchema = tableSchema(decimalDictFileV1) - for (file <- Seq(DECIMAL_DICT_FILE_V1, DECIMAL_DICT_FILE_V2)) { + for (file <- Seq(decimalDictFileV1, decimalDictFileV2)) { val actResult = readParquetFilesUsingKernel(file, readSchema) checkAnswer(actResult, expResult) @@ -43,7 +45,7 @@ class ParquetFileReaderSuite extends AnyFunSuite } test("large scale decimal type file") { - val LARGE_SCALE_DECIMAL_TYPES_FILE = goldenTableFile("parquet-decimal-type").getAbsolutePath + val largeScaleDecimalTypesFile = goldenTableFile("parquet-decimal-type").getAbsolutePath def expand(n: BigDecimal): BigDecimal = { n.scaleByPowerOfTen(5).add(n) @@ -65,9 +67,9 @@ class ParquetFileReaderSuite extends AnyFunSuite } } - val readSchema = tableSchema(LARGE_SCALE_DECIMAL_TYPES_FILE) + val readSchema = tableSchema(largeScaleDecimalTypesFile) - val actResult = readParquetFilesUsingKernel(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema) + val actResult = readParquetFilesUsingKernel(largeScaleDecimalTypesFile, readSchema) checkAnswer(actResult, expResult) }