Skip to content

Commit

Permalink
Reduce indices buffer memory usage in LocalPartition (#7939)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #7939

In the current code, if we partition N rows with M columns to K partitions, we allocate a buffer of N indices for each partitioned output M columns, so the total memory for these indices is $`O(NMK)`$.  In this change we reduce this to $`O(NM)`$ by allocating the indices for each partition with the number of rows in that partition only.

Reviewed By: xiaoxmeng

Differential Revision: D51984975

fbshipit-source-id: 2c17cdba0e7bbe09a41d296030a29482f7a68b2c
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 8, 2023
1 parent 93fe491 commit 4f95700
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 9 deletions.
20 changes: 11 additions & 9 deletions velox/exec/LocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,12 @@ LocalPartition::LocalPartition(

namespace {
std::vector<BufferPtr> allocateIndexBuffers(
int numBuffers,
vector_size_t size,
const std::vector<vector_size_t>& sizes,
memory::MemoryPool* pool) {
std::vector<BufferPtr> indexBuffers;
indexBuffers.reserve(numBuffers);
for (auto i = 0; i < numBuffers; i++) {
indexBuffers.emplace_back(allocateIndices(size, pool));
indexBuffers.reserve(sizes.size());
for (auto size : sizes) {
indexBuffers.push_back(allocateIndices(size, pool));
}
return indexBuffers;
}
Expand Down Expand Up @@ -330,11 +329,15 @@ void LocalPartition::addInput(RowVectorPtr input) {
return;
}

auto numInput = input->size();
auto indexBuffers = allocateIndexBuffers(numPartitions_, numInput, pool());
const auto numInput = input->size();
std::vector<vector_size_t> maxIndex(numPartitions_, 0);
for (auto i = 0; i < numInput; ++i) {
++maxIndex[partitions_[i]];
}
auto indexBuffers = allocateIndexBuffers(maxIndex, pool());
auto rawIndices = getRawIndices(indexBuffers);

std::vector<vector_size_t> maxIndex(numPartitions_, 0);
std::fill(maxIndex.begin(), maxIndex.end(), 0);
for (auto i = 0; i < numInput; ++i) {
auto partition = partitions_[i];
rawIndices[partition][maxIndex[partition]] = i;
Expand All @@ -347,7 +350,6 @@ void LocalPartition::addInput(RowVectorPtr input) {
// Do not enqueue empty partitions.
continue;
}
indexBuffers[i]->setSize(partitionSize * sizeof(vector_size_t));
auto partitionData =
wrapChildren(input, partitionSize, std::move(indexBuffers[i]));

Expand Down
49 changes: 49 additions & 0 deletions velox/exec/tests/LocalPartitionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,55 @@ TEST_F(LocalPartitionTest, maxBufferSizePartition) {
verifyExchangeSourceOperatorStats(task, 2100, 42);
}

TEST_F(LocalPartitionTest, indicesBufferCapacity) {
std::vector<RowVectorPtr> vectors;
for (auto i = 0; i < 21; i++) {
vectors.emplace_back(makeRowVector({makeFlatVector<int32_t>(
100, [i](auto row) { return -71 + i * 10 + row; })}));
}
auto filePaths = writeToFiles(vectors);
auto rowType = asRowType(vectors[0]->type());
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
std::vector<core::PlanNodeId> scanNodeIds;
auto scanNode = [&]() {
auto node = PlanBuilder(planNodeIdGenerator).tableScan(rowType).planNode();
scanNodeIds.push_back(node->id());
return node;
};
CursorParameters params;
params.planNode = PlanBuilder(planNodeIdGenerator)
.localPartition(
{"c0"},
{
scanNode(),
scanNode(),
scanNode(),
})
.planNode();
params.copyResult = false;
params.maxDrivers = 2;
TaskCursor cursor(params);
for (auto i = 0; i < filePaths.size(); ++i) {
auto id = scanNodeIds[i % 3];
cursor.task()->addSplit(
id, Split(makeHiveConnectorSplit(filePaths[i]->path)));
cursor.task()->noMoreSplits(id);
}
int numRows = 0;
int capacity = 0;
while (cursor.moveNext()) {
auto* batch = cursor.current()->as<RowVector>();
ASSERT_EQ(batch->childrenSize(), 1);
auto& column = batch->childAt(0);
ASSERT_EQ(column->encoding(), VectorEncoding::Simple::DICTIONARY);
numRows += batch->size();
capacity += column->wrapInfo()->capacity();
}
ASSERT_EQ(numRows, 2100);
// MemoryPool::preferredSize is capped at 1.5 times the requested size.
ASSERT_LE(capacity, 1.5 * numRows * sizeof(vector_size_t));
}

TEST_F(LocalPartitionTest, blockingOnLocalExchangeQueue) {
auto localExchangeBufferSize = "1024";
auto baseVector = vectorMaker_.flatVector<int64_t>(
Expand Down

0 comments on commit 4f95700

Please sign in to comment.