From 371d4baa7115e1e4cbb9d4db9ede6f24d2363ff4 Mon Sep 17 00:00:00 2001 From: Bikramjeet Vig Date: Wed, 21 Feb 2024 14:00:48 -0800 Subject: [PATCH] Consolidate Operator's close and abort APIs (#8757) Summary: The close() and abort() APIs share the same objective: to release resources held by the operator. However, they are not currently implemented uniformly across all operators. For example, HashBuild and HashProbe have abort() implemented, but not close(). This discrepancy can lead to inconsistencies in the expected effects of these API calls, particularly in the context of their usage. For instance, when a driver is destroyed, it calls close() on all its operators before detaching itself from its parent task. All operators, with the exception of HashBuild and HashProbe, would have their resources released. The latter, however, would rely on their destructor being called, which could occur at any later point. The detachment of the driver from the task serves as a synchronization point. If we now rely on the destructor being called later, this introduces an element of indeterminism to the state of the resources. This unpredictability makes it difficult for memory management to make decisions during arbitration. This change aims to eliminate the abort() API and consolidate its functionality into close(). Additionally, it serializer access to HashBuild's internal state (table and spiller) to handle the case where it can be concurrently cleared by the task thread closing the operator and the being read by the last hash build operator attempting to build the hash table by fetching this internal state from all its peers. Differential Revision: D53818809 --- velox/exec/HashAggregation.cpp | 4 -- velox/exec/HashAggregation.h | 2 - velox/exec/HashBuild.cpp | 76 ++++++++++++++++++++++++++-------- velox/exec/HashBuild.h | 19 ++++++++- velox/exec/HashProbe.cpp | 4 +- velox/exec/HashProbe.h | 2 +- velox/exec/Operator.cpp | 4 +- velox/exec/Operator.h | 9 ---- velox/exec/OrderBy.cpp | 4 +- velox/exec/OrderBy.h | 2 +- 10 files changed, 85 insertions(+), 41 deletions(-) diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 41cdc7eb034b9..0e9c12690c9b8 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -450,10 +450,6 @@ void HashAggregation::close() { groupingSet_.reset(); } -void HashAggregation::abort() { - close(); -} - void HashAggregation::updateEstimatedOutputRowSize() { const auto optionalRowSize = groupingSet_->estimateOutputRowSize(); if (!optionalRowSize.has_value()) { diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index cfa83650ceed4..771dd2ca809d6 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -50,8 +50,6 @@ class HashAggregation : public Operator { void close() override; - void abort() override; - private: void updateRuntimeStats(); diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index bfeb1cd6ff4ee..5e89e0c4237fd 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -120,6 +120,7 @@ HashBuild::HashBuild( tableType_ = ROW(std::move(names), std::move(types)); setupTable(); setupSpiller(); + intermediateStateCleared_ = false; } void HashBuild::initialize() { @@ -782,7 +783,14 @@ bool HashBuild::finishHashBuild() { return true; } } - numRows += build->table_->rows()->numRows(); + { + std::lock_guard l(build->intermediateStateMutex_); + VELOX_CHECK( + !build->intermediateStateCleared_, + "Intermediate state for a peer is empty. It might have been " + "already closed."); + numRows += build->table_->rows()->numRows(); + } otherBuilds.push_back(build); } @@ -792,12 +800,22 @@ bool HashBuild::finishHashBuild() { otherTables.reserve(peers.size()); SpillPartitionSet spillPartitions; for (auto* build : otherBuilds) { - VELOX_CHECK_NOT_NULL(build->table_); - otherTables.push_back(std::move(build->table_)); - if (build->spiller_ != nullptr) { - build->spiller_->finishSpill(spillPartitions); + std::unique_ptr buildSpiller; + { + std::lock_guard l(build->intermediateStateMutex_); + VELOX_CHECK( + !build->intermediateStateCleared_, + "Intermediate state for a peer is empty. It might have been " + "already closed."); + build->intermediateStateCleared_ = true; + VELOX_CHECK_NOT_NULL(build->table_); + otherTables.push_back(std::move(build->table_)); + buildSpiller = std::move(build->spiller_); + } + if (buildSpiller != nullptr) { + buildSpiller->finishSpill(spillPartitions); } - build->recordSpillStats(); + build->recordSpillStats(buildSpiller.get()); } if (spiller_ != nullptr) { @@ -829,6 +847,7 @@ bool HashBuild::finishHashBuild() { addRuntimeStats(); if (joinBridge_->setHashTable( std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) { + intermediateStateCleared_ = true; spillGroup_->restart(); } @@ -839,8 +858,12 @@ bool HashBuild::finishHashBuild() { } void HashBuild::recordSpillStats() { - if (spiller_ != nullptr) { - const auto spillStats = spiller_->stats(); + recordSpillStats(spiller_.get()); +} + +void HashBuild::recordSpillStats(Spiller* spiller) { + if (spiller != nullptr) { + const auto spillStats = spiller->stats(); VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0); Operator::recordSpillStats(spillStats); } else if (exceededMaxSpillLevelLimit_) { @@ -915,6 +938,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) { setupTable(); setupSpiller(spillInput.spillPartition.get()); + intermediateStateCleared_ = false; // Start to process spill input. processSpillInput(); @@ -1102,7 +1126,9 @@ void HashBuild::reclaim( TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this); - if (spiller_ == nullptr) { + // can another thread call close() while hashbuild is in arbitration and + // reclaim is called on it? + if (exceededMaxSpillLevelLimit_) { // NOTE: we might have reached to the max spill limit. return; } @@ -1116,7 +1142,9 @@ void HashBuild::reclaim( LOG(WARNING) << "Can't reclaim from hash build operator, state_[" << stateName(state_) << "], nonReclaimableSection_[" << nonReclaimableSection_ << "], spiller_[" - << (spiller_->finalized() ? "finalized" : "non-finalized") + << (intermediateStateCleared_ || spiller_->finalized() + ? "finalized" + : "non-finalized") << "] " << pool()->name() << ", usage: " << succinctBytes(pool()->currentBytes()); return; @@ -1194,17 +1222,31 @@ void HashBuild::reclaim( } bool HashBuild::nonReclaimableState() const { + // Apart from being in the nonReclaimable section, + // its also not reclaimable if: + // 1) the hash table has been built by the last build thread (inidicated + // by state_) + // 2) the last build operator has transferred ownership of 'this' operator's + // intermediate state (table_ and spiller_) to itself + // 3) it has completed spilling before reaching either of the previous + // two states. return ((state_ != State::kRunning) && (state_ != State::kWaitForBuild) && (state_ != State::kYield)) || - nonReclaimableSection_ || spiller_->finalized(); + nonReclaimableSection_ || intermediateStateCleared_ || + spiller_->finalized(); } -void HashBuild::abort() { - Operator::abort(); +void HashBuild::close() { + Operator::close(); - // Free up major memory usage. - joinBridge_.reset(); - spiller_.reset(); - table_.reset(); + { + // Free up major memory usage. Gate access to them as they can be accessed + // by the last build thread that finishes building the hash table. + std::lock_guard l(intermediateStateMutex_); + intermediateStateCleared_ = true; + joinBridge_.reset(); + spiller_.reset(); + table_.reset(); + } } } // namespace facebook::velox::exec diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 1f09f85f6752a..6359569fccfdc 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -85,7 +85,7 @@ class HashBuild final : public Operator { void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) override; - void abort() override; + void close() override; private: void setState(State state); @@ -123,6 +123,7 @@ class HashBuild final : public Operator { } void recordSpillStats(); + void recordSpillStats(Spiller* spiller); // Indicates if the input is read from spill data or not. bool isInputFromSpill() const; @@ -267,6 +268,19 @@ class HashBuild final : public Operator { // The row type used for hash table build and disk spilling. RowTypePtr tableType_; + // Used to serialize access to intermediate state variables (like 'table_' and + // 'spiller_'). This is only required when variables are accessed + // concurrently, that is, when a thread tries to close the operator while + // another thread is building the hash table. Refer to 'close()' and + // finishHashBuild()' for more details. + std::mutex intermediateStateMutex_; + + // Indicates if the intermediate state ('table_' and 'spiller_') has + // been cleared. This can happen either when the operator is closed or when + // the last hash build operator transfers ownership of them to itself while + // building the final hash table. + bool intermediateStateCleared_{false}; + // Container for the rows being accumulated. std::unique_ptr table_; @@ -305,6 +319,9 @@ class HashBuild final : public Operator { uint64_t numSpillRows_{0}; uint64_t numSpillBytes_{0}; + // This can be nullptr if either spilling is not allowed or it has been + // trsnaferred to the last hash build operator while in kWaitForBuild state or + // it has been cleared to setup a new one for recursive spilling. std::unique_ptr spiller_; // Used to read input from previously spilled data for restoring. diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 228358c350a9f..9cf5990a27dca 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -1429,8 +1429,8 @@ void HashProbe::setRunning() { setState(ProbeOperatorState::kRunning); } -void HashProbe::abort() { - Operator::abort(); +void HashProbe::close() { + Operator::close(); // Free up major memory usage. joinBridge_.reset(); diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index afa0218797056..3f54f84e77a61 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -66,7 +66,7 @@ class HashProbe : public Operator { return false; } - void abort() override; + void close() override; void clearDynamicFilters() override; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 4f1804e372da7..88fecdf3f866e 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -642,7 +642,7 @@ void Operator::MemoryReclaimer::abort( driver->state().isTerminated); VELOX_CHECK(driver->task()->isCancelled()); - // Calls operator abort to free up major memory usage. - op_->abort(); + // Calls operator close to free up major memory usage. + op_->close(); } } // namespace facebook::velox::exec diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 030e06fbc7647..b00dda0d958be 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -424,15 +424,6 @@ class Operator : public BaseRuntimeStatWriter { operatorCtx_->pool()->release(); } - /// Invoked by memory arbitrator to free up operator's resource immediately on - /// memory abort, and the query will stop running after this call. - /// - /// NOTE: we don't expect any access to this operator except close method - /// call. - virtual void abort() { - close(); - } - // Returns true if 'this' never has more output rows than input rows. virtual bool isFilter() const { return false; diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index f78b4517e6713..cdf96545b757d 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -104,8 +104,8 @@ RowVectorPtr OrderBy::getOutput() { return output; } -void OrderBy::abort() { - Operator::abort(); +void OrderBy::close() { + Operator::close(); sortBuffer_.reset(); } diff --git a/velox/exec/OrderBy.h b/velox/exec/OrderBy.h index 7f06d2228528c..850eda5742074 100644 --- a/velox/exec/OrderBy.h +++ b/velox/exec/OrderBy.h @@ -60,7 +60,7 @@ class OrderBy : public Operator { void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) override; - void abort() override; + void close() override; private: // Invoked to record the spilling stats in operator stats after processing all