diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 41cdc7eb034b9..0e9c12690c9b8 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -450,10 +450,6 @@ void HashAggregation::close() { groupingSet_.reset(); } -void HashAggregation::abort() { - close(); -} - void HashAggregation::updateEstimatedOutputRowSize() { const auto optionalRowSize = groupingSet_->estimateOutputRowSize(); if (!optionalRowSize.has_value()) { diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index cfa83650ceed4..771dd2ca809d6 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -50,8 +50,6 @@ class HashAggregation : public Operator { void close() override; - void abort() override; - private: void updateRuntimeStats(); diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index bfeb1cd6ff4ee..5e89e0c4237fd 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -120,6 +120,7 @@ HashBuild::HashBuild( tableType_ = ROW(std::move(names), std::move(types)); setupTable(); setupSpiller(); + intermediateStateCleared_ = false; } void HashBuild::initialize() { @@ -782,7 +783,14 @@ bool HashBuild::finishHashBuild() { return true; } } - numRows += build->table_->rows()->numRows(); + { + std::lock_guard l(build->intermediateStateMutex_); + VELOX_CHECK( + !build->intermediateStateCleared_, + "Intermediate state for a peer is empty. It might have been " + "already closed."); + numRows += build->table_->rows()->numRows(); + } otherBuilds.push_back(build); } @@ -792,12 +800,22 @@ bool HashBuild::finishHashBuild() { otherTables.reserve(peers.size()); SpillPartitionSet spillPartitions; for (auto* build : otherBuilds) { - VELOX_CHECK_NOT_NULL(build->table_); - otherTables.push_back(std::move(build->table_)); - if (build->spiller_ != nullptr) { - build->spiller_->finishSpill(spillPartitions); + std::unique_ptr buildSpiller; + { + std::lock_guard l(build->intermediateStateMutex_); + VELOX_CHECK( + !build->intermediateStateCleared_, + "Intermediate state for a peer is empty. It might have been " + "already closed."); + build->intermediateStateCleared_ = true; + VELOX_CHECK_NOT_NULL(build->table_); + otherTables.push_back(std::move(build->table_)); + buildSpiller = std::move(build->spiller_); + } + if (buildSpiller != nullptr) { + buildSpiller->finishSpill(spillPartitions); } - build->recordSpillStats(); + build->recordSpillStats(buildSpiller.get()); } if (spiller_ != nullptr) { @@ -829,6 +847,7 @@ bool HashBuild::finishHashBuild() { addRuntimeStats(); if (joinBridge_->setHashTable( std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) { + intermediateStateCleared_ = true; spillGroup_->restart(); } @@ -839,8 +858,12 @@ bool HashBuild::finishHashBuild() { } void HashBuild::recordSpillStats() { - if (spiller_ != nullptr) { - const auto spillStats = spiller_->stats(); + recordSpillStats(spiller_.get()); +} + +void HashBuild::recordSpillStats(Spiller* spiller) { + if (spiller != nullptr) { + const auto spillStats = spiller->stats(); VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0); Operator::recordSpillStats(spillStats); } else if (exceededMaxSpillLevelLimit_) { @@ -915,6 +938,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) { setupTable(); setupSpiller(spillInput.spillPartition.get()); + intermediateStateCleared_ = false; // Start to process spill input. processSpillInput(); @@ -1102,7 +1126,9 @@ void HashBuild::reclaim( TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this); - if (spiller_ == nullptr) { + // can another thread call close() while hashbuild is in arbitration and + // reclaim is called on it? + if (exceededMaxSpillLevelLimit_) { // NOTE: we might have reached to the max spill limit. return; } @@ -1116,7 +1142,9 @@ void HashBuild::reclaim( LOG(WARNING) << "Can't reclaim from hash build operator, state_[" << stateName(state_) << "], nonReclaimableSection_[" << nonReclaimableSection_ << "], spiller_[" - << (spiller_->finalized() ? "finalized" : "non-finalized") + << (intermediateStateCleared_ || spiller_->finalized() + ? "finalized" + : "non-finalized") << "] " << pool()->name() << ", usage: " << succinctBytes(pool()->currentBytes()); return; @@ -1194,17 +1222,31 @@ void HashBuild::reclaim( } bool HashBuild::nonReclaimableState() const { + // Apart from being in the nonReclaimable section, + // its also not reclaimable if: + // 1) the hash table has been built by the last build thread (inidicated + // by state_) + // 2) the last build operator has transferred ownership of 'this' operator's + // intermediate state (table_ and spiller_) to itself + // 3) it has completed spilling before reaching either of the previous + // two states. return ((state_ != State::kRunning) && (state_ != State::kWaitForBuild) && (state_ != State::kYield)) || - nonReclaimableSection_ || spiller_->finalized(); + nonReclaimableSection_ || intermediateStateCleared_ || + spiller_->finalized(); } -void HashBuild::abort() { - Operator::abort(); +void HashBuild::close() { + Operator::close(); - // Free up major memory usage. - joinBridge_.reset(); - spiller_.reset(); - table_.reset(); + { + // Free up major memory usage. Gate access to them as they can be accessed + // by the last build thread that finishes building the hash table. + std::lock_guard l(intermediateStateMutex_); + intermediateStateCleared_ = true; + joinBridge_.reset(); + spiller_.reset(); + table_.reset(); + } } } // namespace facebook::velox::exec diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 1f09f85f6752a..6359569fccfdc 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -85,7 +85,7 @@ class HashBuild final : public Operator { void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) override; - void abort() override; + void close() override; private: void setState(State state); @@ -123,6 +123,7 @@ class HashBuild final : public Operator { } void recordSpillStats(); + void recordSpillStats(Spiller* spiller); // Indicates if the input is read from spill data or not. bool isInputFromSpill() const; @@ -267,6 +268,19 @@ class HashBuild final : public Operator { // The row type used for hash table build and disk spilling. RowTypePtr tableType_; + // Used to serialize access to intermediate state variables (like 'table_' and + // 'spiller_'). This is only required when variables are accessed + // concurrently, that is, when a thread tries to close the operator while + // another thread is building the hash table. Refer to 'close()' and + // finishHashBuild()' for more details. + std::mutex intermediateStateMutex_; + + // Indicates if the intermediate state ('table_' and 'spiller_') has + // been cleared. This can happen either when the operator is closed or when + // the last hash build operator transfers ownership of them to itself while + // building the final hash table. + bool intermediateStateCleared_{false}; + // Container for the rows being accumulated. std::unique_ptr table_; @@ -305,6 +319,9 @@ class HashBuild final : public Operator { uint64_t numSpillRows_{0}; uint64_t numSpillBytes_{0}; + // This can be nullptr if either spilling is not allowed or it has been + // trsnaferred to the last hash build operator while in kWaitForBuild state or + // it has been cleared to setup a new one for recursive spilling. std::unique_ptr spiller_; // Used to read input from previously spilled data for restoring. diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 228358c350a9f..9cf5990a27dca 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -1429,8 +1429,8 @@ void HashProbe::setRunning() { setState(ProbeOperatorState::kRunning); } -void HashProbe::abort() { - Operator::abort(); +void HashProbe::close() { + Operator::close(); // Free up major memory usage. joinBridge_.reset(); diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index afa0218797056..3f54f84e77a61 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -66,7 +66,7 @@ class HashProbe : public Operator { return false; } - void abort() override; + void close() override; void clearDynamicFilters() override; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 4f1804e372da7..88fecdf3f866e 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -642,7 +642,7 @@ void Operator::MemoryReclaimer::abort( driver->state().isTerminated); VELOX_CHECK(driver->task()->isCancelled()); - // Calls operator abort to free up major memory usage. - op_->abort(); + // Calls operator close to free up major memory usage. + op_->close(); } } // namespace facebook::velox::exec diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 030e06fbc7647..b00dda0d958be 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -424,15 +424,6 @@ class Operator : public BaseRuntimeStatWriter { operatorCtx_->pool()->release(); } - /// Invoked by memory arbitrator to free up operator's resource immediately on - /// memory abort, and the query will stop running after this call. - /// - /// NOTE: we don't expect any access to this operator except close method - /// call. - virtual void abort() { - close(); - } - // Returns true if 'this' never has more output rows than input rows. virtual bool isFilter() const { return false; diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index f78b4517e6713..cdf96545b757d 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -104,8 +104,8 @@ RowVectorPtr OrderBy::getOutput() { return output; } -void OrderBy::abort() { - Operator::abort(); +void OrderBy::close() { + Operator::close(); sortBuffer_.reset(); } diff --git a/velox/exec/OrderBy.h b/velox/exec/OrderBy.h index 7f06d2228528c..850eda5742074 100644 --- a/velox/exec/OrderBy.h +++ b/velox/exec/OrderBy.h @@ -60,7 +60,7 @@ class OrderBy : public Operator { void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) override; - void abort() override; + void close() override; private: // Invoked to record the spilling stats in operator stats after processing all