Skip to content

Commit

Permalink
Fix the bug in positional delete file traversal (#9023)
Browse files Browse the repository at this point in the history
Summary:
When a delete file has been read to the end and erased, the iter should not be incremented, otherwise the next delete file will be skipped.

Pull Request resolved: #9023

Reviewed By: pedroerp

Differential Revision: D54752335

Pulled By: Yuhta

fbshipit-source-id: 95540a6b15dc74f698080590037976dea9a4d2ac
  • Loading branch information
liujiayi771 authored and facebook-github-bot committed Mar 11, 2024
1 parent d43663c commit c5d3b93
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 33 deletions.
5 changes: 3 additions & 2 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ uint64_t IcebergSplitReader::next(int64_t size, VectorPtr& output) {
std::memset((void*)deleteBitmap_->as<int8_t>(), 0L, numBytes);

for (auto iter = positionalDeleteFileReaders_.begin();
iter != positionalDeleteFileReaders_.end();
iter++) {
iter != positionalDeleteFileReaders_.end();) {
(*iter)->readDeletePositions(
baseReadOffset_, size, deleteBitmap_->asMutable<int8_t>());
if ((*iter)->endOfFile()) {
iter = positionalDeleteFileReaders_.erase(iter);
} else {
++iter;
}
}

Expand Down
105 changes: 74 additions & 31 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ namespace facebook::velox::connector::hive::iceberg {
class HiveIcebergTest : public HiveConnectorTestBase {
public:
void assertPositionalDeletes(
const std::vector<int64_t>& deleteRows,
const std::vector<std::vector<int64_t>>& deleteRowsVec,
bool multipleBaseFiles = false) {
assertPositionalDeletes(
deleteRows,
"SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + ")",
deleteRowsVec,
"SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRowsVec) +
")",
multipleBaseFiles);
}

void assertPositionalDeletes(
const std::vector<int64_t>& deleteRows,
const std::vector<std::vector<int64_t>>& deleteRowsVec,
std::string duckdbSql,
bool multipleBaseFiles = false) {
std::shared_ptr<TempFilePath> dataFilePath = writeDataFile(rowCount);
Expand All @@ -53,21 +55,30 @@ class HiveIcebergTest : public HiveConnectorTestBase {
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
int64_t numDeleteRowsAfter =
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
std::shared_ptr<TempFilePath> deleteFilePath = writePositionDeleteFile(
dataFilePath->path,
deleteRows,
numDeleteRowsBefore,
numDeleteRowsAfter);

IcebergDeleteFile deleteFile(
FileContent::kPositionalDeletes,
deleteFilePath->path,
fileFomat_,
deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter,
testing::internal::GetFileSize(
std::fopen(deleteFilePath->path.c_str(), "r")));
// Keep the reference to the deleteFilePath, otherwise the corresponding
// file will be deleted.
std::vector<std::shared_ptr<TempFilePath>> deleteFilePaths;
std::vector<IcebergDeleteFile> deleteFiles;
deleteFilePaths.reserve(deleteRowsVec.size());
deleteFiles.reserve(deleteRowsVec.size());
for (auto const& deleteRows : deleteRowsVec) {
std::shared_ptr<TempFilePath> deleteFilePath = writePositionDeleteFile(
dataFilePath->path,
deleteRows,
numDeleteRowsBefore,
numDeleteRowsAfter);
IcebergDeleteFile deleteFile(
FileContent::kPositionalDeletes,
deleteFilePath->path,
fileFomat_,
deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter,
testing::internal::GetFileSize(
std::fopen(deleteFilePath->path.c_str(), "r")));
deleteFilePaths.emplace_back(deleteFilePath);
deleteFiles.emplace_back(deleteFile);
}

auto icebergSplit = makeIcebergSplit(dataFilePath->path, {deleteFile});
auto icebergSplit = makeIcebergSplit(dataFilePath->path, deleteFiles);

auto plan = tableScanNode();
auto task = OperatorTestBase::assertQuery(plan, {icebergSplit}, duckdbSql);
Expand Down Expand Up @@ -97,6 +108,17 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return deleteRows;
}

std::vector<std::vector<int64_t>> makeContinuousRows(
int32_t min,
int32_t max) {
std::vector<std::vector<int64_t>> deleteRows(
max - min + 1, std::vector<int64_t>(1));
for (auto i = min; i <= max; i++) {
deleteRows[i - min][0] = i;
}
return deleteRows;
}

const static int rowCount = 20000;

private:
Expand Down Expand Up @@ -200,7 +222,18 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return deleteFilePath;
}

std::string makeNotInList(const std::vector<int64_t>& deleteRows) {
std::string makeNotInList(
const std::vector<std::vector<int64_t>>& deleteRowsVec) {
std::vector<int64_t> deleteRows;
size_t totalSize = 0;
for (const auto& subVec : deleteRowsVec) {
totalSize += subVec.size();
}
deleteRows.reserve(totalSize);
for (const auto& subVec : deleteRowsVec) {
deleteRows.insert(deleteRows.end(), subVec.begin(), subVec.end());
}

if (deleteRows.empty()) {
return "";
}
Expand Down Expand Up @@ -240,41 +273,51 @@ TEST_F(HiveIcebergTest, positionalDeletesSingleBaseFile) {
folly::SingletonVault::singleton()->registrationComplete();

// Delete row 0, 1, 2, 3 from the first batch out of two.
assertPositionalDeletes({0, 1, 2, 3});
assertPositionalDeletes({{0, 1, 2, 3}});
// Delete the first and last row in each batch (10000 rows per batch)
assertPositionalDeletes({0, 9999, 10000, 19999});
assertPositionalDeletes({{0, 9999, 10000, 19999}});
// Delete several rows in the second batch (10000 rows per batch)
assertPositionalDeletes({10000, 10002, 19999});
assertPositionalDeletes({{10000, 10002, 19999}});
// Delete random rows
assertPositionalDeletes(makeRandomDeleteRows(rowCount));
assertPositionalDeletes({makeRandomDeleteRows(rowCount)});
// Delete 0 rows
assertPositionalDeletes({}, "SELECT * FROM tmp", false);
// Delete all rows
assertPositionalDeletes(
makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", false);
{makeSequenceRows(rowCount)}, "SELECT * FROM tmp WHERE 1 = 0", false);
// Delete rows that don't exist
assertPositionalDeletes({20000, 29999});
assertPositionalDeletes({{20000, 29999}});
}

// The positional delete file contains rows from multiple base files
TEST_F(HiveIcebergTest, positionalDeletesMultipleBaseFiles) {
folly::SingletonVault::singleton()->registrationComplete();

// Delete row 0, 1, 2, 3 from the first batch out of two.
assertPositionalDeletes({0, 1, 2, 3}, true);
assertPositionalDeletes({{0, 1, 2, 3}}, true);
// Delete the first and last row in each batch (10000 rows per batch)
assertPositionalDeletes({0, 9999, 10000, 19999}, true);
assertPositionalDeletes({{0, 9999, 10000, 19999}}, true);
// Delete several rows in the second batch (10000 rows per batch)
assertPositionalDeletes({10000, 10002, 19999}, true);
assertPositionalDeletes({{10000, 10002, 19999}}, true);
// Delete random rows
assertPositionalDeletes(makeRandomDeleteRows(rowCount), true);
assertPositionalDeletes({makeRandomDeleteRows(rowCount)}, true);
// Delete 0 rows
assertPositionalDeletes({}, "SELECT * FROM tmp", true);
// Delete all rows
assertPositionalDeletes(
makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", true);
{makeSequenceRows(rowCount)}, "SELECT * FROM tmp WHERE 1 = 0", true);
// Delete rows that don't exist
assertPositionalDeletes({20000, 29999}, true);
assertPositionalDeletes({{20000, 29999}}, true);
}

// The base file has multiple delete files.
TEST_F(HiveIcebergTest, baseFileMultiplePositionalDeletes) {
folly::SingletonVault::singleton()->registrationComplete();

// Delete row 0, 1, 2, 3 from the first batch out of two.
assertPositionalDeletes({{1}, {2}, {3}, {4}});
// Delete the first and last row in each batch (10000 rows per batch).
assertPositionalDeletes({{0}, {9999}, {10000}, {19999}});
}

} // namespace facebook::velox::connector::hive::iceberg

0 comments on commit c5d3b93

Please sign in to comment.