Skip to content

Commit

Permalink
Ensure ConnectorQueryCtx does not become a dangling pointer when Spli…
Browse files Browse the repository at this point in the history
…tPreload is enabled (facebookincubator#9544)

Summary:
Fixes: facebookincubator#9545
Store `shared_ptr<ConnectorQueryCtx>` instead of` ConnectorQueryCtx*` in `SplitReader`
to prevent dangling pointer issue when split preloading enabled.

Pull Request resolved: facebookincubator#9544

Reviewed By: pedroerp

Differential Revision: D56420990

Pulled By: Yuhta

fbshipit-source-id: c30f4b4556b8bdfb1120fe48331fbe93f83d902a
  • Loading branch information
zhli1142015 authored and facebook-github-bot committed Apr 25, 2024
1 parent d97ddb4 commit 61d718d
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 73 deletions.
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ void HiveDataSource::setFromDataSource(
source->scanSpec_->moveAdaptationFrom(*scanSpec_);
scanSpec_ = std::move(source->scanSpec_);
splitReader_ = std::move(source->splitReader_);
splitReader_->setConnectorQueryCtx(connectorQueryCtx_);
// New io will be accounted on the stats of 'source'. Add the existing
// balance to that.
source->ioStats_->merge(*ioStats_);
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ bool SplitReader::allPrefetchIssued() const {
return baseRowReader_ && baseRowReader_->allPrefetchIssued();
}

void SplitReader::setConnectorQueryCtx(
const ConnectorQueryCtx* connectorQueryCtx) {
connectorQueryCtx_ = connectorQueryCtx;
}

std::string SplitReader::toString() const {
std::string partitionKeys;
std::for_each(
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class SplitReader {

bool allPrefetchIssued() const;

void setConnectorQueryCtx(const ConnectorQueryCtx* connectorQueryCtx);

std::string toString() const;

protected:
Expand Down Expand Up @@ -142,7 +144,7 @@ class SplitReader {
const std::unordered_map<
std::string,
std::shared_ptr<HiveColumnHandle>>* const partitionKeys_;
const ConnectorQueryCtx* const connectorQueryCtx_;
const ConnectorQueryCtx* connectorQueryCtx_;
const std::shared_ptr<const HiveConfig> hiveConfig_;

const RowTypePtr readerOutputType_;
Expand Down
5 changes: 2 additions & 3 deletions velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
baseFilePath_(baseFilePath),
fileHandleFactory_(fileHandleFactory),
executor_(executor),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
ioStats_(ioStats),
pool_(connectorQueryCtx->memoryPool()),
Expand Down Expand Up @@ -89,7 +88,7 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
configureReaderOptions(
deleteReaderOpts,
hiveConfig_,
connectorQueryCtx_->sessionProperties(),
connectorQueryCtx->sessionProperties(),
deleteFileSchema,
deleteSplit_);

Expand All @@ -98,7 +97,7 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
auto deleteFileInput = createBufferedInput(
*deleteFileHandle,
deleteReaderOpts,
connectorQueryCtx_,
connectorQueryCtx,
ioStats_,
executor_);

Expand Down
1 change: 0 additions & 1 deletion velox/connectors/hive/iceberg/PositionalDeleteFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class PositionalDeleteFileReader {
const std::string& baseFilePath_;
FileHandleFactory* const fileHandleFactory_;
folly::Executor* const executor_;
const ConnectorQueryCtx* const connectorQueryCtx_;
const std::shared_ptr<const HiveConfig> hiveConfig_;
const std::shared_ptr<io::IoStatistics> ioStats_;
memory::MemoryPool* const pool_;
Expand Down
151 changes: 89 additions & 62 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,57 +37,25 @@ class HiveIcebergTest : public HiveConnectorTestBase {
void assertPositionalDeletes(
const std::vector<std::vector<int64_t>>& deleteRowsVec,
bool multipleBaseFiles = false) {
assertPositionalDeletes(
deleteRowsVec,
"SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRowsVec) +
")",
multipleBaseFiles);
assertPositionalDeletesInternal(
deleteRowsVec, getQuery(deleteRowsVec), multipleBaseFiles, 1, 0);
}

void assertPositionalDeletes(
const std::vector<std::vector<int64_t>>& deleteRowsVec,
std::string duckdbSql,
bool multipleBaseFiles = false) {
std::shared_ptr<TempFilePath> dataFilePath = writeDataFile(rowCount);

std::mt19937 gen{0};
int64_t numDeleteRowsBefore =
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
int64_t numDeleteRowsAfter =
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
// Keep the reference to the deleteFilePath, otherwise the corresponding
// file will be deleted.
std::vector<std::shared_ptr<TempFilePath>> deleteFilePaths;
std::vector<IcebergDeleteFile> deleteFiles;
deleteFilePaths.reserve(deleteRowsVec.size());
deleteFiles.reserve(deleteRowsVec.size());
for (auto const& deleteRows : deleteRowsVec) {
std::shared_ptr<TempFilePath> deleteFilePath = writePositionDeleteFile(
dataFilePath->getPath(),
deleteRows,
numDeleteRowsBefore,
numDeleteRowsAfter);
auto path = deleteFilePath->getPath();
IcebergDeleteFile deleteFile(
FileContent::kPositionalDeletes,
deleteFilePath->getPath(),
fileFomat_,
deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter,
testing::internal::GetFileSize(std::fopen(path.c_str(), "r")));
deleteFilePaths.emplace_back(deleteFilePath);
deleteFiles.emplace_back(deleteFile);
}

auto icebergSplit = makeIcebergSplit(dataFilePath->getPath(), deleteFiles);

auto plan = tableScanNode();
auto task = OperatorTestBase::assertQuery(plan, {icebergSplit}, duckdbSql);
assertPositionalDeletesInternal(
deleteRowsVec, duckdbSql, multipleBaseFiles, 1, 0);
}

auto planStats = toPlanStats(task->taskStats());
auto scanNodeId = plan->id();
auto it = planStats.find(scanNodeId);
ASSERT_TRUE(it != planStats.end());
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
void assertPositionalDeletes(
const std::vector<std::vector<int64_t>>& deleteRowsVec,
std::string duckdbSql,
int32_t splitCount,
int32_t numPrefetchSplits) {
assertPositionalDeletesInternal(
deleteRowsVec, duckdbSql, true, splitCount, numPrefetchSplits);
}

std::vector<int64_t> makeRandomDeleteRows(int32_t maxRowNumber) {
Expand Down Expand Up @@ -119,10 +87,66 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return deleteRows;
}

std::string getQuery(const std::vector<std::vector<int64_t>>& deleteRowsVec) {
return "SELECT * FROM tmp WHERE c0 NOT IN (" +
makeNotInList(deleteRowsVec) + ")";
}

const static int rowCount = 20000;

private:
std::shared_ptr<connector::ConnectorSplit> makeIcebergSplit(
void assertPositionalDeletesInternal(
const std::vector<std::vector<int64_t>>& deleteRowsVec,
std::string duckdbSql,
bool multipleBaseFiles,
int32_t splitCount,
int32_t numPrefetchSplits) {
auto dataFilePaths = writeDataFile(splitCount, rowCount);
std::vector<std::shared_ptr<ConnectorSplit>> splits;
// Keep the reference to the deleteFilePath, otherwise the corresponding
// file will be deleted.
std::vector<std::shared_ptr<TempFilePath>> deleteFilePaths;
for (const auto& dataFilePath : dataFilePaths) {
std::mt19937 gen{0};
int64_t numDeleteRowsBefore =
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
int64_t numDeleteRowsAfter =
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
std::vector<IcebergDeleteFile> deleteFiles;
deleteFiles.reserve(deleteRowsVec.size());
for (auto const& deleteRows : deleteRowsVec) {
std::shared_ptr<TempFilePath> deleteFilePath = writePositionDeleteFile(
dataFilePath->getPath(),
deleteRows,
numDeleteRowsBefore,
numDeleteRowsAfter);
auto path = deleteFilePath->getPath();
IcebergDeleteFile deleteFile(
FileContent::kPositionalDeletes,
deleteFilePath->getPath(),
fileFomat_,
deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter,
testing::internal::GetFileSize(std::fopen(path.c_str(), "r")));
deleteFilePaths.emplace_back(deleteFilePath);
deleteFiles.emplace_back(deleteFile);
}

splits.emplace_back(
makeIcebergSplit(dataFilePath->getPath(), deleteFiles));
}

auto plan = tableScanNode();
auto task = HiveConnectorTestBase::assertQuery(
plan, splits, duckdbSql, numPrefetchSplits);

auto planStats = toPlanStats(task->taskStats());
auto scanNodeId = plan->id();
auto it = planStats.find(scanNodeId);
ASSERT_TRUE(it != planStats.end());
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
}

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
Expand All @@ -148,7 +172,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {

std::vector<RowVectorPtr> makeVectors(int32_t count, int32_t rowsPerVector) {
std::vector<RowVectorPtr> vectors;

for (int i = 0; i < count; i++) {
auto data = makeSequenceRows(rowsPerVector);
VectorPtr c0 = vectorMaker_.flatVector<int64_t>(data);
Expand All @@ -158,13 +181,18 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return vectors;
}

std::shared_ptr<TempFilePath> writeDataFile(uint64_t numRows) {
auto dataVectors = makeVectors(1, numRows);

auto dataFilePath = TempFilePath::create();
writeToFile(dataFilePath->getPath(), dataVectors);
std::vector<std::shared_ptr<TempFilePath>> writeDataFile(
int32_t splitCount,
uint64_t numRows) {
auto dataVectors = makeVectors(splitCount, numRows);
std::vector<std::shared_ptr<TempFilePath>> dataFilePaths;
dataFilePaths.reserve(dataVectors.size());
for (auto i = 0; i < dataVectors.size(); i++) {
dataFilePaths.emplace_back(TempFilePath::create());
writeToFile(dataFilePaths.back()->getPath(), dataVectors[i]);
}
createDuckDbTable(dataVectors);
return dataFilePath;
return dataFilePaths;
}

std::shared_ptr<TempFilePath> writePositionDeleteFile(
Expand Down Expand Up @@ -247,20 +275,10 @@ class HiveIcebergTest : public HiveConnectorTestBase {
});
}

std::shared_ptr<exec::Task> assertQuery(
const core::PlanNodePtr& plan,
std::shared_ptr<TempFilePath> dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles,
const std::string& duckDbSql) {
auto icebergSplit = makeIcebergSplit(dataFilePath->getPath(), deleteFiles);
return OperatorTestBase::assertQuery(plan, {icebergSplit}, duckDbSql);
}

core::PlanNodePtr tableScanNode() {
return PlanBuilder(pool_.get()).tableScan(rowType_).planNode();
}

private:
dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF};
RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})};
std::shared_ptr<IcebergMetadataColumn> pathColumn_ =
Expand Down Expand Up @@ -320,4 +338,13 @@ TEST_F(HiveIcebergTest, baseFileMultiplePositionalDeletes) {
assertPositionalDeletes({{0}, {9999}, {10000}, {19999}});
}

TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
folly::SingletonVault::singleton()->registrationComplete();
constexpr int32_t splitCount = 50;
constexpr int32_t numPrefetchSplits = 10;
std::vector<std::vector<int64_t>> deletedRows = {{1}, {2}, {3, 4}};
assertPositionalDeletes(
deletedRows, getQuery(deletedRows), splitCount, numPrefetchSplits);
}

} // namespace facebook::velox::connector::hive::iceberg
8 changes: 2 additions & 6 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,8 @@ class TableScanTest : public virtual HiveConnectorTestBase {
const std::vector<std::shared_ptr<TempFilePath>>& filePaths,
const std::string& duckDbSql,
const int32_t numPrefetchSplit) {
return AssertQueryBuilder(plan, duckDbQueryRunner_)
.config(
core::QueryConfig::kMaxSplitPreloadPerDriver,
std::to_string(numPrefetchSplit))
.splits(makeHiveConnectorSplits(filePaths))
.assertResults(duckDbSql);
return HiveConnectorTestBase::assertQuery(
plan, makeHiveConnectorSplits(filePaths), duckDbSql, numPrefetchSplit);
}

// Run query with spill enabled.
Expand Down
14 changes: 14 additions & 0 deletions velox/exec/tests/utils/HiveConnectorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"

namespace facebook::velox::exec::test {

Expand Down Expand Up @@ -106,6 +107,19 @@ std::shared_ptr<exec::Task> HiveConnectorTestBase::assertQuery(
plan, makeHiveConnectorSplits(filePaths), duckDbSql);
}

std::shared_ptr<Task> HiveConnectorTestBase::assertQuery(
const core::PlanNodePtr& plan,
const std::vector<std::shared_ptr<connector::ConnectorSplit>>& splits,
const std::string& duckDbSql,
const int32_t numPrefetchSplit) {
return AssertQueryBuilder(plan, duckDbQueryRunner_)
.config(
core::QueryConfig::kMaxSplitPreloadPerDriver,
std::to_string(numPrefetchSplit))
.splits(splits)
.assertResults(duckDbSql);
}

std::vector<std::shared_ptr<TempFilePath>> HiveConnectorTestBase::makeFilePaths(
int count) {
std::vector<std::shared_ptr<TempFilePath>> filePaths;
Expand Down
6 changes: 6 additions & 0 deletions velox/exec/tests/utils/HiveConnectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ class HiveConnectorTestBase : public OperatorTestBase {
const std::vector<std::shared_ptr<TempFilePath>>& filePaths,
const std::string& duckDbSql);

std::shared_ptr<Task> assertQuery(
const core::PlanNodePtr& plan,
const std::vector<std::shared_ptr<connector::ConnectorSplit>>& splits,
const std::string& duckDbSql,
const int32_t numPrefetchSplit);

static std::vector<std::shared_ptr<TempFilePath>> makeFilePaths(int count);

static std::vector<std::shared_ptr<connector::ConnectorSplit>>
Expand Down

0 comments on commit 61d718d

Please sign in to comment.