Skip to content

Commit

Permalink
Minor refactor in hash build (facebookincubator#8933)
Browse files Browse the repository at this point in the history
Summary:

Includes refactoring as per the follow up comments in
facebookincubator#8757 after it
was merged.

Reviewed By: xiaoxmeng

Differential Revision: D54441115
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Mar 14, 2024
1 parent 8f95208 commit f3257d1
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 30 deletions.
52 changes: 24 additions & 28 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ HashBuild::HashBuild(
tableType_ = ROW(std::move(names), std::move(types));
setupTable();
setupSpiller();
intermediateStateCleared_ = false;
stateCleared_ = false;
}

void HashBuild::initialize() {
Expand Down Expand Up @@ -672,11 +672,11 @@ bool HashBuild::finishHashBuild() {
}
}
{
std::lock_guard<std::mutex> l(build->intermediateStateMutex_);
std::lock_guard<std::mutex> l(build->mutex_);
VELOX_CHECK(
!build->intermediateStateCleared_,
"Intermediate state for a peer is empty. It might have been "
"already closed.");
!build->stateCleared_,
"Internal state for a peer is empty. It might have already"
" been closed.");
numRows += build->table_->rows()->numRows();
}
otherBuilds.push_back(build);
Expand All @@ -688,22 +688,22 @@ bool HashBuild::finishHashBuild() {
otherTables.reserve(peers.size());
SpillPartitionSet spillPartitions;
for (auto* build : otherBuilds) {
std::unique_ptr<Spiller> buildSpiller;
std::unique_ptr<Spiller> spiller;
{
std::lock_guard<std::mutex> l(build->intermediateStateMutex_);
std::lock_guard<std::mutex> l(build->mutex_);
VELOX_CHECK(
!build->intermediateStateCleared_,
"Intermediate state for a peer is empty. It might have been "
"already closed.");
build->intermediateStateCleared_ = true;
!build->stateCleared_,
"Internal state for a peer is empty. It might have already"
" been closed.");
build->stateCleared_ = true;
VELOX_CHECK_NOT_NULL(build->table_);
otherTables.push_back(std::move(build->table_));
buildSpiller = std::move(build->spiller_);
spiller = std::move(build->spiller_);
}
if (buildSpiller != nullptr) {
buildSpiller->finishSpill(spillPartitions);
if (spiller != nullptr) {
spiller->finishSpill(spillPartitions);
build->recordSpillStats(spiller.get());
}
build->recordSpillStats(buildSpiller.get());
}

if (spiller_ != nullptr) {
Expand Down Expand Up @@ -736,7 +736,7 @@ bool HashBuild::finishHashBuild() {
joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_);
if (spillEnabled()) {
intermediateStateCleared_ = true;
stateCleared_ = true;
}

// Release the unused memory reservation since we have finished the merged
Expand Down Expand Up @@ -826,7 +826,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {

setupTable();
setupSpiller(spillInput.spillPartition.get());
intermediateStateCleared_ = false;
stateCleared_ = false;

// Start to process spill input.
processSpillInput();
Expand Down Expand Up @@ -1009,10 +1009,7 @@ void HashBuild::reclaim(

TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);

// can another thread call close() while hashbuild is in arbitration and
// reclaim is called on it?
if (exceededMaxSpillLevelLimit_) {
// NOTE: we might have reached to the max spill limit.
return;
}

Expand All @@ -1025,9 +1022,9 @@ void HashBuild::reclaim(
LOG(WARNING) << "Can't reclaim from hash build operator, state_["
<< stateName(state_) << "], nonReclaimableSection_["
<< nonReclaimableSection_ << "], spiller_["
<< (intermediateStateCleared_ || spiller_->finalized()
? "finalized"
: "non-finalized")
<< (stateCleared_ ? "cleared"
: (spiller_->finalized() ? "finalized"
: "non-finalized"))
<< "] " << pool()->name()
<< ", usage: " << succinctBytes(pool()->currentBytes());
return;
Expand Down Expand Up @@ -1110,13 +1107,12 @@ bool HashBuild::nonReclaimableState() const {
// 1) the hash table has been built by the last build thread (indicated by
// state_)
// 2) the last build operator has transferred ownership of 'this operator's
// intermediate state (table_ and spiller_) to itself
// internal state (table_ and spiller_) to itself.
// 3) it has completed spilling before reaching either of the previous
// two states.
return ((state_ != State::kRunning) && (state_ != State::kWaitForBuild) &&
(state_ != State::kYield)) ||
nonReclaimableSection_ || intermediateStateCleared_ ||
spiller_->finalized();
nonReclaimableSection_ || !spiller_ || spiller_->finalized();
}

void HashBuild::close() {
Expand All @@ -1125,8 +1121,8 @@ void HashBuild::close() {
{
// Free up major memory usage. Gate access to them as they can be accessed
// by the last build thread that finishes building the hash table.
std::lock_guard<std::mutex> l(intermediateStateMutex_);
intermediateStateCleared_ = true;
std::lock_guard<std::mutex> l(mutex_);
stateCleared_ = true;
joinBridge_.reset();
spiller_.reset();
table_.reset();
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,13 @@ class HashBuild final : public Operator {
// concurrently, that is, when a thread tries to close the operator while
// another thread is building the hash table. Refer to 'close()' and
// finishHashBuild()' for more details.
std::mutex intermediateStateMutex_;
std::mutex mutex_;

// Indicates if the intermediate state ('table_' and 'spiller_') has
// been cleared. This can happen either when the operator is closed or when
// the last hash build operator transfers ownership of them to itself while
// building the final hash table.
bool intermediateStateCleared_{false};
bool stateCleared_{false};

// Container for the rows being accumulated.
std::unique_ptr<BaseHashTable> table_;
Expand Down

0 comments on commit f3257d1

Please sign in to comment.