Skip to content

Commit

Permalink
Remove the shared string allocator use case (facebookincubator#11002)
Browse files Browse the repository at this point in the history
Summary:
Remove the existing use cases on shared string allocator which cause complexity inside the row container.
The shared string allocator is not thread-safe so it is better not to share with the other components. The
memory overhead within a shared string allocator is pretty small which is just one or two pages. The
shared string allocator is previously used for in-memory spilling operation which has been removed.

Pull Request resolved: facebookincubator#11002

bypass-github-export-checks

Reviewed By: amitkdutta, oerling

Differential Revision: D62609465

fbshipit-source-id: 44fafb9f943a2b1567e745abec21844df880cd85
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 17, 2024
1 parent 794b8b6 commit 17c3d84
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 50 deletions.
10 changes: 4 additions & 6 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,10 @@ void initializeAggregates(
int i = 0;
for (auto& aggregate : aggregates) {
auto& function = aggregate.function;
function->setAllocator(&rows.stringAllocator());
if (excludeToIntermediate && function->supportsToIntermediate()) {
continue;
}
function->setAllocator(&rows.stringAllocator());

const auto rowColumn = rows.columnAt(numKeys + i);
function->setOffsets(
Expand Down Expand Up @@ -1055,13 +1055,12 @@ bool GroupingSet::getOutputWithSpill(
false,
false,
false,
&pool_,
table_->rows()->stringAllocatorShared());
&pool_);

initializeAggregates(aggregates_, *mergeRows_, false);
}

VELOX_CHECK_EQ(table_->rows()->numRows(), 0);
table_->clear(/*freeTable=*/true);

VELOX_CHECK_NULL(merge_);
spiller_->finishSpill(spillPartitionSet_);
Expand Down Expand Up @@ -1274,8 +1273,7 @@ void GroupingSet::abandonPartialAggregation() {
false,
false,
false,
&pool_,
table_->rows()->stringAllocatorShared());
&pool_);
initializeAggregates(aggregates_, *intermediateRows_, true);
table_.reset();
}
Expand Down
6 changes: 2 additions & 4 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ HashTable<ignoreNullKeys>::HashTable(
bool isJoinBuild,
bool hasProbedFlag,
uint32_t minTableSizeForParallelJoinBuild,
memory::MemoryPool* pool,
const std::shared_ptr<velox::HashStringAllocator>& stringArena)
memory::MemoryPool* pool)
: BaseHashTable(std::move(hashers)),
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
isJoinBuild_(isJoinBuild) {
Expand All @@ -74,8 +73,7 @@ HashTable<ignoreNullKeys>::HashTable(
isJoinBuild,
hasProbedFlag,
hashMode_ != HashMode::kHash,
pool,
stringArena);
pool);
nextOffset_ = rows_->nextOffset();
}

Expand Down
10 changes: 3 additions & 7 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,17 +457,14 @@ class HashTable : public BaseHashTable {
bool isJoinBuild,
bool hasProbedFlag,
uint32_t minTableSizeForParallelJoinBuild,
memory::MemoryPool* pool,
const std::shared_ptr<velox::HashStringAllocator>& stringArena = nullptr);
memory::MemoryPool* pool);

~HashTable() override = default;

static std::unique_ptr<HashTable> createForAggregation(
std::vector<std::unique_ptr<VectorHasher>>&& hashers,
const std::vector<Accumulator>& accumulators,
memory::MemoryPool* pool,
const std::shared_ptr<velox::HashStringAllocator>& stringArena =
nullptr) {
memory::MemoryPool* pool) {
return std::make_unique<HashTable>(
std::move(hashers),
accumulators,
Expand All @@ -476,8 +473,7 @@ class HashTable : public BaseHashTable {
false, // isJoinBuild
false, // hasProbedFlag
0, // minTableSizeForParallelJoinBuild
pool,
stringArena);
pool);
}

