Skip to content

Commit

Permalink
Reserve memory for SortWindowBuild sort process (facebookincubator#11370
Browse files Browse the repository at this point in the history
)

Summary:
Reserve memory for `sortedRows_`,  `partitionStartRows_` and prefix sort required buffer to avoid OOM.

Pull Request resolved: facebookincubator#11370

Reviewed By: kagamiori

Differential Revision: D65119230

Pulled By: xiaoxmeng

fbshipit-source-id: df8d96df0f9f284ca790ea9d60ad40dae6e4b653
  • Loading branch information
jinchengchenghh authored and facebook-github-bot committed Oct 29, 2024
1 parent e67f11b commit d471912
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 2 deletions.
45 changes: 44 additions & 1 deletion velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ SortWindowBuild::SortWindowBuild(
pool_(pool),
prefixSortConfig_(prefixSortConfig),
spillStats_(spillStats),
sortedRows_(0, memory::StlAllocator<char*>(*pool)) {
sortedRows_(0, memory::StlAllocator<char*>(*pool)),
partitionStartRows_(0, memory::StlAllocator<char*>(*pool)) {
VELOX_CHECK_NOT_NULL(pool_);
allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size());
allKeyInfo_.insert(
Expand Down Expand Up @@ -139,6 +140,43 @@ void SortWindowBuild::ensureInputFits(const RowVectorPtr& input) {
<< succinctBytes(data_->pool()->reservedBytes());
}

void SortWindowBuild::ensureSortFits() {
// Check if spilling is enabled or not.
if (spillConfig_ == nullptr) {
return;
}

// Test-only spill path.
if (testingTriggerSpill(pool_->name())) {
spill();
return;
}

if (spiller_ != nullptr) {
return;
}

// The memory for std::vector sorted rows, `partitionStartRows_` and prefix
// sort required buffer.
uint64_t sortBufferToReserve =
numRows_ * (sizeof(char*) + sizeof(vector_size_t)) +
PrefixSort::maxRequiredBytes(
pool_, data_.get(), compareFlags_, prefixSortConfig_);
{
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_->maybeReserve(sortBufferToReserve)) {
return;
}
}

LOG(WARNING) << fmt::format(
"Failed to reserve {} for sort window build from memory pool {}, usage: {}, reservation: {}",
succinctBytes(sortBufferToReserve),
pool_->name(),
succinctBytes(pool_->usedBytes()),
succinctBytes(pool_->reservedBytes()));
}

void SortWindowBuild::setupSpiller() {
VELOX_CHECK_NULL(spiller_);

Expand Down Expand Up @@ -230,6 +268,8 @@ void SortWindowBuild::noMoreInput() {
return;
}

ensureSortFits();

if (spiller_ != nullptr) {
// Spill remaining data to avoid running out of memory while sort-merging
// spilled data.
Expand All @@ -249,6 +289,9 @@ void SortWindowBuild::noMoreInput() {
// the partition. This will order the rows for getOutput().
sortPartitions();
}

// Releases the unused memory reservation after procesing input.
pool_->release();
}

void SortWindowBuild::loadNextPartitionFromSpill() {
Expand Down
9 changes: 8 additions & 1 deletion velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class SortWindowBuild : public WindowBuild {
tsan_atomic<bool>* nonReclaimableSection,
folly::Synchronized<common::SpillStats>* spillStats);

~SortWindowBuild() {
pool_->release();
}

bool needsInput() override {
// No partitions are available yet, so can consume input rows.
return partitionStartRows_.size() == 0;
Expand All @@ -60,6 +64,8 @@ class SortWindowBuild : public WindowBuild {
private:
void ensureInputFits(const RowVectorPtr& input);

void ensureSortFits();

void setupSpiller();

// Main sorting function loop done after all input rows are received
Expand Down Expand Up @@ -109,7 +115,8 @@ class SortWindowBuild : public WindowBuild {
// This is a vector that gives the index of the start row
// (in sortedRows_) of each partition in the RowContainer data_.
// This auxiliary structure helps demarcate partitions.
std::vector<vector_size_t> partitionStartRows_;
std::vector<vector_size_t, memory::StlAllocator<vector_size_t>>
partitionStartRows_;

// Current partition being output. Used to construct WindowPartitions
// during resetPartition.
Expand Down
112 changes: 112 additions & 0 deletions velox/exec/tests/WindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/common/file/FileSystems.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/RowsStreamingWindowBuild.h"
#include "velox/exec/SortWindowBuild.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/OperatorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
Expand All @@ -37,6 +38,31 @@ class WindowTest : public OperatorTestBase {
window::prestosql::registerAllWindowFunctions();
filesystems::registerLocalFileSystem();
}

common::SpillConfig getSpillConfig(const std::string& spillDir) const {
return common::SpillConfig(
[spillDir]() -> const std::string& { return spillDir; },
[&](uint64_t) {},
"0.0.0",
0,
0,
1 << 20,
executor_.get(),
5,
10,
0,
0,
0,
0,
0,
"none");
}

const std::shared_ptr<folly::Executor> executor_{
std::make_shared<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency())};

tsan_atomic<bool> nonReclaimableSection_{false};
};

TEST_F(WindowTest, spill) {
Expand Down Expand Up @@ -567,5 +593,91 @@ DEBUG_ONLY_TEST_F(WindowTest, frameColumnNullCheck) {
AssertQueryBuilder(makePlan(inputNoThrow)).copyResults(pool()));
}

DEBUG_ONLY_TEST_F(WindowTest, reserveMemorySort) {
struct {
bool usePrefixSort;
bool spillEnabled;
} testSettings[] = {{false, true}, {true, false}, {true, true}};

const vector_size_t size = 1'000;
auto prefixSortData = makeRowVector(
{"d", "p", "s"},
{
// Payload.
makeFlatVector<int64_t>(size, [](auto row) { return row; }),
// Partition key.
makeFlatVector<int16_t>(size, [](auto row) { return row % 11; }),
// Sorting key.
makeFlatVector<int32_t>(size, [](auto row) { return row; }),
});
auto prefixSortPlan = std::dynamic_pointer_cast<const core::WindowNode>(
PlanBuilder()
.values(split(prefixSortData, 10))
.window({"row_number() over (partition by p order by s)"})
.planNode());

const std::vector<std::string> fruits = {
"apple", "banana", "pear", "grapes", "mango", "grapefruit"};
auto nonPrefixSortData = makeRowVector(
{"d", "p", "s"},
{
// Payload.
makeFlatVector<int64_t>(size, [](auto row) { return row; }),
// Partition key.
makeFlatVector<int16_t>(size, [](auto row) { return row % 11; }),
// Sorting key.
makeFlatVector<StringView>(
size,
[&fruits](auto row) {
return StringView(fruits[row % fruits.size()]);
}),
});
auto nonPrefixSortPlan = std::dynamic_pointer_cast<const core::WindowNode>(
PlanBuilder()
.values(split(nonPrefixSortData, 10))
.window({"row_number() over (partition by p order by s)"})
.planNode());

for (const auto [usePrefixSort, spillEnabled] : testSettings) {
SCOPED_TRACE(fmt::format(
"usePrefixSort: {}, spillEnabled: {}, ", usePrefixSort, spillEnabled));
auto spillDirectory = exec::test::TempDirectoryPath::create();
auto spillConfig = getSpillConfig(spillDirectory->getPath());
folly::Synchronized<common::SpillStats> spillStats;
const auto plan = usePrefixSort ? prefixSortPlan : nonPrefixSortPlan;
velox::common::PrefixSortConfig prefixSortConfig =
velox::common::PrefixSortConfig{
std::numeric_limits<int32_t>::max(), 130};
auto sortWindowBuild = std::make_unique<SortWindowBuild>(
plan,
pool_.get(),
std::move(prefixSortConfig),
spillEnabled ? &spillConfig : nullptr,
&nonReclaimableSection_,
&spillStats);

TestScopedSpillInjection scopedSpillInjection(0);
const auto data = usePrefixSort ? prefixSortData : nonPrefixSortData;
sortWindowBuild->addInput(data);

std::atomic_bool hasReserveMemory = false;
// Reserve memory for sort.
SCOPED_TESTVALUE_SET(
"facebook::velox::common::memory::MemoryPoolImpl::maybeReserve",
std::function<void(memory::MemoryPoolImpl*)>(
([&](memory::MemoryPoolImpl* pool) {
hasReserveMemory.store(true);
})));

sortWindowBuild->noMoreInput();
if (spillEnabled) {
// Reserve memory for sort.
ASSERT_TRUE(hasReserveMemory);
} else {
ASSERT_FALSE(hasReserveMemory);
}
}
}

} // namespace
} // namespace facebook::velox::exec

0 comments on commit d471912

Please sign in to comment.