diff --git a/velox/common/base/SpillConfig.cpp b/velox/common/base/SpillConfig.cpp index dd428a41ec7b8..9283e37828d52 100644 --- a/velox/common/base/SpillConfig.cpp +++ b/velox/common/base/SpillConfig.cpp @@ -25,6 +25,7 @@ SpillConfig::SpillConfig( uint64_t _maxFileSize, uint64_t _writeBufferSize, uint64_t _readBufferSize, + bool _mergePrefixComparatorEnabled, folly::Executor* _executor, int32_t _minSpillableReservationPct, int32_t _spillableReservationGrowthPct, @@ -44,6 +45,7 @@ SpillConfig::SpillConfig( : _maxFileSize), writeBufferSize(_writeBufferSize), readBufferSize(_readBufferSize), + mergePrefixComparatorEnabled(_mergePrefixComparatorEnabled), executor(_executor), minSpillableReservationPct(_minSpillableReservationPct), spillableReservationGrowthPct(_spillableReservationGrowthPct), diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index aeca392f38781..4180a06edc0aa 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -53,6 +53,7 @@ struct SpillConfig { uint64_t _maxFileSize, uint64_t _writeBufferSize, uint64_t _readBufferSize, + bool _mergePrefixComparatorEnabled, folly::Executor* _executor, int32_t _minSpillableReservationPct, int32_t _spillableReservationGrowthPct, @@ -105,6 +106,12 @@ struct SpillConfig { /// which doubles the buffer used to read from each spill file. uint64_t readBufferSize; + /// Enable the prefix comparator for the spill merge ordered reader. The more + /// the number of sort keys, the faster the prefix comparator. But it requires + /// the memory to build normalized prefix keys, which might have potential + /// risk of running out of server memory. + bool mergePrefixComparatorEnabled; + /// Executor for spilling. If nullptr spilling writes on the Driver's thread. folly::Executor* executor; // Not owned. diff --git a/velox/common/base/tests/SpillConfigTest.cpp b/velox/common/base/tests/SpillConfigTest.cpp index 9949486a48626..237abf5dea67f 100644 --- a/velox/common/base/tests/SpillConfigTest.cpp +++ b/velox/common/base/tests/SpillConfigTest.cpp @@ -39,6 +39,7 @@ TEST_P(SpillConfigTest, spillLevel) { 0, 0, 0, + true, nullptr, 0, 0, @@ -125,6 +126,7 @@ TEST_P(SpillConfigTest, spillLevelLimit) { 0, 0, 0, + true, nullptr, 0, 0, @@ -172,6 +174,7 @@ TEST_P(SpillConfigTest, spillableReservationPercentages) { 0, 0, 0, + true, nullptr, testData.minPct, testData.growthPct, diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index b2ea57af281b7..70ab90a1b7a9a 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -97,6 +97,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { 0, 0, /*readBufferSize=*/1 << 20, + true, spillExecutor_.get(), 10, 20, diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 7bceb06d117bb..0aea2c2b44f8a 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -239,6 +239,13 @@ class QueryConfig { static constexpr const char* kSpillPrefixSortEnabled = "spill_prefixsort_enabled"; + /// Enable the prefix comparator for the spill merge ordered reader. The more + /// the number of sort keys, the faster the prefix comparator. But it requires + /// the memory to build normalized prefix keys, which might have potential + /// risk of running out of server memory. + static constexpr const char* kSpillMergePrefixComparatorEnabled = + "spill_merge_prefix_comparator_enabled"; + /// Specifies spill write buffer size in bytes. The spiller tries to buffer /// serialized spill data up to the specified size before write to storage /// underneath for io efficiency. If it is set to zero, then spill write @@ -689,6 +696,10 @@ class QueryConfig { return get(kSpillPrefixSortEnabled, false); } + bool spillMergePrefixComparatorEnabled() const { + return get(kSpillMergePrefixComparatorEnabled, true); + } + uint64_t spillWriteBufferSize() const { // The default write buffer size set to 1MB. return get(kSpillWriteBufferSize, 1L << 20); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index f25b89a5518e5..da46bfc409732 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -354,6 +354,11 @@ Spilling - false - Enable the prefix sort or fallback to timsort in spill. The prefix sort is faster than std::sort but requires the memory to build normalized prefix keys, which might have potential risk of running out of server memory. + * - spill_merge_prefix_comparator_enabled + - bool + - true + - Enable the prefix comparator for the spill merge ordered reader. The more the number of sort keys, the faster the prefix comparator. + But it requires the memory to build normalized prefix keys, which might have potential risk of running out of server memory. * - spiller_start_partition_bit - integer - 29 diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 2b34f2f81ea74..4bfc2a838da22 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -253,6 +253,7 @@ class E2EWriterTest : public testing::Test { 0, 0, 0, + true, nullptr, minSpillableReservationPct, spillableReservationGrowthPct, diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index e5c4656d65217..ec4b6fee4c371 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -132,6 +132,7 @@ std::optional DriverCtx::makeSpillConfig( queryConfig.maxSpillFileSize(), queryConfig.spillWriteBufferSize(), queryConfig.spillReadBufferSize(), + queryConfig.spillMergePrefixComparatorEnabled(), task->queryCtx()->spillExecutor(), queryConfig.minSpillableReservationPct(), queryConfig.spillableReservationGrowthPct(), diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index b0ba52cd98285..fa5dca5a63fec 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -1112,7 +1112,10 @@ bool GroupingSet::prepareNextSpillPartitionOutput() { VELOX_CHECK_NE(outputSpillPartition_, it->first.partitionNumber()); outputSpillPartition_ = it->first.partitionNumber(); merge_ = it->second->createOrderedReader( - spillConfig_->readBufferSize, &pool_, spillStats_); + spillConfig_->readBufferSize, + spillConfig_->mergePrefixComparatorEnabled, + &pool_, + spillStats_); spillPartitionSet_.erase(it); return true; } diff --git a/velox/exec/IdentityProjection.h b/velox/exec/IdentityProjection.h new file mode 100644 index 0000000000000..6065bac406c0b --- /dev/null +++ b/velox/exec/IdentityProjection.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::exec { + +/// Represents a column that is copied from input to output, possibly +/// with cardinality change, i.e. values removed or duplicated. +struct IdentityProjection { + IdentityProjection( + column_index_t _inputChannel, + column_index_t _outputChannel) + : inputChannel(_inputChannel), outputChannel(_outputChannel) {} + + column_index_t inputChannel; + column_index_t outputChannel; +}; +} // namespace facebook::velox::exec diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index de06d78da65a3..1c147bdc40336 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -20,6 +20,7 @@ #include "velox/common/time/CpuWallTimer.h" #include "velox/core/PlanNode.h" #include "velox/exec/Driver.h" +#include "velox/exec/IdentityProjection.h" #include "velox/exec/JoinBridge.h" #include "velox/exec/OperatorTraceWriter.h" #include "velox/exec/Spiller.h" @@ -27,18 +28,6 @@ namespace facebook::velox::exec { -/// Represents a column that is copied from input to output, possibly -/// with cardinality change, i.e. values removed or duplicated. -struct IdentityProjection { - IdentityProjection( - column_index_t _inputChannel, - column_index_t _outputChannel) - : inputChannel(_inputChannel), outputChannel(_outputChannel) {} - - column_index_t inputChannel; - column_index_t outputChannel; -}; - struct MemoryStats { uint64_t userMemoryReservation{0}; uint64_t revocableMemoryReservation{0}; diff --git a/velox/exec/PrefixSort.cpp b/velox/exec/PrefixSort.cpp index b4dfb1f6e0d12..c9ff704765993 100644 --- a/velox/exec/PrefixSort.cpp +++ b/velox/exec/PrefixSort.cpp @@ -26,6 +26,63 @@ namespace { // to bitswap32. static constexpr int32_t kAlignment = 8; +template +FOLLY_ALWAYS_INLINE void encodeColumn( + const PrefixSortLayout& prefixSortLayout, + const DecodedVector& decoded, + vector_size_t row, + column_index_t col, + char* prefixBuffer) { + std::optional value; + if (!decoded.mayHaveNulls()) { + value = decoded.valueAt(row); + } else { + if (decoded.isNullAt(row)) { + value = std::nullopt; + } else { + value = decoded.valueAt(row); + } + } + prefixSortLayout.encoders[col].encode( + value, prefixBuffer + prefixSortLayout.prefixOffsets[col]); +} + +FOLLY_ALWAYS_INLINE void extractColumnToPrefix( + TypeKind typeKind, + const PrefixSortLayout& prefixSortLayout, + const DecodedVector& decoded, + vector_size_t row, + column_index_t col, + char* prefixBuffer) { + switch (typeKind) { + case TypeKind::SMALLINT: + return encodeColumn( + prefixSortLayout, decoded, row, col, prefixBuffer); + case TypeKind::INTEGER: + return encodeColumn( + prefixSortLayout, decoded, row, col, prefixBuffer); + case TypeKind::BIGINT: + return encodeColumn( + prefixSortLayout, decoded, row, col, prefixBuffer); + case TypeKind::REAL: + return encodeColumn( + prefixSortLayout, decoded, row, col, prefixBuffer); + case TypeKind::DOUBLE: + return encodeColumn( + prefixSortLayout, decoded, row, col, prefixBuffer); + case TypeKind::TIMESTAMP: + return encodeColumn( + prefixSortLayout, decoded, row, col, prefixBuffer); + case TypeKind::HUGEINT: + return encodeColumn( + prefixSortLayout, decoded, row, col, prefixBuffer); + default: + VELOX_UNSUPPORTED( + "prefix-sort does not support type kind: {}", + mapTypeKindToName(typeKind)); + } +} + template FOLLY_ALWAYS_INLINE void encodeRowColumn( const PrefixSortLayout& prefixSortLayout, @@ -206,6 +263,36 @@ void PrefixSortLayout::optimizeSortKeysOrder( }); } +void VectorPrefixEncoder::encode( + const PrefixSortLayout& sortLayout, + const std::vector& keyTypes, + const std::vector& decoded, + vector_size_t numRows, + char* prefixBuffer) { + VELOX_CHECK_EQ(decoded.size(), keyTypes.size()); + VELOX_CHECK_EQ(decoded.size(), sortLayout.numNormalizedKeys); + for (auto i = 0; i < numRows; ++i) { + char* bufferForRow = prefixBuffer + i * sortLayout.normalizedBufferSize; + for (auto j = 0; j < keyTypes.size(); ++j) { + extractColumnToPrefix( + keyTypes[j]->kind(), sortLayout, decoded[j], i, j, bufferForRow); + } + simd::memset( + bufferForRow + sortLayout.normalizedBufferSize - + sortLayout.numPaddingBytes, + 0, + sortLayout.numPaddingBytes); + + // When comparing in std::memcmp, each byte is compared. If it is changed to + // compare every 8 bytes, the number of comparisons will be reduced and the + // performance will be improved. + // Use uint64_t compare to implement the above-mentioned comparison of every + // 8 bytes, assuming the system is little-endian, need to reverse bytes for + // every 8 bytes. + bitsSwapByWord((uint64_t*)bufferForRow, sortLayout.normalizedBufferSize); + } +} + FOLLY_ALWAYS_INLINE int PrefixSort::compareAllNormalizedKeys( char* left, char* right) { diff --git a/velox/exec/PrefixSort.h b/velox/exec/PrefixSort.h index 7ac34b4305519..cbd581b803f96 100644 --- a/velox/exec/PrefixSort.h +++ b/velox/exec/PrefixSort.h @@ -16,7 +16,7 @@ #pragma once #include "velox/common/base/PrefixSortConfig.h" -#include "velox/exec/Operator.h" +#include "velox/exec/IdentityProjection.h" #include "velox/exec/RowContainer.h" #include "velox/exec/prefixsort/PrefixSortAlgorithm.h" #include "velox/exec/prefixsort/PrefixSortEncoder.h" @@ -32,37 +32,37 @@ namespace facebook::velox::exec { struct PrefixSortLayout { /// Number of bytes to store a prefix, it equals to: /// normalizedKeySize_ + 8 (non-normalized-ptr) + 8(row address). - const uint64_t entrySize; + uint64_t entrySize; /// If a sort key supports normalization and can be added to the prefix /// sort buffer, it is called a normalized key. - const uint32_t normalizedBufferSize; + uint32_t normalizedBufferSize; - const uint32_t numNormalizedKeys; + uint32_t numNormalizedKeys; /// The num of sort keys include normalized and non-normalized. - const uint32_t numKeys; + uint32_t numKeys; /// CompareFlags of all sort keys. - const std::vector compareFlags; + std::vector compareFlags; /// Whether the sort keys contains normalized key. /// It equals to 'numNormalizedKeys != 0', a little faster. - const bool hasNormalizedKeys; + bool hasNormalizedKeys; /// Whether the sort keys contains non-normalized key. - const bool hasNonNormalizedKey; + bool hasNonNormalizedKey; /// Offsets of normalized keys, used to find write locations when /// extracting columns - const std::vector prefixOffsets; + std::vector prefixOffsets; /// The encoders for normalized keys. - const std::vector encoders; + std::vector encoders; /// The number of padding bytes to align each prefix encoded row size to 8 /// for fast long compare. - const int32_t numPaddingBytes; + int32_t numPaddingBytes; static PrefixSortLayout makeSortLayout( const std::vector& types, @@ -81,6 +81,16 @@ struct PrefixSortLayout { std::vector& keyColumnProjections); }; +class VectorPrefixEncoder { + public: + static void encode( + const PrefixSortLayout& sortLayout, + const std::vector& keyTypes, + const std::vector& decoded, + vector_size_t numRows, + char* prefixBuffer); +}; + class PrefixSort { public: PrefixSort( diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 3782dff698169..9769c29f93c03 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -19,7 +19,6 @@ #include "velox/common/memory/MemoryAllocator.h" #include "velox/core/PlanNode.h" #include "velox/exec/ContainerRowSerde.h" -#include "velox/exec/Spill.h" #include "velox/vector/FlatVector.h" #include "velox/vector/VectorTypeUtils.h" diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index fe12fbecf1c14..ca36802e3949b 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -474,7 +474,10 @@ void SortBuffer::prepareOutputWithSpill() { VELOX_CHECK_EQ(spillPartitionSet_.size(), 1); spillMerger_ = spillPartitionSet_.begin()->second->createOrderedReader( - spillConfig_->readBufferSize, pool(), spillStats_); + spillConfig_->readBufferSize, + spillConfig_->mergePrefixComparatorEnabled, + pool(), + spillStats_); spillPartitionSet_.clear(); } diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index be08fb7109b6c..5f7b4a91dcdcf 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -280,7 +280,10 @@ void SortWindowBuild::noMoreInput() { spiller_->finishSpill(spillPartitionSet); VELOX_CHECK_EQ(spillPartitionSet.size(), 1); merge_ = spillPartitionSet.begin()->second->createOrderedReader( - spillConfig_->readBufferSize, pool_, spillStats_); + spillConfig_->readBufferSize, + spillConfig_->mergePrefixComparatorEnabled, + pool_, + spillStats_); } else { // At this point we have seen all the input rows. The operator is // being prepared to output rows now. diff --git a/velox/exec/Spill.cpp b/velox/exec/Spill.cpp index e0583f18b85ce..21a00ec9c88e5 100644 --- a/velox/exec/Spill.cpp +++ b/velox/exec/Spill.cpp @@ -24,6 +24,9 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::exec { namespace { + +static constexpr int32_t kAlignment = 8; + // Returns the CompareFlags vector whose size is equal to numSortKeys. Fill in // with default CompareFlags() if 'compareFlags' is empty. const std::vector getCompareFlagsOrDefault( @@ -35,36 +38,57 @@ const std::vector getCompareFlagsOrDefault( } return std::vector(numSortKeys); } + +FOLLY_ALWAYS_INLINE int +compareByWord(uint64_t* left, uint64_t* right, int32_t bytes) { + while (bytes != 0) { + if (*left == *right) { + ++left; + ++right; + bytes -= kAlignment; + continue; + } + if (*left > *right) { + return 1; + } else { + return -1; + } + } + return 0; +} } // namespace void SpillMergeStream::pop() { VELOX_CHECK(!closed_); if (++index_ >= size_) { - setNextBatch(); + nextBatch(); } } int32_t SpillMergeStream::compare(const MergeStream& other) const { VELOX_CHECK(!closed_); auto& otherStream = static_cast(other); - auto& children = rowVector_->children(); - auto& otherChildren = otherStream.current().children(); - int32_t key = 0; - if (sortCompareFlags().empty()) { - do { - auto result = children[key] - ->compare( - otherChildren[key].get(), - index_, - otherStream.index_, - CompareFlags()) - .value(); - if (result != 0) { - return result; - } - } while (++key < numSortKeys()); + if (prefixComparatorEnabled_) { + VELOX_DCHECK(otherStream.prefixComparatorEnabled_); + VELOX_DCHECK_EQ( + sortLayout_.normalizedBufferSize, + otherStream.sortLayout_.normalizedBufferSize); + if (!sortLayout_.hasNonNormalizedKey) { + return compareAllNormalizedKeys( + rawPrefixBuffer_ + index_ * sortLayout_.normalizedBufferSize, + otherStream.rawPrefixBuffer_ + + otherStream.index_ * sortLayout_.normalizedBufferSize); + } else { + return comparePartNormalizedKeys( + rawPrefixBuffer_ + index_ * sortLayout_.normalizedBufferSize, + otherStream.rawPrefixBuffer_ + + otherStream.index_ * sortLayout_.normalizedBufferSize, + otherStream); + } } else { - do { + auto& children = rowVector_->children(); + auto& otherChildren = otherStream.current().children(); + for (auto key = 0; key < numSortKeys(); ++key) { auto result = children[key] ->compare( otherChildren[key].get(), @@ -75,8 +99,9 @@ int32_t SpillMergeStream::compare(const MergeStream& other) const { if (result != 0) { return result; } - } while (++key < numSortKeys()); + } } + return 0; } @@ -84,12 +109,47 @@ void SpillMergeStream::close() { VELOX_CHECK(!closed_); closed_ = true; rowVector_.reset(); + prefixBuffer_.reset(); decoded_.clear(); rows_.resize(0); index_ = 0; size_ = 0; } +FOLLY_ALWAYS_INLINE int SpillMergeStream::compareAllNormalizedKeys( + char* left, + char* right) const { + return compareByWord( + (uint64_t*)left, (uint64_t*)right, sortLayout_.normalizedBufferSize); +} + +int SpillMergeStream::comparePartNormalizedKeys( + char* left, + char* right, + const SpillMergeStream& otherStream) const { + int result = compareAllNormalizedKeys(left, right); + if (result != 0) { + return result; + } + + auto& children = rowVector_->children(); + auto& otherChildren = otherStream.current().children(); + // If prefixes are equal, compare the remaining sort keys with BaseVector. + for (auto i = sortLayout_.numNormalizedKeys; i < sortLayout_.numKeys; ++i) { + auto result = children[i] + ->compare( + otherChildren[i].get(), + index_, + otherStream.index_, + sortCompareFlags()[i]) + .value(); + if (result != 0) { + return result; + } + } + return result; +} + SpillState::SpillState( const common::GetSpillDirectoryPathCB& getSpillDirPathCb, const common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb, @@ -287,13 +347,16 @@ SpillPartition::createUnorderedReader( std::unique_ptr> SpillPartition::createOrderedReader( uint64_t bufferSize, + bool prefixComparatorEnabled, memory::MemoryPool* pool, folly::Synchronized* spillStats) { std::vector> streams; streams.reserve(files_.size()); for (auto& fileInfo : files_) { streams.push_back(FileSpillMergeStream::create( - SpillReadFile::create(fileInfo, bufferSize, pool, spillStats))); + SpillReadFile::create(fileInfo, bufferSize, pool, spillStats), + prefixComparatorEnabled, + pool)); } files_.clear(); // Check if the partition is empty or not. @@ -303,11 +366,66 @@ SpillPartition::createOrderedReader( return std::make_unique>(std::move(streams)); } +FileSpillMergeStream::FileSpillMergeStream( + std::unique_ptr spillFile, + bool prefixComparatorEnabled, + memory::MemoryPool* pool) + : spillFile_(std::move(spillFile)), pool_(pool) { + VELOX_CHECK_NOT_NULL(spillFile_); + VELOX_CHECK_EQ(sortCompareFlags().size(), numSortKeys()); + const auto sortTypes = std::vector( + spillFile_->type()->children().begin(), + spillFile_->type()->children().begin() + numSortKeys()); + sortLayout_ = + PrefixSortLayout::makeSortLayout(sortTypes, sortCompareFlags(), 128); + if (sortLayout_.numNormalizedKeys < 2) { + prefixComparatorEnabled_ = false; + } else { + prefixComparatorEnabled_ = prefixComparatorEnabled; + } +} + uint32_t FileSpillMergeStream::id() const { VELOX_CHECK(!closed_); return spillFile_->id(); } +void FileSpillMergeStream::buildPrefixBuffer() { + // After close or no data, we should not build the prefix buffer. + if (size_ == 0 || prefixComparatorEnabled_ == false) { + return; + } + + ensureRows(); + if (decoded_.empty()) { + decoded_.resize(sortLayout_.numNormalizedKeys); + } + for (auto i = 0; i < decoded_.size(); ++i) { + decoded_[i].decode(*rowVector_->childAt(i), rows_); + } + // The small dataset also triggers the prefix buffer build process because + // usually spilled RowVector should be a big vector and we should compare with + // other Stream, make the both side of stream status consistent would be easy + // to understand. + if (prefixBuffer_ && + prefixBuffer_->size() < size_ * sortLayout_.normalizedBufferSize) { + AlignedBuffer::reallocate( + &prefixBuffer_, size_ * sortLayout_.normalizedBufferSize); + rawPrefixBuffer_ = prefixBuffer_->asMutable(); + } else { + prefixBuffer_ = AlignedBuffer::allocate( + size_ * sortLayout_.normalizedBufferSize, pool_); + rawPrefixBuffer_ = prefixBuffer_->asMutable(); + } + + const auto normalizedTypes = std::vector( + spillFile_->type()->children().begin(), + spillFile_->type()->children().begin() + sortLayout_.numNormalizedKeys); + + VectorPrefixEncoder::encode( + sortLayout_, normalizedTypes, decoded_, size_, rawPrefixBuffer_); +} + void FileSpillMergeStream::nextBatch() { VELOX_CHECK(!closed_); index_ = 0; @@ -317,6 +435,7 @@ void FileSpillMergeStream::nextBatch() { return; } size_ = rowVector_->size(); + buildPrefixBuffer(); } void FileSpillMergeStream::close() { diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index a2266aa7315a3..9cae677e018eb 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -24,6 +24,7 @@ #include "velox/common/compression/Compression.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" +#include "velox/exec/PrefixSort.h" #include "velox/exec/SpillFile.h" #include "velox/exec/TreeOfLosers.h" #include "velox/exec/UnorderedStreamReader.h" @@ -86,15 +87,11 @@ class SpillMergeStream : public MergeStream { virtual void close(); + virtual void buildPrefixBuffer() {}; + // loads the next 'rowVector' and sets 'decoded_' if this is initialized. void setNextBatch() { nextBatch(); - if (!decoded_.empty()) { - ensureRows(); - for (auto i = 0; i < decoded_.size(); ++i) { - decoded_[i].decode(*rowVector_->childAt(i), rows_); - } - } } void ensureDecodedValid(int32_t index) { @@ -133,15 +130,32 @@ class SpillMergeStream : public MergeStream { // Covers all rows inn 'rowVector_' Set if 'decoded_' is non-empty. SelectivityVector rows_; + + bool prefixComparatorEnabled_{false}; + PrefixSortLayout sortLayout_; + BufferPtr prefixBuffer_; + char* rawPrefixBuffer_; + + private: + FOLLY_ALWAYS_INLINE int compareAllNormalizedKeys(char* left, char* right) + const; + + int comparePartNormalizedKeys( + char* left, + char* right, + const SpillMergeStream& other) const; }; // A source of spilled RowVectors coming from a file. class FileSpillMergeStream : public SpillMergeStream { public: static std::unique_ptr create( - std::unique_ptr spillFile) { - auto spillStream = std::unique_ptr( - new FileSpillMergeStream(std::move(spillFile))); + std::unique_ptr spillFile, + bool prefixComparatorEnabled, + memory::MemoryPool* pool) { + auto spillStream = + std::unique_ptr(new FileSpillMergeStream( + std::move(spillFile), prefixComparatorEnabled, pool)); static_cast(spillStream.get())->nextBatch(); return spillStream; } @@ -149,10 +163,10 @@ class FileSpillMergeStream : public SpillMergeStream { uint32_t id() const override; private: - explicit FileSpillMergeStream(std::unique_ptr spillFile) - : spillFile_(std::move(spillFile)) { - VELOX_CHECK_NOT_NULL(spillFile_); - } + FileSpillMergeStream( + std::unique_ptr spillFile, + bool prefixComparatorEnabled, + memory::MemoryPool* pool); int32_t numSortKeys() const override { VELOX_CHECK(!closed_); @@ -164,11 +178,14 @@ class FileSpillMergeStream : public SpillMergeStream { return spillFile_->sortCompareFlags(); } + void buildPrefixBuffer() override; + void nextBatch() override; void close() override; std::unique_ptr spillFile_; + memory::MemoryPool* const pool_; }; /// A source of spilled RowVectors coming from a file. The spill data might not @@ -319,6 +336,7 @@ class SpillPartition { /// stats when reading data from spilled files. std::unique_ptr> createOrderedReader( uint64_t bufferSize, + bool prefixComparatorEnabled, memory::MemoryPool* pool, folly::Synchronized* spillStats); diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index 6b5e4f464b925..db65213c61226 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -279,6 +279,20 @@ std::unique_ptr SpillReadFile::create( stats)); } +namespace { +// Returns the CompareFlags vector whose size is equal to numSortKeys. Fill in +// with default CompareFlags() if 'compareFlags' is empty. +const std::vector getCompareFlagsOrDefault( + const std::vector& compareFlags, + int32_t numSortKeys) { + VELOX_DCHECK(compareFlags.empty() || compareFlags.size() == numSortKeys); + if (compareFlags.size() == numSortKeys) { + return compareFlags; + } + return std::vector(numSortKeys); +} +} // namespace + SpillReadFile::SpillReadFile( uint32_t id, const std::string& path, @@ -295,7 +309,8 @@ SpillReadFile::SpillReadFile( size_(size), type_(type), numSortKeys_(numSortKeys), - sortCompareFlags_(sortCompareFlags), + sortCompareFlags_( + getCompareFlagsOrDefault(sortCompareFlags, numSortKeys)), compressionKind_(compressionKind), readOptions_{ kDefaultUseLosslessTimestamp, diff --git a/velox/exec/SpillFile.h b/velox/exec/SpillFile.h index ecf91d3d78659..a6e7ad4b2386b 100644 --- a/velox/exec/SpillFile.h +++ b/velox/exec/SpillFile.h @@ -239,6 +239,10 @@ class SpillReadFile { return path_; } + const RowTypePtr& type() const { + return type_; + } + private: SpillReadFile( uint32_t id, diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 91a1352a205b9..196e66e541192 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -19,6 +19,7 @@ #include "velox/common/compression/Compression.h" #include "velox/exec/HashBitRange.h" #include "velox/exec/RowContainer.h" +#include "velox/exec/Spill.h" namespace facebook::velox::exec { diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index f9766007c3ae1..5664a662d1ee8 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -294,7 +294,10 @@ void TopNRowNumber::noMoreInput() { spiller_->finishSpill(spillPartitionSet); VELOX_CHECK_EQ(spillPartitionSet.size(), 1); merge_ = spillPartitionSet.begin()->second->createOrderedReader( - spillConfig_->readBufferSize, pool(), &spillStats_); + spillConfig_->readBufferSize, + spillConfig_->mergePrefixComparatorEnabled, + pool(), + &spillStats_); } else { outputRows_.resize(outputBatchSize_); } diff --git a/velox/exec/prefixsort/PrefixSortEncoder.h b/velox/exec/prefixsort/PrefixSortEncoder.h index c4d356ec53d46..153936a3c25ea 100644 --- a/velox/exec/prefixsort/PrefixSortEncoder.h +++ b/velox/exec/prefixsort/PrefixSortEncoder.h @@ -27,7 +27,7 @@ namespace facebook::velox::exec::prefixsort { class PrefixSortEncoder { public: PrefixSortEncoder(bool ascending, bool nullsFirst) - : ascending_(ascending), nullsFirst_(nullsFirst){}; + : ascending_(ascending), nullsFirst_(nullsFirst) {}; /// Encode native primitive types(such as uint64_t, int64_t, uint32_t, /// int32_t, float, double, Timestamp). diff --git a/velox/exec/tests/PrefixSortTest.cpp b/velox/exec/tests/PrefixSortTest.cpp index 9bad8aeb035cc..26d851c9af894 100644 --- a/velox/exec/tests/PrefixSortTest.cpp +++ b/velox/exec/tests/PrefixSortTest.cpp @@ -371,5 +371,123 @@ TEST_F(PrefixSortTest, optimizeSortKeysOrder) { } } } + +class VectorPrefixEncoderTest : public exec::test::OperatorTestBase { + protected: + static constexpr CompareFlags kAsc{ + true, + true, + false, + CompareFlags::NullHandlingMode::kNullAsValue}; +}; + +TEST_F(VectorPrefixEncoderTest, encode) { + const std::vector testData = { + makeNullableFlatVector({7979, std::nullopt, -3, 2, 1}), + makeFlatVector({5, 4, 3, 2, 1}), + makeFlatVector({5, 4, 3, 2, 1}), + makeFlatVector( + {5, + HugeInt::parse("1234567"), + HugeInt::parse("12345678901234567890"), + HugeInt::parse("12345679"), + HugeInt::parse("-12345678901234567890")}), + makeFlatVector({5.5, 4.4, 3.3, 2.2, 1.1}), + makeFlatVector({5.5, 4.4, 3.3, 2.2, 1.1}), + makeFlatVector( + {Timestamp(5, 5), + Timestamp(4, 4), + Timestamp(3, 3), + Timestamp(2, 2), + Timestamp(1, 1)})}; + const std::vector> expectedValues = { + {108086391056891935, + 3098476543630901248, + 0, + 0, + 108086391056891903, + 18230571291595767808ull, + 108086391056891904, + 144115188075855872, + 108086391056891904, + 72057594037927936}, + {108086524922298368, + 0, + 108086391056891904, + 108086391056891904, + 108086391006560256}, + {116859394334916608, + 0, + 108086391056891904, + 108086391056891904, + 108086391056891904}, + {108086391056891904, + 31, + 3098476543630901248, + 0, + 0, + 0, + 81523983842910625, + 11601272640106397696ull, + 72057594037927936, + 81523983842910625, + 11646767826930344353ull, + 11601272640106397696ull, + 81523983842910625, + 11646767826930344353ull, + 11601272640106397696ull}, + {108086524922298368, + 0, + 108086391056891904, + 108086391056891904, + 144115188059078656}, + {108086391056891935, + 3098476543630901248, + 0, + 0, + 144115188075855871, + 18374686479671623680ull, + 108086391056891904, + 144115188075855872, + 108086391056891904, + 72057594037927936}, + {108086391056891935, + 3098476543630901248, + 0, + 0, + 0, + 0, + 108086391056891904, + 117552780861874593, + 11601272640106397696ull, + 81523983842910625, + 11646767826930344353ull, + 11601272640106397696ull, + 81523983842910625, + 11646767826930344353ull, + 11601272640106397696ull}}; + const auto numRows = 5; + std::vector decoded; + decoded.resize(testData.size()); + for (auto i = 0; i < testData.size(); ++i) { + decoded[i].decode(*testData[i]); + const auto data = makeRowVector({testData[i]}); + std::vector compareFlags = {kAsc}; + const auto rowType = asRowType(data->type()); + const std::vector keyTypes = rowType->children(); + auto sortLayout = + PrefixSortLayout::makeSortLayout(keyTypes, compareFlags, 17); + char rawPrefixBuffer_[sortLayout.normalizedBufferSize * numRows]; + VectorPrefixEncoder::encode( + sortLayout, keyTypes, decoded, numRows, rawPrefixBuffer_); + auto encodedValue = reinterpret_cast(rawPrefixBuffer_); + + for (auto j = 0; + j < sortLayout.normalizedBufferSize * numRows / sizeof(uint64_t); + ++j) { + ASSERT_EQ(encodedValue[j], expectedValues[i][j]); + } + } +} } // namespace } // namespace facebook::velox::exec::prefixsort::test diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index f021ce5c5c247..db26b51823e8a 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -74,6 +74,7 @@ class SortBufferTest : public OperatorTestBase, 0, 0, 1 << 20, + true, executor_.get(), 5, 10, @@ -392,6 +393,7 @@ TEST_P(SortBufferTest, batchOutput) { 1000, 0, 1 << 20, + true, executor_.get(), 5, 10, @@ -489,6 +491,7 @@ TEST_P(SortBufferTest, spill) { 1000, 0, 1 << 20, + true, executor_.get(), 100, spillableReservationGrowthPct, diff --git a/velox/exec/tests/SpillTest.cpp b/velox/exec/tests/SpillTest.cpp index cedf31058f4df..db67c5c3c3745 100644 --- a/velox/exec/tests/SpillTest.cpp +++ b/velox/exec/tests/SpillTest.cpp @@ -421,8 +421,8 @@ class SpillTest : public ::testing::TestWithParam, ASSERT_EQ(state_->numFinishedFiles(partition), 0); auto spillPartition = SpillPartition(SpillPartitionId{0, partition}, std::move(spillFiles)); - auto merge = - spillPartition.createOrderedReader(1 << 20, pool(), &spillStats_); + auto merge = spillPartition.createOrderedReader( + 1 << 20, true, pool(), &spillStats_); int numReadBatches = 0; // We expect all the rows in dense increasing order. for (auto i = 0; i < numBatches * numRowsPerBatch; ++i) { @@ -576,10 +576,10 @@ TEST_P(SpillTest, spillTimestamp) { SpillPartition spillPartition(SpillPartitionId{0, 0}, state.finish(0)); auto merge = - spillPartition.createOrderedReader(1 << 20, pool(), &spillStats_); + spillPartition.createOrderedReader(1 << 20, true, pool(), &spillStats_); ASSERT_TRUE(merge != nullptr); ASSERT_TRUE( - spillPartition.createOrderedReader(1 << 20, pool(), &spillStats_) == + spillPartition.createOrderedReader(1 << 20, true, pool(), &spillStats_) == nullptr); for (auto i = 0; i < timeValues.size(); ++i) { auto* stream = merge->next(); @@ -937,6 +937,82 @@ TEST(SpillTest, scopedSpillInjectionRegex) { } } +TEST_P(SpillTest, multipleSortKeys) { + auto tempDirectory = exec::test::TempDirectoryPath::create(); + std::vector emptyCompareFlags; + const std::string spillPath = tempDirectory->getPath() + "/test"; + std::vector timeValues = {Timestamp{0, 0}, Timestamp{12, 0}}; + std::vector intValues = {1, 2}; + const std::optional prefixSortConfig = + enablePrefixSort_ + ? std::optional(common::PrefixSortConfig()) + : std::nullopt; + struct { + bool mergePrefixComparatorEnabled; + int numSortKey; + + std::string debugString() const { + return fmt::format( + "mergePrefixComparatorEnabled: {}, numSortKey: {}", + mergePrefixComparatorEnabled, + numSortKey); + } + + } testSettings[] = {{false, 1}, {true, 2}, {true, 1}, {false, 2}, {true, 3}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + SpillState state( + [&]() -> const std::string& { return tempDirectory->getPath(); }, + updateSpilledBytesCb_, + "test", + 1, + testData.numSortKey, + emptyCompareFlags, + 1024, + 0, + compressionKind_, + prefixSortConfig, + pool(), + &spillStats_); + int partitionIndex = 0; + state.setPartitionSpilled(partitionIndex); + ASSERT_TRUE(state.isPartitionSpilled(partitionIndex)); + ASSERT_FALSE( + state.testingNonEmptySpilledPartitionSet().contains(partitionIndex)); + std::vector vectors = { + makeFlatVector(timeValues), + makeFlatVector(intValues)}; + // The last sort key is not supported in prefix comparator. + if (testData.numSortKey == 3) { + vectors.push_back(makeArrayVector({{1, 2, 3}, {3, 4, 5}})); + } + state.appendToPartition(partitionIndex, makeRowVector({vectors})); + state.finishFile(partitionIndex); + EXPECT_TRUE( + state.testingNonEmptySpilledPartitionSet().contains(partitionIndex)); + + SpillPartition spillPartition(SpillPartitionId{0, 0}, state.finish(0)); + auto merge = spillPartition.createOrderedReader( + 1 << 20, testData.mergePrefixComparatorEnabled, pool(), &spillStats_); + ASSERT_TRUE(merge != nullptr); + + for (auto i = 0; i < timeValues.size(); ++i) { + auto* stream = merge->next(); + ASSERT_NE(stream, nullptr); + ASSERT_EQ( + timeValues[i], + stream->decoded(0).valueAt(stream->currentIndex())); + + ASSERT_EQ( + intValues[i], + stream->decoded(1).valueAt(stream->currentIndex())); + stream->pop(); + } + ASSERT_EQ(nullptr, merge->next()); + } +} + VELOX_INSTANTIATE_TEST_SUITE_P( SpillTestSuite, SpillTest, diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index 83d6d7983e864..2f869be0fdc69 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -58,6 +58,7 @@ struct TestParam { int poolSize; common::CompressionKind compressionKind; bool enablePrefixSort; + bool mergePrefixComparatorEnabled; core::JoinType joinType; TestParam( @@ -65,20 +66,23 @@ struct TestParam { int _poolSize, common::CompressionKind _compressionKind, bool _enablePrefixSort, + bool _mergePrefixComparatorEnabled, core::JoinType _joinType) : type(_type), poolSize(_poolSize), compressionKind(_compressionKind), enablePrefixSort(_enablePrefixSort), + mergePrefixComparatorEnabled(_mergePrefixComparatorEnabled), joinType(_joinType) {} std::string toString() const { return fmt::format( - "{}|{}|{}|{}", + "{}|{}|{}|{}|{}", Spiller::typeName(type), poolSize, compressionKindToString(compressionKind), std::to_string(enablePrefixSort), + std::to_string(mergePrefixComparatorEnabled), joinTypeName(joinType)); } }; @@ -98,6 +102,7 @@ struct TestParamsBuilder { poolSize, compressionKind, poolSize % 2, + poolSize % 3, core::JoinType::kRight); if (type == Spiller::Type::kHashJoinBuild) { params.emplace_back( @@ -105,6 +110,7 @@ struct TestParamsBuilder { poolSize, compressionKind, poolSize % 2, + poolSize % 3, core::JoinType::kLeft); } } @@ -658,11 +664,17 @@ class SpillerTest : public exec::test::RowContainerTestBase { // We make a merge reader that merges the spill files and the rows that // are still in the RowContainer. auto merge = spillPartition->createOrderedReader( - spillConfig_.readBufferSize, pool(), &spillStats_); + spillConfig_.readBufferSize, + spillConfig_.mergePrefixComparatorEnabled, + pool(), + &spillStats_); ASSERT_TRUE(merge != nullptr); ASSERT_TRUE( spillPartition->createOrderedReader( - spillConfig_.readBufferSize, pool(), &spillStats_) == nullptr); + spillConfig_.readBufferSize, + spillConfig_.mergePrefixComparatorEnabled, + pool(), + &spillStats_) == nullptr); // We read the spilled data back and check that it matches the sorted // order of the partition. @@ -1473,7 +1485,10 @@ TEST_P(AggregationOutputOnly, basic) { const int expectedNumSpilledRows = numRows - numListedRows; auto merge = spillPartition->createOrderedReader( - spillConfig_.readBufferSize, pool(), &spillStats_); + spillConfig_.readBufferSize, + spillConfig_.mergePrefixComparatorEnabled, + pool(), + &spillStats_); if (expectedNumSpilledRows == 0) { ASSERT_TRUE(merge == nullptr); } else { @@ -1584,7 +1599,10 @@ TEST_P(OrderByOutputOnly, basic) { const int expectedNumSpilledRows = numListedRows; auto merge = spillPartition->createOrderedReader( - spillConfig_.readBufferSize, pool(), &spillStats_); + spillConfig_.readBufferSize, + spillConfig_.mergePrefixComparatorEnabled, + pool(), + &spillStats_); if (expectedNumSpilledRows == 0) { ASSERT_TRUE(merge == nullptr); } else { diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index a725d359207d8..4c00ccf8a4958 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -52,6 +52,7 @@ class WindowTest : public OperatorTestBase { 0, 0, 1 << 20, + true, executor_.get(), 5, 10,