Skip to content

Commit

Permalink
Back out "Move hash table overlap bits check earlier" (facebookincuba…
Browse files Browse the repository at this point in the history
…tor#10451)

Summary:
Pull Request resolved: facebookincubator#10451

Original commit changeset: 0e4a61d1f307

Original Phabricator Diff: D58967938

Queries are failing with

```
com.facebook.presto.verifier.framework.PrestoQueryException: java.sql.SQLException: Query failed (#20240711_160619_00752_kznww): sizeBits_ - 1 < spillInputStartPartitionBit (13 vs. 1) Operator: PartialAggregation[1802] 3
```

Reviewed By: kgpai, tanjialiang

Differential Revision: D59640854

fbshipit-source-id: f0f267dce4602d065886c26f3eb2248e657834ed
  • Loading branch information
amitkdutta authored and facebook-github-bot committed Jul 11, 2024
1 parent ec2abf8 commit 2518463
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 178 deletions.
5 changes: 3 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,15 @@ void GroupingSet::addInputForActiveRows(
*lookup_,
input,
activeRows_,
ignoreNullKeys_,
BaseHashTable::kNoSpillInputStartPartitionBit);
if (lookup_->rows.empty()) {
// No rows to probe. Can happen when ignoreNullKeys_ is true and all rows
// have null keys.
return;
}

table_->groupProbe(*lookup_, BaseHashTable::kNoSpillInputStartPartitionBit);
table_->groupProbe(*lookup_);
masks_.addInput(input, activeRows_);

auto* groups = lookup_->hits.data();
Expand Down Expand Up @@ -399,7 +400,7 @@ void GroupingSet::createHashTable() {

lookup_ = std::make_unique<HashLookup>(table_->hashers());
if (!isAdaptive_ && table_->hashMode() != BaseHashTable::HashMode::kHash) {
table_->forceGenericHashMode(BaseHashTable::kNoSpillInputStartPartitionBit);
table_->forceGenericHashMode();
}
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class GroupingSet {
// groups.
void extractSpillResult(const RowVectorPtr& result);

// Returns a list of accumulators for 'aggregates_', plus one more accumulator
// Return a list of accumulators for 'aggregates_', plus one more accumulator
// for 'sortedAggregations_', and one for each 'distinctAggregations_'. When
// 'excludeToIntermediate' is true, skip the functions that support
// 'toIntermediate'.
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,10 +752,10 @@ bool HashBuild::finishHashBuild() {
CpuWallTimer cpuWallTimer{timing};
table_->prepareJoinTable(
std::move(otherTables),
isInputFromSpill() ? spillConfig()->startPartitionBit
: BaseHashTable::kNoSpillInputStartPartitionBit,
allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor()
: nullptr);
: nullptr,
isInputFromSpill() ? spillConfig()->startPartitionBit
: BaseHashTable::kNoSpillInputStartPartitionBit);
}
stats_.wlock()->addRuntimeStat(
BaseHashTable::kBuildWallNanos,
Expand Down
90 changes: 35 additions & 55 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,7 @@ void populateNormalizedKeys(HashLookup& lookup, int8_t sizeBits) {
} // namespace

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::groupProbe(
HashLookup& lookup,
int8_t spillInputStartPartitionBit) {
void HashTable<ignoreNullKeys>::groupProbe(HashLookup& lookup) {
incrementProbes(lookup.rows.size());

if (hashMode_ == HashMode::kArray) {
Expand All @@ -461,7 +459,7 @@ void HashTable<ignoreNullKeys>::groupProbe(
}
// Do size-based rehash before mixing hashes from normalized keys
// because the size of the table affects the mixing.
checkSize(lookup.rows.size(), false, spillInputStartPartitionBit);
checkSize(lookup.rows.size(), false);
if (hashMode_ == HashMode::kNormalizedKey) {
populateNormalizedKeys(lookup, sizeBits_);
groupNormalizedKeyProbe(lookup);
Expand Down Expand Up @@ -708,9 +706,7 @@ void HashTable<ignoreNullKeys>::joinNormalizedKeyProbe(HashLookup& lookup) {
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::allocateTables(
uint64_t size,
int8_t spillInputStartPartitionBit) {
void HashTable<ignoreNullKeys>::allocateTables(uint64_t size) {
VELOX_CHECK(bits::isPowerOfTwo(size), "Size is not a power of two: {}", size);
VELOX_CHECK_GT(size, 0);
capacity_ = size;
Expand All @@ -720,7 +716,6 @@ void HashTable<ignoreNullKeys>::allocateTables(
sizeMask_ = byteSize - 1;
numBuckets_ = byteSize / kBucketSize;
sizeBits_ = __builtin_popcountll(sizeMask_);
checkHashBitsOverlap(spillInputStartPartitionBit);
bucketOffsetMask_ = sizeMask_ & ~(kBucketSize - 1);
// The total size is 8 bytes per slot, in groups of 16 slots with 16 bytes of
// tags and 16 * 6 bytes of pointers and a padding of 16 bytes to round up the
Expand Down Expand Up @@ -759,8 +754,7 @@ void HashTable<ignoreNullKeys>::clear(bool freeTable) {
template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::checkSize(
int32_t numNew,
bool initNormalizedKeys,
int8_t spillInputStartPartitionBit) {
bool initNormalizedKeys) {
// NOTE: the way we decide the table size and trigger rehash, guarantees the
// table should always have free slots after the insertion.
VELOX_CHECK(
Expand All @@ -774,9 +768,9 @@ void HashTable<ignoreNullKeys>::checkSize(
const int64_t newNumDistincts = numNew + numDistinct_;
if (table_ == nullptr || capacity_ == 0) {
const auto newSize = newHashTableEntries(numDistinct_, numNew);
allocateTables(newSize, spillInputStartPartitionBit);
allocateTables(newSize);
if (numDistinct_ > 0) {
rehash(initNormalizedKeys, spillInputStartPartitionBit);
rehash(initNormalizedKeys);
}
// We are not always able to reuse a tombstone slot as a free one for hash
// collision handling purpose. For example, if all the table slots are
Expand All @@ -788,8 +782,8 @@ void HashTable<ignoreNullKeys>::checkSize(
// NOTE: we need to plus one here as number itself could be power of two.
const auto newCapacity = bits::nextPowerOfTwo(
std::max(newNumDistincts, capacity_ - numTombstones_) + 1);
allocateTables(newCapacity, spillInputStartPartitionBit);
rehash(initNormalizedKeys, spillInputStartPartitionBit);
allocateTables(newCapacity);
rehash(initNormalizedKeys);
}
}

Expand Down Expand Up @@ -1282,9 +1276,7 @@ void HashTable<ignoreNullKeys>::insertForJoin(
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::rehash(
bool initNormalizedKeys,
int8_t spillInputStartPartitionBit) {
void HashTable<ignoreNullKeys>::rehash(bool initNormalizedKeys) {
++numRehashes_;
constexpr int32_t kHashBatchSize = 1024;
if (canApplyParallelJoinBuild()) {
Expand All @@ -1307,18 +1299,15 @@ void HashTable<ignoreNullKeys>::rehash(
if (!insertBatch(
groups, numGroups, hashes, initNormalizedKeys || i != 0)) {
VELOX_CHECK_NE(hashMode_, HashMode::kHash);
setHashMode(HashMode::kHash, 0, spillInputStartPartitionBit);
setHashMode(HashMode::kHash, 0);
return;
}
} while (numGroups > 0);
}
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::setHashMode(
HashMode mode,
int32_t numNew,
int8_t spillInputStartPartitionBit) {
void HashTable<ignoreNullKeys>::setHashMode(HashMode mode, int32_t numNew) {
VELOX_CHECK_NE(hashMode_, HashMode::kHash);
TestValue::adjust("facebook::velox::exec::HashTable::setHashMode", &mode);
if (mode == HashMode::kArray) {
Expand All @@ -1328,7 +1317,7 @@ void HashTable<ignoreNullKeys>::setHashMode(
table_ = tableAllocation_.data<char*>();
memset(table_, 0, bytes);
hashMode_ = HashMode::kArray;
rehash(true, spillInputStartPartitionBit);
rehash(true);
} else if (mode == HashMode::kHash) {
hashMode_ = HashMode::kHash;
for (auto& hasher : hashers_) {
Expand All @@ -1337,12 +1326,12 @@ void HashTable<ignoreNullKeys>::setHashMode(
rows_->disableNormalizedKeys();
capacity_ = 0;
// Makes tables of the right size and rehashes.
checkSize(numNew, true, spillInputStartPartitionBit);
checkSize(numNew, true);
} else if (mode == HashMode::kNormalizedKey) {
hashMode_ = HashMode::kNormalizedKey;
capacity_ = 0;
// Makes tables of the right size and rehashes.
checkSize(numNew, true, spillInputStartPartitionBit);
checkSize(numNew, true);
}
}

Expand Down Expand Up @@ -1470,7 +1459,6 @@ void HashTable<ignoreNullKeys>::clearUseRange(std::vector<bool>& useRange) {
template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::decideHashMode(
int32_t numNew,
int8_t spillInputStartPartitionBit,
bool disableRangeArrayHash) {
std::vector<uint64_t> rangeSizes(hashers_.size());
std::vector<uint64_t> distinctSizes(hashers_.size());
Expand All @@ -1487,7 +1475,7 @@ void HashTable<ignoreNullKeys>::decideHashMode(
disableRangeArrayHash_ |= disableRangeArrayHash;
if (numDistinct_ && !isJoinBuild_) {
if (!analyze()) {
setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit);
setHashMode(HashMode::kHash, numNew);
return;
}
}
Expand All @@ -1512,38 +1500,38 @@ void HashTable<ignoreNullKeys>::decideHashMode(
if (rangesWithReserve < kArrayHashMaxSize && !disableRangeArrayHash_) {
std::fill(useRange.begin(), useRange.end(), true);
capacity_ = setHasherMode(hashers_, useRange, rangeSizes, distinctSizes);
setHashMode(HashMode::kArray, numNew, spillInputStartPartitionBit);
setHashMode(HashMode::kArray, numNew);
return;
}

if (bestWithReserve < kArrayHashMaxSize ||
(disableRangeArrayHash_ && bestWithReserve < numDistinct_ * 2)) {
capacity_ = setHasherMode(hashers_, useRange, rangeSizes, distinctSizes);
setHashMode(HashMode::kArray, numNew, spillInputStartPartitionBit);
setHashMode(HashMode::kArray, numNew);
return;
}
if (rangesWithReserve != VectorHasher::kRangeTooLarge) {
std::fill(useRange.begin(), useRange.end(), true);
setHasherMode(hashers_, useRange, rangeSizes, distinctSizes);
setHashMode(HashMode::kNormalizedKey, numNew, spillInputStartPartitionBit);
setHashMode(HashMode::kNormalizedKey, numNew);
return;
}
if (hashers_.size() == 1 && distinctsWithReserve > 10000) {
// A single part group by that does not go by range or become an array
// does not make sense as a normalized key unless it is very small.
setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit);
setHashMode(HashMode::kHash, numNew);
return;
}

if (distinctsWithReserve < kArrayHashMaxSize) {
clearUseRange(useRange);
capacity_ = setHasherMode(hashers_, useRange, rangeSizes, distinctSizes);
setHashMode(HashMode::kArray, numNew, spillInputStartPartitionBit);
setHashMode(HashMode::kArray, numNew);
return;
}
if (distinctsWithReserve == VectorHasher::kRangeTooLarge &&
rangesWithReserve == VectorHasher::kRangeTooLarge) {
setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit);
setHashMode(HashMode::kHash, numNew);
return;
}
// The key concatenation fits in 64 bits.
Expand All @@ -1553,7 +1541,7 @@ void HashTable<ignoreNullKeys>::decideHashMode(
clearUseRange(useRange);
}
setHasherMode(hashers_, useRange, rangeSizes, distinctSizes);
setHashMode(HashMode::kNormalizedKey, numNew, spillInputStartPartitionBit);
setHashMode(HashMode::kNormalizedKey, numNew);
}

template <bool ignoreNullKeys>
Expand All @@ -1567,15 +1555,6 @@ std::vector<RowContainer*> HashTable<ignoreNullKeys>::allRows() const {
return rowContainers;
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::checkHashBitsOverlap(
int8_t spillInputStartPartitionBit) {
if (spillInputStartPartitionBit != kNoSpillInputStartPartitionBit &&
hashMode() != HashMode::kArray) {
VELOX_CHECK_LT(sizeBits_ - 1, spillInputStartPartitionBit);
}
}

template <bool ignoreNullKeys>
std::string HashTable<ignoreNullKeys>::toString() {
std::stringstream out;
Expand Down Expand Up @@ -1703,8 +1682,8 @@ bool mayUseValueIds(const BaseHashTable& table) {
template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
int8_t spillInputStartPartitionBit,
folly::Executor* executor) {
folly::Executor* executor,
int8_t spillInputStartPartitionBit) {
buildExecutor_ = executor;
otherTables_.reserve(tables.size());
for (auto& table : tables) {
Expand Down Expand Up @@ -1740,13 +1719,14 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
}
if (!useValueIds) {
if (hashMode_ != HashMode::kHash) {
setHashMode(HashMode::kHash, 0, spillInputStartPartitionBit);
setHashMode(HashMode::kHash, 0);
} else {
checkSize(0, true, spillInputStartPartitionBit);
checkSize(0, true);
}
} else {
decideHashMode(0, spillInputStartPartitionBit);
decideHashMode(0);
}
checkHashBitsOverlap(spillInputStartPartitionBit);
}

template <bool ignoreNullKeys>
Expand Down Expand Up @@ -2101,11 +2081,11 @@ std::string BaseHashTable::RowsIterator::toString() const {
rowContainerIterator_.toString());
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::prepareForGroupProbe(
void BaseHashTable::prepareForGroupProbe(
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
bool ignoreNullKeys,
int8_t spillInputStartPartitionBit) {
checkHashBitsOverlap(spillInputStartPartitionBit);
auto& hashers = lookup.hashers;
Expand All @@ -2115,7 +2095,7 @@ void HashTable<ignoreNullKeys>::prepareForGroupProbe(
hasher->decode(*key, rows);
}

if constexpr (ignoreNullKeys) {
if (ignoreNullKeys) {
// A null in any of the keys disables the row.
deselectRowsWithNulls(hashers, rows);
}
Expand All @@ -2137,19 +2117,19 @@ void HashTable<ignoreNullKeys>::prepareForGroupProbe(

if (rehash || capacity() == 0) {
if (mode != BaseHashTable::HashMode::kHash) {
decideHashMode(input->size(), spillInputStartPartitionBit);
decideHashMode(input->size());
// Do not forward 'ignoreNullKeys' to avoid redundant evaluation of
// deselectRowsWithNulls.
prepareForGroupProbe(lookup, input, rows, spillInputStartPartitionBit);
prepareForGroupProbe(
lookup, input, rows, false, spillInputStartPartitionBit);
return;
}
}

populateLookupRows(rows, lookup.rows);
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::prepareForJoinProbe(
void BaseHashTable::prepareForJoinProbe(
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
Expand Down
Loading

0 comments on commit 2518463

Please sign in to comment.