Skip to content

Commit

Permalink
feat(hashjoin): Add fast path to list join result
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Nov 20, 2024
1 parent c13b8ed commit 03bbefd
Show file tree
Hide file tree
Showing 15 changed files with 648 additions and 148 deletions.
2 changes: 2 additions & 0 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ bool GroupingSet::getOutputWithSpill(
false,
false,
false,
false,
&pool_);

initializeAggregates(aggregates_, *mergeRows_, false);
Expand Down Expand Up @@ -1282,6 +1283,7 @@ void GroupingSet::abandonPartialAggregation() {
false,
false,
false,
false,
&pool_);
initializeAggregates(aggregates_, *intermediateRows_, true);
table_.reset();
Expand Down
58 changes: 56 additions & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,57 @@ void HashProbe::maybeSetupSpillInputReader(
inputSpillPartitionSet_.erase(iter);
}

std::optional<uint64_t> HashProbe::estimatedRowSize(
const std::vector<vector_size_t>& varSizedColumns,
uint64_t totalFixedColumnsBytes) {
static const double kToleranceRatio = 10.0;
std::vector<RowColumn::Stats> varSizeListColumnsStats;
varSizeListColumnsStats.reserve(varSizedColumns.size());
for (uint32_t i = 0; i < varSizedColumns.size(); ++i) {
auto statsOpt = columnStats(varSizedColumns[i]);
if (!statsOpt.has_value()) {
return std::nullopt;
}
varSizeListColumnsStats.push_back(statsOpt.value());
}

uint64_t totalAvgBytes{totalFixedColumnsBytes};
uint64_t totalMaxBytes{totalFixedColumnsBytes};
for (const auto& stats : varSizeListColumnsStats) {
totalAvgBytes += stats.avgBytes();
totalMaxBytes += stats.maxBytes();
}
if (totalAvgBytes == 0) {
if (totalMaxBytes == 0) {
return 0;
}
// Return nullopt to prevent memory exploding in extreme size skew cases:
// e.g. 1 row very large and all other rows of size 0.
return std::nullopt;
}
if (totalMaxBytes / totalAvgBytes >= kToleranceRatio) {
return std::nullopt;
}
// Make the total per batch size to be bounded by 2x 'outputBatchSize_':
// worst case size = (outputBatchSize_ / estimated size) * totalMaxBytes
return (totalMaxBytes + totalAvgBytes) / 2;
}

std::optional<RowColumn::Stats> HashProbe::columnStats(
int32_t columnIndex) const {
std::vector<RowColumn::Stats> columnStats;
const auto rowContainers = table_->allRows();
for (const auto* rowContainer : rowContainers) {
VELOX_CHECK_NOT_NULL(rowContainer);
auto statsOpt = rowContainer->columnStats(columnIndex);
if (!statsOpt.has_value()) {
return std::nullopt;
}
columnStats.push_back(statsOpt.value());
}
return RowColumn::Stats::merge(columnStats);
}

void HashProbe::initializeResultIter() {
VELOX_CHECK_NOT_NULL(table_);
if (resultIter_ != nullptr) {
Expand All @@ -312,8 +363,12 @@ void HashProbe::initializeResultIter() {
varSizeListColumns.push_back(column);
}
}

// TODO: Make tolerance ratio configurable if needed.
resultIter_ = std::make_unique<BaseHashTable::JoinResultIterator>(
std::move(varSizeListColumns), fixedSizeListColumnsSizeSum);
std::move(varSizeListColumns),
fixedSizeListColumnsSizeSum,
estimatedRowSize(varSizeListColumns, fixedSizeListColumnsSizeSum));
}

