Skip to content

Commit

Permalink
feat: Sort grouping key columns to maximize the prefix sort optimizat…
Browse files Browse the repository at this point in the history
…ion (#11720)

Summary:
Pull Request resolved: #11720

Reorder the grouping keys of hash aggregation to maximize the prefix sort acceleration with limited max normalized key size.
It sorts the grouping keys based on the prefix sort encoded key size and putting the grouping key with minimal encoded size first.
This enables sort more columns using prefix sort for use case like hash aggregation which doesn't require a total order.

This PR also cleanup the prefix sort a bit.

Reviewed By: Yuhta, tanjialiang, oerling

Differential Revision: D66638085
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 6, 2024
1 parent 939c102 commit a8e4fdc
Show file tree
Hide file tree
Showing 18 changed files with 632 additions and 71 deletions.
16 changes: 9 additions & 7 deletions velox/common/base/PrefixSortConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ namespace facebook::velox::common {
struct PrefixSortConfig {
PrefixSortConfig() = default;

PrefixSortConfig(int64_t _maxNormalizedKeySize, int32_t _threshold)
: maxNormalizedKeySize(_maxNormalizedKeySize), threshold(_threshold) {}
PrefixSortConfig(uint32_t _maxNormalizedKeyBytes, uint32_t _minNumRows)
: maxNormalizedKeyBytes(_maxNormalizedKeyBytes),
minNumRows(_minNumRows) {}

/// Max number of bytes can store normalized keys in prefix-sort buffer per
/// entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes.
int64_t maxNormalizedKeySize{128};
/// Maximum bytes that can be used to store normalized keys in prefix-sort
/// buffer per entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes.
uint32_t maxNormalizedKeyBytes{128};

/// PrefixSort will have performance regression when the dateset is too small.
int32_t threshold{130};
/// Minimum number of rows to apply prefix sort. Prefix sort does not perform
/// with small datasets.
uint32_t minNumRows{128};
};
} // namespace facebook::velox::common
5 changes: 5 additions & 0 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ struct SpillConfig {
/// Checks if the given 'startBitOffset' has exceeded the max spill limit.
bool exceedSpillLevelLimit(uint8_t startBitOffset) const;

/// Returns true if prefix sort is enabled.
bool prefixSortEnabled() const {
return prefixSortConfig.has_value();
}

/// A callback function that returns the spill directory path. Implementations
/// can use it to ensure the path exists before returning.
GetSpillDirectoryPathCB getSpillDirPathCb;
Expand Down
4 changes: 2 additions & 2 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
}
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return (isFinal() || isSingle()) && preGroupedKeys().empty() &&
queryConfig.aggregationSpillEnabled();
return (isFinal() || isSingle()) && !groupingKeys().empty() &&
preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled();
}

