Skip to content

Commit

Permalink
Consolidate Operator's close and abort APIs (facebookincubator#8757)
Browse files Browse the repository at this point in the history
Summary:

The close() and abort() APIs share the same objective: to release
resources held by the operator. However, they are not currently
implemented uniformly across all operators. For example, HashBuild
and HashProbe have abort() implemented, but not close(). This
discrepancy can lead to inconsistencies in the expected effects of
these API calls, particularly in the context of their usage.
For instance, when a driver is destroyed, it calls close() on all its
operators before detaching itself from its parent task. All operators,
with the exception of HashBuild and HashProbe, would have their
resources released. The latter, however, would rely on their
destructor being called, which could occur at any later point.
The detachment of the driver from the task serves as a synchronization
point. If we now rely on the destructor being called later, this
introduces an element of indeterminism to the state of the resources.
This unpredictability makes it difficult for memory management to
make decisions during arbitration.
This change aims to eliminate the abort() API and consolidate its
functionality into close().

Additionally, it serializer access to HashBuild's internal state
(table and spiller) to handle the case where it can be concurrently
cleared by the task thread closing the operator and the being read by
the last hash build operator attempting to build the hash table by
fetching this internal state from all its peers.

Differential Revision: D53818809
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Feb 21, 2024
1 parent 032358e commit 371d4ba
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 41 deletions.
4 changes: 0 additions & 4 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,6 @@ void HashAggregation::close() {
groupingSet_.reset();
}

void HashAggregation::abort() {
close();
}

