Skip to content

Commit

Permalink
Fix flaky reclaimFromCompletedJoinBuilder test (facebookincubator#6038)
Browse files Browse the repository at this point in the history
Summary:
Fix flaky reclaimFromCompletedJoinBuilder test by ensuring
the task has been destroyed.
The test passed 400 iterations internally.

Pull Request resolved: facebookincubator#6038

Reviewed By: pedroerp, mbasmanova

Differential Revision: D48167486

Pulled By: xiaoxmeng

fbshipit-source-id: 931978ba47ad25e0a67a42614b7863952c283144
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Aug 9, 2023
1 parent 0166160 commit 84f8551
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 61 deletions.
2 changes: 1 addition & 1 deletion velox/common/memory/tests/MemoryCapExceededTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ TEST_P(MemoryCapExceededTest, allocatorCapacityExceededError) {
<< "', but received '" << errorMessage << "'.";
}
}
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down
40 changes: 22 additions & 18 deletions velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromOrderBy) {

orderByThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -507,7 +507,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {

orderByThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
const auto newStats = arbitrator_->stats();
ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes);
}
Expand Down Expand Up @@ -577,7 +577,7 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedOrderBy) {

orderByThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -676,7 +676,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, DISABLED_reclaimFromAggregation) {

aggregationThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -764,7 +764,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {

aggregationThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();

const auto newStats = arbitrator_->stats();
ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes);
Expand Down Expand Up @@ -835,7 +835,7 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedAggregation) {

aggregationThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -944,7 +944,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromJoinBuilder) {

aggregationThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -1043,7 +1043,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {

joinThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();

const auto newStats = arbitrator_->stats();
ASSERT_GT(newStats.numReclaimedBytes, oldStats.numReclaimedBytes);
Expand All @@ -1061,6 +1061,7 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedJoinBuilder) {
for (bool sameQuery : sameQueries) {
SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery));
const auto spillDirectory = exec::test::TempDirectoryPath::create();
const uint64_t numCreatedTasks = Task::numCreatedTasks();
std::shared_ptr<core::QueryCtx> fakeMemoryQueryCtx =
newQueryCtx(kMemoryCapacity);
std::shared_ptr<core::QueryCtx> joinQueryCtx;
Expand Down Expand Up @@ -1107,6 +1108,9 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedJoinBuilder) {
.assertResults(
"SELECT c1 FROM tmp WHERE c0 NOT IN (SELECT c0 FROM tmp)");
waitForTaskCompletion(task.get());
task.reset();
// Make sure the join query task has been destroyed.
waitForAllTasksToBeDeleted(numCreatedTasks + 1, 3'000'000);
fakeAllocationWait.notify();
});

