Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public abstract class ColumnVector implements AutoCloseable {
public abstract void close();

/**
* Cleans up memory for this column vector if it's not writable. The column vector is not usable
* after this.
* Cleans up memory for this column vector if it's resources are freeable between batches.
* The column vector is not usable after this.
*
* If this is a writable column vector, it is a no-op.
* If this is a writable column vector or constant column vector, it is a no-op.
*/
public void closeIfNotWritable() {
// By default, we just call close() for all column vectors. If a column vector is writable, it
// should override this method and do nothing.
public void closeIfFreeable() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ColumnVector is an API actually. Can we avoid direct renaming here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original PR which added this method 800faf0 was merged ~a month ago. is it too late to now change the method name?

If so, how do you suggest we go about this? I think closeIfNotWritable doesn't represent the purpose of the method well, so one option is to deprecate closeIfNotWritable and directly call closeIfFreeable inside it. This is pretty ugly, though, so I'm open to any suggestions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah okie dokie then should be fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it is a new API just added recently and not released yet.

// By default, we just call close() for all column vectors. If a column vector is writable or
// constant, it should override this method and do nothing.
close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public void close() {
}

/**
* Called to close all the columns if they are not writable. This is used to clean up memory
* allocated during columnar processing.
* Called to close all the columns if their resources are freeable between batches.
* This is used to clean up memory allocated during columnar processing.
*/
public void closeIfNotWritable() {
public void closeIfFreeable() {
for (ColumnVector c: columns) {
c.closeIfNotWritable();
c.closeIfFreeable();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public ConstantColumnVector(int numRows, DataType type) {
}
}

public void closeIfFreeable() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closeIfNotWritable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah my apologies - do you think closeIfNotWritable still makes sense given that the ConstantColumnVector is not a WritableColumnVector?

Do you think closeIfFreeable makes sense? If so, I'll change the method name in ColumnVector.java in this PR as well.

Or do you have a preference for a different name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok for closeIfFreeable. I just wondered why you only changed in ConstantColumnVector.

// no-op: `ConstantColumnVector`s reuse the data backing its value across multiple batches and
// are freed at the end of execution in `close`.
}

@Override
public void close() {
stringData = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void close() {
}

@Override
public void closeIfNotWritable() {
public void closeIfFreeable() {
// no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
| $shouldStop
| }
| $idx = $numRows;
| $batch.closeIfNotWritable();
| $batch.closeIfFreeable();
| $batch = null;
| $nextBatchFuncName();
|}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,26 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

test("SPARK-50463: Partition values can be read over multiple batches") {
withTempDir { dir =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> "1") {
val path = dir.getAbsolutePath
spark.range(0, 5)
.selectExpr("concat(cast(id % 2 as string), 'a') as partCol", "id")
.write
.format("parquet")
.mode("overwrite")
.partitionBy("partCol").save(path)
val df = spark.read.format("parquet").load(path).selectExpr("partCol")
val expected = spark.range(0, 5)
.selectExpr("concat(cast(id % 2 as string), 'a') as partCol")
.collect()

checkAnswer(df, expected)
}
}
}

test("SPARK-10301 requested schema clipping - same schema") {
withTempPath { dir =>
val path = dir.getCanonicalPath
Expand Down