Skip to content

Commit

Permalink
feat(hashjoin): Add fast path to list join result
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Nov 20, 2024
1 parent c13b8ed commit 59e0e44
Show file tree
Hide file tree
Showing 42 changed files with 697 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

namespace facebook::velox::test {

class InsertTest : public test::VectorTestBase {
class InsertTest : public velox::test::VectorTestBase {
public:
void runInsertTest(
std::string_view outputDirectory,
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/tests/HivePartitionFunctionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
#include "velox/vector/fuzzer/VectorFuzzer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

using namespace facebook;
using namespace facebook::velox;

class HivePartitionFunctionTest : public ::testing::Test,
public test::VectorTestBase {
public velox::test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/tests/HivePartitionUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

#include "gtest/gtest.h"

using namespace facebook::velox::connector::hive;
using namespace facebook;
using namespace facebook::velox;
using namespace facebook::velox::connector::hive;
using namespace facebook::velox::dwio::catalog::fbhive;

class HivePartitionUtilTest : public ::testing::Test,
public test::VectorTestBase {
public velox::test::VectorTestBase {
protected:
template <typename T>
VectorPtr makeDictionary(const std::vector<T>& data) {
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/tests/PartitionIdGeneratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
namespace facebook::velox::connector::hive {

class PartitionIdGeneratorTest : public ::testing::Test,
public test::VectorTestBase {
public velox::test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/tests/ReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace {

using namespace facebook::velox::common;

class ReaderTest : public testing::Test, public test::VectorTestBase {
class ReaderTest : public testing::Test, public velox::test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/tests/ParquetTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@

namespace facebook::velox::parquet {

class ParquetTestBase : public testing::Test, public test::VectorTestBase {
class ParquetTestBase : public testing::Test,
public velox::test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@

#include <folly/init/Init.h>

using namespace facebook;
using namespace facebook::velox;
using namespace facebook::velox::common;
using namespace facebook::velox::dwio::common;
using namespace facebook::velox::parquet;

using dwio::common::MemorySink;

class E2EFilterTest : public E2EFilterTestBase, public test::VectorTestBase {
class E2EFilterTest : public E2EFilterTestBase,
public velox::test::VectorTestBase {
protected:
void SetUp() override {
E2EFilterTestBase::SetUp();
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ bool GroupingSet::getOutputWithSpill(
false,
false,
false,
false,
&pool_);

initializeAggregates(aggregates_, *mergeRows_, false);
Expand Down Expand Up @@ -1282,6 +1283,7 @@ void GroupingSet::abandonPartialAggregation() {
false,
false,
false,
false,
&pool_);
initializeAggregates(aggregates_, *intermediateRows_, true);
table_.reset();
Expand Down
58 changes: 56 additions & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,57 @@ void HashProbe::maybeSetupSpillInputReader(
inputSpillPartitionSet_.erase(iter);
}

std::optional<uint64_t> HashProbe::estimatedRowSize(
const std::vector<vector_size_t>& varSizedColumns,
uint64_t totalFixedColumnsBytes) {
static const double kToleranceRatio = 10.0;
std::vector<RowColumn::Stats> varSizeListColumnsStats;
varSizeListColumnsStats.reserve(varSizedColumns.size());
for (uint32_t i = 0; i < varSizedColumns.size(); ++i) {
auto statsOpt = columnStats(varSizedColumns[i]);
if (!statsOpt.has_value()) {
return std::nullopt;
}
varSizeListColumnsStats.push_back(statsOpt.value());
}

uint64_t totalAvgBytes{totalFixedColumnsBytes};
uint64_t totalMaxBytes{totalFixedColumnsBytes};
for (const auto& stats : varSizeListColumnsStats) {
totalAvgBytes += stats.avgBytes();
totalMaxBytes += stats.maxBytes();
}
if (totalAvgBytes == 0) {
if (totalMaxBytes == 0) {
return 0;
}
// Return nullopt to prevent memory exploding in extreme size skew cases:
// e.g. 1 row very large and all other rows of size 0.
return std::nullopt;
}
if (totalMaxBytes / totalAvgBytes >= kToleranceRatio) {
return std::nullopt;
}
// Make the total per batch size to be bounded by 2x 'outputBatchSize_':
// worst case size = (outputBatchSize_ / estimated size) * totalMaxBytes
return (totalMaxBytes + totalAvgBytes) / 2;
}

std::optional<RowColumn::Stats> HashProbe::columnStats(
int32_t columnIndex) const {
std::vector<RowColumn::Stats> columnStats;
const auto rowContainers = table_->allRows();
for (const auto* rowContainer : rowContainers) {
VELOX_CHECK_NOT_NULL(rowContainer);
auto statsOpt = rowContainer->columnStats(columnIndex);
if (!statsOpt.has_value()) {
return std::nullopt;
}
columnStats.push_back(statsOpt.value());
}
return RowColumn::Stats::merge(columnStats);
}

void HashProbe::initializeResultIter() {
VELOX_CHECK_NOT_NULL(table_);
if (resultIter_ != nullptr) {
Expand All @@ -312,8 +363,12 @@ void HashProbe::initializeResultIter() {
varSizeListColumns.push_back(column);
}
}

// TODO: Make tolerance ratio configurable if needed.
resultIter_ = std::make_unique<BaseHashTable::JoinResultIterator>(
std::move(varSizeListColumns), fixedSizeListColumnsSizeSum);
std::move(varSizeListColumns),
fixedSizeListColumnsSizeSum,
estimatedRowSize(varSizeListColumns, fixedSizeListColumnsSizeSum));
}

void HashProbe::asyncWaitForHashTable() {
Expand Down Expand Up @@ -1987,5 +2042,4 @@ void HashProbe::clearBuffers() {
operatorCtx_->execCtx()->vectorPool()->clear();
filter_->clearCache();
}

} // namespace facebook::velox::exec
17 changes: 17 additions & 0 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,23 @@ class HashProbe : public Operator {
// memory reclamation or operator close.
void clearBuffers();

// Returns the estimated row size of the projected output columns. nullopt
// will be returned if insufficient column stats is presented in 'table_', or
// the row size variation is too large. The row size is too large if ratio of
// max row size and avg row size is larger than 'kToleranceRatio' which is set
// to 10.
std::optional<uint64_t> estimatedRowSize(
const std::vector<vector_size_t>& varColumnsStats,
uint64_t totalFixedColumnsBytes);

// Returns the aggregated column stats at 'columnIndex' of 'table_'. Returns
// nullopt if the column stats is not available.
//
// NOTE: The column stats is collected by default for hash join table but it
// could be invalidated in case of spilling. But we should never expect usage
// of an invalidated table as we always spill the entire table.
std::optional<RowColumn::Stats> columnStats(int32_t columnIndex) const;

// TODO: Define batch size as bytes based on RowContainer row sizes.
const vector_size_t outputBatchSize_;

Expand Down
36 changes: 21 additions & 15 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ HashTable<ignoreNullKeys>::HashTable(
isJoinBuild,
hasProbedFlag,
hashMode_ != HashMode::kHash,
isJoinBuild,
pool);
nextOffset_ = rows_->nextOffset();
}
Expand Down Expand Up @@ -1826,10 +1827,9 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
uint64_t maxBytes) {
VELOX_CHECK_LE(inputRows.size(), hits.size());

if (iter.varSizeListColumns.empty() && !hasDuplicates_) {
// When there is no duplicates, and no variable length columns are selected
// to be projected, we are able to calculate fixed length columns total size
// directly and go through fast path.
if (iter.estimatedRowSize.has_value() && !hasDuplicates_) {
// When there is no duplicates, and row size is estimable, we are able to
// go through fast path.
return listJoinResultsFastPath(
iter, includeMisses, inputRows, hits, maxBytes);
}
Expand Down Expand Up @@ -1859,9 +1859,10 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
hits[numOut] = hit;
numOut++;
iter.lastRowIndex++;
totalBytes +=
(joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
totalBytes += iter.estimatedRowSize.has_value()
? iter.estimatedRowSize.value()
: (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
} else {
const auto numRows = rows->size();
auto num =
Expand All @@ -1873,11 +1874,16 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
num * sizeof(char*));
iter.lastDuplicateRowIndex += num;
numOut += num;
for (const auto* dupRow : *rows) {
totalBytes +=
joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow);
if (iter.estimatedRowSize.has_value()) {
totalBytes += iter.estimatedRowSize.value() * numRows;
} else {
for (const auto* dupRow : *rows) {
totalBytes +=
joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow) +
iter.fixedSizeListColumnsSizeSum;
}
totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows);
}
totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows);
if (iter.lastDuplicateRowIndex >= numRows) {
iter.lastDuplicateRowIndex = 0;
iter.lastRowIndex++;
Expand All @@ -1900,8 +1906,8 @@ int32_t HashTable<ignoreNullKeys>::listJoinResultsFastPath(
int32_t numOut = 0;
const auto maxOut = std::min(
static_cast<uint64_t>(inputRows.size()),
(iter.fixedSizeListColumnsSizeSum != 0
? maxBytes / iter.fixedSizeListColumnsSizeSum
(iter.estimatedRowSize.value() != 0
? maxBytes / iter.estimatedRowSize.value()
: std::numeric_limits<uint64_t>::max()));
int32_t i = iter.lastRowIndex;
const auto numRows = iter.rows->size();
Expand All @@ -1912,8 +1918,8 @@ int32_t HashTable<ignoreNullKeys>::listJoinResultsFastPath(
// We pass the pointers as int64_t's in 'hitWords'.
auto resultHits = reinterpret_cast<int64_t*>(hits.data());
auto resultRows = inputRows.data();
const auto outLimit = maxOut - kWidth;
for (; i + kWidth <= numRows && numOut < outLimit; i += kWidth) {
const int64_t simdOutLimit = maxOut - kWidth;
for (; i + kWidth <= numRows && numOut < simdOutLimit; i += kWidth) {
auto indices = simd::loadGatherIndices<int64_t, int32_t>(sourceRows + i);
auto hitWords = simd::gather(sourceHits, indices);
auto misses = includeMisses ? 0 : simd::toBitMask(hitWords == 0);
Expand Down
32 changes: 18 additions & 14 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ class BaseHashTable {
struct JoinResultIterator {
JoinResultIterator(
std::vector<vector_size_t>&& _varSizeListColumns,
uint64_t _fixedSizeListColumnsSizeSum)
: varSizeListColumns(std::move(_varSizeListColumns)),
uint64_t _fixedSizeListColumnsSizeSum,
std::optional<uint64_t> _estimatedRowSize)
: estimatedRowSize(_estimatedRowSize),
varSizeListColumns(std::move(_varSizeListColumns)),
fixedSizeListColumnsSizeSum(_fixedSizeListColumnsSizeSum) {}

void reset(const HashLookup& lookup) {
Expand All @@ -157,6 +159,8 @@ class BaseHashTable {
return !rows || lastRowIndex == rows->size();
}

/// The row size estimation of the projected output columns, if applicable.
const std::optional<uint64_t> estimatedRowSize;
/// The indexes of the build side projected columns that are variable sized.
const std::vector<vector_size_t> varSizeListColumns;
/// The per row total bytes of the build side projected columns that are
Expand Down Expand Up @@ -635,18 +639,6 @@ class HashTable : public BaseHashTable {
/// purpose.
void checkConsistency() const;

auto& testingOtherTables() const {
return otherTables_;
}

uint64_t testingRehashSize() const {
return rehashSize();
}

char** testingTable() const {
return table_;
}

void extractColumn(
folly::Range<char* const*> rows,
int32_t columnIndex,
Expand All @@ -659,6 +651,18 @@ class HashTable : public BaseHashTable {
result);
}

auto& testingOtherTables() const {
return otherTables_;
}

uint64_t testingRehashSize() const {
return rehashSize();
}

char** testingTable() const {
return table_;
}

private:
// Enables debug stats for collisions for debug build.
#ifdef NDEBUG
Expand Down
Loading

0 comments on commit 59e0e44

Please sign in to comment.