Skip to content

Commit

Permalink
Add subfield filter for the delete file path column
Browse files Browse the repository at this point in the history
  • Loading branch information
yingsu00 committed Nov 15, 2023
1 parent daeeae6 commit 2abbfc3
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
8 changes: 7 additions & 1 deletion velox/connectors/hive/iceberg/DeleteFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h"
#include "velox/type/Filter.h"

namespace facebook::velox::connector::hive::iceberg {
DeleteFileReader::DeleteFileReader(
Expand Down Expand Up @@ -82,7 +83,12 @@ void DeleteFileReader::createPositionalDeleteDataSource(
// TODO: Build filters on the path column: filePathColumn = baseFilePath_
// TODO: Build filters on the positionsColumn:
// positionsColumn >= baseReadOffset_ + splitOffsetInFile
SubfieldFilters subfieldFilters = {};
SubfieldFilters subfieldFilters;
std::vector<std::string> values = {baseFilePath_};
std::unique_ptr<common::Filter> pathFilter =
std::make_unique<common::BytesValues>(values, false);
subfieldFilters[common::Subfield(filePathColumn->name)] =
std::move(pathFilter);

auto deleteTableHandle = std::make_shared<HiveTableHandle>(
connectorId, deleteFileName, false, std::move(subfieldFilters), nullptr);
Expand Down
55 changes: 47 additions & 8 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,20 @@ class HiveIcebergTest : public HiveConnectorTestBase {
}
void assertPositionalDeletes(
const std::vector<int64_t>& deleteRows,
std::string duckdbSql) {
std::string duckdbSql,
bool multipleBaseFiles = false) {
std::shared_ptr<TempFilePath> dataFilePath = writeDataFile(rowCount);
std::shared_ptr<TempFilePath> deleteFilePath =
writePositionDeleteFile(dataFilePath->path, deleteRows);

std::mt19937 gen{0};
int64_t numDeleteRowsBefore =
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
int64_t numDeleteRowsAfter =
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
std::shared_ptr<TempFilePath> deleteFilePath = writePositionDeleteFile(
dataFilePath->path,
deleteRows,
numDeleteRowsBefore,
numDeleteRowsAfter);

IcebergDeleteFile deleteFile(
FileContent::kPositionalDeletes,
Expand Down Expand Up @@ -135,14 +145,43 @@ class HiveIcebergTest : public HiveConnectorTestBase {

std::shared_ptr<TempFilePath> writePositionDeleteFile(
const std::string& dataFilePath,
const std::vector<int64_t>& deleteRows) {
uint32_t numDeleteRows = deleteRows.size();
const std::vector<int64_t>& deleteRows,
int64_t numRowsBefore = 0,
int64_t numRowsAfter = 0) {
// if containsMultipleDataFiles == true, we will write rows for other base
// files before and after the target base file
uint32_t numDeleteRows = numRowsBefore + deleteRows.size() + numRowsAfter;

auto child = vectorMaker_.flatVector<int64_t>(std::vector<int64_t>{1UL});

auto filePathVector = vectorMaker_.flatVector<StringView>(
numDeleteRows, [&](auto row) { return StringView(dataFilePath); });
auto deletePositionsVector = vectorMaker_.flatVector<int64_t>(deleteRows);
auto filePathVector =
vectorMaker_.flatVector<StringView>(numDeleteRows, [&](auto row) {
if (row < numRowsBefore) {
std::string dataFilePathBefore = dataFilePath + "_before";
return StringView(dataFilePathBefore);
} else if (
row >= numRowsBefore && row < deleteRows.size() + numRowsBefore) {
return StringView(dataFilePath);
} else if (row >= deleteRows.size() + numRowsBefore) {
std::string dataFilePathAfter = dataFilePath + "_after";
return StringView(dataFilePathAfter);
}
});

std::vector<int64_t> deleteRowsVec;
deleteRowsVec.resize(numDeleteRows);

if (numRowsBefore > 0) {
auto rowsBefore = makeRandomDeleteRows(numRowsBefore);
deleteRowsVec.insert(deleteRowsVec.end(), rowsBefore.begin(), rowsBefore.end());
}
deleteRowsVec.insert(deleteRowsVec.end(), deleteRows.begin(), deleteRows.end());
if (numRowsAfter > 0) {
auto rowsAfter = makeRandomDeleteRows(numRowsAfter);
deleteRowsVec.insert(deleteRowsVec.end(), rowsAfter.begin(), rowsAfter.end());
}

auto deletePositionsVector = vectorMaker_.flatVector<int64_t>(deleteRowsVec);
RowVectorPtr deleteFileVectors = makeRowVector(
{pathColumn_->name, posColumn_->name},
{filePathVector, deletePositionsVector});
Expand Down

0 comments on commit 2abbfc3

Please sign in to comment.