static std::unique_ptr<HashTable> createForJoin(
Expand Down
14 changes: 4 additions & 10 deletions velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,12 @@ RowContainer::RowContainer(
bool isJoinBuild,
bool hasProbedFlag,
bool hasNormalizedKeys,
memory::MemoryPool* pool,
std::shared_ptr<HashStringAllocator> stringAllocator)
memory::MemoryPool* pool)
: keyTypes_(keyTypes),
nullableKeys_(nullableKeys),
isJoinBuild_(isJoinBuild),
hasNormalizedKeys_(hasNormalizedKeys),
stringAllocator_(
stringAllocator ? stringAllocator
: std::make_shared<HashStringAllocator>(pool)),
stringAllocator_(std::make_unique<HashStringAllocator>(pool)),
accumulators_(accumulators),
rows_(pool) {
// Compute the layout of the payload row. The row has keys, null flags,
Expand Down Expand Up @@ -936,8 +933,7 @@ void RowContainer::hash(
}

void RowContainer::clear() {
const bool sharedStringAllocator = !stringAllocator_.unique();
if (sharedStringAllocator || usesExternalMemory_) {
if (usesExternalMemory_) {
constexpr int32_t kBatch = 1000;
std::vector<char*> rows(kBatch);
RowContainerIterator iter;
Expand All @@ -948,9 +944,7 @@ void RowContainer::clear() {
hasDuplicateRows_ = false;

rows_.clear();
if (!sharedStringAllocator) {
stringAllocator_->clear();
}
stringAllocator_->clear();
numRows_ = 0;
numRowsWithNormalizedKey_ = 0;
normalizedKeySize_ = originalNormalizedKeySize_;
Expand Down
12 changes: 2 additions & 10 deletions velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,6 @@ class RowContainer {
/// into one word for faster comparison. The bulk allocation is done
/// from 'allocator'. ContainerRowSerde is used for serializing complex
/// type values into the container.
/// 'stringAllocator' allows sharing the variable length data arena with
/// another RowContainer. This is needed for spilling where the same
/// aggregates are used for reading one container and merging into another.
RowContainer(
const std::vector<TypePtr>& keyTypes,
bool nullableKeys,
Expand All @@ -239,8 +236,7 @@ class RowContainer {
bool isJoinBuild,
bool hasProbedFlag,
bool hasNormalizedKey,
memory::MemoryPool* pool,
std::shared_ptr<HashStringAllocator> stringAllocator = nullptr);
memory::MemoryPool* pool);

/// Allocates a new row and initializes possible aggregates to null.
char* newRow();
Expand Down Expand Up @@ -313,10 +309,6 @@ class RowContainer {
return *stringAllocator_;
}

const std::shared_ptr<HashStringAllocator>& stringAllocatorShared() {
return stringAllocator_;
}

/// Returns the number of used rows in 'this'. This is the number of rows a
/// RowContainerIterator would access.
int64_t numRows() const {
Expand Down Expand Up @@ -1330,7 +1322,7 @@ class RowContainer {
const bool isJoinBuild_;
// True if normalized keys are enabled in initial state.
const bool hasNormalizedKeys_;
const std::shared_ptr<HashStringAllocator> stringAllocator_;
const std::unique_ptr<HashStringAllocator> stringAllocator_;

std::vector<bool> columnHasNulls_;

Expand Down
28 changes: 15 additions & 13 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,21 +252,23 @@ void SortBuffer::ensureOutputFits() {
return;
}

if (estimatedOutputRowSize_.has_value()) {
const uint64_t outputBufferSizeToReserve =
estimatedOutputRowSize_.value() * 1.2;
{
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_->maybeReserve(outputBufferSizeToReserve)) {
return;
}
if (!estimatedOutputRowSize_.has_value()) {
return;
}

const uint64_t outputBufferSizeToReserve =
estimatedOutputRowSize_.value() * 1.2;
{
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_->maybeReserve(outputBufferSizeToReserve)) {
return;
}
LOG(WARNING) << "Failed to reserve "
<< succinctBytes(outputBufferSizeToReserve)
<< " for memory pool " << pool_->name()
<< ", usage: " << succinctBytes(pool_->usedBytes())
<< ", reservation: " << succinctBytes(pool_->reservedBytes());
}
LOG(WARNING) << "Failed to reserve "
<< succinctBytes(outputBufferSizeToReserve)
<< " for memory pool " << pool_->name()
<< ", usage: " << succinctBytes(pool_->usedBytes())
<< ", reservation: " << succinctBytes(pool_->reservedBytes());
}

void SortBuffer::updateEstimatedOutputRowSize() {
Expand Down

0 comments on commit 17c3d84

Please sign in to comment.