From ec2f8e8593df083086855aff64817ef656b43137 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Fri, 6 Dec 2024 00:13:26 -0800 Subject: [PATCH] feat: Sort grouping key columns to maximize the prefix sort optimization (#11720) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11720 Reorder the grouping keys of hash aggregation to maximize the prefix sort acceleration with limited max normalized key size. It sorts the grouping keys based on the prefix sort encoded key size and putting the grouping key with minimal encoded size first. This enables sort more columns using prefix sort for use case like hash aggregation which doesn't require a total order. This PR also cleanup the prefix sort a bit. Reviewed By: Yuhta, tanjialiang, oerling Differential Revision: D66638085 --- velox/common/base/PrefixSortConfig.h | 16 +- velox/common/base/SpillConfig.h | 5 + velox/core/PlanNode.cpp | 4 +- velox/core/QueryConfig.h | 8 +- velox/docs/configs.rst | 2 +- velox/docs/monitoring/stats.rst | 15 ++ velox/exec/GroupingSet.cpp | 47 ++++-- velox/exec/GroupingSet.h | 8 +- velox/exec/HashAggregation.cpp | 66 +++++++- velox/exec/HashAggregation.h | 10 ++ velox/exec/Operator.h | 8 +- velox/exec/PrefixSort.cpp | 47 +++++- velox/exec/PrefixSort.h | 20 ++- velox/exec/VectorHasher.cpp | 19 ++- velox/exec/VectorHasher.h | 4 + velox/exec/tests/AggregationTest.cpp | 233 +++++++++++++++++++++++++++ velox/exec/tests/PrefixSortTest.cpp | 46 ++++++ velox/exec/tests/SortBufferTest.cpp | 145 ++++++++++++++--- 18 files changed, 632 insertions(+), 71 deletions(-) diff --git a/velox/common/base/PrefixSortConfig.h b/velox/common/base/PrefixSortConfig.h index 27048174d8572..46ac493915546 100644 --- a/velox/common/base/PrefixSortConfig.h +++ b/velox/common/base/PrefixSortConfig.h @@ -24,14 +24,16 @@ namespace facebook::velox::common { struct PrefixSortConfig { PrefixSortConfig() = default; - PrefixSortConfig(int64_t _maxNormalizedKeySize, int32_t _threshold) - : maxNormalizedKeySize(_maxNormalizedKeySize), threshold(_threshold) {} + PrefixSortConfig(uint32_t _maxNormalizedKeyBytes, uint32_t _minNumRows) + : maxNormalizedKeyBytes(_maxNormalizedKeyBytes), + minNumRows(_minNumRows) {} - /// Max number of bytes can store normalized keys in prefix-sort buffer per - /// entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes. - int64_t maxNormalizedKeySize{128}; + /// Maximum bytes that can be used to store normalized keys in prefix-sort + /// buffer per entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes. + uint32_t maxNormalizedKeyBytes{128}; - /// PrefixSort will have performance regression when the dateset is too small. - int32_t threshold{130}; + /// Minimum number of rows to apply prefix sort. Prefix sort does not perform + /// with small datasets. + uint32_t minNumRows{128}; }; } // namespace facebook::velox::common diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index 09790dff2d7d8..aeca392f38781 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -75,6 +75,11 @@ struct SpillConfig { /// Checks if the given 'startBitOffset' has exceeded the max spill limit. bool exceedSpillLevelLimit(uint8_t startBitOffset) const; + /// Returns true if prefix sort is enabled. + bool prefixSortEnabled() const { + return prefixSortConfig.has_value(); + } + /// A callback function that returns the spill directory path. Implementations /// can use it to ensure the path exists before returning. GetSpillDirectoryPathCB getSpillDirPathCb; diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 292a050e4fb2c..ac706fe974049 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -237,8 +237,8 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { } // TODO: add spilling for pre-grouped aggregation later: // https://github.com/facebookincubator/velox/issues/3264 - return (isFinal() || isSingle()) && preGroupedKeys().empty() && - queryConfig.aggregationSpillEnabled(); + return (isFinal() || isSingle()) && !groupingKeys().empty() && + preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled(); } void AggregationNode::addDetails(std::stringstream& stream) const { diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 8bdcd72ebc67f..7bceb06d117bb 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -836,12 +836,12 @@ class QueryConfig { return get(kDriverCpuTimeSliceLimitMs, 0); } - int64_t prefixSortNormalizedKeyMaxBytes() const { - return get(kPrefixSortNormalizedKeyMaxBytes, 128); + uint32_t prefixSortNormalizedKeyMaxBytes() const { + return get(kPrefixSortNormalizedKeyMaxBytes, 128); } - int32_t prefixSortMinRows() const { - return get(kPrefixSortMinRows, 130); + uint32_t prefixSortMinRows() const { + return get(kPrefixSortMinRows, 128); } double scaleWriterRebalanceMaxMemoryUsageRatio() const { diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index e6ce8423b28b0..f25b89a5518e5 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -142,7 +142,7 @@ Generic Configuration - Maximum number of bytes to use for the normalized key in prefix-sort. Use 0 to disable prefix-sort. * - prefixsort_min_rows - integer - - 130 + - 128 - Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking. .. _expression-evaluation-conf: diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index f77d2ba824ca0..53538d8549749 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -190,3 +190,18 @@ These stats are reported by shuffle operators. - Indicates the vector serde kind used by an operator for shuffle with 1 for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange, MergeExchange and PartitionedOutput operators for now. + +PrefixSort +---------- +These stats are reported by prefix sort. + +.. list-table:: + :widths: 50 25 50 + :header-rows: 1 + + * - Stats + - Unit + - Description + * - numPrefixSortKeys + - + - The number of columns sorted using prefix sort. diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 31d1d3e2ad0da..b0ba52cd98285 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -44,6 +44,7 @@ GroupingSet::GroupingSet( const RowTypePtr& inputType, std::vector>&& hashers, std::vector&& preGroupedKeys, + std::vector&& groupingKeyOutputProjections, std::vector&& aggregates, bool ignoreNullKeys, bool isPartial, @@ -55,6 +56,7 @@ GroupingSet::GroupingSet( OperatorCtx* operatorCtx, folly::Synchronized* spillStats) : preGroupedKeyChannels_(std::move(preGroupedKeys)), + groupingKeyOutputProjections_(std::move(groupingKeyOutputProjections)), hashers_(std::move(hashers)), isGlobal_(hashers_.empty()), isPartial_(isPartial), @@ -74,9 +76,21 @@ GroupingSet::GroupingSet( spillStats_(spillStats) { VELOX_CHECK_NOT_NULL(nonReclaimableSection_); VELOX_CHECK(pool_.trackUsage()); + for (auto& hasher : hashers_) { keyChannels_.push_back(hasher->channel()); } + + if (groupingKeyOutputProjections_.empty()) { + groupingKeyOutputProjections_.resize(keyChannels_.size()); + std::iota( + groupingKeyOutputProjections_.begin(), + groupingKeyOutputProjections_.end(), + 0); + } else { + VELOX_CHECK_EQ(groupingKeyOutputProjections_.size(), keyChannels_.size()); + } + std::unordered_map channelUseCount; for (const auto& aggregate : aggregates_) { for (auto channel : aggregate.inputs) { @@ -124,17 +138,18 @@ std::unique_ptr GroupingSet::createForMarkDistinct( return std::make_unique( inputType, std::move(hashers), - /*preGroupedKeys*/ std::vector{}, - /*aggregates*/ std::vector{}, - /*ignoreNullKeys*/ false, - /*isPartial*/ false, - /*isRawInput*/ false, - /*globalGroupingSets*/ std::vector{}, - /*groupIdColumn*/ std::nullopt, - /*spillConfig*/ nullptr, + /*preGroupedKeys=*/std::vector{}, + /*groupingKeyOutputProjections=*/std::vector{}, + /*aggregates=*/std::vector{}, + /*ignoreNullKeys=*/false, + /*isPartial=*/false, + /*isRawInput=*/false, + /*globalGroupingSets=*/std::vector{}, + /*groupIdColumn=*/std::nullopt, + /*spillConfig=*/nullptr, nonReclaimableSection, operatorCtx, - /*spillStats_*/ nullptr); + /*spillStats=*/nullptr); }; namespace { @@ -302,7 +317,6 @@ void GroupingSet::addRemainingInput() { } namespace { - void initializeAggregates( const std::vector& aggregates, RowContainer& rows, @@ -758,10 +772,14 @@ void GroupingSet::extractGroups( return; } RowContainer& rows = *table_->rows(); - auto totalKeys = rows.keyTypes().size(); + const auto totalKeys = rows.keyTypes().size(); for (int32_t i = 0; i < totalKeys; ++i) { - auto keyVector = result->childAt(i); - rows.extractColumn(groups.data(), groups.size(), i, keyVector); + auto& keyVector = result->childAt(i); + rows.extractColumn( + groups.data(), + groups.size(), + groupingKeyOutputProjections_[i], + keyVector); } for (int32_t i = 0; i < aggregates_.size(); ++i) { if (!aggregates_[i].sortingKeys.empty()) { @@ -1330,7 +1348,8 @@ void GroupingSet::toIntermediate( } for (auto i = 0; i < keyChannels_.size(); ++i) { - result->childAt(i) = input->childAt(keyChannels_[i]); + const auto inputKeyChannel = keyChannels_[groupingKeyOutputProjections_[i]]; + result->childAt(i) = input->childAt(inputKeyChannel); } for (auto i = 0; i < aggregates_.size(); ++i) { auto& function = aggregates_[i].function; diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index a80d223c8520b..fda8f4eea02f4 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -32,6 +32,7 @@ class GroupingSet { const RowTypePtr& inputType, std::vector>&& hashers, std::vector&& preGroupedKeys, + std::vector&& groupingKeyOutputProjections, std::vector&& aggregates, bool ignoreNullKeys, bool isPartial, @@ -272,9 +273,14 @@ class GroupingSet { std::vector keyChannels_; - /// A subset of grouping keys on which the input is clustered. + // A subset of grouping keys on which the input is clustered. const std::vector preGroupedKeyChannels_; + // Provides the column projections for extracting the grouping keys from + // 'table_' for output. The vector index is the output channel and the value + // is the corresponding column index stored in 'table_'. + std::vector groupingKeyOutputProjections_; + std::vector> hashers_; const bool isGlobal_; const bool isPartial_; diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index ef072d059d2c4..14e87f336ec24 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -14,7 +14,9 @@ * limitations under the License. */ #include "velox/exec/HashAggregation.h" + #include +#include "velox/exec/PrefixSort.h" #include "velox/exec/Task.h" #include "velox/expression/Expr.h" @@ -54,9 +56,13 @@ void HashAggregation::initialize() { VELOX_CHECK(pool()->trackUsage()); const auto& inputType = aggregationNode_->sources()[0]->outputType(); - auto hashers = - createVectorHashers(inputType, aggregationNode_->groupingKeys()); - auto numHashers = hashers.size(); + std::vector groupingKeyInputChannels; + std::vector groupingKeyOutputChannels; + setupGroupingKeyChannelProjections( + groupingKeyInputChannels, groupingKeyOutputChannels); + + auto hashers = createVectorHashers(inputType, groupingKeyInputChannels); + const auto numHashers = hashers.size(); std::vector preGroupedChannels; preGroupedChannels.reserve(aggregationNode_->preGroupedKeys().size()); @@ -82,7 +88,8 @@ void HashAggregation::initialize() { } for (auto i = 0; i < hashers.size(); ++i) { - identityProjections_.emplace_back(hashers[i]->channel(), i); + identityProjections_.emplace_back( + hashers[groupingKeyOutputChannels[i]]->channel(), i); } std::optional groupIdChannel; @@ -96,6 +103,7 @@ void HashAggregation::initialize() { inputType, std::move(hashers), std::move(preGroupedChannels), + std::move(groupingKeyOutputChannels), std::move(aggregateInfos), aggregationNode_->ignoreNullKeys(), isPartialOutput_, @@ -110,6 +118,54 @@ void HashAggregation::initialize() { aggregationNode_.reset(); } +void HashAggregation::setupGroupingKeyChannelProjections( + std::vector& groupingKeyInputChannels, + std::vector& groupingKeyOutputChannels) const { + VELOX_CHECK(groupingKeyInputChannels.empty()); + VELOX_CHECK(groupingKeyOutputChannels.empty()); + + const auto& inputType = aggregationNode_->sources()[0]->outputType(); + const auto& groupingKeys = aggregationNode_->groupingKeys(); + // The map from the grouping key output channel to the input channel. + // + // NOTE: grouping key output order is specified as 'groupingKeys' in + // 'aggregationNode_'. + std::vector groupingKeyProjections; + groupingKeyProjections.reserve(groupingKeys.size()); + for (auto i = 0; i < groupingKeys.size(); ++i) { + groupingKeyProjections.emplace_back( + exprToChannel(groupingKeys[i].get(), inputType), i); + } + + const bool reorderGroupingKeys = + canSpill() && spillConfig()->prefixSortEnabled(); + // If prefix sort is enabled, we need to sort the grouping key's layout in the + // grouping set to maximize the prefix sort acceleration if spill is + // triggered. The reorder stores the grouping key with smaller prefix sort + // encoded size first. + if (reorderGroupingKeys) { + PrefixSortLayout::optimizeSortKeysOrder(inputType, groupingKeyProjections); + } + + groupingKeyInputChannels.reserve(groupingKeys.size()); + for (auto i = 0; i < groupingKeys.size(); ++i) { + groupingKeyInputChannels.push_back(groupingKeyProjections[i].inputChannel); + } + + groupingKeyOutputChannels.resize(groupingKeys.size()); + if (!reorderGroupingKeys) { + // If there is no reorder, then grouping key output channels are the same as + // the column index order int he grouping set. + std::iota( + groupingKeyOutputChannels.begin(), groupingKeyOutputChannels.end(), 0); + return; + } + + for (auto i = 0; i < groupingKeys.size(); ++i) { + groupingKeyOutputChannels[groupingKeyProjections[i].outputChannel] = i; + } +} + bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { VELOX_CHECK(isPartialOutput_ && !isGlobal_); return numInputRows_ > abandonPartialAggregationMinRows_ && @@ -328,7 +384,7 @@ RowVectorPtr HashAggregation::getDistinctOutput() { auto& lookup = groupingSet_->hashLookup(); const auto size = lookup.newGroups.size(); BufferPtr indices = allocateIndices(size, operatorCtx_->pool()); - auto indicesPtr = indices->asMutable(); + auto* indicesPtr = indices->asMutable(); std::copy(lookup.newGroups.begin(), lookup.newGroups.end(), indicesPtr); newDistincts_ = false; auto output = fillOutput(size, indices); diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index 1bf28c43428ad..5cd44f77cb4b2 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -72,6 +72,16 @@ class HashAggregation : public Operator { RowVectorPtr getDistinctOutput(); + // Setups the projections for accessing grouping keys stored in grouping + // set. + // For 'groupingKeyInputChannels', the index is the key column index from + // the grouping set, and the value is the key column channel from the input. + // For 'outputChannelProjections', the index is the key column channel from + // the output, and the value is the key column index from the grouping set. + void setupGroupingKeyChannelProjections( + std::vector& groupingKeyInputChannels, + std::vector& groupingKeyOutputChannels) const; + void updateEstimatedOutputRowSize(); std::shared_ptr aggregationNode_; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index db48b26df11a6..de06d78da65a3 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -27,16 +27,16 @@ namespace facebook::velox::exec { -// Represents a column that is copied from input to output, possibly -// with cardinality change, i.e. values removed or duplicated. +/// Represents a column that is copied from input to output, possibly +/// with cardinality change, i.e. values removed or duplicated. struct IdentityProjection { IdentityProjection( column_index_t _inputChannel, column_index_t _outputChannel) : inputChannel(_inputChannel), outputChannel(_outputChannel) {} - const column_index_t inputChannel; - const column_index_t outputChannel; + column_index_t inputChannel; + column_index_t outputChannel; }; struct MemoryStats { diff --git a/velox/exec/PrefixSort.cpp b/velox/exec/PrefixSort.cpp index 8488194c72e8e..b4dfb1f6e0d12 100644 --- a/velox/exec/PrefixSort.cpp +++ b/velox/exec/PrefixSort.cpp @@ -126,6 +126,7 @@ compareByWord(uint64_t* left, uint64_t* right, int32_t bytes) { } // namespace +// static. PrefixSortLayout PrefixSortLayout::makeSortLayout( const std::vector& types, const std::vector& compareFlags, @@ -169,6 +170,42 @@ PrefixSortLayout PrefixSortLayout::makeSortLayout( numPaddingBytes}; } +// static. +void PrefixSortLayout::optimizeSortKeysOrder( + const RowTypePtr& rowType, + std::vector& keyColumnProjections) { + std::vector> encodedKeySizes( + rowType->size(), std::nullopt); + for (const auto& projection : keyColumnProjections) { + encodedKeySizes[projection.inputChannel] = PrefixSortEncoder::encodedSize( + rowType->childAt(projection.inputChannel)->kind()); + } + + std::sort( + keyColumnProjections.begin(), + keyColumnProjections.end(), + [&](const IdentityProjection& lhs, const IdentityProjection& rhs) { + const auto& lhsEncodedSize = encodedKeySizes[lhs.inputChannel]; + const auto& rhsEncodedSize = encodedKeySizes[rhs.inputChannel]; + if (lhsEncodedSize.has_value() && !rhsEncodedSize.has_value()) { + return true; + } + if (!lhsEncodedSize.has_value() && rhsEncodedSize.has_value()) { + return false; + } + if (lhsEncodedSize.has_value() && rhsEncodedSize.has_value()) { + if (lhsEncodedSize.value() < rhsEncodedSize.value()) { + return true; + } + if (lhsEncodedSize.value() > rhsEncodedSize.value()) { + return false; + } + } + // Tie breaks with the original key column order. + return lhs.outputChannel < rhs.outputChannel; + }); +} + FOLLY_ALWAYS_INLINE int PrefixSort::compareAllNormalizedKeys( char* left, char* right) { @@ -236,12 +273,12 @@ uint32_t PrefixSort::maxRequiredBytes( const std::vector& compareFlags, const velox::common::PrefixSortConfig& config, memory::MemoryPool* pool) { - if (rowContainer->numRows() < config.threshold) { + if (rowContainer->numRows() < config.minNumRows) { return 0; } VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size()); const auto sortLayout = PrefixSortLayout::makeSortLayout( - rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize); + rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes); if (!sortLayout.hasNormalizedKeys) { return 0; } @@ -303,6 +340,12 @@ void PrefixSort::sortInternal( PrefixSortRunner sortRunner(entrySize, swapBuffer->asMutable()); auto* prefixBufferStart = prefixBuffer; auto* prefixBufferEnd = prefixBuffer + numRows * entrySize; + if (sortLayout_.numNormalizedKeys > 0) { + addThreadLocalRuntimeStat( + PrefixSort::kNumPrefixSortKeys, + RuntimeCounter( + sortLayout_.numNormalizedKeys, RuntimeCounter::Unit::kNone)); + } if (sortLayout_.hasNonNormalizedKey) { sortRunner.quickSort( prefixBufferStart, prefixBufferEnd, [&](char* lhs, char* rhs) { diff --git a/velox/exec/PrefixSort.h b/velox/exec/PrefixSort.h index 2a8235c6e7018..7ac34b4305519 100644 --- a/velox/exec/PrefixSort.h +++ b/velox/exec/PrefixSort.h @@ -16,6 +16,7 @@ #pragma once #include "velox/common/base/PrefixSortConfig.h" +#include "velox/exec/Operator.h" #include "velox/exec/RowContainer.h" #include "velox/exec/prefixsort/PrefixSortAlgorithm.h" #include "velox/exec/prefixsort/PrefixSortEncoder.h" @@ -67,6 +68,17 @@ struct PrefixSortLayout { const std::vector& types, const std::vector& compareFlags, uint32_t maxNormalizedKeySize); + + /// Optimizes the order of sort key columns to maximize the number of prefix + /// sort keys for acceleration. This only applies for use case which doesn't + /// need a total order such as spill sort for hash aggregation. + /// 'keyColumnProjections' provides the mapping from the orginal key column + /// order to its channel in 'rowType'. The function reoders + /// 'keyColumnProjections' based on the prefix sort encoded size of each key + /// column type with smaller size first. + static void optimizeSortKeysOrder( + const RowTypePtr& rowType, + std::vector& keyColumnProjections); }; class PrefixSort { @@ -105,14 +117,14 @@ class PrefixSort { const velox::common::PrefixSortConfig& config, memory::MemoryPool* pool, std::vector>& rows) { - if (rowContainer->numRows() < config.threshold) { + if (rowContainer->numRows() < config.minNumRows) { stdSort(rows, rowContainer, compareFlags); return; } VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size()); const auto sortLayout = PrefixSortLayout::makeSortLayout( - rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize); + rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes); // All keys can not normalize, skip the binary string compare opt. // Putting this outside sort-internal helps with stdSort. if (!sortLayout.hasNormalizedKeys) { @@ -133,6 +145,10 @@ class PrefixSort { const velox::common::PrefixSortConfig& config, memory::MemoryPool* pool); + /// The runtime stats name collected for prefix sort. + /// The number of prefix sort keys. + static inline const std::string kNumPrefixSortKeys{"numPrefixSortKeys"}; + private: /// Fallback to stdSort when prefix sort conditions such as config and memory /// are not satisfied. stdSort provides >2X performance win than std::sort for diff --git a/velox/exec/VectorHasher.cpp b/velox/exec/VectorHasher.cpp index a98e3d9b3d466..dc4c921fb0351 100644 --- a/velox/exec/VectorHasher.cpp +++ b/velox/exec/VectorHasher.cpp @@ -896,14 +896,25 @@ std::vector> createVectorHashers( const RowTypePtr& rowType, const std::vector& keys) { const auto numKeys = keys.size(); - - std::vector> hashers; - hashers.reserve(numKeys); + std::vector keyChannels; + keyChannels.reserve(numKeys); for (const auto& key : keys) { const auto channel = exprToChannel(key.get(), rowType); - hashers.push_back(VectorHasher::create(key->type(), channel)); + keyChannels.push_back(channel); } + return createVectorHashers(rowType, keyChannels); +} +std::vector> createVectorHashers( + const RowTypePtr& rowType, + const std::vector& keyChannels) { + const auto numKeys = keyChannels.size(); + std::vector> hashers; + hashers.reserve(numKeys); + for (const auto& keyChannel : keyChannels) { + hashers.push_back( + VectorHasher::create(rowType->childAt(keyChannel), keyChannel)); + } return hashers; } diff --git a/velox/exec/VectorHasher.h b/velox/exec/VectorHasher.h index 425fe267bd8a7..5ef4899673e57 100644 --- a/velox/exec/VectorHasher.h +++ b/velox/exec/VectorHasher.h @@ -709,6 +709,10 @@ std::vector> createVectorHashers( const RowTypePtr& rowType, const std::vector& keys); +std::vector> createVectorHashers( + const RowTypePtr& rowType, + const std::vector& keyChannels); + } // namespace facebook::velox::exec #include "velox/exec/VectorHasher-inl.h" diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 37e304ceaa785..dc4724ace32b3 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -28,7 +28,9 @@ #include "velox/exec/Aggregate.h" #include "velox/exec/GroupingSet.h" #include "velox/exec/PlanNodeStats.h" +#include "velox/exec/PrefixSort.h" #include "velox/exec/Values.h" +#include "velox/exec/prefixsort/PrefixSortEncoder.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/OperatorTestBase.h" @@ -960,6 +962,42 @@ TEST_F(AggregationTest, partialDistinctWithAbandon) { .assertResults("SELECT distinct c0, sum(c0) FROM tmp group by c0"); } +TEST_F(AggregationTest, distinctWithGroupingKeysReordered) { + rowType_ = ROW( + {"c0", "c1", "c2", "c3"}, {BIGINT(), INTEGER(), VARCHAR(), VARCHAR()}); + + const int vectorSize = 2'000; + VectorFuzzer::Options options; + options.vectorSize = vectorSize; + options.stringVariableLength = false; + options.stringLength = 128; + VectorFuzzer fuzzer(options, pool()); + const int numVectors{5}; + std::vector vectors; + for (int i = 0; i < numVectors; ++i) { + vectors.push_back(fuzzer.fuzzRow(rowType_)); + } + + createDuckDbTable(vectors); + + // Distinct aggregation with grouping key with larger prefix encoded size + // first. + auto spillDirectory = exec::test::TempDirectoryPath::create(); + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .config(QueryConfig::kAbandonPartialAggregationMinRows, 100) + .config(QueryConfig::kAbandonPartialAggregationMinPct, 50) + .spillDirectory(spillDirectory->getPath()) + .config(QueryConfig::kSpillEnabled, true) + .config(QueryConfig::kAggregationSpillEnabled, true) + .config(QueryConfig::kSpillPrefixSortEnabled, true) + .config("max_drivers_per_task", 1) + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c2", "c0"}, {}) + .planNode()) + .assertResults("SELECT distinct c2, c0 FROM tmp"); +} + TEST_F(AggregationTest, largeValueRangeArray) { // We have keys that map to integer range. The keys are // a little under max array hash table size apart. This wastes 16MB of @@ -1957,6 +1995,201 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) { plan, "SELECT c0 % 7, array_agg(c1 ORDER BY c1) FROM tmp GROUP BY 1"); } +TEST_F(AggregationTest, spillPrefixSortOptimization) { + const RowTypePtr rowType{ + ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"}, + {BIGINT(), + SMALLINT(), + VARCHAR(), + TINYINT(), + INTEGER(), + BIGINT(), + REAL(), + DOUBLE(), + VARCHAR()})}; + auto vectors = makeVectors(rowType, 1024, 2); + int64_t groupingKeyValue{0}; + for (auto& vector : vectors) { + auto groupingVector = BaseVector::create( + vector->childAt(0)->type(), vector->childAt(0)->size(), pool_.get()); + auto* flatGroupingKeyVector = groupingVector->asFlatVector(); + for (auto i = 0; i < flatGroupingKeyVector->size(); ++i) { + flatGroupingKeyVector->set(i, groupingKeyValue++); + } + vector->childAt(0) = groupingVector; + } + + createDuckDbTable(vectors); + struct { + bool prefixSortSpillEnabled; + uint32_t maxNormalizedKeyBytes; + uint32_t minNumRows; + uint32_t expectedNumPrefixSortKeys; + + std::string debugString() const { + return fmt::format( + "prefixSortSpillEnabled {}, maxNormalizedKeyBytes {}, minNumRows {}, expectedNumPrefixSortKeys {}", + prefixSortSpillEnabled, + maxNormalizedKeyBytes, + minNumRows, + expectedNumPrefixSortKeys); + } + } testSettings[] = { + {true, 0, 0, 0}, + {false, 0, 0, 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() - + 1, + 0, + 0}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() - + 1, + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value(), + 0, + 1}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value(), + 0, + 0}, + {true, 1'000'000, 0, 3}, + {false, 1'000'000, 0, 0}, + {true, 1'000'000, 1'000'000, 0}, + {false, 1'000'000, 1'000'000, 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 2}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 1'000'000, + 0}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 1'000'000, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT).value(), + 0, + 2}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT).value(), + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value() - + 1, + 0, + 2}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value() - + 1, + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 3}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 0}}; + + for (const auto& testData : testSettings) { + auto spillDirectory = exec::test::TempDirectoryPath::create(); + + core::PlanNodeId aggrNodeId; + + auto testPlan = [&](const core::PlanNodePtr& plan, const std::string& sql) { + SCOPED_TRACE(sql); + TestScopedSpillInjection scopedSpillInjection(100); + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->getPath()) + .config(QueryConfig::kSpillEnabled, true) + .config(QueryConfig::kAggregationSpillEnabled, true) + .config( + QueryConfig::kSpillPrefixSortEnabled, + testData.prefixSortSpillEnabled) + .config( + QueryConfig::kPrefixSortMinRows, + std::to_string(testData.minNumRows)) + .config( + QueryConfig::kPrefixSortNormalizedKeyMaxBytes, + std::to_string(testData.maxNormalizedKeyBytes)) + .plan(plan) + .assertResults(sql); + + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& stats = taskStats.at(aggrNodeId); + checkSpillStats(stats, true); + if (testData.expectedNumPrefixSortKeys > 0) { + ASSERT_GE( + stats.customStats.at(PrefixSort::kNumPrefixSortKeys).sum, + testData.expectedNumPrefixSortKeys); + ASSERT_EQ( + stats.customStats.at(PrefixSort::kNumPrefixSortKeys).max, + testData.expectedNumPrefixSortKeys); + ASSERT_EQ( + stats.customStats.at(PrefixSort::kNumPrefixSortKeys).min, + testData.expectedNumPrefixSortKeys); + } else { + ASSERT_EQ(stats.customStats.count(PrefixSort::kNumPrefixSortKeys), 0); + } + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + }; + + auto plan = PlanBuilder() + .values(vectors) + .singleAggregation( + {"c8", "c2", "c1", "c0", "c4", "c3"}, + {"max(c8)", + "max(c2)", + "sum(c1)", + "sum(c0)", + "min(c4)", + "max(c3)"}, + {}) + .capturePlanNodeId(aggrNodeId) + .planNode(); + testPlan( + plan, + "SELECT c8, c2, c1, c0, c4, c3, max(c8), max(c2), sum(c1), sum(c0), min(c4), max(c3) FROM tmp GROUP BY 1, 2, 3, 4, 5, 6"); + } +} + TEST_F(AggregationTest, preGroupedAggregationWithSpilling) { std::vector vectors; int64_t val = 0; diff --git a/velox/exec/tests/PrefixSortTest.cpp b/velox/exec/tests/PrefixSortTest.cpp index 248d1473bd0da..9bad8aeb035cc 100644 --- a/velox/exec/tests/PrefixSortTest.cpp +++ b/velox/exec/tests/PrefixSortTest.cpp @@ -325,5 +325,51 @@ TEST_F(PrefixSortTest, checkMaxNormalizedKeySizeForMultipleKeys) { ASSERT_EQ(sortLayoutTwoKeys.prefixOffsets[0], 0); ASSERT_EQ(sortLayoutTwoKeys.prefixOffsets[1], 9); } + +TEST_F(PrefixSortTest, optimizeSortKeysOrder) { + struct { + RowTypePtr inputType; + std::vector keyChannels; + std::vector expectedSortedKeyChannels; + + std::string debugString() const { + return fmt::format( + "inputType {}, keyChannels {}, expectedSortedKeyChannels {}", + inputType->toString(), + fmt::join(keyChannels, ":"), + fmt::join(expectedSortedKeyChannels, ":")); + } + } testSettings[] = { + {ROW({BIGINT(), BIGINT()}), {0, 1}, {0, 1}}, + {ROW({BIGINT(), BIGINT()}), {1, 0}, {1, 0}}, + {ROW({BIGINT(), BIGINT(), BIGINT()}), {1, 0}, {1, 0}}, + {ROW({BIGINT(), BIGINT(), BIGINT()}), {1, 0}, {1, 0}}, + {ROW({BIGINT(), SMALLINT(), BIGINT()}), {0, 1}, {1, 0}}, + {ROW({BIGINT(), SMALLINT(), VARCHAR()}), {0, 1, 2}, {1, 0, 2}}, + {ROW({TINYINT(), BIGINT(), VARCHAR(), TINYINT(), INTEGER(), VARCHAR()}), + {2, 1, 0, 4, 5, 3}, + {4, 1, 2, 0, 5, 3}}, + {ROW({INTEGER(), BIGINT(), VARCHAR(), TINYINT(), INTEGER(), VARCHAR()}), + {5, 4, 3, 2, 1, 0}, + {4, 0, 1, 5, 3, 2}}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + std::vector projections; + for (auto i = 0; i < testData.keyChannels.size(); ++i) { + projections.emplace_back(testData.keyChannels[i], i); + } + PrefixSortLayout::optimizeSortKeysOrder(testData.inputType, projections); + std::unordered_set outputChannelSet; + for (auto i = 0; i < projections.size(); ++i) { + ASSERT_EQ( + projections[i].inputChannel, testData.expectedSortedKeyChannels[i]); + ASSERT_EQ( + testData.keyChannels[projections[i].outputChannel], + projections[i].inputChannel); + } + } +} } // namespace } // namespace facebook::velox::exec::prefixsort::test diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 8ebb04f9b0fb3..f021ce5c5c247 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -31,6 +31,23 @@ using namespace facebook::velox; using namespace facebook::velox::memory; namespace facebook::velox::functions::test { +namespace { +// Class to write runtime stats in the tests to the stats container. +class TestRuntimeStatWriter : public BaseRuntimeStatWriter { + public: + explicit TestRuntimeStatWriter( + std::unordered_map& stats) + : stats_{stats} {} + + void addRuntimeStat(const std::string& name, const RuntimeCounter& value) + override { + addOperatorRuntimeStats(name, value, stats_); + } + + private: + std::unordered_map& stats_; +}; +} // namespace class SortBufferTest : public OperatorTestBase, public testing::WithParamInterface { @@ -39,6 +56,8 @@ class SortBufferTest : public OperatorTestBase, OperatorTestBase::SetUp(); filesystems::registerLocalFileSystem(); rng_.seed(123); + statWriter_ = std::make_unique(stats_); + setThreadLocalRunTimeStatWriter(statWriter_.get()); } void TearDown() override { @@ -69,7 +88,9 @@ class SortBufferTest : public OperatorTestBase, const bool enableSpillPrefixSort_{GetParam()}; const velox::common::PrefixSortConfig prefixSortConfig_ = - velox::common::PrefixSortConfig{std::numeric_limits::max(), 130}; + velox::common::PrefixSortConfig{ + std::numeric_limits::max(), + GetParam() ? 8 : std::numeric_limits::max()}; const std::optional spillPrefixSortConfig_ = enableSpillPrefixSort_ ? std::optional(prefixSortConfig_) @@ -101,9 +122,32 @@ class SortBufferTest : public OperatorTestBase, tsan_atomic nonReclaimableSection_{false}; folly::Random::DefaultGenerator rng_; + std::unordered_map stats_; + std::unique_ptr statWriter_; }; TEST_P(SortBufferTest, singleKey) { + const RowVectorPtr data = makeRowVector( + {makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {17, 16, 15, 14, 13, 10, 8, 7, 4, 3}), // sorted column + makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1}), + makeFlatVector( + {1.1, 2.2, 2.2, 5.5, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1}), + makeFlatVector( + {"hello", + "world", + "today", + "is", + "great", + "hello", + "world", + "is", + "great", + "today"})}); + struct { std::vector sortCompareFlags; std::vector expectedResult; @@ -124,12 +168,12 @@ TEST_P(SortBufferTest, singleKey) { true, false, CompareFlags::NullHandlingMode::kNullAsValue}}, // Ascending - {1, 2, 3, 4, 5}}, + {3, 4, 7, 8, 10, 13, 14, 15, 16, 17}}, {{{true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}}, // Descending - {5, 4, 3, 2, 1}}}; + {17, 16, 15, 14, 13, 10, 8, 7, 4, 3}}}; // Specifies the sort columns ["c1"]. sortColumnIndices_ = {1}; @@ -143,25 +187,30 @@ TEST_P(SortBufferTest, singleKey) { &nonReclaimableSection_, prefixSortConfig_); - RowVectorPtr data = makeRowVector( - {makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({5, 4, 3, 2, 1}), // sorted column - makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({1.1, 2.2, 3.3, 4.4, 5.5}), - makeFlatVector({1.1, 2.2, 2.2, 5.5, 5.5}), - makeFlatVector( - {"hello", "world", "today", "is", "great"})}); - sortBuffer->addInput(data); sortBuffer->noMoreInput(); auto output = sortBuffer->getOutput(10000); - ASSERT_EQ(output->size(), 5); + ASSERT_EQ(output->size(), 10); int resultIndex = 0; for (int expectedValue : testData.expectedResult) { ASSERT_EQ( output->childAt(1)->asFlatVector()->valueAt(resultIndex++), expectedValue); } + if (GetParam()) { + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).sum, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).max, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).min, + sortColumnIndices_.size()); + } else { + ASSERT_EQ(stats_.count(PrefixSort::kNumPrefixSortKeys), 0); + } + stats_.clear(); } } @@ -175,23 +224,54 @@ TEST_P(SortBufferTest, multipleKeys) { prefixSortConfig_); RowVectorPtr data = makeRowVector( - {makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({5, 4, 3, 2, 1}), // sorted-2 column - makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({1.1, 2.2, 3.3, 4.4, 5.5}), - makeFlatVector({1.1, 2.2, 2.2, 5.5, 5.5}), // sorted-1 column + {makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {15, 12, 9, 8, 7, 6, 5, 4, 3, 1}), // sorted-2 column + makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1}), + makeFlatVector( + {1.1, 2.2, 2.2, 5.5, 5.5, 7.7, 8.1, 8, 8.1, 8.1, 10.0}), // sorted-1 + // column makeFlatVector( - {"hello", "world", "today", "is", "great"})}); + {"hello", + "world", + "today", + "is", + "great", + "hello", + "world", + "is", + "sort", + "sorted"})}); sortBuffer->addInput(data); sortBuffer->noMoreInput(); auto output = sortBuffer->getOutput(10000); - ASSERT_EQ(output->size(), 5); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(0), 5); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(1), 3); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(2), 4); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(3), 1); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(4), 2); + ASSERT_EQ(output->size(), 10); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(0), 15); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(1), 9); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(2), 12); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(3), 7); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(4), 8); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(5), 6); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(6), 4); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(7), 1); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(8), 3); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(9), 5); + if (GetParam()) { + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).sum, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).max, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).min, + sortColumnIndices_.size()); + } else { + ASSERT_EQ(stats_.count(PrefixSort::kNumPrefixSortKeys), 0); + } } // TODO: enable it later with test utility to compare the sorted result. @@ -267,6 +347,7 @@ TEST_P(SortBufferTest, DISABLED_randomData) { inputVectors.push_back(input); } sortBuffer->noMoreInput(); + stats_.clear(); // todo: have a utility function buildExpectedSortResult and verify the // sorting result for random data. } @@ -469,6 +550,20 @@ TEST_P(SortBufferTest, spill) { memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage); } } + if (GetParam()) { + ASSERT_GE( + stats_.at(PrefixSort::kNumPrefixSortKeys).sum, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).max, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).min, + sortColumnIndices_.size()); + } else { + ASSERT_EQ(stats_.count(PrefixSort::kNumPrefixSortKeys), 0); + } + stats_.clear(); } }