From f3257d1c74f304b8cb002b508579738feebbe679 Mon Sep 17 00:00:00 2001 From: Bikramjeet Vig Date: Thu, 14 Mar 2024 16:10:07 -0700 Subject: [PATCH] Minor refactor in hash build (#8933) Summary: Includes refactoring as per the follow up comments in https://github.com/facebookincubator/velox/pull/8757 after it was merged. Reviewed By: xiaoxmeng Differential Revision: D54441115 --- velox/exec/HashBuild.cpp | 52 +++++++++++++++++++--------------------- velox/exec/HashBuild.h | 4 ++-- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 198bb424fd20..99fed38b4dd2 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -113,7 +113,7 @@ HashBuild::HashBuild( tableType_ = ROW(std::move(names), std::move(types)); setupTable(); setupSpiller(); - intermediateStateCleared_ = false; + stateCleared_ = false; } void HashBuild::initialize() { @@ -672,11 +672,11 @@ bool HashBuild::finishHashBuild() { } } { - std::lock_guard l(build->intermediateStateMutex_); + std::lock_guard l(build->mutex_); VELOX_CHECK( - !build->intermediateStateCleared_, - "Intermediate state for a peer is empty. It might have been " - "already closed."); + !build->stateCleared_, + "Internal state for a peer is empty. It might have already" + " been closed."); numRows += build->table_->rows()->numRows(); } otherBuilds.push_back(build); @@ -688,22 +688,22 @@ bool HashBuild::finishHashBuild() { otherTables.reserve(peers.size()); SpillPartitionSet spillPartitions; for (auto* build : otherBuilds) { - std::unique_ptr buildSpiller; + std::unique_ptr spiller; { - std::lock_guard l(build->intermediateStateMutex_); + std::lock_guard l(build->mutex_); VELOX_CHECK( - !build->intermediateStateCleared_, - "Intermediate state for a peer is empty. It might have been " - "already closed."); - build->intermediateStateCleared_ = true; + !build->stateCleared_, + "Internal state for a peer is empty. It might have already" + " been closed."); + build->stateCleared_ = true; VELOX_CHECK_NOT_NULL(build->table_); otherTables.push_back(std::move(build->table_)); - buildSpiller = std::move(build->spiller_); + spiller = std::move(build->spiller_); } - if (buildSpiller != nullptr) { - buildSpiller->finishSpill(spillPartitions); + if (spiller != nullptr) { + spiller->finishSpill(spillPartitions); + build->recordSpillStats(spiller.get()); } - build->recordSpillStats(buildSpiller.get()); } if (spiller_ != nullptr) { @@ -736,7 +736,7 @@ bool HashBuild::finishHashBuild() { joinBridge_->setHashTable( std::move(table_), std::move(spillPartitions), joinHasNullKeys_); if (spillEnabled()) { - intermediateStateCleared_ = true; + stateCleared_ = true; } // Release the unused memory reservation since we have finished the merged @@ -826,7 +826,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) { setupTable(); setupSpiller(spillInput.spillPartition.get()); - intermediateStateCleared_ = false; + stateCleared_ = false; // Start to process spill input. processSpillInput(); @@ -1009,10 +1009,7 @@ void HashBuild::reclaim( TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this); - // can another thread call close() while hashbuild is in arbitration and - // reclaim is called on it? if (exceededMaxSpillLevelLimit_) { - // NOTE: we might have reached to the max spill limit. return; } @@ -1025,9 +1022,9 @@ void HashBuild::reclaim( LOG(WARNING) << "Can't reclaim from hash build operator, state_[" << stateName(state_) << "], nonReclaimableSection_[" << nonReclaimableSection_ << "], spiller_[" - << (intermediateStateCleared_ || spiller_->finalized() - ? "finalized" - : "non-finalized") + << (stateCleared_ ? "cleared" + : (spiller_->finalized() ? "finalized" + : "non-finalized")) << "] " << pool()->name() << ", usage: " << succinctBytes(pool()->currentBytes()); return; @@ -1110,13 +1107,12 @@ bool HashBuild::nonReclaimableState() const { // 1) the hash table has been built by the last build thread (indicated by // state_) // 2) the last build operator has transferred ownership of 'this operator's - // intermediate state (table_ and spiller_) to itself + // internal state (table_ and spiller_) to itself. // 3) it has completed spilling before reaching either of the previous // two states. return ((state_ != State::kRunning) && (state_ != State::kWaitForBuild) && (state_ != State::kYield)) || - nonReclaimableSection_ || intermediateStateCleared_ || - spiller_->finalized(); + nonReclaimableSection_ || !spiller_ || spiller_->finalized(); } void HashBuild::close() { @@ -1125,8 +1121,8 @@ void HashBuild::close() { { // Free up major memory usage. Gate access to them as they can be accessed // by the last build thread that finishes building the hash table. - std::lock_guard l(intermediateStateMutex_); - intermediateStateCleared_ = true; + std::lock_guard l(mutex_); + stateCleared_ = true; joinBridge_.reset(); spiller_.reset(); table_.reset(); diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 75a3eb8bdec9..83a26c9fc71c 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -224,13 +224,13 @@ class HashBuild final : public Operator { // concurrently, that is, when a thread tries to close the operator while // another thread is building the hash table. Refer to 'close()' and // finishHashBuild()' for more details. - std::mutex intermediateStateMutex_; + std::mutex mutex_; // Indicates if the intermediate state ('table_' and 'spiller_') has // been cleared. This can happen either when the operator is closed or when // the last hash build operator transfers ownership of them to itself while // building the final hash table. - bool intermediateStateCleared_{false}; + bool stateCleared_{false}; // Container for the rows being accumulated. std::unique_ptr table_;