Skip to content

Commit 800faf0

Browse files
viiryayaooqinn
authored andcommitted
[SPARK-50235][SQL] Clean up ColumnVector resource after processing all rows in ColumnarToRowExec
### What changes were proposed in this pull request? This patch cleans up ColumnVector resource after processing all rows in ColumnarToRowExec. This patch only focus on codeben implementation of ColumnarToRowExec. For non-codegen, it should be relatively rare to use, and currently no good way has proposed, so leaving it to a follow up. ### Why are the changes needed? Currently we only assign null to ColumnarBatch object but it doesn't release the resources hold by the vectors in the batch. For OnHeapColumnVector, the Java arrays may be automatically collected by JVM, but for OffHeapColumnVector, the allocated off-heap memory will be leaked. For custom ColumnVector implementations like Arrow-based, it also possibly causes issues on memory safety if the underlying buffers are reused across batches. Because when ColumnarToRowExec begins to fill values for next batch, the arrays in previous batch are still hold. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #48767 from viirya/close_if_not_writable. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Kent Yao <[email protected]>
1 parent 737a65e commit 800faf0

File tree

4 files changed

+32
-0
lines changed

4 files changed

+32
-0
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ public abstract class ColumnVector implements AutoCloseable {
6868
@Override
6969
public abstract void close();
7070

71+
/**
72+
* Cleans up memory for this column vector if it's not writable. The column vector is not usable
73+
* after this.
74+
*
75+
* If this is a writable column vector, it is a no-op.
76+
*/
77+
public void closeIfNotWritable() {
78+
// By default, we just call close() for all column vectors. If a column vector is writable, it
79+
// should override this method and do nothing.
80+
close();
81+
}
82+
7183
/**
7284
* Returns true if this column vector contains any null values.
7385
*/

sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ public void close() {
4545
}
4646
}
4747

48+
/**
49+
* Called to close all the columns if they are not writable. This is used to clean up memory
50+
* allocated during columnar processing.
51+
*/
52+
public void closeIfNotWritable() {
53+
for (ColumnVector c: columns) {
54+
c.closeIfNotWritable();
55+
}
56+
}
57+
4858
/**
4959
* Returns an iterator over the rows in this batch.
5060
*/

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public void close() {
9696
releaseMemory();
9797
}
9898

99+
@Override
100+
public void closeIfNotWritable() {
101+
// no-op
102+
}
103+
99104
public void reserveAdditional(int additionalCapacity) {
100105
reserve(elementsAppended + additionalCapacity);
101106
}

sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,14 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
194194
| $shouldStop
195195
| }
196196
| $idx = $numRows;
197+
| $batch.closeIfNotWritable();
197198
| $batch = null;
198199
| $nextBatchFuncName();
199200
|}
201+
|// clean up resources
202+
|if ($batch != null) {
203+
| $batch.close();
204+
|}
200205
""".stripMargin
201206
}
202207

0 commit comments

Comments
 (0)