From e9d14a492ea361c2306499ebe779c866de0fdd4e Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Mon, 2 Dec 2024 22:37:23 -0800 Subject: [PATCH] feat: Sort grouping key columns to maximize the prefix sort optimization (#11720) Summary: Reorder the grouping keys of hash aggregation to maximize the prefix sort acceleration with limited max normalized key size. It sorts the grouping keys based on the prefix sort encoded key size and putting the grouping key with minimal encoded size first. This enables sort more columns using prefix sort for use case like hash aggregation which doesn't require a total order. This PR also cleanup the prefix sort a bit. Differential Revision: D66638085 --- velox/common/base/PrefixSortConfig.h | 9 +- velox/common/base/SpillConfig.h | 5 + velox/core/PlanNode.cpp | 4 +- velox/core/QueryConfig.h | 8 +- velox/docs/configs.rst | 2 +- velox/docs/monitoring/stats.rst | 15 ++ velox/exec/GroupingSet.cpp | 33 +++-- velox/exec/GroupingSet.h | 7 +- velox/exec/HashAggregation.cpp | 57 +++++++- velox/exec/HashAggregation.h | 10 ++ velox/exec/Operator.h | 8 +- velox/exec/PrefixSort.cpp | 47 ++++++- velox/exec/PrefixSort.h | 20 ++- velox/exec/VectorHasher.cpp | 19 ++- velox/exec/VectorHasher.h | 4 + velox/exec/tests/AggregationTest.cpp | 197 +++++++++++++++++++++++++++ velox/exec/tests/PrefixSortTest.cpp | 46 +++++++ velox/exec/tests/SortBufferTest.cpp | 145 ++++++++++++++++---- 18 files changed, 571 insertions(+), 65 deletions(-) diff --git a/velox/common/base/PrefixSortConfig.h b/velox/common/base/PrefixSortConfig.h index 27048174d8572..fd167b1386e89 100644 --- a/velox/common/base/PrefixSortConfig.h +++ b/velox/common/base/PrefixSortConfig.h @@ -24,14 +24,15 @@ namespace facebook::velox::common { struct PrefixSortConfig { PrefixSortConfig() = default; - PrefixSortConfig(int64_t _maxNormalizedKeySize, int32_t _threshold) - : maxNormalizedKeySize(_maxNormalizedKeySize), threshold(_threshold) {} + PrefixSortConfig(uint32_t _maxNormalizedKeyBytes, uint32_t _minNumRows) + : maxNormalizedKeyBytes(_maxNormalizedKeyBytes), + minNumRows(_minNumRows) {} /// Max number of bytes can store normalized keys in prefix-sort buffer per /// entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes. - int64_t maxNormalizedKeySize{128}; + uint32_t maxNormalizedKeyBytes{128}; /// PrefixSort will have performance regression when the dateset is too small. - int32_t threshold{130}; + uint32_t minNumRows{128}; }; } // namespace facebook::velox::common diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index 09790dff2d7d8..aeca392f38781 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -75,6 +75,11 @@ struct SpillConfig { /// Checks if the given 'startBitOffset' has exceeded the max spill limit. bool exceedSpillLevelLimit(uint8_t startBitOffset) const; + /// Returns true if prefix sort is enabled. + bool prefixSortEnabled() const { + return prefixSortConfig.has_value(); + } + /// A callback function that returns the spill directory path. Implementations /// can use it to ensure the path exists before returning. GetSpillDirectoryPathCB getSpillDirPathCb; diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index cdb724f1ef0f0..68382ffeb45a5 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -237,8 +237,8 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { } // TODO: add spilling for pre-grouped aggregation later: // https://github.com/facebookincubator/velox/issues/3264 - return (isFinal() || isSingle()) && preGroupedKeys().empty() && - queryConfig.aggregationSpillEnabled(); + return (isFinal() || isSingle()) && !groupingKeys().empty() && + preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled(); } void AggregationNode::addDetails(std::stringstream& stream) const { diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 105062de81308..76d348c4b39ee 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -811,12 +811,12 @@ class QueryConfig { return get(kDriverCpuTimeSliceLimitMs, 0); } - int64_t prefixSortNormalizedKeyMaxBytes() const { - return get(kPrefixSortNormalizedKeyMaxBytes, 128); + uint32_t prefixSortNormalizedKeyMaxBytes() const { + return get(kPrefixSortNormalizedKeyMaxBytes, 128); } - int32_t prefixSortMinRows() const { - return get(kPrefixSortMinRows, 130); + uint32_t prefixSortMinRows() const { + return get(kPrefixSortMinRows, 128); } template diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 3ce8e224f870e..e59c3915200cb 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -142,7 +142,7 @@ Generic Configuration - Maximum number of bytes to use for the normalized key in prefix-sort. Use 0 to disable prefix-sort. * - prefixsort_min_rows - integer - - 130 + - 128 - Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking. .. _expression-evaluation-conf: diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index 4958587e7bcfd..6881e7543af34 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -179,3 +179,18 @@ These stats are reported by shuffle operators. - Indicates the vector serde kind used by an operator for shuffle with 1 for Presto, 2 for CompactRow, 3 for UnsafeRow. It is reported by Exchange, MergeExchange and PartitionedOutput operators for now. + +PrefixSort +---------- +These stats are reported by shuffle operators. + +.. list-table:: + :widths: 50 25 50 + :header-rows: 1 + + * - Stats + - Unit + - Description + * - numPrefixSortKeys + - + - The number of columns sorted using prefix sort. diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 4613947f1f040..25092312cdc4c 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -44,6 +44,7 @@ GroupingSet::GroupingSet( const RowTypePtr& inputType, std::vector>&& hashers, std::vector&& preGroupedKeys, + std::vector&& groupingKeyOutputProjections, std::vector&& aggregates, bool ignoreNullKeys, bool isPartial, @@ -55,6 +56,7 @@ GroupingSet::GroupingSet( OperatorCtx* operatorCtx, folly::Synchronized* spillStats) : preGroupedKeyChannels_(std::move(preGroupedKeys)), + groupingKeyOutputProjections_(std::move(groupingKeyOutputProjections)), hashers_(std::move(hashers)), isGlobal_(hashers_.empty()), isPartial_(isPartial), @@ -74,9 +76,11 @@ GroupingSet::GroupingSet( spillStats_(spillStats) { VELOX_CHECK_NOT_NULL(nonReclaimableSection_); VELOX_CHECK(pool_.trackUsage()); + for (auto& hasher : hashers_) { keyChannels_.push_back(hasher->channel()); } + std::unordered_map channelUseCount; for (const auto& aggregate : aggregates_) { for (auto channel : aggregate.inputs) { @@ -124,17 +128,18 @@ std::unique_ptr GroupingSet::createForMarkDistinct( return std::make_unique( inputType, std::move(hashers), - /*preGroupedKeys*/ std::vector{}, - /*aggregates*/ std::vector{}, - /*ignoreNullKeys*/ false, - /*isPartial*/ false, - /*isRawInput*/ false, - /*globalGroupingSets*/ std::vector{}, - /*groupIdColumn*/ std::nullopt, - /*spillConfig*/ nullptr, + /*preGroupedKeys=*/std::vector{}, + /*groupingKeyOutputProjections=*/std::vector{}, + /*aggregates=*/std::vector{}, + /*ignoreNullKeys=*/false, + /*isPartial=*/false, + /*isRawInput=*/false, + /*globalGroupingSets=*/std::vector{}, + /*groupIdColumn=*/std::nullopt, + /*spillConfig=*/nullptr, nonReclaimableSection, operatorCtx, - /*spillStats_*/ nullptr); + /*spillStats=*/nullptr); }; namespace { @@ -302,7 +307,6 @@ void GroupingSet::addRemainingInput() { } namespace { - void initializeAggregates( const std::vector& aggregates, RowContainer& rows, @@ -758,10 +762,13 @@ void GroupingSet::extractGroups( return; } RowContainer& rows = *table_->rows(); - auto totalKeys = rows.keyTypes().size(); + const auto totalKeys = rows.keyTypes().size(); for (int32_t i = 0; i < totalKeys; ++i) { - auto keyVector = result->childAt(i); - rows.extractColumn(groups.data(), groups.size(), i, keyVector); + const auto keyChannel = groupingKeyOutputProjections_.empty() + ? i + : groupingKeyOutputProjections_[i]; + auto& keyVector = result->childAt(i); + rows.extractColumn(groups.data(), groups.size(), keyChannel, keyVector); } for (int32_t i = 0; i < aggregates_.size(); ++i) { if (!aggregates_[i].sortingKeys.empty()) { diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index a80d223c8520b..120d787a677da 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -32,6 +32,7 @@ class GroupingSet { const RowTypePtr& inputType, std::vector>&& hashers, std::vector&& preGroupedKeys, + std::vector&& groupingKeyOutputProjections, std::vector&& aggregates, bool ignoreNullKeys, bool isPartial, @@ -272,8 +273,12 @@ class GroupingSet { std::vector keyChannels_; - /// A subset of grouping keys on which the input is clustered. + // A subset of grouping keys on which the input is clustered. const std::vector preGroupedKeyChannels_; + // If not empty, it provides the projections for extracting the grouping keys + // from 'table_' for output. The vector index is the output channel and the + // value is the corresponding column index stored in 'table_'. + const std::vector groupingKeyOutputProjections_; std::vector> hashers_; const bool isGlobal_; diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index ef072d059d2c4..5ce8ae07db22b 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -14,7 +14,9 @@ * limitations under the License. */ #include "velox/exec/HashAggregation.h" + #include +#include "velox/exec/PrefixSort.h" #include "velox/exec/Task.h" #include "velox/expression/Expr.h" @@ -54,9 +56,13 @@ void HashAggregation::initialize() { VELOX_CHECK(pool()->trackUsage()); const auto& inputType = aggregationNode_->sources()[0]->outputType(); - auto hashers = - createVectorHashers(inputType, aggregationNode_->groupingKeys()); - auto numHashers = hashers.size(); + std::vector groupingKeyInputChannels; + std::vector groupingKeyOutputChannels; + setupGroupingKeyChannelProjections( + groupingKeyInputChannels, groupingKeyOutputChannels); + + auto hashers = createVectorHashers(inputType, groupingKeyInputChannels); + const auto numHashers = hashers.size(); std::vector preGroupedChannels; preGroupedChannels.reserve(aggregationNode_->preGroupedKeys().size()); @@ -96,6 +102,7 @@ void HashAggregation::initialize() { inputType, std::move(hashers), std::move(preGroupedChannels), + std::move(groupingKeyOutputChannels), std::move(aggregateInfos), aggregationNode_->ignoreNullKeys(), isPartialOutput_, @@ -110,6 +117,50 @@ void HashAggregation::initialize() { aggregationNode_.reset(); } +void HashAggregation::setupGroupingKeyChannelProjections( + std::vector& groupingKeyInputChannels, + std::vector& groupingKeyOutputChannels) const { + const auto& inputType = aggregationNode_->sources()[0]->outputType(); + const auto& groupingKeys = aggregationNode_->groupingKeys(); + // The map from the grouping key output channel to the input channel. + // + // NOTE: grouping key output order is specified as 'groupingKeys' in + // 'aggregationNode_'. + std::vector groupingKeyProjections; + groupingKeyProjections.reserve(groupingKeys.size()); + for (auto i = 0; i < groupingKeys.size(); ++i) { + groupingKeyProjections.emplace_back( + exprToChannel(groupingKeys[i].get(), inputType), i); + } + + const bool reorderGroupingKeys = + canSpill() && spillConfig()->prefixSortEnabled(); + // If prefix sort is enabled, we need to sort the grouping key's layout in the + // grouping set to maximize the prefix sort acceleration if spill is + // triggered. The reorder stores the grouping key with smaller prefix sort + // encoded size first. + if (reorderGroupingKeys) { + PrefixSortLayout::optimizeSortKeysOrder(inputType, groupingKeyProjections); + } + + groupingKeyInputChannels.reserve(groupingKeys.size()); + for (auto i = 0; i < groupingKeys.size(); ++i) { + groupingKeyInputChannels.push_back(groupingKeyProjections[i].inputChannel); + } + + if (!reorderGroupingKeys) { + // If there is no reorder, then grouping key output channels are the same as + // the grouping set. + groupingKeyOutputChannels.clear(); + return; + } + + groupingKeyOutputChannels.resize(groupingKeys.size()); + for (auto i = 0; i < groupingKeys.size(); ++i) { + groupingKeyOutputChannels[groupingKeyProjections[i].outputChannel] = i; + } +} + bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { VELOX_CHECK(isPartialOutput_ && !isGlobal_); return numInputRows_ > abandonPartialAggregationMinRows_ && diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index 1bf28c43428ad..48e0982f22f92 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -72,6 +72,16 @@ class HashAggregation : public Operator { RowVectorPtr getDistinctOutput(); + // Setup the projections for accessing grouping keys stored in the grouping + // set. 'groupingKeyInputChannels' provides the map from the key column index + // in the grouping set (as vector index) to key column channel in the input + // (as vector value). 'outputChannelProjections' provides the map from the key + // column channel in the output (as vector index) to the key column index in + // the grouping set (as vector value). + void setupGroupingKeyChannelProjections( + std::vector& groupingKeyInputChannels, + std::vector& groupingKeyOutputChannels) const; + void updateEstimatedOutputRowSize(); std::shared_ptr aggregationNode_; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 25d355802d25a..0e99390ad73b3 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -27,16 +27,16 @@ namespace facebook::velox::exec { -// Represents a column that is copied from input to output, possibly -// with cardinality change, i.e. values removed or duplicated. +/// Represents a column that is copied from input to output, possibly +/// with cardinality change, i.e. values removed or duplicated. struct IdentityProjection { IdentityProjection( column_index_t _inputChannel, column_index_t _outputChannel) : inputChannel(_inputChannel), outputChannel(_outputChannel) {} - const column_index_t inputChannel; - const column_index_t outputChannel; + column_index_t inputChannel; + column_index_t outputChannel; }; struct MemoryStats { diff --git a/velox/exec/PrefixSort.cpp b/velox/exec/PrefixSort.cpp index 8488194c72e8e..b4dfb1f6e0d12 100644 --- a/velox/exec/PrefixSort.cpp +++ b/velox/exec/PrefixSort.cpp @@ -126,6 +126,7 @@ compareByWord(uint64_t* left, uint64_t* right, int32_t bytes) { } // namespace +// static. PrefixSortLayout PrefixSortLayout::makeSortLayout( const std::vector& types, const std::vector& compareFlags, @@ -169,6 +170,42 @@ PrefixSortLayout PrefixSortLayout::makeSortLayout( numPaddingBytes}; } +// static. +void PrefixSortLayout::optimizeSortKeysOrder( + const RowTypePtr& rowType, + std::vector& keyColumnProjections) { + std::vector> encodedKeySizes( + rowType->size(), std::nullopt); + for (const auto& projection : keyColumnProjections) { + encodedKeySizes[projection.inputChannel] = PrefixSortEncoder::encodedSize( + rowType->childAt(projection.inputChannel)->kind()); + } + + std::sort( + keyColumnProjections.begin(), + keyColumnProjections.end(), + [&](const IdentityProjection& lhs, const IdentityProjection& rhs) { + const auto& lhsEncodedSize = encodedKeySizes[lhs.inputChannel]; + const auto& rhsEncodedSize = encodedKeySizes[rhs.inputChannel]; + if (lhsEncodedSize.has_value() && !rhsEncodedSize.has_value()) { + return true; + } + if (!lhsEncodedSize.has_value() && rhsEncodedSize.has_value()) { + return false; + } + if (lhsEncodedSize.has_value() && rhsEncodedSize.has_value()) { + if (lhsEncodedSize.value() < rhsEncodedSize.value()) { + return true; + } + if (lhsEncodedSize.value() > rhsEncodedSize.value()) { + return false; + } + } + // Tie breaks with the original key column order. + return lhs.outputChannel < rhs.outputChannel; + }); +} + FOLLY_ALWAYS_INLINE int PrefixSort::compareAllNormalizedKeys( char* left, char* right) { @@ -236,12 +273,12 @@ uint32_t PrefixSort::maxRequiredBytes( const std::vector& compareFlags, const velox::common::PrefixSortConfig& config, memory::MemoryPool* pool) { - if (rowContainer->numRows() < config.threshold) { + if (rowContainer->numRows() < config.minNumRows) { return 0; } VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size()); const auto sortLayout = PrefixSortLayout::makeSortLayout( - rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize); + rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes); if (!sortLayout.hasNormalizedKeys) { return 0; } @@ -303,6 +340,12 @@ void PrefixSort::sortInternal( PrefixSortRunner sortRunner(entrySize, swapBuffer->asMutable()); auto* prefixBufferStart = prefixBuffer; auto* prefixBufferEnd = prefixBuffer + numRows * entrySize; + if (sortLayout_.numNormalizedKeys > 0) { + addThreadLocalRuntimeStat( + PrefixSort::kNumPrefixSortKeys, + RuntimeCounter( + sortLayout_.numNormalizedKeys, RuntimeCounter::Unit::kNone)); + } if (sortLayout_.hasNonNormalizedKey) { sortRunner.quickSort( prefixBufferStart, prefixBufferEnd, [&](char* lhs, char* rhs) { diff --git a/velox/exec/PrefixSort.h b/velox/exec/PrefixSort.h index 2a8235c6e7018..7ac34b4305519 100644 --- a/velox/exec/PrefixSort.h +++ b/velox/exec/PrefixSort.h @@ -16,6 +16,7 @@ #pragma once #include "velox/common/base/PrefixSortConfig.h" +#include "velox/exec/Operator.h" #include "velox/exec/RowContainer.h" #include "velox/exec/prefixsort/PrefixSortAlgorithm.h" #include "velox/exec/prefixsort/PrefixSortEncoder.h" @@ -67,6 +68,17 @@ struct PrefixSortLayout { const std::vector& types, const std::vector& compareFlags, uint32_t maxNormalizedKeySize); + + /// Optimizes the order of sort key columns to maximize the number of prefix + /// sort keys for acceleration. This only applies for use case which doesn't + /// need a total order such as spill sort for hash aggregation. + /// 'keyColumnProjections' provides the mapping from the orginal key column + /// order to its channel in 'rowType'. The function reoders + /// 'keyColumnProjections' based on the prefix sort encoded size of each key + /// column type with smaller size first. + static void optimizeSortKeysOrder( + const RowTypePtr& rowType, + std::vector& keyColumnProjections); }; class PrefixSort { @@ -105,14 +117,14 @@ class PrefixSort { const velox::common::PrefixSortConfig& config, memory::MemoryPool* pool, std::vector>& rows) { - if (rowContainer->numRows() < config.threshold) { + if (rowContainer->numRows() < config.minNumRows) { stdSort(rows, rowContainer, compareFlags); return; } VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size()); const auto sortLayout = PrefixSortLayout::makeSortLayout( - rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize); + rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes); // All keys can not normalize, skip the binary string compare opt. // Putting this outside sort-internal helps with stdSort. if (!sortLayout.hasNormalizedKeys) { @@ -133,6 +145,10 @@ class PrefixSort { const velox::common::PrefixSortConfig& config, memory::MemoryPool* pool); + /// The runtime stats name collected for prefix sort. + /// The number of prefix sort keys. + static inline const std::string kNumPrefixSortKeys{"numPrefixSortKeys"}; + private: /// Fallback to stdSort when prefix sort conditions such as config and memory /// are not satisfied. stdSort provides >2X performance win than std::sort for diff --git a/velox/exec/VectorHasher.cpp b/velox/exec/VectorHasher.cpp index a98e3d9b3d466..dc4c921fb0351 100644 --- a/velox/exec/VectorHasher.cpp +++ b/velox/exec/VectorHasher.cpp @@ -896,14 +896,25 @@ std::vector> createVectorHashers( const RowTypePtr& rowType, const std::vector& keys) { const auto numKeys = keys.size(); - - std::vector> hashers; - hashers.reserve(numKeys); + std::vector keyChannels; + keyChannels.reserve(numKeys); for (const auto& key : keys) { const auto channel = exprToChannel(key.get(), rowType); - hashers.push_back(VectorHasher::create(key->type(), channel)); + keyChannels.push_back(channel); } + return createVectorHashers(rowType, keyChannels); +} +std::vector> createVectorHashers( + const RowTypePtr& rowType, + const std::vector& keyChannels) { + const auto numKeys = keyChannels.size(); + std::vector> hashers; + hashers.reserve(numKeys); + for (const auto& keyChannel : keyChannels) { + hashers.push_back( + VectorHasher::create(rowType->childAt(keyChannel), keyChannel)); + } return hashers; } diff --git a/velox/exec/VectorHasher.h b/velox/exec/VectorHasher.h index 425fe267bd8a7..5ef4899673e57 100644 --- a/velox/exec/VectorHasher.h +++ b/velox/exec/VectorHasher.h @@ -709,6 +709,10 @@ std::vector> createVectorHashers( const RowTypePtr& rowType, const std::vector& keys); +std::vector> createVectorHashers( + const RowTypePtr& rowType, + const std::vector& keyChannels); + } // namespace facebook::velox::exec #include "velox/exec/VectorHasher-inl.h" diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 1e9dd7147ef0f..f0e27cd03018d 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -28,7 +28,9 @@ #include "velox/exec/Aggregate.h" #include "velox/exec/GroupingSet.h" #include "velox/exec/PlanNodeStats.h" +#include "velox/exec/PrefixSort.h" #include "velox/exec/Values.h" +#include "velox/exec/prefixsort/PrefixSortEncoder.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -1957,6 +1959,201 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) { plan, "SELECT c0 % 7, array_agg(c1 ORDER BY c1) FROM tmp GROUP BY 1"); } +TEST_F(AggregationTest, spillPrefixSortOptimization) { + const RowTypePtr rowType{ + ROW({"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"}, + {BIGINT(), + SMALLINT(), + VARCHAR(), + TINYINT(), + INTEGER(), + BIGINT(), + REAL(), + DOUBLE(), + VARCHAR()})}; + auto vectors = makeVectors(rowType, 1024, 2); + int64_t groupingKeyValue{0}; + for (auto& vector : vectors) { + auto groupingVector = BaseVector::create( + vector->childAt(0)->type(), vector->childAt(0)->size(), pool_.get()); + auto* flatGroupingKeyVector = groupingVector->asFlatVector(); + for (auto i = 0; i < flatGroupingKeyVector->size(); ++i) { + flatGroupingKeyVector->set(i, groupingKeyValue++); + } + vector->childAt(0) = groupingVector; + } + + createDuckDbTable(vectors); + struct { + bool prefixSortSpillEnabled; + uint32_t maxNormalizedKeyBytes; + uint32_t minNumRows; + uint32_t expectedNumPrefixSortKeys; + + std::string debugString() const { + return fmt::format( + "prefixSortSpillEnabled {}, maxNormalizedKeyBytes {}, minNumRows {}, expectedNumPrefixSortKeys {}", + prefixSortSpillEnabled, + maxNormalizedKeyBytes, + minNumRows, + expectedNumPrefixSortKeys); + } + } testSettings[] = { + {true, 0, 0, 0}, + {false, 0, 0, 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() - + 1, + 0, + 0}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() - + 1, + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value(), + 0, + 1}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value(), + 0, + 0}, + {true, 1'000'000, 0, 3}, + {false, 1'000'000, 0, 0}, + {true, 1'000'000, 1'000'000, 0}, + {false, 1'000'000, 1'000'000, 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 2}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 1'000'000, + 0}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 1'000'000, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT).value(), + 0, + 2}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT).value(), + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value() - + 1, + 0, + 2}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value() - + 1, + 0, + 0}, + {true, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 3}, + {false, + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::SMALLINT).value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::BIGINT) + .value() + + prefixsort::PrefixSortEncoder::encodedSize(TypeKind::INTEGER) + .value(), + 0, + 0}}; + + for (const auto& testData : testSettings) { + auto spillDirectory = exec::test::TempDirectoryPath::create(); + + core::PlanNodeId aggrNodeId; + + auto testPlan = [&](const core::PlanNodePtr& plan, const std::string& sql) { + SCOPED_TRACE(sql); + TestScopedSpillInjection scopedSpillInjection(100); + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->getPath()) + .config(QueryConfig::kSpillEnabled, true) + .config(QueryConfig::kAggregationSpillEnabled, true) + .config( + QueryConfig::kSpillPrefixSortEnabled, + testData.prefixSortSpillEnabled) + .config( + QueryConfig::kPrefixSortMinRows, + std::to_string(testData.minNumRows)) + .config( + QueryConfig::kPrefixSortNormalizedKeyMaxBytes, + std::to_string(testData.maxNormalizedKeyBytes)) + .plan(plan) + .assertResults(sql); + + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& stats = taskStats.at(aggrNodeId); + checkSpillStats(stats, true); + if (testData.expectedNumPrefixSortKeys > 0) { + ASSERT_GE( + stats.customStats.at(PrefixSort::kNumPrefixSortKeys).sum, + testData.expectedNumPrefixSortKeys); + ASSERT_EQ( + stats.customStats.at(PrefixSort::kNumPrefixSortKeys).max, + testData.expectedNumPrefixSortKeys); + ASSERT_EQ( + stats.customStats.at(PrefixSort::kNumPrefixSortKeys).min, + testData.expectedNumPrefixSortKeys); + } else { + ASSERT_EQ(stats.customStats.count(PrefixSort::kNumPrefixSortKeys), 0); + } + OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); + }; + + auto plan = PlanBuilder() + .values(vectors) + .singleAggregation( + {"c8", "c2", "c1", "c0", "c4", "c3"}, + {"max(c8)", + "max(c2)", + "sum(c1)", + "sum(c0)", + "min(c4)", + "max(c3)"}, + {}) + .capturePlanNodeId(aggrNodeId) + .planNode(); + testPlan( + plan, + "SELECT c8, c2, c1, c0, c4, c3, max(c8), max(c2), sum(c1), sum(c0), min(c4), max(c3) FROM tmp GROUP BY 1, 2, 3, 4, 5, 6"); + } +} + TEST_F(AggregationTest, preGroupedAggregationWithSpilling) { std::vector vectors; int64_t val = 0; diff --git a/velox/exec/tests/PrefixSortTest.cpp b/velox/exec/tests/PrefixSortTest.cpp index 248d1473bd0da..9bad8aeb035cc 100644 --- a/velox/exec/tests/PrefixSortTest.cpp +++ b/velox/exec/tests/PrefixSortTest.cpp @@ -325,5 +325,51 @@ TEST_F(PrefixSortTest, checkMaxNormalizedKeySizeForMultipleKeys) { ASSERT_EQ(sortLayoutTwoKeys.prefixOffsets[0], 0); ASSERT_EQ(sortLayoutTwoKeys.prefixOffsets[1], 9); } + +TEST_F(PrefixSortTest, optimizeSortKeysOrder) { + struct { + RowTypePtr inputType; + std::vector keyChannels; + std::vector expectedSortedKeyChannels; + + std::string debugString() const { + return fmt::format( + "inputType {}, keyChannels {}, expectedSortedKeyChannels {}", + inputType->toString(), + fmt::join(keyChannels, ":"), + fmt::join(expectedSortedKeyChannels, ":")); + } + } testSettings[] = { + {ROW({BIGINT(), BIGINT()}), {0, 1}, {0, 1}}, + {ROW({BIGINT(), BIGINT()}), {1, 0}, {1, 0}}, + {ROW({BIGINT(), BIGINT(), BIGINT()}), {1, 0}, {1, 0}}, + {ROW({BIGINT(), BIGINT(), BIGINT()}), {1, 0}, {1, 0}}, + {ROW({BIGINT(), SMALLINT(), BIGINT()}), {0, 1}, {1, 0}}, + {ROW({BIGINT(), SMALLINT(), VARCHAR()}), {0, 1, 2}, {1, 0, 2}}, + {ROW({TINYINT(), BIGINT(), VARCHAR(), TINYINT(), INTEGER(), VARCHAR()}), + {2, 1, 0, 4, 5, 3}, + {4, 1, 2, 0, 5, 3}}, + {ROW({INTEGER(), BIGINT(), VARCHAR(), TINYINT(), INTEGER(), VARCHAR()}), + {5, 4, 3, 2, 1, 0}, + {4, 0, 1, 5, 3, 2}}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + std::vector projections; + for (auto i = 0; i < testData.keyChannels.size(); ++i) { + projections.emplace_back(testData.keyChannels[i], i); + } + PrefixSortLayout::optimizeSortKeysOrder(testData.inputType, projections); + std::unordered_set outputChannelSet; + for (auto i = 0; i < projections.size(); ++i) { + ASSERT_EQ( + projections[i].inputChannel, testData.expectedSortedKeyChannels[i]); + ASSERT_EQ( + testData.keyChannels[projections[i].outputChannel], + projections[i].inputChannel); + } + } +} } // namespace } // namespace facebook::velox::exec::prefixsort::test diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 8ebb04f9b0fb3..f021ce5c5c247 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -31,6 +31,23 @@ using namespace facebook::velox; using namespace facebook::velox::memory; namespace facebook::velox::functions::test { +namespace { +// Class to write runtime stats in the tests to the stats container. +class TestRuntimeStatWriter : public BaseRuntimeStatWriter { + public: + explicit TestRuntimeStatWriter( + std::unordered_map& stats) + : stats_{stats} {} + + void addRuntimeStat(const std::string& name, const RuntimeCounter& value) + override { + addOperatorRuntimeStats(name, value, stats_); + } + + private: + std::unordered_map& stats_; +}; +} // namespace class SortBufferTest : public OperatorTestBase, public testing::WithParamInterface { @@ -39,6 +56,8 @@ class SortBufferTest : public OperatorTestBase, OperatorTestBase::SetUp(); filesystems::registerLocalFileSystem(); rng_.seed(123); + statWriter_ = std::make_unique(stats_); + setThreadLocalRunTimeStatWriter(statWriter_.get()); } void TearDown() override { @@ -69,7 +88,9 @@ class SortBufferTest : public OperatorTestBase, const bool enableSpillPrefixSort_{GetParam()}; const velox::common::PrefixSortConfig prefixSortConfig_ = - velox::common::PrefixSortConfig{std::numeric_limits::max(), 130}; + velox::common::PrefixSortConfig{ + std::numeric_limits::max(), + GetParam() ? 8 : std::numeric_limits::max()}; const std::optional spillPrefixSortConfig_ = enableSpillPrefixSort_ ? std::optional(prefixSortConfig_) @@ -101,9 +122,32 @@ class SortBufferTest : public OperatorTestBase, tsan_atomic nonReclaimableSection_{false}; folly::Random::DefaultGenerator rng_; + std::unordered_map stats_; + std::unique_ptr statWriter_; }; TEST_P(SortBufferTest, singleKey) { + const RowVectorPtr data = makeRowVector( + {makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {17, 16, 15, 14, 13, 10, 8, 7, 4, 3}), // sorted column + makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1}), + makeFlatVector( + {1.1, 2.2, 2.2, 5.5, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1}), + makeFlatVector( + {"hello", + "world", + "today", + "is", + "great", + "hello", + "world", + "is", + "great", + "today"})}); + struct { std::vector sortCompareFlags; std::vector expectedResult; @@ -124,12 +168,12 @@ TEST_P(SortBufferTest, singleKey) { true, false, CompareFlags::NullHandlingMode::kNullAsValue}}, // Ascending - {1, 2, 3, 4, 5}}, + {3, 4, 7, 8, 10, 13, 14, 15, 16, 17}}, {{{true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}}, // Descending - {5, 4, 3, 2, 1}}}; + {17, 16, 15, 14, 13, 10, 8, 7, 4, 3}}}; // Specifies the sort columns ["c1"]. sortColumnIndices_ = {1}; @@ -143,25 +187,30 @@ TEST_P(SortBufferTest, singleKey) { &nonReclaimableSection_, prefixSortConfig_); - RowVectorPtr data = makeRowVector( - {makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({5, 4, 3, 2, 1}), // sorted column - makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({1.1, 2.2, 3.3, 4.4, 5.5}), - makeFlatVector({1.1, 2.2, 2.2, 5.5, 5.5}), - makeFlatVector( - {"hello", "world", "today", "is", "great"})}); - sortBuffer->addInput(data); sortBuffer->noMoreInput(); auto output = sortBuffer->getOutput(10000); - ASSERT_EQ(output->size(), 5); + ASSERT_EQ(output->size(), 10); int resultIndex = 0; for (int expectedValue : testData.expectedResult) { ASSERT_EQ( output->childAt(1)->asFlatVector()->valueAt(resultIndex++), expectedValue); } + if (GetParam()) { + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).sum, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).max, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).min, + sortColumnIndices_.size()); + } else { + ASSERT_EQ(stats_.count(PrefixSort::kNumPrefixSortKeys), 0); + } + stats_.clear(); } } @@ -175,23 +224,54 @@ TEST_P(SortBufferTest, multipleKeys) { prefixSortConfig_); RowVectorPtr data = makeRowVector( - {makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({5, 4, 3, 2, 1}), // sorted-2 column - makeFlatVector({1, 2, 3, 4, 5}), - makeFlatVector({1.1, 2.2, 3.3, 4.4, 5.5}), - makeFlatVector({1.1, 2.2, 2.2, 5.5, 5.5}), // sorted-1 column + {makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {15, 12, 9, 8, 7, 6, 5, 4, 3, 1}), // sorted-2 column + makeFlatVector({1, 2, 3, 4, 5, 6, 8, 10, 12, 15}), + makeFlatVector( + {1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1}), + makeFlatVector( + {1.1, 2.2, 2.2, 5.5, 5.5, 7.7, 8.1, 8, 8.1, 8.1, 10.0}), // sorted-1 + // column makeFlatVector( - {"hello", "world", "today", "is", "great"})}); + {"hello", + "world", + "today", + "is", + "great", + "hello", + "world", + "is", + "sort", + "sorted"})}); sortBuffer->addInput(data); sortBuffer->noMoreInput(); auto output = sortBuffer->getOutput(10000); - ASSERT_EQ(output->size(), 5); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(0), 5); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(1), 3); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(2), 4); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(3), 1); - ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(4), 2); + ASSERT_EQ(output->size(), 10); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(0), 15); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(1), 9); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(2), 12); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(3), 7); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(4), 8); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(5), 6); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(6), 4); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(7), 1); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(8), 3); + ASSERT_EQ(output->childAt(1)->asFlatVector()->valueAt(9), 5); + if (GetParam()) { + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).sum, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).max, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).min, + sortColumnIndices_.size()); + } else { + ASSERT_EQ(stats_.count(PrefixSort::kNumPrefixSortKeys), 0); + } } // TODO: enable it later with test utility to compare the sorted result. @@ -267,6 +347,7 @@ TEST_P(SortBufferTest, DISABLED_randomData) { inputVectors.push_back(input); } sortBuffer->noMoreInput(); + stats_.clear(); // todo: have a utility function buildExpectedSortResult and verify the // sorting result for random data. } @@ -469,6 +550,20 @@ TEST_P(SortBufferTest, spill) { memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage); } } + if (GetParam()) { + ASSERT_GE( + stats_.at(PrefixSort::kNumPrefixSortKeys).sum, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).max, + sortColumnIndices_.size()); + ASSERT_EQ( + stats_.at(PrefixSort::kNumPrefixSortKeys).min, + sortColumnIndices_.size()); + } else { + ASSERT_EQ(stats_.count(PrefixSort::kNumPrefixSortKeys), 0); + } + stats_.clear(); } }