Skip to content

Commit

Permalink
Extract HashTable::prepareForJoinProbe method (#7926)
Browse files Browse the repository at this point in the history
Summary:
Rename HashTable::prepareForProbe to prepareForGroupProbe and add prepareForJoinProbe method.

Pull Request resolved: #7926

Reviewed By: xiaoxmeng

Differential Revision: D51968271

Pulled By: mbasmanova

fbshipit-source-id: f7a29e92331b610ceccd97fb352b4ee3aad20f1e
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Dec 8, 2023
1 parent 7249cd9 commit a504134
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 48 deletions.
2 changes: 1 addition & 1 deletion velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ void GroupingSet::addInputForActiveRows(
TestValue::adjust(
"facebook::velox::exec::GroupingSet::addInputForActiveRows", this);

table_->prepareForProbe(*lookup_, input, activeRows_, ignoreNullKeys_);
table_->prepareForGroupProbe(*lookup_, input, activeRows_, ignoreNullKeys_);
table_->groupProbe(*lookup_);
masks_.addInput(input, activeRows_);

Expand Down
28 changes: 5 additions & 23 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,9 @@ void HashProbe::addInput(RowVectorPtr input) {
}
input_ = std::move(input);

if (input_->size() > 0) {
const auto numInput = input_->size();

if (numInput > 0) {
noInput_ = false;
}

Expand Down Expand Up @@ -577,28 +579,9 @@ void HashProbe::addInput(RowVectorPtr input) {
if (!hasDecoded) {
decodeAndDetectNonNullKeys();
}

activeRows_ = nonNullInputRows_;
lookup_->hashes.resize(input_->size());
auto mode = table_->hashMode();
auto& buildHashers = table_->hashers();
for (auto i = 0; i < keyChannels_.size(); ++i) {
if (mode != BaseHashTable::HashMode::kHash) {
auto key = input_->childAt(keyChannels_[i]);
buildHashers[i]->lookupValueIds(
*key, activeRows_, scratchMemory_, lookup_->hashes);
} else {
hashers_[i]->hash(activeRows_, i > 0, lookup_->hashes);
}
}
lookup_->rows.clear();
if (activeRows_.isAllSelected()) {
lookup_->rows.resize(activeRows_.size());
std::iota(lookup_->rows.begin(), lookup_->rows.end(), 0);
} else {
activeRows_.applyToSelected(
[&](auto row) { lookup_->rows.push_back(row); });
}

table_->prepareForJoinProbe(*lookup_.get(), input_, activeRows_, false);

passingInputRowsInitialized_ = false;
if (isLeftJoin(joinType_) || isFullJoin(joinType_) || isAntiJoin(joinType_) ||
Expand All @@ -607,7 +590,6 @@ void HashProbe::addInput(RowVectorPtr input) {
// including rows without a match in the output. Also, make sure to
// initialize all 'hits' to nullptr as HashTable::joinProbe will only
// process activeRows_.
auto numInput = input_->size();
auto& hits = lookup_->hits;
hits.resize(numInput);
std::fill(hits.data(), hits.data() + numInput, nullptr);
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,6 @@ class HashProbe : public Operator {
// side. Used by right semi project join.
bool probeSideHasNullKeys_{false};

VectorHasher::ScratchMemory scratchMemory_;

// Rows in 'filterInput_' to apply 'filter_' to.
SelectivityVector filterInputRows_;

Expand Down
56 changes: 49 additions & 7 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1947,7 +1947,20 @@ void HashTable<ignoreNullKeys>::checkConsistency() const {
template class HashTable<true>;
template class HashTable<false>;

void BaseHashTable::prepareForProbe(
namespace {
void populateLookupRows(
const SelectivityVector& rows,
raw_vector<vector_size_t>& lookupRows) {
if (rows.isAllSelected()) {
std::iota(lookupRows.begin(), lookupRows.end(), 0);
} else {
lookupRows.clear();
rows.applyToSelected([&](auto row) { lookupRows.push_back(row); });
}
}
} // namespace

void BaseHashTable::prepareForGroupProbe(
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
Expand Down Expand Up @@ -1984,17 +1997,46 @@ void BaseHashTable::prepareForProbe(
decideHashMode(input->size());
// Do not forward 'ignoreNullKeys' to avoid redundant evaluation of
// deselectRowsWithNulls.
prepareForProbe(lookup, input, rows, false);
prepareForGroupProbe(lookup, input, rows, false);
return;
}
}

if (rows.isAllSelected()) {
std::iota(lookup.rows.begin(), lookup.rows.end(), 0);
} else {
lookup.rows.clear();
rows.applyToSelected([&](auto row) { lookup.rows.push_back(row); });
populateLookupRows(rows, lookup.rows);
}

void BaseHashTable::prepareForJoinProbe(
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
bool decodeAndRemoveNulls) {
auto& hashers = lookup.hashers;

if (decodeAndRemoveNulls) {
for (auto& hasher : hashers) {
auto key = input->childAt(hasher->channel())->loadedVector();
hasher->decode(*key, rows);
}

// A null in any of the keys disables the row.
deselectRowsWithNulls(hashers, rows);
}

lookup.reset(rows.end());

const auto mode = hashMode();
for (auto i = 0; i < hashers.size(); ++i) {
auto& hasher = hashers[i];
if (mode != BaseHashTable::HashMode::kHash) {
auto& key = input->childAt(hasher->channel());
hashers_[i]->lookupValueIds(
*key, rows, lookup.scratchMemory, lookup.hashes);
} else {
hasher->hash(rows, i > 0, lookup.hashes);
}
}

populateLookupRows(rows, lookup.rows);
}

} // namespace facebook::velox::exec
57 changes: 46 additions & 11 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace facebook::velox::exec {

using PartitionBoundIndexType = int64_t;

/// Contains input and output parameters for groupProbe and joinProbe APIs.
struct HashLookup {
explicit HashLookup(const std::vector<std::unique_ptr<VectorHasher>>& h)
: hashers(h) {}
Expand All @@ -36,17 +37,36 @@ struct HashLookup {
newGroups.clear();
}

// One entry per aggregation or join key
/// One entry per group-by or join key.
const std::vector<std::unique_ptr<VectorHasher>>& hashers;

/// Scratch memory used to call VectorHasher::lookupValueIds.
VectorHasher::ScratchMemory scratchMemory;

/// Input to groupProbe and joinProbe APIs.

/// Set of row numbers of row to probe.
raw_vector<vector_size_t> rows;
// Hash number for all input rows.

/// Hashes or value IDs for rows in 'rows'. Not aligned with 'rows'. Index is
/// the row number.
raw_vector<uint64_t> hashes;
// If using valueIds, list of concatenated valueIds. 1:1 with 'hashes'.
raw_vector<uint64_t> normalizedKeys;
// Hit for each row of input corresponding group row or join row.

/// Results of groupProbe and joinProbe APIs.

/// Contains one entry for each row in 'rows'. Index is the row number.
/// For groupProbe, a pointer to an existing or new row with matching grouping
/// keys. For joinProbe, a pointer to the first row with matching keys or null
/// if no match.
raw_vector<char*> hits;
// Indices of newly inserted rows (not found during probe).

/// For groupProbe, row numbers for which a new entry was inserted (didn't
/// exist before the groupProbe). Empty for joinProbe.
std::vector<vector_size_t> newGroups;

/// If using valueIds, list of concatenated valueIds. 1:1 with 'hashes'.
/// Populated by groupProbe and joinProbe.
raw_vector<uint64_t> normalizedKeys;
};

struct HashTableStats {
Expand Down Expand Up @@ -124,7 +144,12 @@ class BaseHashTable {

virtual HashStringAllocator* stringAllocator() = 0;

void prepareForProbe(
/// Populates 'hashes' and 'rows' fields in 'lookup' in preparation for
/// 'groupProbe' call. Rehashes the table if necessary. Uses lookup.hashes to
/// decode grouping keys from 'input'. If 'ignoreNullKeys' is true, updates
/// 'rows' to remove entries with null grouping keys. After this call, 'rows'
/// may have no entries selected.
void prepareForGroupProbe(
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
Expand All @@ -139,6 +164,20 @@ class BaseHashTable {
/// join probe. Use listJoinResults to iterate over the results.
virtual void joinProbe(HashLookup& lookup) = 0;

/// Populates 'hashes' and 'rows' fields in 'lookup' in preparation for
/// 'joinProbe' call. If hash mode is not kHash, populates 'hashes' with
/// values IDs. Rows which do not have value IDs are removed from 'rows'
/// (these rows cannot possibly match). if 'decodeAndRemoveNulls' is true,
/// uses lookup.hashes to decode grouping keys from 'input' and updates 'rows'
/// to remove entries with null grouping keys. Otherwise, assumes the caller
/// has done that already. After this call, 'rows' may have no entries
/// selected.
void prepareForJoinProbe(
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
bool decodeAndRemoveNulls);

/// Fills 'hits' with consecutive hash join results. The corresponding element
/// of 'inputRows' is set to the corresponding row number in probe keys.
/// Returns the number of hits produced. If this s less than hits.size() then
Expand Down Expand Up @@ -256,10 +295,6 @@ class BaseHashTable {
return rows_.get();
}

std::unique_ptr<RowContainer> moveRows() {
return std::move(rows_);
}

// Static functions for processing internals. Public because used in
// structs that define probe and insert algorithms.

Expand Down
6 changes: 3 additions & 3 deletions velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void RowNumber::addInput(RowVectorPtr input) {
}

SelectivityVector rows(numInput);
table_->prepareForProbe(*lookup_, input, rows, false);
table_->prepareForGroupProbe(*lookup_, input, rows, false);
table_->groupProbe(*lookup_);

// Initialize new partitions with zeros.
Expand All @@ -93,7 +93,7 @@ void RowNumber::addInput(RowVectorPtr input) {
void RowNumber::addSpillInput() {
const auto numInput = input_->size();
SelectivityVector rows(numInput);
table_->prepareForProbe(*lookup_, input_, rows, false);
table_->prepareForGroupProbe(*lookup_, input_, rows, false);
table_->groupProbe(*lookup_);

// Initialize new partitions with zeros.
Expand Down Expand Up @@ -157,7 +157,7 @@ void RowNumber::restoreNextSpillPartition() {

const auto numInput = input->size();
SelectivityVector rows(numInput);
table_->prepareForProbe(*lookup_, input, rows, false);
table_->prepareForGroupProbe(*lookup_, input, rows, false);
table_->groupProbe(*lookup_);

auto* counts = data->children().back()->as<FlatVector<int64_t>>();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void TopNRowNumber::addInput(RowVectorPtr input) {
ensureInputFits(input);

SelectivityVector rows(numInput);
table_->prepareForProbe(*lookup_, input, rows, false);
table_->prepareForGroupProbe(*lookup_, input, rows, false);
table_->groupProbe(*lookup_);

// Initialize new partitions.
Expand Down

0 comments on commit a504134

Please sign in to comment.