Skip to content

Commit

Permalink
feat: Sort grouping key columns to maximize the prefix sort optimizat…
Browse files Browse the repository at this point in the history
…ion (facebookincubator#11720)

Summary:

Reorder the grouping keys of hash aggregation to maximize the prefix sort acceleration with limited max normalized key size.
It sorts the grouping keys based on the prefix sort encoded key size and putting the grouping key with minimal encoded size first.
This enables sort more columns using prefix sort for use case like hash aggregation which doesn't require a total order.

This PR also cleanup the prefix sort a bit.

Differential Revision: D66638085
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 3, 2024
1 parent db0c3e6 commit e9d14a4
Show file tree
Hide file tree
Showing 18 changed files with 571 additions and 65 deletions.
9 changes: 5 additions & 4 deletions velox/common/base/PrefixSortConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ namespace facebook::velox::common {
struct PrefixSortConfig {
PrefixSortConfig() = default;

PrefixSortConfig(int64_t _maxNormalizedKeySize, int32_t _threshold)
: maxNormalizedKeySize(_maxNormalizedKeySize), threshold(_threshold) {}
PrefixSortConfig(uint32_t _maxNormalizedKeyBytes, uint32_t _minNumRows)
: maxNormalizedKeyBytes(_maxNormalizedKeyBytes),
minNumRows(_minNumRows) {}

/// Max number of bytes can store normalized keys in prefix-sort buffer per
/// entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes.
int64_t maxNormalizedKeySize{128};
uint32_t maxNormalizedKeyBytes{128};

/// PrefixSort will have performance regression when the dateset is too small.
int32_t threshold{130};
uint32_t minNumRows{128};
};
} // namespace facebook::velox::common
5 changes: 5 additions & 0 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ struct SpillConfig {
/// Checks if the given 'startBitOffset' has exceeded the max spill limit.
bool exceedSpillLevelLimit(uint8_t startBitOffset) const;

/// Returns true if prefix sort is enabled.
bool prefixSortEnabled() const {
return prefixSortConfig.has_value();
}

/// A callback function that returns the spill directory path. Implementations
/// can use it to ensure the path exists before returning.
GetSpillDirectoryPathCB getSpillDirPathCb;
Expand Down
4 changes: 2 additions & 2 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
}
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return (isFinal() || isSingle()) && preGroupedKeys().empty() &&
queryConfig.aggregationSpillEnabled();
return (isFinal() || isSingle()) && !groupingKeys().empty() &&
preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled();
}

