Skip to content

Commit

Permalink
Centralize join probe/build table spill to join bridge (facebookincub…
Browse files Browse the repository at this point in the history
…ator#11294)

Summary:
Consolidates table spill logic from hash probe and hash build to a centralized place, hash join bridge. This effort will reduce duplicate and unnecessary code, as well as making reclaiming from higher level possible (directly from join bridge under some circumstances).

Pull Request resolved: facebookincubator#11294

Reviewed By: xiaoxmeng

Differential Revision: D64568888

Pulled By: tanjialiang

fbshipit-source-id: b532f778b412362ccc3a33233eaa0eb0bfa404c3
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Oct 22, 2024
1 parent abc563c commit bb3b7ca
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 214 deletions.
64 changes: 11 additions & 53 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,12 @@ HashBuild::HashBuild(

const auto numKeys = joinNode_->rightKeys().size();
keyChannels_.reserve(numKeys);
std::vector<std::string> names;
names.reserve(inputType->size());
std::vector<TypePtr> types;
types.reserve(inputType->size());

for (int i = 0; i < numKeys; ++i) {
auto& key = joinNode_->rightKeys()[i];
auto channel = exprToChannel(key.get(), inputType);
keyChannelMap_[channel] = i;
keyChannels_.emplace_back(channel);
names.emplace_back(inputType->nameOf(channel));
types.emplace_back(inputType->childAt(channel));
}

// Identify the non-key build side columns and make a decoder for each.
Expand All @@ -106,12 +100,10 @@ HashBuild::HashBuild(
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
}
}

tableType_ = ROW(std::move(names), std::move(types));
tableType_ = hashJoinTableType(joinNode_);
setupTable();
setupSpiller();
stateCleared_ = false;
Expand Down Expand Up @@ -1053,13 +1045,13 @@ void HashBuild::reclaim(
VELOX_CHECK_NOT_NULL(driver);
VELOX_CHECK(!nonReclaimableSection_);

const auto* config = spillConfig();
VELOX_CHECK_NOT_NULL(config);
if (UNLIKELY(exceededMaxSpillLevelLimit_)) {
// 'canReclaim()' already checks the spill limit is not exceeding max, there
// is only a small chance from the time 'canReclaim()' is checked to the
// actual reclaim happens that the operator has spilled such that the spill
// level exceeds max.
const auto* config = spillConfig();
VELOX_CHECK_NOT_NULL(config);
LOG(WARNING)
<< "Can't reclaim from hash build operator, exceeded maximum spill "
"level of "
Expand Down Expand Up @@ -1089,6 +1081,7 @@ void HashBuild::reclaim(
VELOX_CHECK(task->pauseRequested());
const std::vector<Operator*> operators =
task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this);

for (auto* op : operators) {
HashBuild* buildOp = dynamic_cast<HashBuild*>(op);
VELOX_CHECK_NOT_NULL(buildOp);
Expand All @@ -1106,53 +1099,18 @@ void HashBuild::reclaim(
}
}

struct SpillResult {
const std::exception_ptr error{nullptr};

explicit SpillResult(std::exception_ptr _error) : error(_error) {}
};

std::vector<std::shared_ptr<AsyncSource<SpillResult>>> spillTasks;
auto* spillExecutor = spillConfig()->executor;
std::vector<Spiller*> spillers;
for (auto* op : operators) {
HashBuild* buildOp = static_cast<HashBuild*>(op);
spillTasks.push_back(
memory::createAsyncMemoryReclaimTask<SpillResult>([buildOp]() {
try {
buildOp->spiller_->spill();
buildOp->table_->clear(true);
// Release the minimum reserved memory.
buildOp->pool()->release();
return std::make_unique<SpillResult>(nullptr);
} catch (const std::exception& e) {
LOG(ERROR) << "Spill from hash build pool "
<< buildOp->pool()->name() << " failed: " << e.what();
// The exception is captured and thrown by the caller.
return std::make_unique<SpillResult>(std::current_exception());
}
}));
if ((operators.size() > 1) && (spillExecutor != nullptr)) {
spillExecutor->add([source = spillTasks.back()]() { source->prepare(); });
}
spillers.push_back(buildOp->spiller_.get());
}

auto syncGuard = folly::makeGuard([&]() {
for (auto& spillTask : spillTasks) {
// We consume the result for the pending tasks. This is a cleanup in the
// guard and must not throw. The first error is already captured before
// this runs.
try {
spillTask->move();
} catch (const std::exception&) {
}
}
});
spillHashJoinTable(spillers, config);

for (auto& spillTask : spillTasks) {
const auto result = spillTask->move();
if (result->error) {
std::rethrow_exception(result->error);
}
for (auto* op : operators) {
HashBuild* buildOp = static_cast<HashBuild*>(op);
buildOp->table_->clear(true);
buildOp->pool()->release();
}
}

Expand Down
18 changes: 9 additions & 9 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@

namespace facebook::velox::exec {

// Builds a hash table for use in HashProbe. This is the final
// Operator in a build side Driver. The build side pipeline has
// multiple Drivers, each with its own HashBuild. The build finishes
// when the last Driver of the build pipeline finishes. Hence finishHashBuild()
// has a barrier where the last one to enter gathers the data
// accumulated by the other Drivers and makes the join hash
// table. This table is then passed to the probe side pipeline via
// JoinBridge. After this, all build side Drivers finish and free
// their state.
/// Builds a hash table for use in HashProbe. This is the final
/// Operator in a build side Driver. The build side pipeline has
/// multiple Drivers, each with its own HashBuild. The build finishes
/// when the last Driver of the build pipeline finishes. Hence finishHashBuild()
/// has a barrier where the last one to enter gathers the data
/// accumulated by the other Drivers and makes the join hash
/// table. This table is then passed to the probe side pipeline via
/// JoinBridge. After this, all build side Drivers finish and free
/// their state.
class HashBuild final : public Operator {
public:
/// Define the internal execution state for hash build.
Expand Down
152 changes: 152 additions & 0 deletions velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,43 @@
*/

#include "velox/exec/HashJoinBridge.h"
#include "velox/common/memory/MemoryArbitrator.h"

namespace facebook::velox::exec {
namespace {
static const char* kSpillProbedFlagColumnName = "__probedFlag";
}

RowTypePtr hashJoinTableType(
const std::shared_ptr<const core::HashJoinNode>& joinNode) {
const auto inputType = joinNode->sources()[1]->outputType();
const auto numKeys = joinNode->rightKeys().size();

std::vector<std::string> names;
names.reserve(inputType->size());
std::vector<TypePtr> types;
types.reserve(inputType->size());
std::unordered_set<uint32_t> keyChannelSet;
keyChannelSet.reserve(inputType->size());

for (int i = 0; i < numKeys; ++i) {
auto& key = joinNode->rightKeys()[i];
auto channel = exprToChannel(key.get(), inputType);
keyChannelSet.insert(channel);
names.emplace_back(inputType->nameOf(channel));
types.emplace_back(inputType->childAt(channel));
}

for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelSet.find(i) == keyChannelSet.end()) {
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
}
}

return ROW(std::move(names), std::move(types));
}

void HashJoinBridge::start() {
std::lock_guard<std::mutex> l(mutex_);
started_ = true;
Expand All @@ -33,6 +64,127 @@ void HashJoinBridge::addBuilder() {
++numBuilders_;
}

namespace {
// Create spiller for spilling the row container from one of the sub-table from
// 'table' to parallelize the table spilling. The function spills all the rows
// from the row container and returns the spiller for the caller to collect the
// spilled partitions and stats.
std::unique_ptr<Spiller> createSpiller(
RowContainer* subTableRows,
core::JoinType joinType,
const RowTypePtr& tableType,
const HashBitRange& hashBitRange,
const common::SpillConfig* spillConfig,
folly::Synchronized<common::SpillStats>* stats) {
return std::make_unique<Spiller>(
Spiller::Type::kHashJoinBuild,
joinType,
subTableRows,
hashJoinTableSpillType(tableType, joinType),
hashBitRange,
spillConfig,
stats);
}
} // namespace

std::vector<std::unique_ptr<HashJoinTableSpillResult>> spillHashJoinTable(
const std::vector<Spiller*>& spillers,
const common::SpillConfig* spillConfig) {
VELOX_CHECK_NOT_NULL(spillConfig);
auto spillExecutor = spillConfig->executor;
std::vector<std::shared_ptr<AsyncSource<HashJoinTableSpillResult>>>
spillTasks;
for (auto* spiller : spillers) {
spillTasks.push_back(
memory::createAsyncMemoryReclaimTask<HashJoinTableSpillResult>(
[spiller]() {
try {
spiller->spill();
return std::make_unique<HashJoinTableSpillResult>(spiller);
} catch (const std::exception& e) {
LOG(ERROR) << "Spill from hash join bridge failed: "
<< e.what();
// The exception is captured and thrown by the caller.
return std::make_unique<HashJoinTableSpillResult>(
std::current_exception());
}
}));
if ((spillTasks.size() > 1) && (spillExecutor != nullptr)) {
spillExecutor->add([source = spillTasks.back()]() { source->prepare(); });
}
}

auto syncGuard = folly::makeGuard([&]() {
for (auto& spillTask : spillTasks) {
// We consume the result for the pending tasks. This is a cleanup in the
// guard and must not throw. The first error is already captured before
// this runs.
try {
spillTask->move();
} catch (const std::exception&) {
}
}
});

std::vector<std::unique_ptr<HashJoinTableSpillResult>> spillResults;
for (auto& spillTask : spillTasks) {
auto result = spillTask->move();
if (result->error) {
std::rethrow_exception(result->error);
}
spillResults.push_back(std::move(result));
}
return spillResults;
}

SpillPartitionSet spillHashJoinTable(
std::shared_ptr<BaseHashTable> table,
const HashBitRange& hashBitRange,
const std::shared_ptr<const core::HashJoinNode>& joinNode,
const common::SpillConfig* spillConfig,
folly::Synchronized<common::SpillStats>* stats) {
VELOX_CHECK_NOT_NULL(table);
VELOX_CHECK_NOT_NULL(spillConfig);
if (table->numDistinct() == 0) {
// Empty build side.
return {};
}

std::vector<std::unique_ptr<Spiller>> spillersHolder;
std::vector<Spiller*> spillers;
const auto rowContainers = table->allRows();
const auto tableType = hashJoinTableType(joinNode);
for (auto* rowContainer : rowContainers) {
if (rowContainer->numRows() == 0) {
continue;
}
spillersHolder.push_back(createSpiller(
rowContainer,
joinNode->joinType(),
tableType,
hashBitRange,
spillConfig,
stats));
spillers.push_back(spillersHolder.back().get());
}
if (spillersHolder.empty()) {
return {};
}

const auto spillResults = spillHashJoinTable(spillers, spillConfig);

SpillPartitionSet spillPartitions;
for (const auto& spillResult : spillResults) {
VELOX_CHECK_NULL(spillResult->error);
spillResult->spiller->finishSpill(spillPartitions);
}

// Remove the spilled partitions which are empty so as we don't need to
// trigger unnecessary spilling at hash probe side.
removeEmptyPartitions(spillPartitions);
return spillPartitions;
}

void HashJoinBridge::setHashTable(
std::unique_ptr<BaseHashTable> table,
SpillPartitionSet spillPartitionSet,
Expand Down
Loading

0 comments on commit bb3b7ca

Please sign in to comment.