void HashProbe::asyncWaitForHashTable() {
Expand Down Expand Up @@ -1987,5 +2042,4 @@ void HashProbe::clearBuffers() {
operatorCtx_->execCtx()->vectorPool()->clear();
filter_->clearCache();
}

} // namespace facebook::velox::exec
17 changes: 17 additions & 0 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,23 @@ class HashProbe : public Operator {
// memory reclamation or operator close.
void clearBuffers();

// Returns the estimated row size of the projected output columns. nullopt
// will be returned if insufficient column stats is presented in 'table_', or
// the row size variation is too large. The row size is too large if ratio of
// max row size and avg row size is larger than 'kToleranceRatio' which is set
// to 10.
std::optional<uint64_t> estimatedRowSize(
const std::vector<vector_size_t>& varColumnsStats,
uint64_t totalFixedColumnsBytes);

// Returns the aggregated column stats at 'columnIndex' of 'table_'. Returns
// nullopt if the column stats is not available.
//
// NOTE: The column stats is collected by default for hash join table but it
// could be invalidated in case of spilling. But we should never expect usage
// of an invalidated table as we always spill the entire table.
std::optional<RowColumn::Stats> columnStats(int32_t columnIndex) const;

// TODO: Define batch size as bytes based on RowContainer row sizes.
const vector_size_t outputBatchSize_;

Expand Down
36 changes: 21 additions & 15 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ HashTable<ignoreNullKeys>::HashTable(
isJoinBuild,
hasProbedFlag,
hashMode_ != HashMode::kHash,
isJoinBuild,
pool);
nextOffset_ = rows_->nextOffset();
}
Expand Down Expand Up @@ -1826,10 +1827,9 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
uint64_t maxBytes) {
VELOX_CHECK_LE(inputRows.size(), hits.size());

if (iter.varSizeListColumns.empty() && !hasDuplicates_) {
// When there is no duplicates, and no variable length columns are selected
// to be projected, we are able to calculate fixed length columns total size
// directly and go through fast path.
if (iter.estimatedRowSize.has_value() && !hasDuplicates_) {
// When there is no duplicates, and row size is estimable, we are able to
// go through fast path.
return listJoinResultsFastPath(
iter, includeMisses, inputRows, hits, maxBytes);
}
Expand Down Expand Up @@ -1859,9 +1859,10 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
hits[numOut] = hit;
numOut++;
iter.lastRowIndex++;
totalBytes +=
(joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
totalBytes += iter.estimatedRowSize.has_value()
? iter.estimatedRowSize.value()
: (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
} else {
const auto numRows = rows->size();
auto num =
Expand All @@ -1873,11 +1874,16 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
num * sizeof(char*));
iter.lastDuplicateRowIndex += num;
numOut += num;
for (const auto* dupRow : *rows) {
totalBytes +=
joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow);
if (iter.estimatedRowSize.has_value()) {
totalBytes += iter.estimatedRowSize.value() * numRows;
} else {
for (const auto* dupRow : *rows) {
totalBytes +=
joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow) +
iter.fixedSizeListColumnsSizeSum;
}
totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows);
}
totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows);
if (iter.lastDuplicateRowIndex >= numRows) {
iter.lastDuplicateRowIndex = 0;
iter.lastRowIndex++;
Expand All @@ -1900,8 +1906,8 @@ int32_t HashTable<ignoreNullKeys>::listJoinResultsFastPath(
int32_t numOut = 0;
const auto maxOut = std::min(
static_cast<uint64_t>(inputRows.size()),
(iter.fixedSizeListColumnsSizeSum != 0
? maxBytes / iter.fixedSizeListColumnsSizeSum
(iter.estimatedRowSize.value() != 0
? maxBytes / iter.estimatedRowSize.value()
: std::numeric_limits<uint64_t>::max()));
int32_t i = iter.lastRowIndex;
const auto numRows = iter.rows->size();
Expand All @@ -1912,8 +1918,8 @@ int32_t HashTable<ignoreNullKeys>::listJoinResultsFastPath(
// We pass the pointers as int64_t's in 'hitWords'.
auto resultHits = reinterpret_cast<int64_t*>(hits.data());
auto resultRows = inputRows.data();
const auto outLimit = maxOut - kWidth;
for (; i + kWidth <= numRows && numOut < outLimit; i += kWidth) {
const int64_t simdOutLimit = maxOut - kWidth;
for (; i + kWidth <= numRows && numOut < simdOutLimit; i += kWidth) {
auto indices = simd::loadGatherIndices<int64_t, int32_t>(sourceRows + i);
auto hitWords = simd::gather(sourceHits, indices);
auto misses = includeMisses ? 0 : simd::toBitMask(hitWords == 0);
Expand Down
32 changes: 18 additions & 14 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ class BaseHashTable {
struct JoinResultIterator {
JoinResultIterator(
std::vector<vector_size_t>&& _varSizeListColumns,
uint64_t _fixedSizeListColumnsSizeSum)
: varSizeListColumns(std::move(_varSizeListColumns)),
uint64_t _fixedSizeListColumnsSizeSum,
std::optional<uint64_t> _estimatedRowSize)
: estimatedRowSize(_estimatedRowSize),
varSizeListColumns(std::move(_varSizeListColumns)),
fixedSizeListColumnsSizeSum(_fixedSizeListColumnsSizeSum) {}

void reset(const HashLookup& lookup) {
Expand All @@ -157,6 +159,8 @@ class BaseHashTable {
return !rows || lastRowIndex == rows->size();
}

/// The row size estimation of the projected output columns, if applicable.
const std::optional<uint64_t> estimatedRowSize;
/// The indexes of the build side projected columns that are variable sized.
const std::vector<vector_size_t> varSizeListColumns;
/// The per row total bytes of the build side projected columns that are
Expand Down Expand Up @@ -635,18 +639,6 @@ class HashTable : public BaseHashTable {
/// purpose.
void checkConsistency() const;

auto& testingOtherTables() const {
return otherTables_;
}

uint64_t testingRehashSize() const {
return rehashSize();
}

char** testingTable() const {
return table_;
}

void extractColumn(
folly::Range<char* const*> rows,
int32_t columnIndex,
Expand All @@ -659,6 +651,18 @@ class HashTable : public BaseHashTable {
result);
}

auto& testingOtherTables() const {
return otherTables_;
}

uint64_t testingRehashSize() const {
return rehashSize();
}

char** testingTable() const {
return table_;
}

private:
// Enables debug stats for collisions for debug build.
#ifdef NDEBUG
Expand Down
Loading

0 comments on commit 03bbefd

Please sign in to comment.