Expand All @@ -1125,7 +1129,7 @@ TEST_F(SharedArbitrationTest, reclaimFromCompletedJoinBuilder) {

joinThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -1250,7 +1254,7 @@ DEBUG_ONLY_TEST_F(
});
joinThread.join();
memThread.join();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -1408,7 +1412,7 @@ DEBUG_ONLY_TEST_F(
memThread.join();
// We only expect to reclaim from one hash build operator once.
ASSERT_EQ(numHashBuildReclaims, 1);
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(
Expand Down Expand Up @@ -1561,7 +1565,7 @@ DEBUG_ONLY_TEST_F(

// We only expect to reclaim from one hash build operator once.
ASSERT_EQ(numHashBuildReclaims, 1);
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(
Expand Down Expand Up @@ -1614,7 +1618,7 @@ DEBUG_ONLY_TEST_F(
.assertResults(
"SELECT t.c1 FROM tmp as t, tmp AS u WHERE t.c0 == u.c1 AND t.c1 == u.c0");
ASSERT_TRUE(parallelBuildTriggered);
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(
Expand Down Expand Up @@ -1695,7 +1699,7 @@ DEBUG_ONLY_TEST_F(
.assertResults(
"SELECT t.c1 FROM tmp as t, tmp AS u WHERE t.c0 == u.c1 AND t.c1 == u.c0");
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
ASSERT_EQ(injectAllocations.size(), 2);
}

Expand Down Expand Up @@ -1788,7 +1792,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimDuringJoinTableBuild) {
memThread.join();
queryThread.join();

Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) {
Expand Down Expand Up @@ -1919,7 +1923,7 @@ DEBUG_ONLY_TEST_F(
queryThread.join();
fakeAllocation.free();
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenMaybeReserveAndTaskAbort) {
Expand Down Expand Up @@ -1984,7 +1988,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenMaybeReserveAndTaskAbort) {
queryThread.join();
fakeAllocation.free();
injectAllocation->free();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) {
Expand Down Expand Up @@ -2059,7 +2063,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) {
fakeAllocation.free();

task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}

TEST_F(SharedArbitrationTest, concurrentArbitration) {
Expand Down
23 changes: 0 additions & 23 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2377,29 +2377,6 @@ std::shared_ptr<SpillOperatorGroup> Task::getSpillOperatorGroupLocked(
return group;
}

// static
void Task::testingWaitForAllTasksToBeDeleted(uint64_t maxWaitUs) {
const uint64_t numCreatedTasks = Task::numCreatedTasks();
uint64_t numDeletedTasks = Task::numDeletedTasks();
uint64_t waitUs = 0;
while (numCreatedTasks > numDeletedTasks) {
constexpr uint64_t kWaitInternalUs = 1'000;
std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs));
waitUs += kWaitInternalUs;
numDeletedTasks = Task::numDeletedTasks();
if (waitUs >= maxWaitUs) {
break;
}
}
VELOX_CHECK_EQ(
numDeletedTasks,
numCreatedTasks,
"{} tasks have been created while only {} have been deleted after waiting for {} us",
numCreatedTasks,
numDeletedTasks,
waitUs);
}

void Task::testingVisitDrivers(const std::function<void(Driver*)>& callback) {
std::lock_guard<std::mutex> l(mutex_);
for (int i = 0; i < drivers_.size(); ++i) {
Expand Down
6 changes: 0 additions & 6 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -576,12 +576,6 @@ class Task : public std::enable_shared_from_this<Task> {
return numDriversInPartitionedOutput_ > 0;
}

/// Invoked to wait for all the tasks created by the test to be deleted.
///
/// NOTE: it is assumed that there is no more task to be created after or
/// during this wait call. This is for testing purpose for now.
static void testingWaitForAllTasksToBeDeleted(uint64_t maxWaitUs = 3'000'000);

/// Invoked to run provided 'callback' on each alive driver of the task.
void testingVisitDrivers(const std::function<void(Driver*)>& callback);

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2397,7 +2397,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, abortDuringOutputProcessing) {
driverWait.notify();
taskThread.join();
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -2492,7 +2492,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, abortDuringInputgProcessing) {
driverWait.notify();
taskThread.join();
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5298,7 +5298,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringOutputProcessing) {
driverWait.notify();
taskThread.join();
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -5403,7 +5403,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringInputgProcessing) {
driverWait.notify();
taskThread.join();
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -5508,7 +5508,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeAbortDuringInputProcessing) {
driverWait.notify();
taskThread.join();
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}
} // namespace
4 changes: 2 additions & 2 deletions velox/exec/tests/OrderByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, abortDuringOutputProcessing) {
driverWait.notify();
taskThread.join();
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}

Expand Down Expand Up @@ -1219,6 +1219,6 @@ DEBUG_ONLY_TEST_F(OrderByTest, abortDuringInputgProcessing) {
driverWait.notify();
taskThread.join();
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}
}
6 changes: 4 additions & 2 deletions velox/exec/tests/utils/OperatorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ void OperatorTestBase::registerVectorSerde() {
}

OperatorTestBase::~OperatorTestBase() {
// Wait for all the tasks to be deleted.
exec::test::waitForAllTasksToBeDeleted();
// Revert to default process-wide MemoryAllocator.
memory::MemoryAllocator::setDefaultInstance(nullptr);
}
Expand All @@ -53,7 +55,7 @@ void OperatorTestBase::SetUpTestCase() {
}

void OperatorTestBase::TearDownTestCase() {
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();
}

void OperatorTestBase::SetUp() {
Expand Down Expand Up @@ -172,7 +174,7 @@ core::TypedExprPtr OperatorTestBase::parseExpr(

// Wait for the task to go.
task.reset();
Task::testingWaitForAllTasksToBeDeleted();
waitForAllTasksToBeDeleted();

// If a spilling directory was set, ensure it was removed after the task is
// gone.
Expand Down
45 changes: 45 additions & 0 deletions velox/exec/tests/utils/QueryAssertions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,51 @@ bool waitForTaskDriversToFinish(exec::Task* task, uint64_t maxWaitMicros) {
return task->numFinishedDrivers() == task->numTotalDrivers();
}

void waitForAllTasksToBeDeleted(uint64_t maxWaitUs) {
const uint64_t numCreatedTasks = Task::numCreatedTasks();
uint64_t numDeletedTasks = Task::numDeletedTasks();
uint64_t waitUs = 0;
while (numCreatedTasks > numDeletedTasks) {
constexpr uint64_t kWaitInternalUs = 1'000;
std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs));
waitUs += kWaitInternalUs;
numDeletedTasks = Task::numDeletedTasks();
if (waitUs >= maxWaitUs) {
break;
}
}
VELOX_CHECK_EQ(
numDeletedTasks,
numCreatedTasks,
"{} tasks have been created while only {} have been deleted after waiting for {} us",
numCreatedTasks,
numDeletedTasks,
waitUs);
}

void waitForAllTasksToBeDeleted(
uint64_t expectedDeletedTasks,
uint64_t maxWaitUs) {
uint64_t numDeletedTasks = Task::numDeletedTasks();
uint64_t waitUs = 0;
while (expectedDeletedTasks > numDeletedTasks) {
constexpr uint64_t kWaitInternalUs = 1'000;
std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs));
waitUs += kWaitInternalUs;
numDeletedTasks = Task::numDeletedTasks();
if (waitUs >= maxWaitUs) {
break;
}
}
VELOX_CHECK_EQ(
numDeletedTasks,
expectedDeletedTasks,
"expected {} tasks to be deleted but only {} have been deleted after waiting for {} us",
expectedDeletedTasks,
numDeletedTasks,
waitUs);
}

std::shared_ptr<Task> assertQuery(
const core::PlanNodePtr& plan,
std::function<void(exec::Task*)> addSplits,
Expand Down
12 changes: 12 additions & 0 deletions velox/exec/tests/utils/QueryAssertions.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ bool waitForTaskDriversToFinish(
exec::Task* task,
uint64_t maxWaitMicros = 1'000'000);

/// Invoked to wait for all the tasks created by the test to be deleted.
///
/// NOTE: it is assumed that there is no more task to be created after or
/// during this wait call. This is for testing purpose for now.
void waitForAllTasksToBeDeleted(uint64_t maxWaitUs = 3'000'000);

/// Similar to above test utility except waiting for a specific number of
/// tasks to be deleted.
void waitForAllTasksToBeDeleted(
uint64_t expectedDeletedTasks,
uint64_t maxWaitUs);

std::shared_ptr<Task> assertQuery(
const core::PlanNodePtr& plan,
const std::string& duckDbSql,
Expand Down
Loading

0 comments on commit 84f8551

Please sign in to comment.