Skip to content

Commit

Permalink
Improve 'insertForJoin' function performance by group prefetching (#9731
Browse files Browse the repository at this point in the history
)

Summary:
Apply the prefetching optimization for join probe to function 'insertForJoin' to improve it's performance.
Fixes: #9732

Pull Request resolved: #9731

Reviewed By: kagamiori

Differential Revision: D57585922

Pulled By: pedroerp

fbshipit-source-id: c0f08ad85afc2c175d2cf5a0d576eb7bbf0f429e
  • Loading branch information
zhli1142015 authored and facebook-github-bot committed May 21, 2024
1 parent 9c9712e commit 0f15532
Show file tree
Hide file tree
Showing 4 changed files with 397 additions and 16 deletions.
73 changes: 57 additions & 16 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::fullProbe(
}

namespace {
// Group prefetch size for join build & probe.
constexpr int32_t kPrefetchSize = 64;

// Normalized keys have non0-random bits. Bits need to be propagated
// up to make a tag byte and down so that non-lowest bits of
// normalized key affect the hash table index.
Expand Down Expand Up @@ -676,22 +679,21 @@ void HashTable<ignoreNullKeys>::joinNormalizedKeyProbe(HashLookup& lookup) {
int32_t probeIndex = 0;
int32_t numProbes = lookup.rows.size();
const vector_size_t* rows = lookup.rows.data();
constexpr int32_t groupSize = 64;
ProbeState states[groupSize];
ProbeState states[kPrefetchSize];
const uint64_t* keys = lookup.normalizedKeys.data();
const uint64_t* hashes = lookup.hashes.data();
char** hits = lookup.hits.data();
constexpr int32_t kKeyOffset =
-static_cast<int32_t>(sizeof(normalized_key_t));
for (; probeIndex + groupSize <= numProbes; probeIndex += groupSize) {
for (int32_t i = 0; i < groupSize; ++i) {
for (; probeIndex + kPrefetchSize <= numProbes; probeIndex += kPrefetchSize) {
for (int32_t i = 0; i < kPrefetchSize; ++i) {
int32_t row = rows[probeIndex + i];
states[i].preProbe(*this, hashes[row], row);
}
for (int32_t i = 0; i < groupSize; ++i) {
for (int32_t i = 0; i < kPrefetchSize; ++i) {
states[i].firstProbe(*this, kKeyOffset);
}
for (int32_t i = 0; i < groupSize; ++i) {
for (int32_t i = 0; i < kPrefetchSize; ++i) {
hits[states[i].row()] = states[i].joinNormalizedKeyFullProbe(*this, keys);
}
}
Expand Down Expand Up @@ -1152,13 +1154,16 @@ void HashTable<ignoreNullKeys>::pushNext(
}

template <bool ignoreNullKeys>
template <bool isNormailizedKeyMode>
FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::buildFullProbe(
RowContainer* rows,
ProbeState& state,
uint64_t hash,
char* inserted,
bool extraCheck,
TableInsertPartitionInfo* partitionInfo) {
constexpr int32_t kKeyOffset =
-static_cast<int32_t>(sizeof(normalized_key_t));
auto insertFn = [&](int32_t /*row*/, PartitionBoundIndexType index) {
if (partitionInfo != nullptr && !partitionInfo->inRange(index)) {
partitionInfo->addOverflow(inserted);
Expand All @@ -1167,11 +1172,10 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::buildFullProbe(
storeRowPointer(index, hash, inserted);
return nullptr;
};

if (hashMode_ == HashMode::kNormalizedKey) {
if constexpr (isNormailizedKeyMode) {
state.fullProbe<ProbeState::Operation::kInsert>(
*this,
-static_cast<int32_t>(sizeof(normalized_key_t)),
kKeyOffset,
[&](char* group, int32_t /*row*/) {
if (RowContainer::normalizedKey(group) ==
RowContainer::normalizedKey(inserted)) {
Expand Down Expand Up @@ -1206,6 +1210,44 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::buildFullProbe(
}
}

template <bool ignoreNullKeys>
template <bool isNormailizedKeyMode>
FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::insertForJoinWithPrefetch(
RowContainer* rows,
char** groups,
uint64_t* hashes,
int32_t numGroups,
TableInsertPartitionInfo* partitionInfo) {
auto i = 0;
ProbeState states[kPrefetchSize];
constexpr int32_t kKeyOffset =
-static_cast<int32_t>(sizeof(normalized_key_t));
int32_t keyOffset = 0;
if constexpr (isNormailizedKeyMode) {
keyOffset = kKeyOffset;
}
for (; i + kPrefetchSize <= numGroups; i += kPrefetchSize) {
for (int32_t j = 0; j < kPrefetchSize; ++j) {
auto index = i + j;
states[j].preProbe(*this, hashes[index], index);
}
for (int32_t j = 0; j < kPrefetchSize; ++j) {
states[j].firstProbe(*this, keyOffset);
}
for (int32_t j = 0; j < kPrefetchSize; ++j) {
auto index = i + j;
buildFullProbe<isNormailizedKeyMode>(
rows, states[j], hashes[index], groups[index], j != 0, partitionInfo);
}
}
for (; i < numGroups; ++i) {
states[0].preProbe(*this, hashes[i], i);
states[0].firstProbe(*this, keyOffset);
buildFullProbe<isNormailizedKeyMode>(
rows, states[0], hashes[i], groups[i], false, partitionInfo);
}
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::insertForJoin(
RowContainer* rows,
Expand All @@ -1223,13 +1265,12 @@ void HashTable<ignoreNullKeys>::insertForJoin(
}
return;
}

ProbeState state;
for (auto i = 0; i < numGroups; ++i) {
state.preProbe(*this, hashes[i], i);
state.firstProbe(*this, 0);

buildFullProbe(rows, state, hashes[i], groups[i], i, partitionInfo);
if (hashMode_ == HashMode::kNormalizedKey) {
insertForJoinWithPrefetch<true>(
rows, groups, hashes, numGroups, partitionInfo);
} else {
insertForJoinWithPrefetch<false>(
rows, groups, hashes, numGroups, partitionInfo);
}
}

Expand Down
9 changes: 9 additions & 0 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ class HashTable : public BaseHashTable {
// Finishes inserting an entry into a join hash table. If 'partitionInfo' is
// not null and the insert falls out-side of the partition range, then insert
// is not made but row is instead added to 'overflow' in 'partitionInfo'
template <bool isNormailizedKeyMode>
void buildFullProbe(
RowContainer* rows,
ProbeState& state,
Expand All @@ -863,6 +864,14 @@ class HashTable : public BaseHashTable {
bool extraCheck,
TableInsertPartitionInfo* partitionInfo);

template <bool isNormailizedKeyMode>
void insertForJoinWithPrefetch(
RowContainer* rows,
char** groups,
uint64_t* hashes,
int32_t numGroups,
TableInsertPartitionInfo* partitionInfo);

// Updates 'hashers_' to correspond to the keys in the
// content. Returns true if all hashers offer a mapping to value ids
// for array or normalized key.
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ target_link_libraries(
velox_hash_join_list_result_benchmark velox_exec velox_exec_test_lib
velox_vector_test_lib ${FOLLY_BENCHMARK})

add_executable(velox_hash_join_prepare_join_table_benchmark
HashJoinPrepareJoinTableBenchmark.cpp)

target_link_libraries(
velox_hash_join_prepare_join_table_benchmark velox_exec velox_exec_test_lib
velox_vector_test_lib ${FOLLY_BENCHMARK})

if(${VELOX_ENABLE_PARQUET})
add_executable(velox_sort_benchmark RowContainerSortBenchmark.cpp)

Expand Down
Loading

0 comments on commit 0f15532

Please sign in to comment.