From 0dba35b44fd0b069c2d3079b6570bd9deec1552f Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Sun, 1 Dec 2024 15:59:32 -0800 Subject: [PATCH 1/7] init --- .../vectorized/ConstantColumnVector.java | 5 +++++ .../parquet/ParquetQuerySuite.scala | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java index 8b24973ad3d87..64c7feeaa4d1f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java @@ -77,6 +77,11 @@ public ConstantColumnVector(int numRows, DataType type) { } } + public void closeIfFreeable() { + // no-op: `ConstantColumnVector`s reuse the data backing its value across multiple batches and + // is freed at the end of execution in `close`. + } + @Override public void close() { stringData = null; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 22a02447e720f..7f9521de38129 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -473,6 +473,24 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } + test("SPARK-50463: Partition values can be read over multiple batches") { + withTempDir { dir => + val path = dir.getAbsolutePath + spark.range(0, 100000) + .selectExpr("concat(cast(id % 2 as string), 'a') as partCol", "id") + .write + .format("parquet") + .mode("overwrite"). + partitionBy("partCol").save(path) + val df = spark.read.format("parquet").load(path).selectExpr("partCol") + val expected = spark.range(0, 100000) + .selectExpr("concat(cast(id % 2 as string), 'a') as partCol") + .collect() + + checkAnswer(df, expected) + } + } + test("SPARK-10301 requested schema clipping - same schema") { withTempPath { dir => val path = dir.getCanonicalPath From 7e1b471bd9f8c2f0f2489ac65b8ade72355347b2 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Sun, 1 Dec 2024 16:10:10 -0800 Subject: [PATCH 2/7] comment --- .../spark/sql/execution/vectorized/ConstantColumnVector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java index 64c7feeaa4d1f..cd2a821698853 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java @@ -79,7 +79,7 @@ public ConstantColumnVector(int numRows, DataType type) { public void closeIfFreeable() { // no-op: `ConstantColumnVector`s reuse the data backing its value across multiple batches and - // is freed at the end of execution in `close`. + // are freed at the end of execution in `close`. } @Override From 20e5b284991f505b3c1dc14878e480941f648013 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Sun, 1 Dec 2024 16:17:47 -0800 Subject: [PATCH 3/7] update test --- .../parquet/ParquetQuerySuite.scala | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 7f9521de38129..dbc2d363e74d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -475,19 +475,21 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS test("SPARK-50463: Partition values can be read over multiple batches") { withTempDir { dir => - val path = dir.getAbsolutePath - spark.range(0, 100000) - .selectExpr("concat(cast(id % 2 as string), 'a') as partCol", "id") - .write - .format("parquet") - .mode("overwrite"). - partitionBy("partCol").save(path) - val df = spark.read.format("parquet").load(path).selectExpr("partCol") - val expected = spark.range(0, 100000) - .selectExpr("concat(cast(id % 2 as string), 'a') as partCol") - .collect() - - checkAnswer(df, expected) + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> "1") { + val path = dir.getAbsolutePath + spark.range(0, 5) + .selectExpr("concat(cast(id % 2 as string), 'a') as partCol", "id") + .write + .format("parquet") + .mode("overwrite"). + partitionBy("partCol").save(path) + val df = spark.read.format("parquet").load(path).selectExpr("partCol") + val expected = spark.range(0, 5) + .selectExpr("concat(cast(id % 2 as string), 'a') as partCol") + .collect() + + checkAnswer(df, expected) + } } } From 0043dee2bb15e1909c4ce90e7034f906a6b8ba34 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Sun, 1 Dec 2024 16:22:20 -0800 Subject: [PATCH 4/7] style --- .../sql/execution/datasources/parquet/ParquetQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index dbc2d363e74d0..8a7377b896374 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -481,8 +481,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .selectExpr("concat(cast(id % 2 as string), 'a') as partCol", "id") .write .format("parquet") - .mode("overwrite"). - partitionBy("partCol").save(path) + .mode("overwrite") + .partitionBy("partCol").save(path) val df = spark.read.format("parquet").load(path).selectExpr("partCol") val expected = spark.range(0, 5) .selectExpr("concat(cast(id % 2 as string), 'a') as partCol") From 08fc7ed07b85d63f32a6565dcb16e03274a17fa9 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Sun, 1 Dec 2024 16:46:25 -0800 Subject: [PATCH 5/7] style --- .../sql/execution/datasources/parquet/ParquetQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 8a7377b896374..bba71f1c48dec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -487,7 +487,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val expected = spark.range(0, 5) .selectExpr("concat(cast(id % 2 as string), 'a') as partCol") .collect() - + checkAnswer(df, expected) } } From 9e453bf8f887e23e90bf47737bdff3346055ca90 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Sun, 1 Dec 2024 18:55:39 -0800 Subject: [PATCH 6/7] rename --- .../apache/spark/sql/vectorized/ColumnVector.java | 12 ++++++------ .../apache/spark/sql/vectorized/ColumnarBatch.java | 8 ++++---- .../execution/vectorized/WritableColumnVector.java | 2 +- .../org/apache/spark/sql/execution/Columnar.scala | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index bfb1833b731a7..d5fd7e61897a1 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -69,14 +69,14 @@ public abstract class ColumnVector implements AutoCloseable { public abstract void close(); /** - * Cleans up memory for this column vector if it's not writable. The column vector is not usable - * after this. + * Cleans up memory for this column vector if it's resources are freeable bewteen batches. + * The column vector is not usable after this. * - * If this is a writable column vector, it is a no-op. + * If this is a writable column vector or constant column vector, it is a no-op. */ - public void closeIfNotWritable() { - // By default, we just call close() for all column vectors. If a column vector is writable, it - // should override this method and do nothing. + public void closeIfFreeable() { + // By default, we just call close() for all column vectors. If a column vector is writable or + // constant, it should override this method and do nothing. close(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index 52e4115af336a..7ef570a212292 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -46,12 +46,12 @@ public void close() { } /** - * Called to close all the columns if they are not writable. This is used to clean up memory - * allocated during columnar processing. + * Called to close all the columns if their resources are freeable between batches. + * This is used to clean up memory allocated during columnar processing. */ - public void closeIfNotWritable() { + public void closeIfFreeable() { for (ColumnVector c: columns) { - c.closeIfNotWritable(); + c.closeIfFreeable(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 696e20525cdac..fc465e73006be 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -97,7 +97,7 @@ public void close() { } @Override - public void closeIfNotWritable() { + public void closeIfFreeable() { // no-op } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 64163da50e13a..a67648f24b4c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -194,7 +194,7 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w | $shouldStop | } | $idx = $numRows; - | $batch.closeIfNotWritable(); + | $batch.closeIfFreeable(); | $batch = null; | $nextBatchFuncName(); |} From ca1d5d4db9c86d2965c75198106ee511f6b9e159 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Sun, 1 Dec 2024 19:25:01 -0800 Subject: [PATCH 7/7] spelling --- .../main/java/org/apache/spark/sql/vectorized/ColumnVector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index d5fd7e61897a1..54b62c00283fa 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -69,7 +69,7 @@ public abstract class ColumnVector implements AutoCloseable { public abstract void close(); /** - * Cleans up memory for this column vector if it's resources are freeable bewteen batches. + * Cleans up memory for this column vector if it's resources are freeable between batches. * The column vector is not usable after this. * * If this is a writable column vector or constant column vector, it is a no-op.