void AggregationNode::addDetails(std::stringstream& stream) const {
Expand Down
8 changes: 4 additions & 4 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -836,12 +836,12 @@ class QueryConfig {
return get<uint32_t>(kDriverCpuTimeSliceLimitMs, 0);
}

int64_t prefixSortNormalizedKeyMaxBytes() const {
return get<int64_t>(kPrefixSortNormalizedKeyMaxBytes, 128);
uint32_t prefixSortNormalizedKeyMaxBytes() const {
return get<uint32_t>(kPrefixSortNormalizedKeyMaxBytes, 128);
}

int32_t prefixSortMinRows() const {
return get<int32_t>(kPrefixSortMinRows, 130);
uint32_t prefixSortMinRows() const {
return get<uint32_t>(kPrefixSortMinRows, 128);
}

double scaleWriterRebalanceMaxMemoryUsageRatio() const {
Expand Down
2 changes: 1 addition & 1 deletion velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ Generic Configuration
- Maximum number of bytes to use for the normalized key in prefix-sort. Use 0 to disable prefix-sort.
* - prefixsort_min_rows
- integer
- 130
- 128
- Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking.

.. _expression-evaluation-conf:
Expand Down
15 changes: 15 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,18 @@ These stats are reported by shuffle operators.
- Indicates the vector serde kind used by an operator for shuffle with 1
for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange,
MergeExchange and PartitionedOutput operators for now.

PrefixSort
----------
These stats are reported by prefix sort.

.. list-table::
:widths: 50 25 50
:header-rows: 1

* - Stats
- Unit
- Description
* - numPrefixSortKeys
-
- The number of columns sorted using prefix sort.
47 changes: 33 additions & 14 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ GroupingSet::GroupingSet(
const RowTypePtr& inputType,
std::vector<std::unique_ptr<VectorHasher>>&& hashers,
std::vector<column_index_t>&& preGroupedKeys,
std::vector<column_index_t>&& groupingKeyOutputProjections,
std::vector<AggregateInfo>&& aggregates,
bool ignoreNullKeys,
bool isPartial,
Expand All @@ -55,6 +56,7 @@ GroupingSet::GroupingSet(
OperatorCtx* operatorCtx,
folly::Synchronized<common::SpillStats>* spillStats)
: preGroupedKeyChannels_(std::move(preGroupedKeys)),
groupingKeyOutputProjections_(std::move(groupingKeyOutputProjections)),
hashers_(std::move(hashers)),
isGlobal_(hashers_.empty()),
isPartial_(isPartial),
Expand All @@ -74,9 +76,21 @@ GroupingSet::GroupingSet(
spillStats_(spillStats) {
VELOX_CHECK_NOT_NULL(nonReclaimableSection_);
VELOX_CHECK(pool_.trackUsage());

for (auto& hasher : hashers_) {
keyChannels_.push_back(hasher->channel());
}

if (groupingKeyOutputProjections_.empty()) {
groupingKeyOutputProjections_.resize(keyChannels_.size());
std::iota(
groupingKeyOutputProjections_.begin(),
groupingKeyOutputProjections_.end(),
0);
} else {
VELOX_CHECK_EQ(groupingKeyOutputProjections_.size(), keyChannels_.size());
}

std::unordered_map<column_index_t, int> channelUseCount;
for (const auto& aggregate : aggregates_) {
for (auto channel : aggregate.inputs) {
Expand Down Expand Up @@ -124,17 +138,18 @@ std::unique_ptr<GroupingSet> GroupingSet::createForMarkDistinct(
return std::make_unique<GroupingSet>(
inputType,
std::move(hashers),
/*preGroupedKeys*/ std::vector<column_index_t>{},
/*aggregates*/ std::vector<AggregateInfo>{},
/*ignoreNullKeys*/ false,
/*isPartial*/ false,
/*isRawInput*/ false,
/*globalGroupingSets*/ std::vector<vector_size_t>{},
/*groupIdColumn*/ std::nullopt,
/*spillConfig*/ nullptr,
/*preGroupedKeys=*/std::vector<column_index_t>{},
/*groupingKeyOutputProjections=*/std::vector<column_index_t>{},
/*aggregates=*/std::vector<AggregateInfo>{},
/*ignoreNullKeys=*/false,
/*isPartial=*/false,
/*isRawInput=*/false,
/*globalGroupingSets=*/std::vector<vector_size_t>{},
/*groupIdColumn=*/std::nullopt,
/*spillConfig=*/nullptr,
nonReclaimableSection,
operatorCtx,
/*spillStats_*/ nullptr);
/*spillStats=*/nullptr);
};

namespace {
Expand Down Expand Up @@ -302,7 +317,6 @@ void GroupingSet::addRemainingInput() {
}

namespace {

void initializeAggregates(
const std::vector<AggregateInfo>& aggregates,
RowContainer& rows,
Expand Down Expand Up @@ -758,10 +772,14 @@ void GroupingSet::extractGroups(
return;
}
RowContainer& rows = *table_->rows();
auto totalKeys = rows.keyTypes().size();
const auto totalKeys = rows.keyTypes().size();
for (int32_t i = 0; i < totalKeys; ++i) {
auto keyVector = result->childAt(i);
rows.extractColumn(groups.data(), groups.size(), i, keyVector);
auto& keyVector = result->childAt(i);
rows.extractColumn(
groups.data(),
groups.size(),
groupingKeyOutputProjections_[i],
keyVector);
}
for (int32_t i = 0; i < aggregates_.size(); ++i) {
if (!aggregates_[i].sortingKeys.empty()) {
Expand Down Expand Up @@ -1330,7 +1348,8 @@ void GroupingSet::toIntermediate(
}

for (auto i = 0; i < keyChannels_.size(); ++i) {
result->childAt(i) = input->childAt(keyChannels_[i]);
const auto inputKeyChannel = keyChannels_[groupingKeyOutputProjections_[i]];
result->childAt(i) = input->childAt(inputKeyChannel);
}
for (auto i = 0; i < aggregates_.size(); ++i) {
auto& function = aggregates_[i].function;
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class GroupingSet {
const RowTypePtr& inputType,
std::vector<std::unique_ptr<VectorHasher>>&& hashers,
std::vector<column_index_t>&& preGroupedKeys,
std::vector<column_index_t>&& groupingKeyOutputProjections,
std::vector<AggregateInfo>&& aggregates,
bool ignoreNullKeys,
bool isPartial,
Expand Down Expand Up @@ -272,9 +273,14 @@ class GroupingSet {

std::vector<column_index_t> keyChannels_;

/// A subset of grouping keys on which the input is clustered.
// A subset of grouping keys on which the input is clustered.
const std::vector<column_index_t> preGroupedKeyChannels_;

// Provides the column projections for extracting the grouping keys from
// 'table_' for output. The vector index is the output channel and the value
// is the corresponding column index stored in 'table_'.
std::vector<column_index_t> groupingKeyOutputProjections_;

std::vector<std::unique_ptr<VectorHasher>> hashers_;
const bool isGlobal_;
const bool isPartial_;
Expand Down
66 changes: 61 additions & 5 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/
#include "velox/exec/HashAggregation.h"

#include <optional>
#include "velox/exec/PrefixSort.h"
#include "velox/exec/Task.h"
#include "velox/expression/Expr.h"

Expand Down Expand Up @@ -54,9 +56,13 @@ void HashAggregation::initialize() {
VELOX_CHECK(pool()->trackUsage());

const auto& inputType = aggregationNode_->sources()[0]->outputType();
auto hashers =
createVectorHashers(inputType, aggregationNode_->groupingKeys());
auto numHashers = hashers.size();
std::vector<column_index_t> groupingKeyInputChannels;
std::vector<column_index_t> groupingKeyOutputChannels;
setupGroupingKeyChannelProjections(
groupingKeyInputChannels, groupingKeyOutputChannels);

auto hashers = createVectorHashers(inputType, groupingKeyInputChannels);
const auto numHashers = hashers.size();

std::vector<column_index_t> preGroupedChannels;
preGroupedChannels.reserve(aggregationNode_->preGroupedKeys().size());
Expand All @@ -82,7 +88,8 @@ void HashAggregation::initialize() {
}

for (auto i = 0; i < hashers.size(); ++i) {
identityProjections_.emplace_back(hashers[i]->channel(), i);
identityProjections_.emplace_back(
hashers[groupingKeyOutputChannels[i]]->channel(), i);
}

std::optional<column_index_t> groupIdChannel;
Expand All @@ -96,6 +103,7 @@ void HashAggregation::initialize() {
inputType,
std::move(hashers),
std::move(preGroupedChannels),
std::move(groupingKeyOutputChannels),
std::move(aggregateInfos),
aggregationNode_->ignoreNullKeys(),
isPartialOutput_,
Expand All @@ -110,6 +118,54 @@ void HashAggregation::initialize() {
aggregationNode_.reset();
}

void HashAggregation::setupGroupingKeyChannelProjections(
std::vector<column_index_t>& groupingKeyInputChannels,
std::vector<column_index_t>& groupingKeyOutputChannels) const {
VELOX_CHECK(groupingKeyInputChannels.empty());
VELOX_CHECK(groupingKeyOutputChannels.empty());

const auto& inputType = aggregationNode_->sources()[0]->outputType();
const auto& groupingKeys = aggregationNode_->groupingKeys();
// The map from the grouping key output channel to the input channel.
//
// NOTE: grouping key output order is specified as 'groupingKeys' in
// 'aggregationNode_'.
std::vector<IdentityProjection> groupingKeyProjections;
groupingKeyProjections.reserve(groupingKeys.size());
for (auto i = 0; i < groupingKeys.size(); ++i) {
groupingKeyProjections.emplace_back(
exprToChannel(groupingKeys[i].get(), inputType), i);
}

const bool reorderGroupingKeys =
canSpill() && spillConfig()->prefixSortEnabled();
// If prefix sort is enabled, we need to sort the grouping key's layout in the
// grouping set to maximize the prefix sort acceleration if spill is
// triggered. The reorder stores the grouping key with smaller prefix sort
// encoded size first.
if (reorderGroupingKeys) {
PrefixSortLayout::optimizeSortKeysOrder(inputType, groupingKeyProjections);
}

groupingKeyInputChannels.reserve(groupingKeys.size());
for (auto i = 0; i < groupingKeys.size(); ++i) {
groupingKeyInputChannels.push_back(groupingKeyProjections[i].inputChannel);
}

groupingKeyOutputChannels.resize(groupingKeys.size());
if (!reorderGroupingKeys) {
// If there is no reorder, then grouping key output channels are the same as
// the column index order int he grouping set.
std::iota(
groupingKeyOutputChannels.begin(), groupingKeyOutputChannels.end(), 0);
return;
}

for (auto i = 0; i < groupingKeys.size(); ++i) {
groupingKeyOutputChannels[groupingKeyProjections[i].outputChannel] = i;
}
}

bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
VELOX_CHECK(isPartialOutput_ && !isGlobal_);
return numInputRows_ > abandonPartialAggregationMinRows_ &&
Expand Down Expand Up @@ -328,7 +384,7 @@ RowVectorPtr HashAggregation::getDistinctOutput() {
auto& lookup = groupingSet_->hashLookup();
const auto size = lookup.newGroups.size();
BufferPtr indices = allocateIndices(size, operatorCtx_->pool());
auto indicesPtr = indices->asMutable<vector_size_t>();
auto* indicesPtr = indices->asMutable<vector_size_t>();
std::copy(lookup.newGroups.begin(), lookup.newGroups.end(), indicesPtr);
newDistincts_ = false;
auto output = fillOutput(size, indices);
Expand Down
10 changes: 10 additions & 0 deletions velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ class HashAggregation : public Operator {

RowVectorPtr getDistinctOutput();

// Setups the projections for accessing grouping keys stored in grouping
// set.
// For 'groupingKeyInputChannels', the index is the key column index from
// the grouping set, and the value is the key column channel from the input.
// For 'outputChannelProjections', the index is the key column channel from
// the output, and the value is the key column index from the grouping set.
void setupGroupingKeyChannelProjections(
std::vector<column_index_t>& groupingKeyInputChannels,
std::vector<column_index_t>& groupingKeyOutputChannels) const;

void updateEstimatedOutputRowSize();

std::shared_ptr<const core::AggregationNode> aggregationNode_;
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@

namespace facebook::velox::exec {

// Represents a column that is copied from input to output, possibly
// with cardinality change, i.e. values removed or duplicated.
/// Represents a column that is copied from input to output, possibly
/// with cardinality change, i.e. values removed or duplicated.
struct IdentityProjection {
IdentityProjection(
column_index_t _inputChannel,
column_index_t _outputChannel)
: inputChannel(_inputChannel), outputChannel(_outputChannel) {}

const column_index_t inputChannel;
const column_index_t outputChannel;
column_index_t inputChannel;
column_index_t outputChannel;
};

struct MemoryStats {
Expand Down
Loading

0 comments on commit a8e4fdc

Please sign in to comment.