From c1abdcd14afa450b9ec7d487c06b1ba8e99f2989 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 5 Nov 2024 12:16:22 -0800 Subject: [PATCH] Close not writable ColumnVector after processing all rows in ColumnarToRowExec --- .../apache/spark/sql/vectorized/ColumnVector.java | 12 ++++++++++++ .../apache/spark/sql/vectorized/ColumnarBatch.java | 10 ++++++++++ .../execution/vectorized/WritableColumnVector.java | 5 +++++ .../org/apache/spark/sql/execution/Columnar.scala | 5 +++++ 4 files changed, 32 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index cd3c30fa69335..bfb1833b731a7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -68,6 +68,18 @@ public abstract class ColumnVector implements AutoCloseable { @Override public abstract void close(); + /** + * Cleans up memory for this column vector if it's not writable. The column vector is not usable + * after this. + * + * If this is a writable column vector, it is a no-op. + */ + public void closeIfNotWritable() { + // By default, we just call close() for all column vectors. If a column vector is writable, it + // should override this method and do nothing. + close(); + } + /** * Returns true if this column vector contains any null values. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index 9e859e77644ac..52e4115af336a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -45,6 +45,16 @@ public void close() { } } + /** + * Called to close all the columns if they are not writable. This is used to clean up memory + * allocated during columnar processing. + */ + public void closeIfNotWritable() { + for (ColumnVector c: columns) { + c.closeIfNotWritable(); + } + } + /** * Returns an iterator over the rows in this batch. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 10594d6c5d340..696e20525cdac 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -96,6 +96,11 @@ public void close() { releaseMemory(); } + @Override + public void closeIfNotWritable() { + // no-op + } + public void reserveAdditional(int additionalCapacity) { reserve(elementsAppended + additionalCapacity); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 111851094a69b..64163da50e13a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -194,9 +194,14 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w | $shouldStop | } | $idx = $numRows; + | $batch.closeIfNotWritable(); | $batch = null; | $nextBatchFuncName(); |} + |// clean up resources + |if ($batch != null) { + | $batch.close(); + |} """.stripMargin }