Skip to content

Commit

Permalink
Fix flaky reclaimFromHashJoinBuildInWaitForTableBuild (facebookincuba…
Browse files Browse the repository at this point in the history
…tor#10032)

Summary:
The test is flaky because there are two child pools from the query root pool. One is the
query task pool and the other is fake memory pool. And the test is designed to allocate all
the memory from the memory arbitrator and expect the memory arbitration reclaim all
the memory from the hash build operator after one of the hash build operator finishes
processing the input. And it expects the fake memory allocation succeed but the query
task fails with query OOM (the fake memory pool is not reclaimable). However the test is
recently found crashed in (github [issue](facebookincubator#10027)). The reason is because after reclaim from the hash join
query task, the task is resumed and there is a short time window that the hash build
operators start allocating more memory and before fake memory pool proceeds with the
rest of the memory arbitration process and the fake memory allocation fails with throw.

This PR deflaky this test by triggering the memory arbitration within the last the hash build
operator context to avoid the above flakiness. The meta internal test passed 1000 iterations:
https://www.internalfb.com/intern/testinfra/testrun/7318349614536715

Pull Request resolved: facebookincubator#10032

Reviewed By: tanjialiang

Differential Revision: D58115774

Pulled By: xiaoxmeng

fbshipit-source-id: 32eb56b66ca923d81de2dff682f32eed4947550d
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Jun 4, 2024
1 parent 61c3379 commit aff91ed
Showing 1 changed file with 50 additions and 79 deletions.
129 changes: 50 additions & 79 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6848,93 +6848,64 @@ DEBUG_ONLY_TEST_F(
2);
}

DEBUG_ONLY_TEST_F(HashJoinTest, reclaimFromHashJoinBuildInWaitForTableBuild) {
std::unique_ptr<memory::MemoryManager> memoryManager = createMemoryManager();
const auto& arbitrator = memoryManager->arbitrator();
auto rowType = ROW({
{"c0", INTEGER()},
{"c1", INTEGER()},
{"c2", VARCHAR()},
});
const auto vectors = createVectors(rowType, 32 << 20, fuzzerOpts_);
const int numDrivers = 4;
const auto expectedResult =
runHashJoinTask(vectors, nullptr, numDrivers, pool(), false).data;
std::shared_ptr<core::QueryCtx> queryCtx =
newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity);

folly::EventCount arbitrationWait;
std::atomic_bool arbitrationWaitFlag{true};
folly::EventCount taskPauseWait;
std::atomic_bool taskPauseWaitFlag{true};

std::atomic_int blockedBuildOperators{0};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Driver::runInternal",
std::function<void(Driver*)>(([&](Driver* driver) {
// Check if the driver is from hash join build.
if (driver->driverCtx()->pipelineId != 1) {
return;
}
DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringTableBuild) {
VectorFuzzer fuzzer({.vectorSize = 1000}, pool());
const int32_t numBuildVectors = 5;
std::vector<RowVectorPtr> buildVectors;
for (int32_t i = 0; i < numBuildVectors; ++i) {
buildVectors.push_back(fuzzer.fuzzRow(buildType_));
}
const int32_t numProbeVectors = 5;
std::vector<RowVectorPtr> probeVectors;
for (int32_t i = 0; i < numProbeVectors; ++i) {
probeVectors.push_back(fuzzer.fuzzRow(probeType_));
}

if (++blockedBuildOperators > numDrivers - 1) {
return;
}
createDuckDbTable("t", probeVectors);
createDuckDbTable("u", buildVectors);

taskPauseWait.await([&]() { return !taskPauseWaitFlag.load(); });
})));
core::PlanNodeId probeScanId;
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = PlanBuilder(planNodeIdGenerator)
.values(probeVectors, false)
.hashJoin(
{"t_k1"},
{"u_k1"},
PlanBuilder(planNodeIdGenerator)
.values(buildVectors, false)
.planNode(),
"",
concat(probeType_->names(), buildType_->names()))
.planNode();

std::atomic_bool injectNoMoreInputOnce{true};
std::atomic_bool injectSpillOnce{true};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Driver::runInternal::noMoreInput",
std::function<void(Operator*)>(([&](Operator* op) {
if (op->operatorType() != "HashBuild") {
return;
}

if (!injectNoMoreInputOnce.exchange(false)) {
"facebook::velox::exec::HashBuild::finishHashBuild",
std::function<void(Operator*)>([&](Operator* op) {
if (!injectSpillOnce.exchange(false)) {
return;
}

arbitrationWaitFlag = false;
arbitrationWait.notifyAll();
taskPauseWait.await([&]() { return !taskPauseWaitFlag.load(); });
})));

SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Task::requestPauseLocked",
std::function<void(Task*)>([&](Task* /*unused*/) {
taskPauseWaitFlag = false;
taskPauseWait.notifyAll();
Operator::ReclaimableSectionGuard guard(op);
testingRunArbitration(op->pool());
}));

std::thread joinThread([&]() {
VELOX_ASSERT_THROW(
runHashJoinTask(
vectors, queryCtx, numDrivers, pool(), true, expectedResult),
"Exceeded memory pool cap of");
});

arbitrationWait.await([&] { return !arbitrationWaitFlag.load(); });
auto fakePool = queryCtx->pool()->addLeafChild(
"fakePool", true, FakeMemoryReclaimer::create());
void* fakeBuffer{nullptr};
arbitrationWait.await([&]() { return !arbitrationWaitFlag.load(); });
// Let the first hash build operator reaches to wait for table build state.
std::this_thread::sleep_for(std::chrono::seconds(1));
fakeBuffer = fakePool->allocate(kMemoryCapacity);

joinThread.join();

// We expect the reclaimed bytes from hash build.
ASSERT_GT(arbitrator->stats().numReclaimedBytes, 0);

// This test uses on-demand created memory manager instead of the global
// one. We need to make sure any used memory got cleaned up before exiting
// the scope
waitForAllTasksToBeDeleted();
ASSERT_TRUE(fakeBuffer != nullptr);
fakePool->free(fakeBuffer, kMemoryCapacity);
auto tempDirectory = exec::test::TempDirectoryPath::create();
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.numDrivers(4)
.planNode(plan)
.injectSpill(false)
.maxSpillLevel(0)
.spillDirectory(tempDirectory->getPath())
.referenceQuery(
"SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
auto opStats = toOperatorStats(task->taskStats());
ASSERT_GT(
opStats.at("HashBuild").runtimeStats[Operator::kSpillWrites].sum,
0);
})
.run();
}

DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredDuringParallelJoinBuild) {
Expand Down

0 comments on commit aff91ed

Please sign in to comment.