void AggregationNode::addDetails(std::stringstream& stream) const {
Expand Down
8 changes: 4 additions & 4 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -811,12 +811,12 @@ class QueryConfig {
return get<uint32_t>(kDriverCpuTimeSliceLimitMs, 0);
}

int64_t prefixSortNormalizedKeyMaxBytes() const {
return get<int64_t>(kPrefixSortNormalizedKeyMaxBytes, 128);
uint32_t prefixSortNormalizedKeyMaxBytes() const {
return get<uint32_t>(kPrefixSortNormalizedKeyMaxBytes, 128);
}

int32_t prefixSortMinRows() const {
return get<int32_t>(kPrefixSortMinRows, 130);
uint32_t prefixSortMinRows() const {
return get<uint32_t>(kPrefixSortMinRows, 128);
}

template <typename T>
Expand Down
2 changes: 1 addition & 1 deletion velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ Generic Configuration
- Maximum number of bytes to use for the normalized key in prefix-sort. Use 0 to disable prefix-sort.
* - prefixsort_min_rows
- integer
- 130
- 128
- Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking.

.. _expression-evaluation-conf:
Expand Down
15 changes: 15 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,18 @@ These stats are reported by shuffle operators.
- Indicates the vector serde kind used by an operator for shuffle with 1
for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange,
MergeExchange and PartitionedOutput operators for now.

PrefixSort
----------
These stats are reported by shuffle operators.

.. list-table::
:widths: 50 25 50
:header-rows: 1

* - Stats
- Unit
- Description
* - numPrefixSortKeys
-
- The number of columns sorted using prefix sort.
33 changes: 20 additions & 13 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ GroupingSet::GroupingSet(
const RowTypePtr& inputType,
std::vector<std::unique_ptr<VectorHasher>>&& hashers,
std::vector<column_index_t>&& preGroupedKeys,
std::vector<column_index_t>&& groupingKeyOutputProjections,
std::vector<AggregateInfo>&& aggregates,
bool ignoreNullKeys,
bool isPartial,
Expand All @@ -55,6 +56,7 @@ GroupingSet::GroupingSet(
OperatorCtx* operatorCtx,
folly::Synchronized<common::SpillStats>* spillStats)
: preGroupedKeyChannels_(std::move(preGroupedKeys)),
groupingKeyOutputProjections_(std::move(groupingKeyOutputProjections)),
hashers_(std::move(hashers)),
isGlobal_(hashers_.empty()),
isPartial_(isPartial),
Expand All @@ -74,9 +76,11 @@ GroupingSet::GroupingSet(
spillStats_(spillStats) {
VELOX_CHECK_NOT_NULL(nonReclaimableSection_);
VELOX_CHECK(pool_.trackUsage());

for (auto& hasher : hashers_) {
keyChannels_.push_back(hasher->channel());
}

std::unordered_map<column_index_t, int> channelUseCount;
for (const auto& aggregate : aggregates_) {
for (auto channel : aggregate.inputs) {
Expand Down Expand Up @@ -124,17 +128,18 @@ std::unique_ptr<GroupingSet> GroupingSet::createForMarkDistinct(
return std::make_unique<GroupingSet>(
inputType,
std::move(hashers),
/*preGroupedKeys*/ std::vector<column_index_t>{},
/*aggregates*/ std::vector<AggregateInfo>{},
/*ignoreNullKeys*/ false,
/*isPartial*/ false,
/*isRawInput*/ false,
/*globalGroupingSets*/ std::vector<vector_size_t>{},
/*groupIdColumn*/ std::nullopt,
/*spillConfig*/ nullptr,
/*preGroupedKeys=*/std::vector<column_index_t>{},
/*groupingKeyOutputProjections=*/std::vector<column_index_t>{},
/*aggregates=*/std::vector<AggregateInfo>{},
/*ignoreNullKeys=*/false,
/*isPartial=*/false,
/*isRawInput=*/false,
/*globalGroupingSets=*/std::vector<vector_size_t>{},
/*groupIdColumn=*/std::nullopt,
/*spillConfig=*/nullptr,
nonReclaimableSection,
operatorCtx,
/*spillStats_*/ nullptr);
/*spillStats=*/nullptr);
};

namespace {
Expand Down Expand Up @@ -302,7 +307,6 @@ void GroupingSet::addRemainingInput() {
}

namespace {

void initializeAggregates(
const std::vector<AggregateInfo>& aggregates,
RowContainer& rows,
Expand Down Expand Up @@ -758,10 +762,13 @@ void GroupingSet::extractGroups(
return;
}
RowContainer& rows = *table_->rows();
auto totalKeys = rows.keyTypes().size();
const auto totalKeys = rows.keyTypes().size();
for (int32_t i = 0; i < totalKeys; ++i) {
auto keyVector = result->childAt(i);
rows.extractColumn(groups.data(), groups.size(), i, keyVector);
const auto keyChannel = groupingKeyOutputProjections_.empty()
? i
: groupingKeyOutputProjections_[i];
auto& keyVector = result->childAt(i);
rows.extractColumn(groups.data(), groups.size(), keyChannel, keyVector);
}
for (int32_t i = 0; i < aggregates_.size(); ++i) {
if (!aggregates_[i].sortingKeys.empty()) {
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class GroupingSet {
const RowTypePtr& inputType,
std::vector<std::unique_ptr<VectorHasher>>&& hashers,
std::vector<column_index_t>&& preGroupedKeys,
std::vector<column_index_t>&& groupingKeyOutputProjections,
std::vector<AggregateInfo>&& aggregates,
bool ignoreNullKeys,
bool isPartial,
Expand Down Expand Up @@ -272,8 +273,12 @@ class GroupingSet {

std::vector<column_index_t> keyChannels_;

/// A subset of grouping keys on which the input is clustered.
// A subset of grouping keys on which the input is clustered.
const std::vector<column_index_t> preGroupedKeyChannels_;
// If not empty, it provides the projections for extracting the grouping keys
// from 'table_' for output. The vector index is the output channel and the
// value is the corresponding column index stored in 'table_'.
const std::vector<column_index_t> groupingKeyOutputProjections_;

std::vector<std::unique_ptr<VectorHasher>> hashers_;
const bool isGlobal_;
Expand Down
57 changes: 54 additions & 3 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/
#include "velox/exec/HashAggregation.h"

#include <optional>
#include "velox/exec/PrefixSort.h"
#include "velox/exec/Task.h"
#include "velox/expression/Expr.h"

Expand Down Expand Up @@ -54,9 +56,13 @@ void HashAggregation::initialize() {
VELOX_CHECK(pool()->trackUsage());

const auto& inputType = aggregationNode_->sources()[0]->outputType();
auto hashers =
createVectorHashers(inputType, aggregationNode_->groupingKeys());
auto numHashers = hashers.size();
std::vector<column_index_t> groupingKeyInputChannels;
std::vector<column_index_t> groupingKeyOutputChannels;
setupGroupingKeyChannelProjections(
groupingKeyInputChannels, groupingKeyOutputChannels);

auto hashers = createVectorHashers(inputType, groupingKeyInputChannels);
const auto numHashers = hashers.size();

std::vector<column_index_t> preGroupedChannels;
preGroupedChannels.reserve(aggregationNode_->preGroupedKeys().size());
Expand Down Expand Up @@ -96,6 +102,7 @@ void HashAggregation::initialize() {
inputType,
std::move(hashers),
std::move(preGroupedChannels),
std::move(groupingKeyOutputChannels),
std::move(aggregateInfos),
aggregationNode_->ignoreNullKeys(),
isPartialOutput_,
Expand All @@ -110,6 +117,50 @@ void HashAggregation::initialize() {
aggregationNode_.reset();
}

void HashAggregation::setupGroupingKeyChannelProjections(
std::vector<column_index_t>& groupingKeyInputChannels,
std::vector<column_index_t>& groupingKeyOutputChannels) const {
const auto& inputType = aggregationNode_->sources()[0]->outputType();
const auto& groupingKeys = aggregationNode_->groupingKeys();
// The map from the grouping key output channel to the input channel.
//
// NOTE: grouping key output order is specified as 'groupingKeys' in
// 'aggregationNode_'.
std::vector<IdentityProjection> groupingKeyProjections;
groupingKeyProjections.reserve(groupingKeys.size());
for (auto i = 0; i < groupingKeys.size(); ++i) {
groupingKeyProjections.emplace_back(
exprToChannel(groupingKeys[i].get(), inputType), i);
}

const bool reorderGroupingKeys =
canSpill() && spillConfig()->prefixSortEnabled();
// If prefix sort is enabled, we need to sort the grouping key's layout in the
// grouping set to maximize the prefix sort acceleration if spill is
// triggered. The reorder stores the grouping key with smaller prefix sort
// encoded size first.
if (reorderGroupingKeys) {
PrefixSortLayout::optimizeSortKeysOrder(inputType, groupingKeyProjections);
}

groupingKeyInputChannels.reserve(groupingKeys.size());
for (auto i = 0; i < groupingKeys.size(); ++i) {
groupingKeyInputChannels.push_back(groupingKeyProjections[i].inputChannel);
}

if (!reorderGroupingKeys) {
// If there is no reorder, then grouping key output channels are the same as
// the grouping set.
groupingKeyOutputChannels.clear();
return;
}

groupingKeyOutputChannels.resize(groupingKeys.size());
for (auto i = 0; i < groupingKeys.size(); ++i) {
groupingKeyOutputChannels[groupingKeyProjections[i].outputChannel] = i;
}
}

bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
VELOX_CHECK(isPartialOutput_ && !isGlobal_);
return numInputRows_ > abandonPartialAggregationMinRows_ &&
Expand Down
10 changes: 10 additions & 0 deletions velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ class HashAggregation : public Operator {

RowVectorPtr getDistinctOutput();

// Setup the projections for accessing grouping keys stored in the grouping
// set. 'groupingKeyInputChannels' provides the map from the key column index
// in the grouping set (as vector index) to key column channel in the input
// (as vector value). 'outputChannelProjections' provides the map from the key
// column channel in the output (as vector index) to the key column index in
// the grouping set (as vector value).
void setupGroupingKeyChannelProjections(
std::vector<column_index_t>& groupingKeyInputChannels,
std::vector<column_index_t>& groupingKeyOutputChannels) const;

void updateEstimatedOutputRowSize();

std::shared_ptr<const core::AggregationNode> aggregationNode_;
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@

namespace facebook::velox::exec {

// Represents a column that is copied from input to output, possibly
// with cardinality change, i.e. values removed or duplicated.
/// Represents a column that is copied from input to output, possibly
/// with cardinality change, i.e. values removed or duplicated.
struct IdentityProjection {
IdentityProjection(
column_index_t _inputChannel,
column_index_t _outputChannel)
: inputChannel(_inputChannel), outputChannel(_outputChannel) {}

const column_index_t inputChannel;
const column_index_t outputChannel;
column_index_t inputChannel;
column_index_t outputChannel;
};

struct MemoryStats {
Expand Down
47 changes: 45 additions & 2 deletions velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ compareByWord(uint64_t* left, uint64_t* right, int32_t bytes) {

} // namespace

// static.
PrefixSortLayout PrefixSortLayout::makeSortLayout(
const std::vector<TypePtr>& types,
const std::vector<CompareFlags>& compareFlags,
Expand Down Expand Up @@ -169,6 +170,42 @@ PrefixSortLayout PrefixSortLayout::makeSortLayout(
numPaddingBytes};
}

// static.
void PrefixSortLayout::optimizeSortKeysOrder(
const RowTypePtr& rowType,
std::vector<IdentityProjection>& keyColumnProjections) {
std::vector<std::optional<uint32_t>> encodedKeySizes(
rowType->size(), std::nullopt);
for (const auto& projection : keyColumnProjections) {
encodedKeySizes[projection.inputChannel] = PrefixSortEncoder::encodedSize(
rowType->childAt(projection.inputChannel)->kind());
}

std::sort(
keyColumnProjections.begin(),
keyColumnProjections.end(),
[&](const IdentityProjection& lhs, const IdentityProjection& rhs) {
const auto& lhsEncodedSize = encodedKeySizes[lhs.inputChannel];
const auto& rhsEncodedSize = encodedKeySizes[rhs.inputChannel];
if (lhsEncodedSize.has_value() && !rhsEncodedSize.has_value()) {
return true;
}
if (!lhsEncodedSize.has_value() && rhsEncodedSize.has_value()) {
return false;
}
if (lhsEncodedSize.has_value() && rhsEncodedSize.has_value()) {
if (lhsEncodedSize.value() < rhsEncodedSize.value()) {
return true;
}
if (lhsEncodedSize.value() > rhsEncodedSize.value()) {
return false;
}
}
// Tie breaks with the original key column order.
return lhs.outputChannel < rhs.outputChannel;
});
}

FOLLY_ALWAYS_INLINE int PrefixSort::compareAllNormalizedKeys(
char* left,
char* right) {
Expand Down Expand Up @@ -236,12 +273,12 @@ uint32_t PrefixSort::maxRequiredBytes(
const std::vector<CompareFlags>& compareFlags,
const velox::common::PrefixSortConfig& config,
memory::MemoryPool* pool) {
if (rowContainer->numRows() < config.threshold) {
if (rowContainer->numRows() < config.minNumRows) {
return 0;
}
VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size());
const auto sortLayout = PrefixSortLayout::makeSortLayout(
rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize);
rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes);
if (!sortLayout.hasNormalizedKeys) {
return 0;
}
Expand Down Expand Up @@ -303,6 +340,12 @@ void PrefixSort::sortInternal(
PrefixSortRunner sortRunner(entrySize, swapBuffer->asMutable<char>());
auto* prefixBufferStart = prefixBuffer;
auto* prefixBufferEnd = prefixBuffer + numRows * entrySize;
if (sortLayout_.numNormalizedKeys > 0) {
addThreadLocalRuntimeStat(
PrefixSort::kNumPrefixSortKeys,
RuntimeCounter(
sortLayout_.numNormalizedKeys, RuntimeCounter::Unit::kNone));
}
if (sortLayout_.hasNonNormalizedKey) {
sortRunner.quickSort(
prefixBufferStart, prefixBufferEnd, [&](char* lhs, char* rhs) {
Expand Down
Loading

0 comments on commit e9d14a4

Please sign in to comment.