From 17c3d8480ffee051252dcb7b61ce1fafcb0ec6ba Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Mon, 16 Sep 2024 17:18:11 -0700 Subject: [PATCH] Remove the shared string allocator use case (#11002) Summary: Remove the existing use cases on shared string allocator which cause complexity inside the row container. The shared string allocator is not thread-safe so it is better not to share with the other components. The memory overhead within a shared string allocator is pretty small which is just one or two pages. The shared string allocator is previously used for in-memory spilling operation which has been removed. Pull Request resolved: https://github.com/facebookincubator/velox/pull/11002 bypass-github-export-checks Reviewed By: amitkdutta, oerling Differential Revision: D62609465 fbshipit-source-id: 44fafb9f943a2b1567e745abec21844df880cd85 --- velox/exec/GroupingSet.cpp | 10 ++++------ velox/exec/HashTable.cpp | 6 ++---- velox/exec/HashTable.h | 10 +++------- velox/exec/RowContainer.cpp | 14 ++++---------- velox/exec/RowContainer.h | 12 ++---------- velox/exec/SortBuffer.cpp | 28 +++++++++++++++------------- 6 files changed, 30 insertions(+), 50 deletions(-) diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 5e478bef8094..aa4b6b732d83 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -311,10 +311,10 @@ void initializeAggregates( int i = 0; for (auto& aggregate : aggregates) { auto& function = aggregate.function; + function->setAllocator(&rows.stringAllocator()); if (excludeToIntermediate && function->supportsToIntermediate()) { continue; } - function->setAllocator(&rows.stringAllocator()); const auto rowColumn = rows.columnAt(numKeys + i); function->setOffsets( @@ -1055,13 +1055,12 @@ bool GroupingSet::getOutputWithSpill( false, false, false, - &pool_, - table_->rows()->stringAllocatorShared()); + &pool_); initializeAggregates(aggregates_, *mergeRows_, false); } - VELOX_CHECK_EQ(table_->rows()->numRows(), 0); + table_->clear(/*freeTable=*/true); VELOX_CHECK_NULL(merge_); spiller_->finishSpill(spillPartitionSet_); @@ -1274,8 +1273,7 @@ void GroupingSet::abandonPartialAggregation() { false, false, false, - &pool_, - table_->rows()->stringAllocatorShared()); + &pool_); initializeAggregates(aggregates_, *intermediateRows_, true); table_.reset(); } diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 78aa468fc419..6abede7ccafc 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -52,8 +52,7 @@ HashTable::HashTable( bool isJoinBuild, bool hasProbedFlag, uint32_t minTableSizeForParallelJoinBuild, - memory::MemoryPool* pool, - const std::shared_ptr& stringArena) + memory::MemoryPool* pool) : BaseHashTable(std::move(hashers)), minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild), isJoinBuild_(isJoinBuild) { @@ -74,8 +73,7 @@ HashTable::HashTable( isJoinBuild, hasProbedFlag, hashMode_ != HashMode::kHash, - pool, - stringArena); + pool); nextOffset_ = rows_->nextOffset(); } diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 549733178d90..12f0b05d07b1 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -457,17 +457,14 @@ class HashTable : public BaseHashTable { bool isJoinBuild, bool hasProbedFlag, uint32_t minTableSizeForParallelJoinBuild, - memory::MemoryPool* pool, - const std::shared_ptr& stringArena = nullptr); + memory::MemoryPool* pool); ~HashTable() override = default; static std::unique_ptr createForAggregation( std::vector>&& hashers, const std::vector& accumulators, - memory::MemoryPool* pool, - const std::shared_ptr& stringArena = - nullptr) { + memory::MemoryPool* pool) { return std::make_unique( std::move(hashers), accumulators, @@ -476,8 +473,7 @@ class HashTable : public BaseHashTable { false, // isJoinBuild false, // hasProbedFlag 0, // minTableSizeForParallelJoinBuild - pool, - stringArena); + pool); } static std::unique_ptr createForJoin( diff --git a/velox/exec/RowContainer.cpp b/velox/exec/RowContainer.cpp index b83050982b49..a158c31cdbee 100644 --- a/velox/exec/RowContainer.cpp +++ b/velox/exec/RowContainer.cpp @@ -139,15 +139,12 @@ RowContainer::RowContainer( bool isJoinBuild, bool hasProbedFlag, bool hasNormalizedKeys, - memory::MemoryPool* pool, - std::shared_ptr stringAllocator) + memory::MemoryPool* pool) : keyTypes_(keyTypes), nullableKeys_(nullableKeys), isJoinBuild_(isJoinBuild), hasNormalizedKeys_(hasNormalizedKeys), - stringAllocator_( - stringAllocator ? stringAllocator - : std::make_shared(pool)), + stringAllocator_(std::make_unique(pool)), accumulators_(accumulators), rows_(pool) { // Compute the layout of the payload row. The row has keys, null flags, @@ -936,8 +933,7 @@ void RowContainer::hash( } void RowContainer::clear() { - const bool sharedStringAllocator = !stringAllocator_.unique(); - if (sharedStringAllocator || usesExternalMemory_) { + if (usesExternalMemory_) { constexpr int32_t kBatch = 1000; std::vector rows(kBatch); RowContainerIterator iter; @@ -948,9 +944,7 @@ void RowContainer::clear() { hasDuplicateRows_ = false; rows_.clear(); - if (!sharedStringAllocator) { - stringAllocator_->clear(); - } + stringAllocator_->clear(); numRows_ = 0; numRowsWithNormalizedKey_ = 0; normalizedKeySize_ = originalNormalizedKeySize_; diff --git a/velox/exec/RowContainer.h b/velox/exec/RowContainer.h index 230e01b96f21..9e28fb527e45 100644 --- a/velox/exec/RowContainer.h +++ b/velox/exec/RowContainer.h @@ -227,9 +227,6 @@ class RowContainer { /// into one word for faster comparison. The bulk allocation is done /// from 'allocator'. ContainerRowSerde is used for serializing complex /// type values into the container. - /// 'stringAllocator' allows sharing the variable length data arena with - /// another RowContainer. This is needed for spilling where the same - /// aggregates are used for reading one container and merging into another. RowContainer( const std::vector& keyTypes, bool nullableKeys, @@ -239,8 +236,7 @@ class RowContainer { bool isJoinBuild, bool hasProbedFlag, bool hasNormalizedKey, - memory::MemoryPool* pool, - std::shared_ptr stringAllocator = nullptr); + memory::MemoryPool* pool); /// Allocates a new row and initializes possible aggregates to null. char* newRow(); @@ -313,10 +309,6 @@ class RowContainer { return *stringAllocator_; } - const std::shared_ptr& stringAllocatorShared() { - return stringAllocator_; - } - /// Returns the number of used rows in 'this'. This is the number of rows a /// RowContainerIterator would access. int64_t numRows() const { @@ -1330,7 +1322,7 @@ class RowContainer { const bool isJoinBuild_; // True if normalized keys are enabled in initial state. const bool hasNormalizedKeys_; - const std::shared_ptr stringAllocator_; + const std::unique_ptr stringAllocator_; std::vector columnHasNulls_; diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index c338f5d084a5..2def45fb9cfb 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -252,21 +252,23 @@ void SortBuffer::ensureOutputFits() { return; } - if (estimatedOutputRowSize_.has_value()) { - const uint64_t outputBufferSizeToReserve = - estimatedOutputRowSize_.value() * 1.2; - { - memory::ReclaimableSectionGuard guard(nonReclaimableSection_); - if (pool_->maybeReserve(outputBufferSizeToReserve)) { - return; - } + if (!estimatedOutputRowSize_.has_value()) { + return; + } + + const uint64_t outputBufferSizeToReserve = + estimatedOutputRowSize_.value() * 1.2; + { + memory::ReclaimableSectionGuard guard(nonReclaimableSection_); + if (pool_->maybeReserve(outputBufferSizeToReserve)) { + return; } - LOG(WARNING) << "Failed to reserve " - << succinctBytes(outputBufferSizeToReserve) - << " for memory pool " << pool_->name() - << ", usage: " << succinctBytes(pool_->usedBytes()) - << ", reservation: " << succinctBytes(pool_->reservedBytes()); } + LOG(WARNING) << "Failed to reserve " + << succinctBytes(outputBufferSizeToReserve) + << " for memory pool " << pool_->name() + << ", usage: " << succinctBytes(pool_->usedBytes()) + << ", reservation: " << succinctBytes(pool_->reservedBytes()); } void SortBuffer::updateEstimatedOutputRowSize() {