Skip to content

Commit

Permalink
feat(hashjoin): Add fast row size estimation for hash probe (#11558)
Browse files Browse the repository at this point in the history
Summary:
* Add column stats for row container to collect aggregated column stats. The aggregated column stats will be used in hash probe to decide if row size estimation is applicable. If it is applicable, column stats will be used to compose a fast row size estimation to avoid memory exploding when probing and listing results. This added feature makes hash join more performant, and in some extreme skew cases that we've seen in Meta internal queries, it helped to decrease the query latency by >20x.
* The work of this feature also helped to discovered a bug in HashTable when using simd for fast path result listing -> when max number of rows is smaller than kWidth, the unsigned integer overflow bug will make the max number of rows be ignored. Fixed the bug and the new test covers that case.

Pull Request resolved: #11558

Reviewed By: xiaoxmeng

Differential Revision: D66064300

Pulled By: tanjialiang

fbshipit-source-id: 886cd943036350b1c1bf0b6741ebe7165883a30f
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Nov 22, 2024
1 parent 20b5728 commit 059337f
Show file tree
Hide file tree
Showing 45 changed files with 725 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

namespace facebook::velox::test {

class InsertTest : public test::VectorTestBase {
class InsertTest : public velox::test::VectorTestBase {
public:
void runInsertTest(
std::string_view outputDirectory,
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/tests/HivePartitionFunctionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
#include "velox/vector/fuzzer/VectorFuzzer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

using namespace facebook;
using namespace facebook::velox;

class HivePartitionFunctionTest : public ::testing::Test,
public test::VectorTestBase {
public velox::test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/tests/HivePartitionUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

#include "gtest/gtest.h"

using namespace facebook::velox::connector::hive;
using namespace facebook;
using namespace facebook::velox;
using namespace facebook::velox::connector::hive;
using namespace facebook::velox::dwio::catalog::fbhive;

class HivePartitionUtilTest : public ::testing::Test,
public test::VectorTestBase {
public velox::test::VectorTestBase {
protected:
template <typename T>
VectorPtr makeDictionary(const std::vector<T>& data) {
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
namespace facebook::velox::connector::hive {

class PartitionIdGeneratorTest : public ::testing::Test,
public test::VectorTestBase {
public velox::test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/tests/ReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace {

using namespace facebook::velox::common;

class ReaderTest : public testing::Test, public test::VectorTestBase {
class ReaderTest : public testing::Test, public velox::test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/tests/ParquetTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@

namespace facebook::velox::parquet {

class ParquetTestBase : public testing::Test, public test::VectorTestBase {
class ParquetTestBase : public testing::Test,
public velox::test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@

#include <folly/init/Init.h>

using namespace facebook;
using namespace facebook::velox;
using namespace facebook::velox::common;
using namespace facebook::velox::dwio::common;
using namespace facebook::velox::parquet;

using dwio::common::MemorySink;

class E2EFilterTest : public E2EFilterTestBase, public test::VectorTestBase {
class E2EFilterTest : public E2EFilterTestBase,
public velox::test::VectorTestBase {
protected:
void SetUp() override {
E2EFilterTestBase::SetUp();
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ bool GroupingSet::getOutputWithSpill(
false,
false,
false,
false,
&pool_);

initializeAggregates(aggregates_, *mergeRows_, false);
Expand Down Expand Up @@ -1282,6 +1283,7 @@ void GroupingSet::abandonPartialAggregation() {
false,
false,
false,
false,
&pool_);
initializeAggregates(aggregates_, *intermediateRows_, true);
table_.reset();
Expand Down
60 changes: 58 additions & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,57 @@ void HashProbe::maybeSetupSpillInputReader(
inputSpillPartitionSet_.erase(iter);
}

std::optional<uint64_t> HashProbe::estimatedRowSize(
const std::vector<vector_size_t>& varSizedColumns,
uint64_t totalFixedColumnsBytes) {
static const double kToleranceRatio = 10.0;
std::vector<RowColumn::Stats> varSizeListColumnsStats;
varSizeListColumnsStats.reserve(varSizedColumns.size());
for (uint32_t i = 0; i < varSizedColumns.size(); ++i) {
auto statsOpt = columnStats(varSizedColumns[i]);
if (!statsOpt.has_value()) {
return std::nullopt;
}
varSizeListColumnsStats.push_back(statsOpt.value());
}

uint64_t totalAvgBytes{totalFixedColumnsBytes};
uint64_t totalMaxBytes{totalFixedColumnsBytes};
for (const auto& stats : varSizeListColumnsStats) {
totalAvgBytes += stats.avgBytes();
totalMaxBytes += stats.maxBytes();
}
if (totalAvgBytes == 0) {
if (totalMaxBytes == 0) {
return 0;
}
// Return nullopt to prevent memory exploding in extreme size skew cases:
// e.g. 1 row very large and all other rows of size 0.
return std::nullopt;
}
if (totalMaxBytes / totalAvgBytes >= kToleranceRatio) {
return std::nullopt;
}
// Make the total per batch size to be bounded by 2x 'outputBatchSize_':
// worst case size = (outputBatchSize_ / estimated size) * totalMaxBytes
return (totalMaxBytes + totalAvgBytes) / 2;
}

std::optional<RowColumn::Stats> HashProbe::columnStats(
int32_t columnIndex) const {
std::vector<RowColumn::Stats> columnStats;
const auto rowContainers = table_->allRows();
for (const auto* rowContainer : rowContainers) {
VELOX_CHECK_NOT_NULL(rowContainer);
auto statsOpt = rowContainer->columnStats(columnIndex);
if (!statsOpt.has_value()) {
return std::nullopt;
}
columnStats.push_back(statsOpt.value());
}
return RowColumn::Stats::merge(columnStats);
}

void HashProbe::initializeResultIter() {
VELOX_CHECK_NOT_NULL(table_);
if (resultIter_ != nullptr) {
Expand All @@ -312,8 +363,14 @@ void HashProbe::initializeResultIter() {
varSizeListColumns.push_back(column);
}
}

auto rowSizeEstimation =
estimatedRowSize(varSizeListColumns, fixedSizeListColumnsSizeSum);
// TODO: Make tolerance ratio configurable if needed.
resultIter_ = std::make_unique<BaseHashTable::JoinResultIterator>(
std::move(varSizeListColumns), fixedSizeListColumnsSizeSum);
std::move(varSizeListColumns),
fixedSizeListColumnsSizeSum,
rowSizeEstimation);
}

void HashProbe::asyncWaitForHashTable() {
Expand Down Expand Up @@ -1987,5 +2044,4 @@ void HashProbe::clearBuffers() {
operatorCtx_->execCtx()->vectorPool()->clear();
filter_->clearCache();
}

} // namespace facebook::velox::exec
17 changes: 17 additions & 0 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,23 @@ class HashProbe : public Operator {
// memory reclamation or operator close.
void clearBuffers();

// Returns the estimated row size of the projected output columns. nullopt
// will be returned if insufficient column stats is presented in 'table_', or
// the row size variation is too large. The row size is too large if ratio of
// max row size and avg row size is larger than 'kToleranceRatio' which is set
// to 10.
std::optional<uint64_t> estimatedRowSize(
const std::vector<vector_size_t>& varColumnsStats,
uint64_t totalFixedColumnsBytes);

// Returns the aggregated column stats at 'columnIndex' of 'table_'. Returns
// nullopt if the column stats is not available.
//
// NOTE: The column stats is collected by default for hash join table but it
// could be invalidated in case of spilling. But we should never expect usage
// of an invalidated table as we always spill the entire table.
std::optional<RowColumn::Stats> columnStats(int32_t columnIndex) const;

// TODO: Define batch size as bytes based on RowContainer row sizes.
const vector_size_t outputBatchSize_;

Expand Down
36 changes: 21 additions & 15 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ HashTable<ignoreNullKeys>::HashTable(
isJoinBuild,
hasProbedFlag,
hashMode_ != HashMode::kHash,
isJoinBuild,
pool);
nextOffset_ = rows_->nextOffset();
}
Expand Down Expand Up @@ -1826,10 +1827,9 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
uint64_t maxBytes) {
VELOX_CHECK_LE(inputRows.size(), hits.size());

if (iter.varSizeListColumns.empty() && !hasDuplicates_) {
// When there is no duplicates, and no variable length columns are selected
// to be projected, we are able to calculate fixed length columns total size
// directly and go through fast path.
if (iter.estimatedRowSize.has_value() && !hasDuplicates_) {
// When there is no duplicates, and row size is estimable, we are able to
// go through fast path.
return listJoinResultsFastPath(
iter, includeMisses, inputRows, hits, maxBytes);
}
Expand Down Expand Up @@ -1859,9 +1859,10 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
hits[numOut] = hit;
numOut++;
iter.lastRowIndex++;
totalBytes +=
(joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
totalBytes += iter.estimatedRowSize.has_value()
? iter.estimatedRowSize.value()
: (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
} else {
const auto numRows = rows->size();
auto num =
Expand All @@ -1873,11 +1874,16 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
num * sizeof(char*));
iter.lastDuplicateRowIndex += num;
numOut += num;
for (const auto* dupRow : *rows) {
totalBytes +=
joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow);
if (iter.estimatedRowSize.has_value()) {
totalBytes += iter.estimatedRowSize.value() * numRows;
} else {
for (const auto* dupRow : *rows) {
totalBytes +=
joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow) +
iter.fixedSizeListColumnsSizeSum;
}
totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows);
}
totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows);
if (iter.lastDuplicateRowIndex >= numRows) {
iter.lastDuplicateRowIndex = 0;
iter.lastRowIndex++;
Expand All @@ -1900,8 +1906,8 @@ int32_t HashTable<ignoreNullKeys>::listJoinResultsFastPath(
int32_t numOut = 0;
const auto maxOut = std::min(
static_cast<uint64_t>(inputRows.size()),
(iter.fixedSizeListColumnsSizeSum != 0
? maxBytes / iter.fixedSizeListColumnsSizeSum
(iter.estimatedRowSize.value() != 0
? maxBytes / iter.estimatedRowSize.value()
: std::numeric_limits<uint64_t>::max()));
int32_t i = iter.lastRowIndex;
const auto numRows = iter.rows->size();
Expand All @@ -1912,8 +1918,8 @@ int32_t HashTable<ignoreNullKeys>::listJoinResultsFastPath(
// We pass the pointers as int64_t's in 'hitWords'.
auto resultHits = reinterpret_cast<int64_t*>(hits.data());
auto resultRows = inputRows.data();
const auto outLimit = maxOut - kWidth;
for (; i + kWidth <= numRows && numOut < outLimit; i += kWidth) {
const int64_t simdOutLimit = maxOut - kWidth;
for (; i + kWidth <= numRows && numOut < simdOutLimit; i += kWidth) {
auto indices = simd::loadGatherIndices<int64_t, int32_t>(sourceRows + i);
auto hitWords = simd::gather(sourceHits, indices);
auto misses = includeMisses ? 0 : simd::toBitMask(hitWords == 0);
Expand Down
32 changes: 18 additions & 14 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ class BaseHashTable {
struct JoinResultIterator {
JoinResultIterator(
std::vector<vector_size_t>&& _varSizeListColumns,
uint64_t _fixedSizeListColumnsSizeSum)
: varSizeListColumns(std::move(_varSizeListColumns)),
uint64_t _fixedSizeListColumnsSizeSum,
std::optional<uint64_t> _estimatedRowSize)
: estimatedRowSize(_estimatedRowSize),
varSizeListColumns(std::move(_varSizeListColumns)),
fixedSizeListColumnsSizeSum(_fixedSizeListColumnsSizeSum) {}

void reset(const HashLookup& lookup) {
Expand All @@ -157,6 +159,8 @@ class BaseHashTable {
return !rows || lastRowIndex == rows->size();
}

/// The row size estimation of the projected output columns, if applicable.
const std::optional<uint64_t> estimatedRowSize;
/// The indexes of the build side projected columns that are variable sized.
const std::vector<vector_size_t> varSizeListColumns;
/// The per row total bytes of the build side projected columns that are
Expand Down Expand Up @@ -635,18 +639,6 @@ class HashTable : public BaseHashTable {
/// purpose.
void checkConsistency() const;

auto& testingOtherTables() const {
return otherTables_;
}

uint64_t testingRehashSize() const {
return rehashSize();
}

char** testingTable() const {
return table_;
}

void extractColumn(
folly::Range<char* const*> rows,
int32_t columnIndex,
Expand All @@ -659,6 +651,18 @@ class HashTable : public BaseHashTable {
result);
}

auto& testingOtherTables() const {
return otherTables_;
}

uint64_t testingRehashSize() const {
return rehashSize();
}

char** testingTable() const {
return table_;
}

private:
// Enables debug stats for collisions for debug build.
#ifdef NDEBUG
Expand Down
Loading

0 comments on commit 059337f

Please sign in to comment.