Skip to content

Commit

Permalink
Minor refactor in hash build
Browse files Browse the repository at this point in the history
Summary:
Includes refactoring as per the follow up comments in
facebookincubator#8757 after it
was merged.

Differential Revision: D54441115
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Mar 2, 2024
1 parent d270ace commit fbe5f4c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 31 deletions.
52 changes: 24 additions & 28 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ HashBuild::HashBuild(
tableType_ = ROW(std::move(names), std::move(types));
setupTable();
setupSpiller();
intermediateStateCleared_ = false;
stateCleared_ = false;
}

void HashBuild::initialize() {
Expand Down Expand Up @@ -784,11 +784,11 @@ bool HashBuild::finishHashBuild() {
}
}
{
std::lock_guard<std::mutex> l(build->intermediateStateMutex_);
std::lock_guard<std::mutex> l(build->mutex_);
VELOX_CHECK(
!build->intermediateStateCleared_,
"Intermediate state for a peer is empty. It might have been "
"already closed.");
!build->stateCleared_,
"Internal state for a peer is empty. It might have already"
" been closed.");
numRows += build->table_->rows()->numRows();
}
otherBuilds.push_back(build);
Expand All @@ -800,22 +800,22 @@ bool HashBuild::finishHashBuild() {
otherTables.reserve(peers.size());
SpillPartitionSet spillPartitions;
for (auto* build : otherBuilds) {
std::unique_ptr<Spiller> buildSpiller;
std::unique_ptr<Spiller> spiller;
{
std::lock_guard<std::mutex> l(build->intermediateStateMutex_);
std::lock_guard<std::mutex> l(build->mutex_);
VELOX_CHECK(
!build->intermediateStateCleared_,
"Intermediate state for a peer is empty. It might have been "
"already closed.");
build->intermediateStateCleared_ = true;
!build->stateCleared_,
"Internal state for a peer is empty. It might have already"
" been closed.");
build->stateCleared_ = true;
VELOX_CHECK_NOT_NULL(build->table_);
otherTables.push_back(std::move(build->table_));
buildSpiller = std::move(build->spiller_);
spiller = std::move(build->spiller_);
}
if (buildSpiller != nullptr) {
buildSpiller->finishSpill(spillPartitions);
if (spiller != nullptr) {
spiller->finishSpill(spillPartitions);
build->recordSpillStats(spiller.get());
}
build->recordSpillStats(buildSpiller.get());
}

if (spiller_ != nullptr) {
Expand Down Expand Up @@ -847,7 +847,7 @@ bool HashBuild::finishHashBuild() {
addRuntimeStats();
if (joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) {
intermediateStateCleared_ = true;
stateCleared_ = true;
spillGroup_->restart();
}

Expand Down Expand Up @@ -938,7 +938,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {

setupTable();
setupSpiller(spillInput.spillPartition.get());
intermediateStateCleared_ = false;
stateCleared_ = false;

// Start to process spill input.
processSpillInput();
Expand Down Expand Up @@ -1126,10 +1126,7 @@ void HashBuild::reclaim(

TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);

// can another thread call close() while hashbuild is in arbitration and
// reclaim is called on it?
if (exceededMaxSpillLevelLimit_) {
// NOTE: we might have reached to the max spill limit.
return;
}

Expand All @@ -1142,9 +1139,9 @@ void HashBuild::reclaim(
LOG(WARNING) << "Can't reclaim from hash build operator, state_["
<< stateName(state_) << "], nonReclaimableSection_["
<< nonReclaimableSection_ << "], spiller_["
<< (intermediateStateCleared_ || spiller_->finalized()
? "finalized"
: "non-finalized")
<< (stateCleared_ ? "cleared"
: (spiller_->finalized() ? "finalized"
: "non-finalized"))
<< "] " << pool()->name()
<< ", usage: " << succinctBytes(pool()->currentBytes());
return;
Expand Down Expand Up @@ -1227,13 +1224,12 @@ bool HashBuild::nonReclaimableState() const {
// 1) the hash table has been built by the last build thread (inidicated
// by state_)
// 2) the last build operator has transferred ownership of 'this' operator's
// intermediate state (table_ and spiller_) to itself
// internal state (table_ and spiller_) to itself.
// 3) it has completed spilling before reaching either of the previous
// two states.
return ((state_ != State::kRunning) && (state_ != State::kWaitForBuild) &&
(state_ != State::kYield)) ||
nonReclaimableSection_ || intermediateStateCleared_ ||
spiller_->finalized();
nonReclaimableSection_ || !spiller_ || spiller_->finalized();
}

void HashBuild::close() {
Expand All @@ -1242,8 +1238,8 @@ void HashBuild::close() {
{
// Free up major memory usage. Gate access to them as they can be accessed
// by the last build thread that finishes building the hash table.
std::lock_guard<std::mutex> l(intermediateStateMutex_);
intermediateStateCleared_ = true;
std::lock_guard<std::mutex> l(mutex_);
stateCleared_ = true;
joinBridge_.reset();
spiller_.reset();
table_.reset();
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,18 @@ class HashBuild final : public Operator {
// The row type used for hash table build and disk spilling.
RowTypePtr tableType_;

// Used to serialize access to intermediate state variables (like 'table_' and
// Used to serialize access to the internal state (like 'table_' and
// 'spiller_'). This is only required when variables are accessed
// concurrently, that is, when a thread tries to close the operator while
// another thread is building the hash table. Refer to 'close()' and
// finishHashBuild()' for more details.
std::mutex intermediateStateMutex_;
std::mutex mutex_;

// Indicates if the intermediate state ('table_' and 'spiller_') has
// been cleared. This can happen either when the operator is closed or when
// the last hash build operator transfers ownership of them to itself while
// building the final hash table.
bool intermediateStateCleared_{false};
bool stateCleared_{false};

// Container for the rows being accumulated.
std::unique_ptr<BaseHashTable> table_;
Expand Down

0 comments on commit fbe5f4c

Please sign in to comment.