Skip to content

Commit

Permalink
[Spark] Fix resource leaking when a deletion vector file is not found
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Sep 27, 2023
1 parent 7f8fe8b commit 9190330
Showing 1 changed file with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,26 @@ case class DeltaParquetFileFormat(
val useOffHeapBuffers = sparkSession.sessionState.conf.offHeapColumnVectorEnabled
(partitionedFile: PartitionedFile) => {
val rowIteratorFromParquet = parquetDataReader(partitionedFile)
val iterToReturn =
iteratorWithAdditionalMetadataColumns(
partitionedFile,
rowIteratorFromParquet,
isRowDeletedColumn,
useOffHeapBuffers = useOffHeapBuffers,
rowIndexColumn = rowIndexColumn)
iterToReturn.asInstanceOf[Iterator[InternalRow]]
try {
val iterToReturn =
iteratorWithAdditionalMetadataColumns(
partitionedFile,
rowIteratorFromParquet,
isRowDeletedColumn,
useOffHeapBuffers = useOffHeapBuffers,
rowIndexColumn = rowIndexColumn)
iterToReturn.asInstanceOf[Iterator[InternalRow]]
} catch {
case NonFatal(e) =>
// Close the iterator if it is a closeable resource. The `ParquetFileFormat` opens
// the file and returns `RecordReaderIterator` (which implements `AutoCloseable` and
// `Iterator`) instance as a `Iterator`.
rowIteratorFromParquet match {
case resource: AutoCloseable => closeQuietly(resource)
case _ => // do nothing
}
throw e
}
}
}

Expand Down

0 comments on commit 9190330

Please sign in to comment.