From b772970cd7683eca5406207378e655621f401e9f Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Tue, 3 Dec 2024 08:36:59 -0800 Subject: [PATCH] [SPARK-50463][SQL] Fix `ConstantColumnVector` with Columnar to Row conversion ### What changes were proposed in this pull request? https://github.com/apache/spark/commit/800faf0abfa368ad0a5ef1e0fa44b74dbaab724e frees column vector resources between batches in columnar to row conversion. However, like `WritableColumnVector`, `ConstantColumnVector` should not free resources between batches because the same data is used across batches ### Why are the changes needed? Without this change, ConstantColumnVectors with string values, for example, will fail if used with column->row conversion. For instance, reading a parquet table partitioned by a string column with multiple batches. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? added UT that failed before and now passes ### Was this patch authored or co-authored using generative AI tooling? no Closes #49021 from richardc-db/col_to_row_const_col_vec_fix. Authored-by: Richard Chen Signed-off-by: Dongjoon Hyun --- .../spark/sql/vectorized/ColumnVector.java | 12 +++++------ .../spark/sql/vectorized/ColumnarBatch.java | 8 ++++---- .../vectorized/ConstantColumnVector.java | 5 +++++ .../vectorized/WritableColumnVector.java | 2 +- .../apache/spark/sql/execution/Columnar.scala | 2 +- .../parquet/ParquetQuerySuite.scala | 20 +++++++++++++++++++ 6 files changed, 37 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index 7dc2d38144296..ea199e2685a54 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -68,14 +68,14 @@ public abstract class ColumnVector implements AutoCloseable { public abstract void close(); /** - * Cleans up memory for this column vector if it's not writable. The column vector is not usable - * after this. + * Cleans up memory for this column vector if it's resources are freeable between batches. + * The column vector is not usable after this. * - * If this is a writable column vector, it is a no-op. + * If this is a writable column vector or constant column vector, it is a no-op. */ - public void closeIfNotWritable() { - // By default, we just call close() for all column vectors. If a column vector is writable, it - // should override this method and do nothing. + public void closeIfFreeable() { + // By default, we just call close() for all column vectors. If a column vector is writable or + // constant, it should override this method and do nothing. close(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index 52e4115af336a..7ef570a212292 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -46,12 +46,12 @@ public void close() { } /** - * Called to close all the columns if they are not writable. This is used to clean up memory - * allocated during columnar processing. + * Called to close all the columns if their resources are freeable between batches. + * This is used to clean up memory allocated during columnar processing. */ - public void closeIfNotWritable() { + public void closeIfFreeable() { for (ColumnVector c: columns) { - c.closeIfNotWritable(); + c.closeIfFreeable(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java index 5095e6b0c9c6b..9713998549c72 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java @@ -72,6 +72,11 @@ public ConstantColumnVector(int numRows, DataType type) { } } + public void closeIfFreeable() { + // no-op: `ConstantColumnVector`s reuse the data backing its value across multiple batches and + // are freed at the end of execution in `close`. + } + @Override public void close() { stringData = null; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 0fde85fd454c1..d23de1ff0cfe9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -88,7 +88,7 @@ public void close() { } @Override - public void closeIfNotWritable() { + public void closeIfFreeable() { // no-op } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index ea559efc45f13..bfb198adad501 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -194,7 +194,7 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w | $shouldStop | } | $idx = $numRows; - | $batch.closeIfNotWritable(); + | $batch.closeIfFreeable(); | $batch = null; | $nextBatchFuncName(); |} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 29cb224c8787c..f6472ba3d9dbc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -482,6 +482,26 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } + test("SPARK-50463: Partition values can be read over multiple batches") { + withTempDir { dir => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> "1") { + val path = dir.getAbsolutePath + spark.range(0, 5) + .selectExpr("concat(cast(id % 2 as string), 'a') as partCol", "id") + .write + .format("parquet") + .mode("overwrite") + .partitionBy("partCol").save(path) + val df = spark.read.format("parquet").load(path).selectExpr("partCol") + val expected = spark.range(0, 5) + .selectExpr("concat(cast(id % 2 as string), 'a') as partCol") + .collect() + + checkAnswer(df, expected) + } + } + } + test("SPARK-10301 requested schema clipping - same schema") { withTempPath { dir => val path = dir.getCanonicalPath