Skip to content

Commit

Permalink
Stream input row to hash table when addInput for left semi and anti join
Browse files Browse the repository at this point in the history
Add benchmark
  • Loading branch information
liujiayi771 committed Nov 24, 2024
1 parent 78d761b commit b6cff3d
Show file tree
Hide file tree
Showing 12 changed files with 712 additions and 26 deletions.
2 changes: 1 addition & 1 deletion velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ DEBUG_ONLY_TEST_P(
folly::EventCount taskPauseWait;
auto taskPauseWaitKey = taskPauseWait.prepareWait();

const auto fakeAllocationSize = kMemoryCapacity - (32L << 20);
const auto fakeAllocationSize = kMemoryCapacity - (2L << 20);

std::atomic<bool> injectAllocationOnce{true};
fakeOperatorFactory_->setAllocationCallback([&](Operator* op) {
Expand Down
14 changes: 14 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class QueryConfig {
static constexpr const char* kAbandonPartialTopNRowNumberMinPct =
"abandon_partial_topn_row_number_min_pct";

static constexpr const char* kAbandonBuildNoDupHashMinRows =
"abandon_build_no_dup_hash_min_rows";

static constexpr const char* kAbandonBuildNoDupHashMinPct =
"abandon_build_no_dup_hash_min_pct";

/// The maximum number of bytes to buffer in PartitionedOutput operator to
/// avoid creating tiny SerializedPages.
///
Expand Down Expand Up @@ -475,6 +481,14 @@ class QueryConfig {
return get<int32_t>(kAbandonPartialTopNRowNumberMinPct, 80);
}

int32_t abandonBuildNoDupHashMinRows() const {
return get<int32_t>(kAbandonBuildNoDupHashMinRows, 100'000);
}

int32_t abandonBuildNoDupHashMinPct() const {
return get<int32_t>(kAbandonBuildNoDupHashMinPct, 80);
}

uint64_t maxSpillRunRows() const {
static constexpr uint64_t kDefault = 12UL << 20;
return get<uint64_t>(kMaxSpillRunRows, kDefault);
Expand Down
8 changes: 8 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ Generic Configuration
- integer
- 80
- Abandons partial TopNRowNumber if number of output rows equals or exceeds this percentage of the number of input rows.
* - abandon_build_no_dup_hash_min_rows
- integer
- 100,000
- Number of input rows to receive before starting to check whether to abandon building a HashTable without duplicates in HashBuild for left semi/anti join.
* - abandon_build_no_dup_hash_min_pct
- integer
- 80
- Abandons building a HashTable without duplicates in HashBuild for left semi/anti join if the percentage of distinct keys in the HashTable exceeds this threshold.
* - session_timezone
- string
-
Expand Down
68 changes: 54 additions & 14 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ HashBuild::HashBuild(
joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked(
operatorCtx_->driverCtx()->splitGroupId,
planNodeId())),
keyChannelMap_(joinNode_->rightKeys().size()) {
keyChannelMap_(joinNode_->rightKeys().size()),
abandonBuildNoDupHashMinRows_(
driverCtx->queryConfig().abandonBuildNoDupHashMinRows()),
abandonBuildNoDupHashMinPct_(
driverCtx->queryConfig().abandonBuildNoDupHashMinPct()) {
VELOX_CHECK(pool()->trackUsage());
VELOX_CHECK_NOT_NULL(joinBridge_);

Expand All @@ -85,9 +89,11 @@ HashBuild::HashBuild(
keyChannels_.emplace_back(channel);
}

dropDuplicates_ = canDropDuplicates(joinNode_);

// Identify the non-key build side columns and make a decoder for each.
const int32_t numDependents = inputType->size() - numKeys;
if (numDependents > 0) {
if (!dropDuplicates_ && numDependents > 0) {
// Number of join keys (numKeys) may be less then number of input columns
// (inputType->size()). In this case numDependents is negative and cannot be
// used to call 'reserve'. This happens when we join different probe side
Expand All @@ -96,10 +102,14 @@ HashBuild::HashBuild(
dependentChannels_.reserve(numDependents);
decoders_.reserve(numDependents);
}
for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
if (!dropDuplicates_) {
// For left semi and anti join with no extra filter, hash table does not
// store dependent columns.
for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
}
}
}

Expand Down Expand Up @@ -147,11 +157,6 @@ void HashBuild::setupTable() {
.minTableRowsForParallelJoinBuild(),
pool());
} else {
// (Left) semi and anti join with no extra filter only needs to know whether
// there is a match. Hence, no need to store entries with duplicate keys.
const bool dropDuplicates = !joinNode_->filter() &&
(joinNode_->isLeftSemiFilterJoin() ||
joinNode_->isLeftSemiProjectJoin() || isAntiJoin(joinType_));
// Right semi join needs to tag build rows that were probed.
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
if (isLeftNullAwareJoinWithFilter(joinNode_)) {
Expand All @@ -160,7 +165,7 @@ void HashBuild::setupTable() {
table_ = HashTable<false>::createForJoin(
std::move(keyHashers),
dependentTypes,
!dropDuplicates, // allowDuplicates
!dropDuplicates_, // allowDuplicates
needProbedFlag, // hasProbedFlag
operatorCtx_->driverCtx()
->queryConfig()
Expand All @@ -171,14 +176,16 @@ void HashBuild::setupTable() {
table_ = HashTable<true>::createForJoin(
std::move(keyHashers),
dependentTypes,
!dropDuplicates, // allowDuplicates
!dropDuplicates_, // allowDuplicates
needProbedFlag, // hasProbedFlag
operatorCtx_->driverCtx()
->queryConfig()
.minTableRowsForParallelJoinBuild(),
pool());
}
}
lookup_ = std::make_unique<HashLookup>(table_->hashers());
lookup_->reset(1);
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;
}

Expand Down Expand Up @@ -373,6 +380,31 @@ void HashBuild::addInput(RowVectorPtr input) {
return;
}

if (dropDuplicates_ && !abandonBuildNoDupHash_) {
const bool abandonEarly = abandonBuildNoDupHashEarly(table_->numDistinct());
numInputRows_ += activeRows_.countSelected();
if (abandonEarly) {
// The hash table is no longer directly constructed in addInput. The data
// that was previously inserted into the hash table is already in the
// RowContainer.
addRuntimeStat("abandonBuildNoDupHash", RuntimeCounter(1));
abandonBuildNoDupHash_ = true;
table_->joinTableMayHaveDuplicates();
} else {
table_->prepareForGroupProbe(
*lookup_,
input,
activeRows_,
BaseHashTable::kNoSpillInputStartPartitionBit);
if (lookup_->rows.empty()) {
return;
}
table_->groupProbe(
*lookup_, BaseHashTable::kNoSpillInputStartPartitionBit);
return;
}
}

if (analyzeKeys_ && hashes_.size() < activeRows_.end()) {
hashes_.resize(activeRows_.end());
}
Expand Down Expand Up @@ -751,7 +783,8 @@ bool HashBuild::finishHashBuild() {
isInputFromSpill() ? spillConfig()->startPartitionBit
: BaseHashTable::kNoSpillInputStartPartitionBit,
allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor()
: nullptr);
: nullptr,
dropDuplicates_);
}
stats_.wlock()->addRuntimeStat(
BaseHashTable::kBuildWallNanos,
Expand Down Expand Up @@ -867,6 +900,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
setupTable();
setupSpiller(spillInput.spillPartition.get());
stateCleared_ = false;
numInputRows_ = 0;

// Start to process spill input.
processSpillInput();
Expand Down Expand Up @@ -1149,4 +1183,10 @@ void HashBuild::close() {
table_.reset();
}
}

bool HashBuild::abandonBuildNoDupHashEarly(int64_t numDistinct) const {
VELOX_CHECK(dropDuplicates_);
return numInputRows_ > abandonBuildNoDupHashMinRows_ &&
100 * numDistinct / numInputRows_ >= abandonBuildNoDupHashMinPct_;
}
} // namespace facebook::velox::exec
22 changes: 22 additions & 0 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ class HashBuild final : public Operator {
// not.
bool nonReclaimableState() const;

// True if we have enough rows and not enough duplicate join keys, i.e. more
// than 'abandonBuildNoDuplicatesHashMinRows_' rows and more than
// 'abandonBuildNoDuplicatesHashMinPct_' % of rows are unique.
bool abandonBuildNoDupHashEarly(int64_t numDistinct) const;

const std::shared_ptr<const core::HashJoinNode> joinNode_;

const core::JoinType joinType_;
Expand Down Expand Up @@ -239,6 +244,7 @@ class HashBuild final : public Operator {

// Container for the rows being accumulated.
std::unique_ptr<BaseHashTable> table_;
std::unique_ptr<HashLookup> lookup_;

// Key channels in 'input_'
std::vector<column_index_t> keyChannels_;
Expand Down Expand Up @@ -267,6 +273,11 @@ class HashBuild final : public Operator {
// at least one entry with null join keys.
bool joinHasNullKeys_{false};

// Indicates whether drop duplicate rows. Rows containing duplicate keys
// can be removed for left semi and anti join.
bool dropDuplicates_{false};
bool abandonBuildNoDupHash_{false};

// The type used to spill hash table which might attach a boolean column to
// record the probed flag if 'needProbedFlagSpill_' is true.
RowTypePtr spillType_;
Expand Down Expand Up @@ -305,6 +316,17 @@ class HashBuild final : public Operator {

// Maps key channel in 'input_' to channel in key.
folly::F14FastMap<column_index_t, column_index_t> keyChannelMap_;

// Count the number of input rows.
int64_t numInputRows_ = 0;

// Minimum number of rows to see before deciding to give up build no
// duplicates hash table.
const int32_t abandonBuildNoDupHashMinRows_;
// Min unique rows pct for give up build no duplicates hash table. If more
// than this many rows are unique, build hash table in addInput phase is not
// worthwhile.
const int32_t abandonBuildNoDupHashMinPct_;
};

inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) {
Expand Down
21 changes: 17 additions & 4 deletions velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ RowTypePtr hashJoinTableType(
types.emplace_back(inputType->childAt(channel));
}

for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelSet.find(i) == keyChannelSet.end()) {
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
if (!canDropDuplicates(joinNode)) {
// For left semi and anti join with no extra filter, hash table does not
// store dependent columns.
for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelSet.find(i) == keyChannelSet.end()) {
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
}
}
}

Expand Down Expand Up @@ -377,6 +381,15 @@ bool isLeftNullAwareJoinWithFilter(
joinNode->isNullAware() && (joinNode->filter() != nullptr);
}

bool canDropDuplicates(
const std::shared_ptr<const core::HashJoinNode>& joinNode) {
// Left semi and anti join with no extra filter only needs to know whether
// there is a match. Hence, no need to store entries with duplicate keys.
return !joinNode->filter() &&
(joinNode->isLeftSemiFilterJoin() || joinNode->isLeftSemiProjectJoin() ||
joinNode->isAntiJoin());
}

uint64_t HashJoinMemoryReclaimer::reclaim(
memory::MemoryPool* pool,
uint64_t targetBytes,
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ class HashJoinBridge : public JoinBridge {
bool isLeftNullAwareJoinWithFilter(
const std::shared_ptr<const core::HashJoinNode>& joinNode);

// Indicates if 'joinNode' can drop duplicate rows with same join key. For left
// semi and anti join, it is not necessary to store duplicate rows.
bool canDropDuplicates(
const std::shared_ptr<const core::HashJoinNode>& joinNode);

class HashJoinMemoryReclaimer final : public MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
Expand Down
26 changes: 23 additions & 3 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ HashTable<ignoreNullKeys>::HashTable(
memory::MemoryPool* pool)
: BaseHashTable(std::move(hashers)),
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
isJoinBuild_(isJoinBuild) {
isJoinBuild_(isJoinBuild),
joinBuildNoDuplicates_(!allowDuplicates) {
std::vector<TypePtr> keys;
for (auto& hasher : hashers_) {
keys.push_back(hasher->type());
Expand Down Expand Up @@ -1527,7 +1528,9 @@ void HashTable<ignoreNullKeys>::decideHashMode(
return;
}
disableRangeArrayHash_ |= disableRangeArrayHash;
if (numDistinct_ && !isJoinBuild_) {
if (numDistinct_ && (!isJoinBuild_ || joinBuildNoDuplicates_)) {
// If the join type is left semi and anti, allowDuplicates_ will be false,
// and join build is building hash table while adding input rows.
if (!analyze()) {
setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit);
return;
Expand Down Expand Up @@ -1746,8 +1749,20 @@ template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
int8_t spillInputStartPartitionBit,
folly::Executor* executor) {
folly::Executor* executor,
bool dropDuplicates) {
buildExecutor_ = executor;
if (dropDuplicates) {
if (table_ != nullptr) {
// Reset table_ and capacity_ to trigger rehash.
rows_->pool()->freeContiguous(tableAllocation_);
table_ = nullptr;
capacity_ = 0;
}
// Call analyze to insert all unique values in row container to the
// table hashers' uniqueValues_;
analyze();
}
otherTables_.reserve(tables.size());
for (auto& table : tables) {
otherTables_.emplace_back(std::unique_ptr<HashTable<ignoreNullKeys>>(
Expand Down Expand Up @@ -1777,6 +1792,11 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
}
if (useValueIds) {
for (auto& other : otherTables_) {
if (dropDuplicates) {
// Before merging with the current hashers, all values in the row
// containers of other table need to be inserted into uniqueValues_.
other->analyze();
}
for (auto i = 0; i < hashers_.size(); ++i) {
hashers_[i]->merge(*other->hashers_[i]);
if (!hashers_[i]->mayUseValueIds()) {
Expand Down
Loading

0 comments on commit b6cff3d

Please sign in to comment.