void HashAggregation::updateEstimatedOutputRowSize() {
const auto optionalRowSize = groupingSet_->estimateOutputRowSize();
if (!optionalRowSize.has_value()) {
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class HashAggregation : public Operator {

void close() override;

void abort() override;

private:
void updateRuntimeStats();

Expand Down
76 changes: 59 additions & 17 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ HashBuild::HashBuild(
tableType_ = ROW(std::move(names), std::move(types));
setupTable();
setupSpiller();
intermediateStateCleared_ = false;
}

void HashBuild::initialize() {
Expand Down Expand Up @@ -782,7 +783,14 @@ bool HashBuild::finishHashBuild() {
return true;
}
}
numRows += build->table_->rows()->numRows();
{
std::lock_guard<std::mutex> l(build->intermediateStateMutex_);
VELOX_CHECK(
!build->intermediateStateCleared_,
"Intermediate state for a peer is empty. It might have been "
"already closed.");
numRows += build->table_->rows()->numRows();
}
otherBuilds.push_back(build);
}

Expand All @@ -792,12 +800,22 @@ bool HashBuild::finishHashBuild() {
otherTables.reserve(peers.size());
SpillPartitionSet spillPartitions;
for (auto* build : otherBuilds) {
VELOX_CHECK_NOT_NULL(build->table_);
otherTables.push_back(std::move(build->table_));
if (build->spiller_ != nullptr) {
build->spiller_->finishSpill(spillPartitions);
std::unique_ptr<Spiller> buildSpiller;
{
std::lock_guard<std::mutex> l(build->intermediateStateMutex_);
VELOX_CHECK(
!build->intermediateStateCleared_,
"Intermediate state for a peer is empty. It might have been "
"already closed.");
build->intermediateStateCleared_ = true;
VELOX_CHECK_NOT_NULL(build->table_);
otherTables.push_back(std::move(build->table_));
buildSpiller = std::move(build->spiller_);
}
if (buildSpiller != nullptr) {
buildSpiller->finishSpill(spillPartitions);
}
build->recordSpillStats();
build->recordSpillStats(buildSpiller.get());
}

if (spiller_ != nullptr) {
Expand Down Expand Up @@ -829,6 +847,7 @@ bool HashBuild::finishHashBuild() {
addRuntimeStats();
if (joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) {
intermediateStateCleared_ = true;
spillGroup_->restart();
}

Expand All @@ -839,8 +858,12 @@ bool HashBuild::finishHashBuild() {
}

void HashBuild::recordSpillStats() {
if (spiller_ != nullptr) {
const auto spillStats = spiller_->stats();
recordSpillStats(spiller_.get());
}

void HashBuild::recordSpillStats(Spiller* spiller) {
if (spiller != nullptr) {
const auto spillStats = spiller->stats();
VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0);
Operator::recordSpillStats(spillStats);
} else if (exceededMaxSpillLevelLimit_) {
Expand Down Expand Up @@ -915,6 +938,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {

setupTable();
setupSpiller(spillInput.spillPartition.get());
intermediateStateCleared_ = false;

// Start to process spill input.
processSpillInput();
Expand Down Expand Up @@ -1102,7 +1126,9 @@ void HashBuild::reclaim(

TestValue::adjust("facebook::velox::exec::HashBuild::reclaim", this);

if (spiller_ == nullptr) {
// can another thread call close() while hashbuild is in arbitration and
// reclaim is called on it?
if (exceededMaxSpillLevelLimit_) {
// NOTE: we might have reached to the max spill limit.
return;
}
Expand All @@ -1116,7 +1142,9 @@ void HashBuild::reclaim(
LOG(WARNING) << "Can't reclaim from hash build operator, state_["
<< stateName(state_) << "], nonReclaimableSection_["
<< nonReclaimableSection_ << "], spiller_["
<< (spiller_->finalized() ? "finalized" : "non-finalized")
<< (intermediateStateCleared_ || spiller_->finalized()
? "finalized"
: "non-finalized")
<< "] " << pool()->name()
<< ", usage: " << succinctBytes(pool()->currentBytes());
return;
Expand Down Expand Up @@ -1194,17 +1222,31 @@ void HashBuild::reclaim(
}

bool HashBuild::nonReclaimableState() const {
// Apart from being in the nonReclaimable section,
// its also not reclaimable if:
// 1) the hash table has been built by the last build thread (inidicated
// by state_)
// 2) the last build operator has transferred ownership of 'this' operator's
// intermediate state (table_ and spiller_) to itself
// 3) it has completed spilling before reaching either of the previous
// two states.
return ((state_ != State::kRunning) && (state_ != State::kWaitForBuild) &&
(state_ != State::kYield)) ||
nonReclaimableSection_ || spiller_->finalized();
nonReclaimableSection_ || intermediateStateCleared_ ||
spiller_->finalized();
}

void HashBuild::abort() {
Operator::abort();
void HashBuild::close() {
Operator::close();

// Free up major memory usage.
joinBridge_.reset();
spiller_.reset();
table_.reset();
{
// Free up major memory usage. Gate access to them as they can be accessed
// by the last build thread that finishes building the hash table.
std::lock_guard<std::mutex> l(intermediateStateMutex_);
intermediateStateCleared_ = true;
joinBridge_.reset();
spiller_.reset();
table_.reset();
}
}
} // namespace facebook::velox::exec
19 changes: 18 additions & 1 deletion velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class HashBuild final : public Operator {
void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override;

void abort() override;
void close() override;

private:
void setState(State state);
Expand Down Expand Up @@ -123,6 +123,7 @@ class HashBuild final : public Operator {
}

void recordSpillStats();
void recordSpillStats(Spiller* spiller);

// Indicates if the input is read from spill data or not.
bool isInputFromSpill() const;
Expand Down Expand Up @@ -267,6 +268,19 @@ class HashBuild final : public Operator {
// The row type used for hash table build and disk spilling.
RowTypePtr tableType_;

// Used to serialize access to intermediate state variables (like 'table_' and
// 'spiller_'). This is only required when variables are accessed
// concurrently, that is, when a thread tries to close the operator while
// another thread is building the hash table. Refer to 'close()' and
// finishHashBuild()' for more details.
std::mutex intermediateStateMutex_;

// Indicates if the intermediate state ('table_' and 'spiller_') has
// been cleared. This can happen either when the operator is closed or when
// the last hash build operator transfers ownership of them to itself while
// building the final hash table.
bool intermediateStateCleared_{false};

// Container for the rows being accumulated.
std::unique_ptr<BaseHashTable> table_;

Expand Down Expand Up @@ -305,6 +319,9 @@ class HashBuild final : public Operator {
uint64_t numSpillRows_{0};
uint64_t numSpillBytes_{0};

// This can be nullptr if either spilling is not allowed or it has been
// trsnaferred to the last hash build operator while in kWaitForBuild state or
// it has been cleared to setup a new one for recursive spilling.
std::unique_ptr<Spiller> spiller_;

// Used to read input from previously spilled data for restoring.
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1429,8 +1429,8 @@ void HashProbe::setRunning() {
setState(ProbeOperatorState::kRunning);
}

void HashProbe::abort() {
Operator::abort();
void HashProbe::close() {
Operator::close();

// Free up major memory usage.
joinBridge_.reset();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class HashProbe : public Operator {
return false;
}

void abort() override;
void close() override;

void clearDynamicFilters() override;

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ void Operator::MemoryReclaimer::abort(
driver->state().isTerminated);
VELOX_CHECK(driver->task()->isCancelled());

// Calls operator abort to free up major memory usage.
op_->abort();
// Calls operator close to free up major memory usage.
op_->close();
}
} // namespace facebook::velox::exec
9 changes: 0 additions & 9 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,15 +424,6 @@ class Operator : public BaseRuntimeStatWriter {
operatorCtx_->pool()->release();
}

/// Invoked by memory arbitrator to free up operator's resource immediately on
/// memory abort, and the query will stop running after this call.
///
/// NOTE: we don't expect any access to this operator except close method
/// call.
virtual void abort() {
close();
}

// Returns true if 'this' never has more output rows than input rows.
virtual bool isFilter() const {
return false;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ RowVectorPtr OrderBy::getOutput() {
return output;
}

void OrderBy::abort() {
Operator::abort();
void OrderBy::close() {
Operator::close();
sortBuffer_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/OrderBy.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class OrderBy : public Operator {
void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats)
override;

void abort() override;
void close() override;

private:
// Invoked to record the spilling stats in operator stats after processing all
Expand Down

0 comments on commit 371d4ba

Please sign in to comment.