Skip to content

Commit

Permalink
Clear join filter related cached buffers when hash probe spill finish (
Browse files Browse the repository at this point in the history
…facebookincubator#11319)

Summary:
Pull Request resolved: facebookincubator#11319

LBM stress test see some unexpected memory usage growth in hash probe spill which caused the query
failure. The unexpected memory growth comes from the join eval execution which includes both the
reused result buffer in hash probe operator as well as inside the eval expression.
This PR fixes this by adding to clear those buffers when hash probe spill finish (HashProbe::clearBuffers).
Also verified with the failed LBM query.

Reviewed By: tanjialiang

Differential Revision: D64490457

fbshipit-source-id: e29450d70b88b71b19833334eea08fab52de02bc
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 22, 2024
1 parent bb3b7ca commit 53a1374
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
10 changes: 10 additions & 0 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,16 @@ void HashProbe::clearBuffers() {
output_.reset();
nonSpillInputIndicesBuffer_.reset();
spillInputIndicesBuffers_.clear();
if (filter_ == nullptr) {
return;
}
filterResult_.clear();
filterResult_.resize(1);
filterTableInput_.reset();
filterTableResult_.clear();
filterTableResult_.resize(1);
operatorCtx_->execCtx()->vectorPool()->clear();
filter_->clearCache();
}

} // namespace facebook::velox::exec
4 changes: 4 additions & 0 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class HashProbe : public Operator {
return exceededMaxSpillLevelLimit_;
}

bool testingHasPendingInput() const {
return input_ != nullptr;
}

private:
// Indicates if the join type includes misses from the left side in the
// output.
Expand Down
47 changes: 46 additions & 1 deletion velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,52 @@ TEST_P(MultiThreadedHashJoinTest, filter) {
.run();
}

DEBUG_ONLY_TEST_P(MultiThreadedHashJoinTest, filterSpillOnFirstProbeInput) {
auto spillDirectory = exec::test::TempDirectoryPath::create();
std::atomic_bool injectProbeSpillOnce{true};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Driver::runInternal::getOutput",
std::function<void(Operator*)>([&](Operator* op) {
if (!isHashProbeMemoryPool(*op->pool())) {
return;
}
HashProbe* probeOp = static_cast<HashProbe*>(op);
if (!probeOp->testingHasPendingInput()) {
return;
}
if (!injectProbeSpillOnce.exchange(false)) {
return;
}
testingRunArbitration(op->pool());
ASSERT_EQ(op->pool()->usedBytes(), 0);
ASSERT_EQ(op->pool()->reservedBytes(), 0);
}));

HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.numDrivers(numDrivers_)
.keyTypes({BIGINT()})
.numDrivers(1)
.probeVectors(1600, 5)
.buildVectors(1500, 5)
.injectSpill(false)
.spillDirectory(spillDirectory->getPath())
.joinFilter("((t_k0 % 100) + (u_k0 % 100)) % 40 < 20")
.referenceQuery(
"SELECT t_k0, t_data, u_k0, u_data FROM t, u WHERE t_k0 = u_k0 AND ((t_k0 % 100) + (u_k0 % 100)) % 40 < 20")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
const auto statsPair = taskSpilledStats(*task);
ASSERT_EQ(statsPair.first.spilledRows, 0);
ASSERT_EQ(statsPair.first.spilledBytes, 0);
ASSERT_EQ(statsPair.first.spilledPartitions, 0);
ASSERT_EQ(statsPair.first.spilledFiles, 0);
ASSERT_GT(statsPair.second.spilledRows, 0);
ASSERT_GT(statsPair.second.spilledBytes, 0);
ASSERT_GT(statsPair.second.spilledPartitions, 0);
ASSERT_GT(statsPair.second.spilledFiles, 0);
})
.run();
}

TEST_P(MultiThreadedHashJoinTest, nullAwareAntiJoinWithNull) {
struct {
double probeNullRatio;
Expand Down Expand Up @@ -7518,7 +7564,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, taskWaitTimeout) {

for (uint64_t timeoutMs : {1'000, 30'000}) {
SCOPED_TRACE(fmt::format("timeout {}", succinctMillis(timeoutMs)));
LOG(ERROR) << "timeout " << succinctMillis(timeoutMs);
auto memoryManager = createMemoryManager(512 << 20, 0, timeoutMs);
auto queryCtx =
newQueryCtx(memoryManager.get(), executor_.get(), queryMemoryCapacity);
Expand Down

0 comments on commit 53a1374

Please sign in to comment.