diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index d56347cf39bf..b7c1f6b52340 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -48,7 +48,9 @@ IcebergSplitReader::IcebergSplitReader( executor, scanSpec), baseReadOffset_(0), - splitOffset_(0) {} + splitOffset_(0), + deleteBitmap_(nullptr), + deleteBitmapBitOffset_(0) {} void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, @@ -97,29 +99,55 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) { mutation.randomSkip = baseReaderOpts_.randomSkip().get(); mutation.deletedRows = nullptr; + if (deleteBitmap_ && deleteBitmapBitOffset_ > 0) { + // There are unconsumed bits from last batch + if (deleteBitmapBitOffset_ < deleteBitmap_->size() * 8) { + bits::copyBits( + deleteBitmap_->as(), + deleteBitmapBitOffset_, + deleteBitmap_->asMutable(), + 0, + deleteBitmap_->size() * 8 - deleteBitmapBitOffset_); + + uint64_t newBitMapSizeInBytes = + deleteBitmap_->size() - deleteBitmapBitOffset_ / 8; + if (deleteBitmapBitOffset_ % 8 != 0) { + newBitMapSizeInBytes--; + } + deleteBitmap_->setSize(newBitMapSizeInBytes); + } else { + // All bits were consumed, reset to 0 for all bits + std::memset( + (void*)(deleteBitmap_->asMutable()), + 0L, + deleteBitmap_->size()); + } + } + if (!positionalDeleteFileReaders_.empty()) { auto numBytes = bits::nbytes(size); dwio::common::ensureCapacity( - deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool()); - std::memset((void*)deleteBitmap_->as(), 0L, numBytes); + deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool(), true, true); for (auto iter = positionalDeleteFileReaders_.begin(); iter != positionalDeleteFileReaders_.end();) { - (*iter)->readDeletePositions( - baseReadOffset_, size, deleteBitmap_->asMutable()); - if ((*iter)->endOfFile()) { + (*iter)->readDeletePositions(baseReadOffset_, size, deleteBitmap_); + + if ((*iter)->noMoreData()) { iter = positionalDeleteFileReaders_.erase(iter); } else { ++iter; } } - - deleteBitmap_->setSize(numBytes); - mutation.deletedRows = deleteBitmap_->as(); } + mutation.deletedRows = deleteBitmap_ && deleteBitmap_->size() > 0 + ? deleteBitmap_->as() + : nullptr; + auto rowsScanned = baseRowReader_->next(size, output, &mutation); baseReadOffset_ += rowsScanned; + deleteBitmapBitOffset_ = rowsScanned; return rowsScanned; } diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index 4e308f47b6c4..b5ab7da64480 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -52,12 +52,13 @@ class IcebergSplitReader : public SplitReader { // The read offset to the beginning of the split in number of rows for the // current batch for the base data file uint64_t baseReadOffset_; - // The file position for the first row in the split uint64_t splitOffset_; - std::list> positionalDeleteFileReaders_; BufferPtr deleteBitmap_; + // The offset in bits of the deleteBitmap_ starting from where the bits shall + // be consumed + uint64_t deleteBitmapBitOffset_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp index 421d2f98bbf6..7414f9e26fe5 100644 --- a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp @@ -49,12 +49,9 @@ PositionalDeleteFileReader::PositionalDeleteFileReader( deleteRowReader_(nullptr), deletePositionsOutput_(nullptr), deletePositionsOffset_(0), - endOfFile_(false) { + totalNumRowsScanned_(0) { VELOX_CHECK(deleteFile_.content == FileContent::kPositionalDeletes); - - if (deleteFile_.recordCount == 0) { - return; - } + VELOX_CHECK(deleteFile_.recordCount); // TODO: check if the lowerbounds and upperbounds in deleteFile overlap with // this batch. If not, no need to proceed. @@ -137,13 +134,15 @@ PositionalDeleteFileReader::PositionalDeleteFileReader( void PositionalDeleteFileReader::readDeletePositions( uint64_t baseReadOffset, uint64_t size, - int8_t* deleteBitmap) { + BufferPtr deleteBitmapBuffer) { // We are going to read to the row number up to the end of the batch. For the // same base file, the deleted rows are in ascending order in the same delete - // file + // file. rowNumberUpperBound is the upperbound for the row number in this + // batch, excluding boundaries int64_t rowNumberUpperBound = splitOffset_ + baseReadOffset + size; - // Finish unused delete positions from last batch + // Finish unused delete positions from last batch. Note that at this point we + // don't know how many rows the base row reader would scan yet. if (deletePositionsOutput_ && deletePositionsOffset_ < deletePositionsOutput_->size()) { updateDeleteBitmap( @@ -151,7 +150,7 @@ void PositionalDeleteFileReader::readDeletePositions( ->childAt(0), baseReadOffset, rowNumberUpperBound, - deleteBitmap); + deleteBitmapBuffer); if (readFinishedForBatch(rowNumberUpperBound)) { return; @@ -166,14 +165,15 @@ void PositionalDeleteFileReader::readDeletePositions( // and update the delete bitmap auto outputType = posColumn_->type; - RowTypePtr outputRowType = ROW({posColumn_->name}, {posColumn_->type}); if (!deletePositionsOutput_) { deletePositionsOutput_ = BaseVector::create(outputRowType, 0, pool_); } - while (!readFinishedForBatch(rowNumberUpperBound)) { + do { auto rowsScanned = deleteRowReader_->next(size, deletePositionsOutput_); + totalNumRowsScanned_ += rowsScanned; + if (rowsScanned > 0) { VELOX_CHECK( !deletePositionsOutput_->mayHaveNulls(), @@ -184,42 +184,57 @@ void PositionalDeleteFileReader::readDeletePositions( deletePositionsOutput_->loadedVector(); deletePositionsOffset_ = 0; + // Convert the row numbers to set bits, up to rowNumberUpperBound. + // Beyond that the buffer of deleteBitMap is not available. updateDeleteBitmap( std::dynamic_pointer_cast(deletePositionsOutput_) ->childAt(0), baseReadOffset, rowNumberUpperBound, - deleteBitmap); + deleteBitmapBuffer); } } else { // Reaching the end of the file - endOfFile_ = true; deleteSplit_.reset(); - return; + break; } - } + } while (!readFinishedForBatch(rowNumberUpperBound)); } -bool PositionalDeleteFileReader::endOfFile() { - return endOfFile_; +bool PositionalDeleteFileReader::noMoreData() { + return totalNumRowsScanned_ >= deleteFile_.recordCount && + deletePositionsOutput_ && + deletePositionsOffset_ >= deletePositionsOutput_->size(); } void PositionalDeleteFileReader::updateDeleteBitmap( VectorPtr deletePositionsVector, uint64_t baseReadOffset, int64_t rowNumberUpperBound, - int8_t* deleteBitmap) { + BufferPtr deleteBitmapBuffer) { + auto deleteBitmap = deleteBitmapBuffer->asMutable(); + // Convert the positions in file into positions relative to the start of the // split. const int64_t* deletePositions = deletePositionsVector->as>()->rawValues(); int64_t offset = baseReadOffset + splitOffset_; + while (deletePositionsOffset_ < deletePositionsVector->size() && deletePositions[deletePositionsOffset_] < rowNumberUpperBound) { bits::setBit( deleteBitmap, deletePositions[deletePositionsOffset_] - offset); deletePositionsOffset_++; } + + // There might be multiple delete files for a single base file. The size of + // the deleteBitmapBuffer should be the largest position among all delte files + deleteBitmapBuffer->setSize(std::max( + (uint64_t)deleteBitmapBuffer->size(), + deletePositionsOffset_ == 0 + ? 0 + : bits::nbytes( + deletePositions[deletePositionsOffset_ - 1] + 1 - offset))); } bool PositionalDeleteFileReader::readFinishedForBatch( @@ -231,9 +246,14 @@ bool PositionalDeleteFileReader::readFinishedForBatch( const int64_t* deletePositions = deletePositionsVector->as>()->rawValues(); - if (deletePositionsOutput_->size() != 0 && - deletePositionsOffset_ < deletePositionsVector->size() && - deletePositions[deletePositionsOffset_] >= rowNumberUpperBound) { + // We've read enough of the delete positions from this delete file when 1) it + // reaches the end of the file, or 2) the last read delete position is greater + // than the largest base file row number that is going to be read in this + // batch + if (totalNumRowsScanned_ >= deleteFile_.recordCount || + (deletePositionsVector->size() != 0 && + (deletePositionsOffset_ < deletePositionsVector->size() && + deletePositions[deletePositionsOffset_] >= rowNumberUpperBound))) { return true; } return false; diff --git a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h index c83bedb9f62b..ba98845eb639 100644 --- a/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h +++ b/velox/connectors/hive/iceberg/PositionalDeleteFileReader.h @@ -50,16 +50,16 @@ class PositionalDeleteFileReader { void readDeletePositions( uint64_t baseReadOffset, uint64_t size, - int8_t* deleteBitmap); + BufferPtr deleteBitmap); - bool endOfFile(); + bool noMoreData(); private: void updateDeleteBitmap( VectorPtr deletePositionsVector, uint64_t baseReadOffset, int64_t rowNumberUpperBound, - int8_t* deleteBitmap); + BufferPtr deleteBitmapBuffer); bool readFinishedForBatch(int64_t rowNumberUpperBound); @@ -77,9 +77,17 @@ class PositionalDeleteFileReader { std::shared_ptr deleteSplit_; std::unique_ptr deleteRowReader_; + // The vector to hold the delete positions read from the positional delete + // file. These positions are relative to the start of the whole base data + // file. VectorPtr deletePositionsOutput_; + // The index of deletePositionsOutput_ that indicates up to where the delete + // positions have been converted into the bitmap uint64_t deletePositionsOffset_; - bool endOfFile_; + // Total number of rows read from this positional delete file reader, + // including the rows filtered out from filters on both filePathColumn_ and + // posColumn_. + uint64_t totalNumRowsScanned_; }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 2585d41b2737..798cf150aa1e 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -34,107 +34,186 @@ namespace facebook::velox::connector::hive::iceberg { class HiveIcebergTest : public HiveConnectorTestBase { public: - void assertPositionalDeletes( - const std::vector>& deleteRowsVec, - bool multipleBaseFiles = false) { - assertPositionalDeletesInternal( - deleteRowsVec, getQuery(deleteRowsVec), multipleBaseFiles, 1, 0); + HiveIcebergTest() + : config_{std::make_shared()} { + // Make the writers flush per batch so that we can create non-aligned + // RowGroups between the base data files and delete files + flushPolicyFactory_ = []() { + return std::make_unique([]() { return true; }); + }; } - void assertPositionalDeletes( - const std::vector>& deleteRowsVec, - std::string duckdbSql, - bool multipleBaseFiles = false) { - assertPositionalDeletesInternal( - deleteRowsVec, duckdbSql, multipleBaseFiles, 1, 0); + /// Create 1 base data file data_file_1 with 2 RowGroups of 10000 rows each. + /// Also create 1 delete file delete_file_1 which contains delete positions + /// for data_file_1. + void assertSingleBaseFileSingleDeleteFile( + const std::vector& deletePositionsVec) { + std::map> rowGroupSizesForFiles = { + {"data_file_1", {10000, 10000}}}; + std::unordered_map< + std::string, + std::multimap>> + deleteFilesForBaseDatafiles = { + {"delete_file_1", {{"data_file_1", deletePositionsVec}}}}; + + assertPositionalDeletes( + rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0); } - void assertPositionalDeletes( - const std::vector>& deleteRowsVec, - std::string duckdbSql, + /// Create 3 base data files, where the first file data_file_0 has 500 rows, + /// the second file data_file_1 contains 2 RowGroups of 10000 rows each, and + /// the third file data_file_2 contains 500 rows. It creates 1 positional + /// delete file delete_file_1, which contains delete positions for + /// data_file_1. + void assertMultipleBaseFileSingleDeleteFile( + const std::vector& deletePositionsVec) { + int64_t previousFileRowCount = 500; + int64_t afterFileRowCount = 500; + + assertPositionalDeletes( + { + {"data_file_0", {previousFileRowCount}}, + {"data_file_1", {10000, 10000}}, + {"data_file_2", {afterFileRowCount}}, + }, + {{"delete_file_1", {{"data_file_1", deletePositionsVec}}}}, + 0); + } + + /// Create 1 base data file data_file_1 with 2 RowGroups of 10000 rows each. + /// Create multiple delete files with name data_file_1, data_file_2, and so on + void assertSingleBaseFileMultipleDeleteFiles( + const std::vector>& deletePositionsVecs) { + std::map> rowGroupSizesForFiles = { + {"data_file_1", {10000, 10000}}}; + + std::unordered_map< + std::string, + std::multimap>> + deleteFilesForBaseDatafiles; + for (int i = 0; i < deletePositionsVecs.size(); i++) { + std::string deleteFileName = fmt::format("delete_file_{}", i); + deleteFilesForBaseDatafiles[deleteFileName] = { + {"data_file_1", deletePositionsVecs[i]}}; + } + assertPositionalDeletes( + rowGroupSizesForFiles, deleteFilesForBaseDatafiles, 0); + } + + void assertMultipleSplits( + const std::vector& deletePositions, int32_t splitCount, int32_t numPrefetchSplits) { - assertPositionalDeletesInternal( - deleteRowsVec, duckdbSql, true, splitCount, numPrefetchSplits); + std::map> rowGroupSizesForFiles; + for (int32_t i = 0; i < splitCount; i++) { + std::string dataFileName = fmt::format("data_file_{}", i); + rowGroupSizesForFiles[dataFileName] = {rowCount}; + } + + std::unordered_map< + std::string, + std::multimap>> + deleteFilesForBaseDatafiles; + for (int i = 0; i < splitCount; i++) { + std::string deleteFileName = fmt::format("delete_file_{}", i); + deleteFilesForBaseDatafiles[deleteFileName] = { + {fmt::format("data_file_{}", i), deletePositions}}; + } + + assertPositionalDeletes( + rowGroupSizesForFiles, deleteFilesForBaseDatafiles, numPrefetchSplits); } - std::vector makeRandomDeleteRows(int32_t maxRowNumber) { + std::vector makeRandomIncreasingValues(int64_t begin, int64_t end) { + VELOX_CHECK(begin < end); + std::mt19937 gen{0}; - std::vector deleteRows; - for (int i = 0; i < maxRowNumber; i++) { + std::vector values; + values.reserve(end - begin); + for (int i = begin; i < end; i++) { if (folly::Random::rand32(0, 10, gen) > 8) { - deleteRows.push_back(i); + values.push_back(i); } } - return deleteRows; + return values; } - std::vector makeSequenceRows(int32_t maxRowNumber) { - std::vector deleteRows; - deleteRows.resize(maxRowNumber); - std::iota(deleteRows.begin(), deleteRows.end(), 0); - return deleteRows; + std::vector makeContinuousIncreasingValues( + int64_t begin, + int64_t end) { + std::vector values; + values.resize(end - begin); + std::iota(values.begin(), values.end(), begin); + return values; } - std::vector> makeContinuousRows( - int32_t min, - int32_t max) { - std::vector> deleteRows( - max - min + 1, std::vector(1)); - for (auto i = min; i <= max; i++) { - deleteRows[i - min][0] = i; - } - return deleteRows; - } + /// @rowGroupSizesForFiles The key is the file name, and the value is a vector + /// of RowGroup sizes + /// @deleteFilesForBaseDatafiles The key is the delete file name, and the + /// value contains the information about the content of this delete file. + /// e.g. { + /// "delete_file_1", + /// { + /// {"data_file_1", {1, 2, 3}}, + /// {"data_file_1", {4, 5, 6}}, + /// {"data_file_2", {0, 2, 4}} + /// } + /// } + /// represents one delete file called delete_file_1, which contains delete + /// positions for data_file_1 and data_file_2. THere are 3 RowGroups in this + /// delete file, the first two contain positions for data_file_1, and the last + /// contain positions for data_file_2 + void assertPositionalDeletes( + const std::map>& rowGroupSizesForFiles, + const std::unordered_map< + std::string, + std::multimap>>& + deleteFilesForBaseDatafiles, + int32_t numPrefetchSplits = 0) { + // Keep the reference to the deleteFilePath, otherwise the corresponding + // file will be deleted. + std::map> dataFilePaths = + writeDataFiles(rowGroupSizesForFiles); + std::unordered_map< + std::string, + std::pair>> + deleteFilePaths = writePositionDeleteFiles( + deleteFilesForBaseDatafiles, dataFilePaths); - std::string getQuery(const std::vector>& deleteRowsVec) { - return "SELECT * FROM tmp WHERE c0 NOT IN (" + - makeNotInList(deleteRowsVec) + ")"; - } + std::vector> splits; - const static int rowCount = 20000; + for (const auto& dataFile : dataFilePaths) { + std::string baseFileName = dataFile.first; + std::string baseFilePath = dataFile.second->getPath(); - private: - void assertPositionalDeletesInternal( - const std::vector>& deleteRowsVec, - std::string duckdbSql, - bool multipleBaseFiles, - int32_t splitCount, - int32_t numPrefetchSplits) { - auto dataFilePaths = writeDataFile(splitCount, rowCount); - std::vector> splits; - // Keep the reference to the deleteFilePath, otherwise the corresponding - // file will be deleted. - std::vector> deleteFilePaths; - for (const auto& dataFilePath : dataFilePaths) { - std::mt19937 gen{0}; - int64_t numDeleteRowsBefore = - multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; - int64_t numDeleteRowsAfter = - multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; std::vector deleteFiles; - deleteFiles.reserve(deleteRowsVec.size()); - for (auto const& deleteRows : deleteRowsVec) { - std::shared_ptr deleteFilePath = writePositionDeleteFile( - dataFilePath->getPath(), - deleteRows, - numDeleteRowsBefore, - numDeleteRowsAfter); - auto path = deleteFilePath->getPath(); - IcebergDeleteFile deleteFile( - FileContent::kPositionalDeletes, - deleteFilePath->getPath(), - fileFomat_, - deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter, - testing::internal::GetFileSize(std::fopen(path.c_str(), "r"))); - deleteFilePaths.emplace_back(deleteFilePath); - deleteFiles.emplace_back(deleteFile); + + for (auto const& deleteFile : deleteFilesForBaseDatafiles) { + std::string deleteFileName = deleteFile.first; + std::multimap> deleteFileContent = + deleteFile.second; + + if (deleteFileContent.count(baseFileName) != 0) { + // If this delete file contains rows for the target base file, then + // add it to the split + auto deleteFilePath = + deleteFilePaths[deleteFileName].second->getPath(); + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath, + fileFomat_, + deleteFilePaths[deleteFileName].first, + testing::internal::GetFileSize( + std::fopen(deleteFilePath.c_str(), "r"))); + deleteFiles.push_back(deleteFile); + } } - splits.emplace_back( - makeIcebergSplit(dataFilePath->getPath(), deleteFiles)); + splits.emplace_back(makeIcebergSplit(baseFilePath, deleteFiles)); } + std::string duckdbSql = + getDuckDBQuery(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); auto plan = tableScanNode(); auto task = HiveConnectorTestBase::assertQuery( plan, splits, duckdbSql, numPrefetchSplits); @@ -146,6 +225,116 @@ class HiveIcebergTest : public HiveConnectorTestBase { ASSERT_TRUE(it->second.peakMemoryBytes > 0); } + const static int rowCount = 20000; + + private: + std::map> writeDataFiles( + std::map> rowGroupSizesForFiles) { + std::map> dataFilePaths; + + std::vector dataVectorsJoined; + dataVectorsJoined.reserve(rowGroupSizesForFiles.size()); + + int64_t startingValue = 0; + for (auto& dataFile : rowGroupSizesForFiles) { + dataFilePaths[dataFile.first] = TempFilePath::create(); + + // We make the values are continuously increasing even across base data + // files. This is to make constructing DuckDB queries easier + std::vector dataVectors = + makeVectors(dataFile.second, startingValue); + writeToFile( + dataFilePaths[dataFile.first]->getPath(), + dataVectors, + config_, + flushPolicyFactory_); + + for (int i = 0; i < dataVectors.size(); i++) { + dataVectorsJoined.push_back(dataVectors[i]); + } + } + + createDuckDbTable(dataVectorsJoined); + return dataFilePaths; + } + + /// Input is like <"deleteFile1", <"dataFile1", {pos_RG1, pos_RG2,..}>, + /// <"dataFile2", {pos_RG1, pos_RG2,..}> + std::unordered_map< + std::string, + std::pair>> + writePositionDeleteFiles( + const std::unordered_map< + std::string, // delete file name + std::multimap< + std::string, + std::vector>>& + deleteFilesForBaseDatafiles, // + std::map> baseFilePaths) { + std::unordered_map< + std::string, + std::pair>> + deleteFilePaths; + deleteFilePaths.reserve(deleteFilesForBaseDatafiles.size()); + + for (auto& deleteFile : deleteFilesForBaseDatafiles) { + auto deleteFileName = deleteFile.first; + auto deleteFileContent = deleteFile.second; + auto deleteFilePath = TempFilePath::create(); + + std::vector deleteFileVectors; + int64_t totalPositionsInDeleteFile = 0; + + for (auto& deleteFileRowGroup : deleteFileContent) { + auto baseFileName = deleteFileRowGroup.first; + auto baseFilePath = baseFilePaths[baseFileName]->getPath(); + auto positionsInRowGroup = deleteFileRowGroup.second; + + auto filePathVector = makeFlatVector( + static_cast(positionsInRowGroup.size()), + [&](vector_size_t row) { return baseFilePath; }); + auto deletePosVector = makeFlatVector(positionsInRowGroup); + + RowVectorPtr deleteFileVector = makeRowVector( + {pathColumn_->name, posColumn_->name}, + {filePathVector, deletePosVector}); + + deleteFileVectors.push_back(deleteFileVector); + totalPositionsInDeleteFile += positionsInRowGroup.size(); + } + + writeToFile( + deleteFilePath->getPath(), + deleteFileVectors, + config_, + flushPolicyFactory_); + + deleteFilePaths[deleteFileName] = + std::make_pair(totalPositionsInDeleteFile, deleteFilePath); + } + + return deleteFilePaths; + } + + std::vector makeVectors( + std::vector vectorSizes, + int64_t& startingValue) { + std::vector vectors; + vectors.reserve(vectorSizes.size()); + + vectors.reserve(vectorSizes.size()); + for (int j = 0; j < vectorSizes.size(); j++) { + auto data = makeContinuousIncreasingValues( + startingValue, startingValue + vectorSizes[j]); + VectorPtr c0 = vectorMaker_.flatVector(data); + vectors.push_back(makeRowVector({"c0"}, {c0})); + startingValue += vectorSizes[j]; + } + + return vectors; + } + std::shared_ptr makeIcebergSplit( const std::string& dataFilePath, const std::vector& deleteFiles = {}) { @@ -170,106 +359,114 @@ class HiveIcebergTest : public HiveConnectorTestBase { deleteFiles); } - std::vector makeVectors(int32_t count, int32_t rowsPerVector) { - std::vector vectors; - for (int i = 0; i < count; i++) { - auto data = makeSequenceRows(rowsPerVector); - VectorPtr c0 = vectorMaker_.flatVector(data); - vectors.push_back(makeRowVector({"c0"}, {c0})); + std::string getDuckDBQuery( + const std::map>& rowGroupSizesForFiles, + const std::unordered_map< + std::string, + std::multimap>>& + deleteFilesForBaseDatafiles) { + int64_t totalNumRowsInAllBaseFiles = 0; + std::map baseFileSizes; + for (auto rowGroupSizesInFile : rowGroupSizesForFiles) { + // Sum up the row counts in all RowGroups in each base file + baseFileSizes[rowGroupSizesInFile.first] += std::accumulate( + rowGroupSizesInFile.second.begin(), + rowGroupSizesInFile.second.end(), + 0LL); + totalNumRowsInAllBaseFiles += baseFileSizes[rowGroupSizesInFile.first]; } - return vectors; - } + // Group the delete vectors by baseFileName + std::map>> + deletePosVectorsForAllBaseFiles; + for (auto deleteFile : deleteFilesForBaseDatafiles) { + auto deleteFileContent = deleteFile.second; + for (auto rowGroup : deleteFileContent) { + auto baseFileName = rowGroup.first; + deletePosVectorsForAllBaseFiles[baseFileName].push_back( + rowGroup.second); + } + } - std::vector> writeDataFile( - int32_t splitCount, - uint64_t numRows) { - auto dataVectors = makeVectors(splitCount, numRows); - std::vector> dataFilePaths; - dataFilePaths.reserve(dataVectors.size()); - for (auto i = 0; i < dataVectors.size(); i++) { - dataFilePaths.emplace_back(TempFilePath::create()); - writeToFile(dataFilePaths.back()->getPath(), dataVectors[i]); + // Flatten and deduplicate the delete position vectors in + // deletePosVectorsForAllBaseFiles from previous step, and count the total + // number of distinct delete positions for all base files + std::map> + flattenedDeletePosVectorsForAllBaseFiles; + int64_t totalNumDeletePositions = 0; + for (auto deleteVectorsForBaseFile : deletePosVectorsForAllBaseFiles) { + auto baseFileName = deleteVectorsForBaseFile.first; + auto deletePositionVectors = deleteVectorsForBaseFile.second; + std::vector deletePositionVector = + flattenAndDedup(deletePositionVectors, baseFileSizes[baseFileName]); + flattenedDeletePosVectorsForAllBaseFiles[baseFileName] = + deletePositionVector; + totalNumDeletePositions += deletePositionVector.size(); } - createDuckDbTable(dataVectors); - return dataFilePaths; - } - std::shared_ptr writePositionDeleteFile( - const std::string& dataFilePath, - const std::vector& deleteRows, - int64_t numRowsBefore = 0, - int64_t numRowsAfter = 0) { - // if containsMultipleDataFiles == true, we will write rows for other base - // files before and after the target base file - uint32_t numDeleteRows = numRowsBefore + deleteRows.size() + numRowsAfter; - - std::string dataFilePathBefore = dataFilePath + "_before"; - std::string dataFilePathAfter = dataFilePath + "_after"; - - auto filePathVector = - vectorMaker_.flatVector(numDeleteRows, [&](auto row) { - if (row < numRowsBefore) { - return StringView(dataFilePathBefore); - } else if ( - row >= numRowsBefore && row < deleteRows.size() + numRowsBefore) { - return StringView(dataFilePath); - } else if ( - row >= deleteRows.size() + numRowsBefore && row < numDeleteRows) { - return StringView(dataFilePathAfter); - } else { - return StringView(); + // Now build the DuckDB queries + if (totalNumDeletePositions == 0) { + return "SELECT * FROM tmp"; + } else if (totalNumDeletePositions >= totalNumRowsInAllBaseFiles) { + return "SELECT * FROM tmp WHERE 1 = 0"; + } else { + // Convert the delete positions in all base files into column values + std::vector allDeleteValues; + + int64_t numRowsInPreviousBaseFiles = 0; + for (auto baseFileSize : baseFileSizes) { + auto deletePositions = + flattenedDeletePosVectorsForAllBaseFiles[baseFileSize.first]; + + if (numRowsInPreviousBaseFiles > 0) { + for (int64_t& deleteValue : deletePositions) { + deleteValue += numRowsInPreviousBaseFiles; } - }); + } - std::vector deleteRowsVec; - deleteRowsVec.reserve(numDeleteRows); + allDeleteValues.insert( + allDeleteValues.end(), + deletePositions.begin(), + deletePositions.end()); - if (numRowsBefore > 0) { - auto rowsBefore = makeSequenceRows(numRowsBefore); - deleteRowsVec.insert( - deleteRowsVec.end(), rowsBefore.begin(), rowsBefore.end()); - } - deleteRowsVec.insert( - deleteRowsVec.end(), deleteRows.begin(), deleteRows.end()); - if (numRowsAfter > 0) { - auto rowsAfter = makeSequenceRows(numRowsAfter); - deleteRowsVec.insert( - deleteRowsVec.end(), rowsAfter.begin(), rowsAfter.end()); + numRowsInPreviousBaseFiles += baseFileSize.second; + } + + return fmt::format( + "SELECT * FROM tmp WHERE c0 NOT IN ({})", + makeNotInList(allDeleteValues)); } + } - auto deletePositionsVector = - vectorMaker_.flatVector(deleteRowsVec); - RowVectorPtr deleteFileVectors = makeRowVector( - {pathColumn_->name, posColumn_->name}, - {filePathVector, deletePositionsVector}); + std::vector flattenAndDedup( + const std::vector>& deletePositionVectors, + int64_t baseFileSize) { + std::vector deletePositionVector; + for (auto vec : deletePositionVectors) { + for (auto pos : vec) { + if (pos >= 0 && pos < baseFileSize) { + deletePositionVector.push_back(pos); + } + } + } - auto deleteFilePath = TempFilePath::create(); - writeToFile(deleteFilePath->getPath(), deleteFileVectors); + std::sort(deletePositionVector.begin(), deletePositionVector.end()); + auto last = + std::unique(deletePositionVector.begin(), deletePositionVector.end()); + deletePositionVector.erase(last, deletePositionVector.end()); - return deleteFilePath; + return deletePositionVector; } - std::string makeNotInList( - const std::vector>& deleteRowsVec) { - std::vector deleteRows; - size_t totalSize = 0; - for (const auto& subVec : deleteRowsVec) { - totalSize += subVec.size(); - } - deleteRows.reserve(totalSize); - for (const auto& subVec : deleteRowsVec) { - deleteRows.insert(deleteRows.end(), subVec.begin(), subVec.end()); - } - - if (deleteRows.empty()) { + std::string makeNotInList(const std::vector& deletePositionVector) { + if (deletePositionVector.empty()) { return ""; } return std::accumulate( - deleteRows.begin() + 1, - deleteRows.end(), - std::to_string(deleteRows[0]), + deletePositionVector.begin() + 1, + deletePositionVector.end(), + std::to_string(deletePositionVector[0]), [](const std::string& a, int64_t b) { return a + ", " + std::to_string(b); }); @@ -280,6 +477,9 @@ class HiveIcebergTest : public HiveConnectorTestBase { } dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; + std::shared_ptr config_; + std::function()> flushPolicyFactory_; + RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; std::shared_ptr pathColumn_ = IcebergMetadataColumn::icebergDeleteFilePathColumn(); @@ -287,64 +487,174 @@ class HiveIcebergTest : public HiveConnectorTestBase { IcebergMetadataColumn::icebergDeletePosColumn(); }; -TEST_F(HiveIcebergTest, positionalDeletesSingleBaseFile) { +/// This test creates one single data file and one delete file. The parameter +/// passed to assertSingleBaseFileSingleDeleteFile is the delete positions. +TEST_F(HiveIcebergTest, singleBaseFileSinglePositionalDeleteFile) { folly::SingletonVault::singleton()->registrationComplete(); - // Delete row 0, 1, 2, 3 from the first batch out of two. - assertPositionalDeletes({{0, 1, 2, 3}}); + assertSingleBaseFileSingleDeleteFile({{0, 1, 2, 3}}); // Delete the first and last row in each batch (10000 rows per batch) - assertPositionalDeletes({{0, 9999, 10000, 19999}}); + assertSingleBaseFileSingleDeleteFile({{0, 9999, 10000, 19999}}); // Delete several rows in the second batch (10000 rows per batch) - assertPositionalDeletes({{10000, 10002, 19999}}); + assertSingleBaseFileSingleDeleteFile({{10000, 10002, 19999}}); // Delete random rows - assertPositionalDeletes({makeRandomDeleteRows(rowCount)}); + assertSingleBaseFileSingleDeleteFile({makeRandomIncreasingValues(0, 20000)}); // Delete 0 rows - assertPositionalDeletes({}, "SELECT * FROM tmp", false); + assertSingleBaseFileSingleDeleteFile({}); // Delete all rows - assertPositionalDeletes( - {makeSequenceRows(rowCount)}, "SELECT * FROM tmp WHERE 1 = 0", false); + assertSingleBaseFileSingleDeleteFile( + {makeContinuousIncreasingValues(0, 20000)}); // Delete rows that don't exist - assertPositionalDeletes({{20000, 29999}}); + assertSingleBaseFileSingleDeleteFile({{20000, 29999}}); } -// The positional delete file contains rows from multiple base files -TEST_F(HiveIcebergTest, positionalDeletesMultipleBaseFiles) { +/// This test creates 3 base data files, only the middle one has corresponding +/// delete positions. The parameter passed to +/// assertSingleBaseFileSingleDeleteFile is the delete positions.for the middle +/// base file. +TEST_F(HiveIcebergTest, MultipleBaseFilesSinglePositionalDeleteFile) { folly::SingletonVault::singleton()->registrationComplete(); - // Delete row 0, 1, 2, 3 from the first batch out of two. - assertPositionalDeletes({{0, 1, 2, 3}}, true); - // Delete the first and last row in each batch (10000 rows per batch) - assertPositionalDeletes({{0, 9999, 10000, 19999}}, true); - // Delete several rows in the second batch (10000 rows per batch) - assertPositionalDeletes({{10000, 10002, 19999}}, true); - // Delete random rows - assertPositionalDeletes({makeRandomDeleteRows(rowCount)}, true); - // Delete 0 rows - assertPositionalDeletes({}, "SELECT * FROM tmp", true); - // Delete all rows - assertPositionalDeletes( - {makeSequenceRows(rowCount)}, "SELECT * FROM tmp WHERE 1 = 0", true); - // Delete rows that don't exist - assertPositionalDeletes({{20000, 29999}}, true); + assertMultipleBaseFileSingleDeleteFile({0, 1, 2, 3}); + assertMultipleBaseFileSingleDeleteFile({0, 9999, 10000, 19999}); + assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999}); + assertMultipleBaseFileSingleDeleteFile({10000, 10002, 19999}); + assertMultipleBaseFileSingleDeleteFile( + makeRandomIncreasingValues(0, rowCount)); + assertMultipleBaseFileSingleDeleteFile({}); + assertMultipleBaseFileSingleDeleteFile( + makeContinuousIncreasingValues(0, rowCount)); } -// The base file has multiple delete files. -TEST_F(HiveIcebergTest, baseFileMultiplePositionalDeletes) { +/// This test creates one base data file/split with multiple delete files. The +/// parameter passed to assertSingleBaseFileMultipleDeleteFiles is the vector of +/// delete files. Each leaf vector represents the delete positions in that +/// delete file. +TEST_F(HiveIcebergTest, singleBaseFileMultiplePositionalDeleteFiles) { folly::SingletonVault::singleton()->registrationComplete(); // Delete row 0, 1, 2, 3 from the first batch out of two. - assertPositionalDeletes({{1}, {2}, {3}, {4}}); + assertSingleBaseFileMultipleDeleteFiles({{1}, {2}, {3}, {4}}); // Delete the first and last row in each batch (10000 rows per batch). - assertPositionalDeletes({{0}, {9999}, {10000}, {19999}}); + assertSingleBaseFileMultipleDeleteFiles({{0}, {9999}, {10000}, {19999}}); + + assertSingleBaseFileMultipleDeleteFiles( + {makeRandomIncreasingValues(0, 10000), + makeRandomIncreasingValues(10000, 20000), + makeRandomIncreasingValues(5000, 15000)}); + + assertSingleBaseFileMultipleDeleteFiles( + {makeContinuousIncreasingValues(0, 10000), + makeContinuousIncreasingValues(10000, 20000)}); + + assertSingleBaseFileMultipleDeleteFiles( + {makeContinuousIncreasingValues(0, 10000), + makeContinuousIncreasingValues(10000, 20000), + makeRandomIncreasingValues(5000, 15000)}); + + assertSingleBaseFileMultipleDeleteFiles( + {makeContinuousIncreasingValues(0, 20000), + makeContinuousIncreasingValues(0, 20000)}); + + assertSingleBaseFileMultipleDeleteFiles( + {makeRandomIncreasingValues(0, 20000), + {}, + makeRandomIncreasingValues(5000, 15000)}); + + assertSingleBaseFileMultipleDeleteFiles({{}, {}}); +} + +/// This test creates 2 base data files, and 1 or 2 delete files, with unaligned +/// RowGroup boundaries +TEST_F(HiveIcebergTest, multipleBaseFileMultiplePositionalDeleteFiles) { + folly::SingletonVault::singleton()->registrationComplete(); + + std::map> rowGroupSizesForFiles; + std::unordered_map< + std::string, + std::multimap>> + deleteFilesForBaseDatafiles; + + // Create two data files, each with two RowGroups + rowGroupSizesForFiles["data_file_1"] = {100, 85}; + rowGroupSizesForFiles["data_file_2"] = {99, 1}; + + // Delete 3 rows from the first RowGroup in data_file_1 + deleteFilesForBaseDatafiles["delete_file_1"] = {{"data_file_1", {0, 1, 99}}}; + assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); + + // Delete 3 rows from the second RowGroup in data_file_1 + deleteFilesForBaseDatafiles["delete_file_1"] = { + {"data_file_1", {100, 101, 184}}}; + assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); + + // Delete random rows from the both RowGroups in data_file_1 + deleteFilesForBaseDatafiles["delete_file_1"] = { + {"data_file_1", makeRandomIncreasingValues(0, 185)}}; + assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); + + // Delete all rows in data_file_1 + deleteFilesForBaseDatafiles["delete_file_1"] = { + {"data_file_1", makeContinuousIncreasingValues(0, 185)}}; + assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); + // + // Delete non-existent rows from data_file_1 + deleteFilesForBaseDatafiles["delete_file_1"] = { + {"data_file_1", makeRandomIncreasingValues(186, 300)}}; + assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); + + // Delete several rows from both RowGroups in both data files + deleteFilesForBaseDatafiles.clear(); + deleteFilesForBaseDatafiles["delete_file_1"] = { + {"data_file_1", {0, 100, 102, 184}}, {"data_file_2", {1, 98, 99}}}; + assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); + + // The delete file delete_file_1 contains 3 RowGroups itself, with the first 3 + // deleting some repeating rows in data_file_1, and the last 2 RowGroups + // deleting some repeating rows in data_file_2 + deleteFilesForBaseDatafiles.clear(); + deleteFilesForBaseDatafiles["delete_file_1"] = { + {"data_file_1", {0, 1, 2, 3}}, + {"data_file_1", {1, 2, 3, 4}}, + {"data_file_1", makeRandomIncreasingValues(0, 185)}, + {"data_file_2", {1, 3, 5, 7}}, + {"data_file_2", makeRandomIncreasingValues(0, 100)}}; + assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); + + // delete_file_2 contains non-overlapping delete rows for each data files in + // each RowGroup + deleteFilesForBaseDatafiles.clear(); + deleteFilesForBaseDatafiles["delete_file_1"] = { + {"data_file_1", {0, 1, 2, 3}}, {"data_file_2", {1, 3, 5, 7}}}; + deleteFilesForBaseDatafiles["delete_file_2"] = { + {"data_file_1", {1, 2, 3, 4}}, + {"data_file_1", {98, 99, 100, 101, 184}}, + {"data_file_2", {3, 5, 7, 9}}, + {"data_file_2", {98, 99, 100}}}; + assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); + + // Two delete files each containing overlapping delete rows for both data + // files + deleteFilesForBaseDatafiles.clear(); + deleteFilesForBaseDatafiles["delete_file_1"] = { + {"data_file_1", makeRandomIncreasingValues(0, 185)}, + {"data_file_2", makeRandomIncreasingValues(0, 100)}}; + deleteFilesForBaseDatafiles["delete_file_2"] = { + {"data_file_1", makeRandomIncreasingValues(10, 120)}, + {"data_file_2", makeRandomIncreasingValues(50, 100)}}; + assertPositionalDeletes(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); } TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) { folly::SingletonVault::singleton()->registrationComplete(); - constexpr int32_t splitCount = 50; - constexpr int32_t numPrefetchSplits = 10; - std::vector> deletedRows = {{1}, {2}, {3, 4}}; - assertPositionalDeletes( - deletedRows, getQuery(deletedRows), splitCount, numPrefetchSplits); + + assertMultipleSplits({1, 2, 3, 4}, 10, 5); + assertMultipleSplits({1, 2, 3, 4}, 10, 0); + assertMultipleSplits({1, 2, 3, 4}, 10, 10); + assertMultipleSplits({0, 9999, 10000, 19999}, 10, 3); + assertMultipleSplits(makeRandomIncreasingValues(0, 20000), 10, 3); + assertMultipleSplits(makeContinuousIncreasingValues(0, 20000), 10, 3); + assertMultipleSplits({}, 10, 3); } } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/dwio/common/BufferUtil.h b/velox/dwio/common/BufferUtil.h index 92b468b0e5bc..38c5a89bc692 100644 --- a/velox/dwio/common/BufferUtil.h +++ b/velox/dwio/common/BufferUtil.h @@ -24,10 +24,31 @@ template inline void ensureCapacity( BufferPtr& data, size_t capacity, - velox::memory::MemoryPool* pool) { - if (!data || !data->isMutable() || - data->capacity() < BaseVector::byteSize(capacity)) { + velox::memory::MemoryPool* pool, + bool preserveOldData = false, + bool clearBits = false) { + size_t oldSize = 0; + if (!data) { data = AlignedBuffer::allocate(capacity, pool); + } else { + oldSize = data->size(); + if (!data->isMutable() || + data->capacity() < BaseVector::byteSize(capacity)) { + oldSize = data->size(); + auto newData = AlignedBuffer::allocate(capacity, pool); + if (preserveOldData) { + std::memcpy( + newData->template asMutable(), + data->as(), + data->size()); + } + data = newData; + } + } + + if (clearBits) { + std::memset( + (void*)(data->asMutable() + oldSize), 0L, capacity - oldSize); } } diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index 5d492d7cfaa7..d83770714c4c 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -68,15 +68,24 @@ void HiveConnectorTestBase::writeToFile( void HiveConnectorTestBase::writeToFile( const std::string& filePath, const std::vector& vectors, - std::shared_ptr config) { - writeToFile(filePath, vectors, std::move(config), vectors[0]->type()); + std::shared_ptr config, + const std::function()>& + flushPolicyFactory) { + writeToFile( + filePath, + vectors, + std::move(config), + vectors[0]->type(), + flushPolicyFactory); } void HiveConnectorTestBase::writeToFile( const std::string& filePath, const std::vector& vectors, std::shared_ptr config, - const TypePtr& schema) { + const TypePtr& schema, + const std::function()>& + flushPolicyFactory) { velox::dwrf::WriterOptions options; options.config = config; options.schema = schema; @@ -85,6 +94,8 @@ void HiveConnectorTestBase::writeToFile( std::move(localWriteFile), filePath); auto childPool = rootPool_->addAggregateChild("HiveConnectorTestBase.Writer"); options.memoryPool = childPool.get(); + options.flushPolicyFactory = flushPolicyFactory; + facebook::velox::dwrf::Writer writer{std::move(sink), options}; for (size_t i = 0; i < vectors.size(); ++i) { writer.write(vectors[i]); diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 7237b8e78ea3..83666d17a748 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -20,6 +20,7 @@ #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/dwio/dwrf/common/Config.h" +#include "velox/dwio/dwrf/writer/FlushPolicy.h" #include "velox/exec/Operator.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/TempFilePath.h" @@ -47,13 +48,17 @@ class HiveConnectorTestBase : public OperatorTestBase { const std::string& filePath, const std::vector& vectors, std::shared_ptr config = - std::make_shared()); + std::make_shared(), + const std::function()>& + flushPolicyFactory = nullptr); void writeToFile( const std::string& filePath, const std::vector& vectors, std::shared_ptr config, - const TypePtr& schema); + const TypePtr& schema, + const std::function()>& + flushPolicyFactory = nullptr); std::vector makeVectors( const RowTypePtr& rowType,