Skip to content

Commit

Permalink
Fix Iceberg read when positional delete files are unaligned (#10261)
Browse files Browse the repository at this point in the history
Summary:
When the base data file and positional delete files contains multiple
unaligned RowGroups, some of the bits at the end of
IcebergSplitReader::deleteBitmap_ could be mistakenly skipped, causing
wrong result. This commit fixes it by introducing an offset into this
deleteBitmap_ and shift the unused bits to the beginning for each batch.

Fixes #9856

Pull Request resolved: #10261

Reviewed By: xiaoxmeng

Differential Revision: D59350780

Pulled By: Yuhta

fbshipit-source-id: 368101d306d78c25a3616b8ae193c8f221330533
  • Loading branch information
yingsu00 authored and facebook-github-bot committed Jul 5, 2024
1 parent 8d5e317 commit 66aeca4
Show file tree
Hide file tree
Showing 8 changed files with 643 additions and 239 deletions.
46 changes: 37 additions & 9 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ IcebergSplitReader::IcebergSplitReader(
executor,
scanSpec),
baseReadOffset_(0),
splitOffset_(0) {}
splitOffset_(0),
deleteBitmap_(nullptr),
deleteBitmapBitOffset_(0) {}

void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
Expand Down Expand Up @@ -97,29 +99,55 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) {
mutation.randomSkip = baseReaderOpts_.randomSkip().get();
mutation.deletedRows = nullptr;

if (deleteBitmap_ && deleteBitmapBitOffset_ > 0) {
// There are unconsumed bits from last batch
if (deleteBitmapBitOffset_ < deleteBitmap_->size() * 8) {
bits::copyBits(
deleteBitmap_->as<uint64_t>(),
deleteBitmapBitOffset_,
deleteBitmap_->asMutable<uint64_t>(),
0,
deleteBitmap_->size() * 8 - deleteBitmapBitOffset_);

uint64_t newBitMapSizeInBytes =
deleteBitmap_->size() - deleteBitmapBitOffset_ / 8;
if (deleteBitmapBitOffset_ % 8 != 0) {
newBitMapSizeInBytes--;
}
deleteBitmap_->setSize(newBitMapSizeInBytes);
} else {
// All bits were consumed, reset to 0 for all bits
std::memset(
(void*)(deleteBitmap_->asMutable<int8_t>()),
0L,
deleteBitmap_->size());
}
}

if (!positionalDeleteFileReaders_.empty()) {
auto numBytes = bits::nbytes(size);
dwio::common::ensureCapacity<int8_t>(
deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool());
std::memset((void*)deleteBitmap_->as<int8_t>(), 0L, numBytes);
deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool(), true, true);

for (auto iter = positionalDeleteFileReaders_.begin();
iter != positionalDeleteFileReaders_.end();) {
(*iter)->readDeletePositions(
baseReadOffset_, size, deleteBitmap_->asMutable<int8_t>());
if ((*iter)->endOfFile()) {
(*iter)->readDeletePositions(baseReadOffset_, size, deleteBitmap_);

if ((*iter)->noMoreData()) {
iter = positionalDeleteFileReaders_.erase(iter);
} else {
++iter;
}
}

deleteBitmap_->setSize(numBytes);
mutation.deletedRows = deleteBitmap_->as<uint64_t>();
}

mutation.deletedRows = deleteBitmap_ && deleteBitmap_->size() > 0
? deleteBitmap_->as<uint64_t>()
: nullptr;

auto rowsScanned = baseRowReader_->next(size, output, &mutation);
baseReadOffset_ += rowsScanned;
deleteBitmapBitOffset_ = rowsScanned;

return rowsScanned;
}
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/iceberg/IcebergSplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ class IcebergSplitReader : public SplitReader {
// The read offset to the beginning of the split in number of rows for the
// current batch for the base data file
uint64_t baseReadOffset_;

// The file position for the first row in the split
uint64_t splitOffset_;

std::list<std::unique_ptr<PositionalDeleteFileReader>>
positionalDeleteFileReaders_;
BufferPtr deleteBitmap_;
// The offset in bits of the deleteBitmap_ starting from where the bits shall
// be consumed
uint64_t deleteBitmapBitOffset_;
};
} // namespace facebook::velox::connector::hive::iceberg
62 changes: 41 additions & 21 deletions velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,9 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
deleteRowReader_(nullptr),
deletePositionsOutput_(nullptr),
deletePositionsOffset_(0),
endOfFile_(false) {
totalNumRowsScanned_(0) {
VELOX_CHECK(deleteFile_.content == FileContent::kPositionalDeletes);

if (deleteFile_.recordCount == 0) {
return;
}
VELOX_CHECK(deleteFile_.recordCount);

// TODO: check if the lowerbounds and upperbounds in deleteFile overlap with
// this batch. If not, no need to proceed.
Expand Down Expand Up @@ -137,21 +134,23 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
void PositionalDeleteFileReader::readDeletePositions(
uint64_t baseReadOffset,
uint64_t size,
int8_t* deleteBitmap) {
BufferPtr deleteBitmapBuffer) {
// We are going to read to the row number up to the end of the batch. For the
// same base file, the deleted rows are in ascending order in the same delete
// file
// file. rowNumberUpperBound is the upperbound for the row number in this
// batch, excluding boundaries
int64_t rowNumberUpperBound = splitOffset_ + baseReadOffset + size;

// Finish unused delete positions from last batch
// Finish unused delete positions from last batch. Note that at this point we
// don't know how many rows the base row reader would scan yet.
if (deletePositionsOutput_ &&
deletePositionsOffset_ < deletePositionsOutput_->size()) {
updateDeleteBitmap(
std::dynamic_pointer_cast<RowVector>(deletePositionsOutput_)
->childAt(0),
baseReadOffset,
rowNumberUpperBound,
deleteBitmap);
deleteBitmapBuffer);

if (readFinishedForBatch(rowNumberUpperBound)) {
return;
Expand All @@ -166,14 +165,15 @@ void PositionalDeleteFileReader::readDeletePositions(
// and update the delete bitmap

auto outputType = posColumn_->type;

RowTypePtr outputRowType = ROW({posColumn_->name}, {posColumn_->type});
if (!deletePositionsOutput_) {
deletePositionsOutput_ = BaseVector::create(outputRowType, 0, pool_);
}

while (!readFinishedForBatch(rowNumberUpperBound)) {
do {
auto rowsScanned = deleteRowReader_->next(size, deletePositionsOutput_);
totalNumRowsScanned_ += rowsScanned;

if (rowsScanned > 0) {
VELOX_CHECK(
!deletePositionsOutput_->mayHaveNulls(),
Expand All @@ -184,42 +184,57 @@ void PositionalDeleteFileReader::readDeletePositions(
deletePositionsOutput_->loadedVector();
deletePositionsOffset_ = 0;

// Convert the row numbers to set bits, up to rowNumberUpperBound.
// Beyond that the buffer of deleteBitMap is not available.
updateDeleteBitmap(
std::dynamic_pointer_cast<RowVector>(deletePositionsOutput_)
->childAt(0),
baseReadOffset,
rowNumberUpperBound,
deleteBitmap);
deleteBitmapBuffer);
}
} else {
// Reaching the end of the file
endOfFile_ = true;
deleteSplit_.reset();
return;
break;
}
}
} while (!readFinishedForBatch(rowNumberUpperBound));
}

bool PositionalDeleteFileReader::endOfFile() {
return endOfFile_;
bool PositionalDeleteFileReader::noMoreData() {
return totalNumRowsScanned_ >= deleteFile_.recordCount &&
deletePositionsOutput_ &&
deletePositionsOffset_ >= deletePositionsOutput_->size();
}

void PositionalDeleteFileReader::updateDeleteBitmap(
VectorPtr deletePositionsVector,
uint64_t baseReadOffset,
int64_t rowNumberUpperBound,
int8_t* deleteBitmap) {
BufferPtr deleteBitmapBuffer) {
auto deleteBitmap = deleteBitmapBuffer->asMutable<uint8_t>();

// Convert the positions in file into positions relative to the start of the
// split.
const int64_t* deletePositions =
deletePositionsVector->as<FlatVector<int64_t>>()->rawValues();
int64_t offset = baseReadOffset + splitOffset_;

while (deletePositionsOffset_ < deletePositionsVector->size() &&
deletePositions[deletePositionsOffset_] < rowNumberUpperBound) {
bits::setBit(
deleteBitmap, deletePositions[deletePositionsOffset_] - offset);
deletePositionsOffset_++;
}

// There might be multiple delete files for a single base file. The size of
// the deleteBitmapBuffer should be the largest position among all delte files
deleteBitmapBuffer->setSize(std::max(
(uint64_t)deleteBitmapBuffer->size(),
deletePositionsOffset_ == 0
? 0
: bits::nbytes(
deletePositions[deletePositionsOffset_ - 1] + 1 - offset)));
}

bool PositionalDeleteFileReader::readFinishedForBatch(
Expand All @@ -231,9 +246,14 @@ bool PositionalDeleteFileReader::readFinishedForBatch(
const int64_t* deletePositions =
deletePositionsVector->as<FlatVector<int64_t>>()->rawValues();

if (deletePositionsOutput_->size() != 0 &&
deletePositionsOffset_ < deletePositionsVector->size() &&
deletePositions[deletePositionsOffset_] >= rowNumberUpperBound) {
// We've read enough of the delete positions from this delete file when 1) it
// reaches the end of the file, or 2) the last read delete position is greater
// than the largest base file row number that is going to be read in this
// batch
if (totalNumRowsScanned_ >= deleteFile_.recordCount ||
(deletePositionsVector->size() != 0 &&
(deletePositionsOffset_ < deletePositionsVector->size() &&
deletePositions[deletePositionsOffset_] >= rowNumberUpperBound))) {
return true;
}
return false;
Expand Down
16 changes: 12 additions & 4 deletions velox/connectors/hive/iceberg/PositionalDeleteFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ class PositionalDeleteFileReader {
void readDeletePositions(
uint64_t baseReadOffset,
uint64_t size,
int8_t* deleteBitmap);
BufferPtr deleteBitmap);

bool endOfFile();
bool noMoreData();

private:
void updateDeleteBitmap(
VectorPtr deletePositionsVector,
uint64_t baseReadOffset,
int64_t rowNumberUpperBound,
int8_t* deleteBitmap);
BufferPtr deleteBitmapBuffer);

bool readFinishedForBatch(int64_t rowNumberUpperBound);

Expand All @@ -77,9 +77,17 @@ class PositionalDeleteFileReader {

std::shared_ptr<HiveConnectorSplit> deleteSplit_;
std::unique_ptr<dwio::common::RowReader> deleteRowReader_;
// The vector to hold the delete positions read from the positional delete
// file. These positions are relative to the start of the whole base data
// file.
VectorPtr deletePositionsOutput_;
// The index of deletePositionsOutput_ that indicates up to where the delete
// positions have been converted into the bitmap
uint64_t deletePositionsOffset_;
bool endOfFile_;
// Total number of rows read from this positional delete file reader,
// including the rows filtered out from filters on both filePathColumn_ and
// posColumn_.
uint64_t totalNumRowsScanned_;
};

} // namespace facebook::velox::connector::hive::iceberg
Loading

0 comments on commit 66aeca4

Please sign in to comment.