Skip to content

Commit

Permalink
Use shrink pools by arbitrator for spill test in hash join uni test (#…
Browse files Browse the repository at this point in the history
…8932)

Summary:
- Add flag in OperatorTestBase to configure shared arbitrator to use in unit test
- Switch to use new api to create query pool and add the memory reclaimer to
   work with shared arbitrator
- Hash build switch to use test utility to trigger spilling by calling shared arbitrator's
   shrink pools function
- Improve the shrink pool test utility to ensure the memory pool is always associated
   with its memory manager
- Turn on shared arbitrator in hash join unit test

Eventually, we will make shared arbitrator the default for unit testing.

Pull Request resolved: #8932

Reviewed By: tanjialiang

Differential Revision: D54432889

Pulled By: xiaoxmeng

fbshipit-source-id: 484ccf356a555be837bfa4fa64fb03e6d16c8cb8
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Mar 2, 2024
1 parent a6672eb commit f391c02
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 39 deletions.
14 changes: 14 additions & 0 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,4 +457,18 @@ MemoryArbitrationContext* memoryArbitrationContext() {
bool underMemoryArbitration() {
return memoryArbitrationContext() != nullptr;
}

void testingRunArbitration(uint64_t targetBytes, MemoryManager* manager) {
if (manager == nullptr) {
manager = memory::memoryManager();
}
manager->shrinkPools(targetBytes);
}

void testingRunArbitration(MemoryPool* pool, uint64_t targetBytes) {
pool->enterArbitration();
static_cast<MemoryPoolImpl*>(pool)->testingManager()->shrinkPools(
targetBytes);
pool->leaveArbitration();
}
} // namespace facebook::velox::memory
14 changes: 14 additions & 0 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,18 @@ MemoryArbitrationContext* memoryArbitrationContext();

/// Returns true if the running thread is under memory arbitration or not.
bool underMemoryArbitration();

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' by invoking shrinkPools API. If 'manager' is not set, then it
/// shrinks from the process wide memory manager. If 'targetBytes' is zero, then
/// reclaims all the memory from 'manager' if possible.
class MemoryManager;
void testingRunArbitration(
uint64_t targetBytes = 0,
MemoryManager* manager = nullptr);

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' of 'pool' by invoking its shrinkPools API. If 'targetBytes' is
/// zero, then reclaims all the memory from 'manager' if possible.
void testingRunArbitration(MemoryPool* pool, uint64_t targetBytes = 0);
} // namespace facebook::velox::memory
4 changes: 4 additions & 0 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,10 @@ class MemoryPoolImpl : public MemoryPool {

void testingSetCapacity(int64_t bytes);

MemoryManager* testingManager() const {
return manager_;
}

MemoryAllocator* testingAllocator() const {
return allocator_;
}
Expand Down
6 changes: 4 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ class QueryCtx {

void initPool(const std::string& queryId) {
if (pool_ == nullptr) {
pool_ = memory::deprecatedDefaultMemoryManager().addRootPool(
QueryCtx::generatePoolName(queryId));
pool_ = memory::memoryManager()->addRootPool(
QueryCtx::generatePoolName(queryId),
memory::kMaxMemory,
memory::MemoryReclaimer::create());
}
}

Expand Down
15 changes: 7 additions & 8 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ bool HashBuild::ensureInputFits(RowVectorPtr& input) {
bool HashBuild::reserveMemory(const RowVectorPtr& input) {
VELOX_CHECK(spillEnabled());

Operator::ReclaimableSectionGuard guard(this);
numSpillRows_ = 0;
numSpillBytes_ = 0;

Expand All @@ -468,9 +469,10 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
if (numRows != 0) {
// Test-only spill path.
if (testingTriggerSpill()) {
numSpillRows_ = std::max<int64_t>(1, numRows / 10);
numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow;
return false;
memory::testingRunArbitration(pool());
// NOTE: the memory arbitration should have triggered spilling on this
// hash build operator so we return true to indicate have enough memory.
return true;
}

// We check usage from the parent pool to take peers' allocations into
Expand Down Expand Up @@ -522,11 +524,8 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
incrementBytes * 2,
currentUsage * spillConfig_->spillableReservationGrowthPct / 100);

{
Operator::ReclaimableSectionGuard guard(this);
if (pool()->maybeReserve(targetIncrementBytes)) {
return true;
}
if (pool()->maybeReserve(targetIncrementBytes)) {
return true;
}

LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "velox/exec/HashBuild.h"
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/TableScan.h"
#include "velox/exec/tests/utils/ArbitratorTestUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/Cursor.h"
Expand Down Expand Up @@ -734,6 +733,11 @@ class HashJoinBuilder {

class HashJoinTest : public HiveConnectorTestBase {
protected:
static void SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
HiveConnectorTestBase::SetUpTestCase();
}

HashJoinTest() : HashJoinTest(TestParam(1)) {}

explicit HashJoinTest(const TestParam& param)
Expand Down
17 changes: 0 additions & 17 deletions velox/exec/tests/utils/ArbitratorTestUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,21 +348,4 @@ QueryTestResult runWriteTask(
}
return result;
}

void testingRunArbitration(
memory::MemoryPool* pool,
uint64_t targetBytes,
memory::MemoryManager* manager) {
if (manager == nullptr) {
manager = memory::memoryManager();
}
if (pool != nullptr) {
pool->enterArbitration();
manager->shrinkPools(targetBytes);
pool->leaveArbitration();
} else {
manager->shrinkPools(targetBytes);
}
}

} // namespace facebook::velox::exec::test
11 changes: 0 additions & 11 deletions velox/exec/tests/utils/ArbitratorTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,4 @@ QueryTestResult runWriteTask(
const std::string& kHiveConnectorId,
bool enableSpilling,
const RowVectorPtr& expectedResult = nullptr);

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' by invoking shrinkPools API. If 'manager' is not set, then it
/// shrinks from the process wide memory manager. If 'pool' is provided, the
/// function puts 'pool' in arbitration state before the arbitration to ease
/// test use. If 'targetBytes' is zero, then reclaims all the memory from
/// 'manager' if possible.
void testingRunArbitration(
memory::MemoryPool* pool = nullptr,
uint64_t targetBytes = 0,
memory::MemoryManager* manager = nullptr);
} // namespace facebook::velox::exec::test
10 changes: 10 additions & 0 deletions velox/exec/tests/utils/OperatorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@

DECLARE_bool(velox_memory_leak_check_enabled);
DECLARE_bool(velox_enable_memory_usage_track_in_default_memory_pool);
DEFINE_bool(
velox_testing_enable_arbitration,
false,
"Enable to turn on arbitration for tests by default");

using namespace facebook::velox::common::testutil;

Expand All @@ -58,6 +62,12 @@ void OperatorTestBase::SetUpTestCase() {
exec::SharedArbitrator::registerFactory();
memory::MemoryManagerOptions options;
options.allocatorCapacity = 8L << 30;
if (FLAGS_velox_testing_enable_arbitration) {
options.arbitratorCapacity = 6L << 30;
options.arbitratorKind = "SHARED";
options.checkUsageLeak = true;
options.arbitrationStateCheckCb = memoryArbitrationStateCheck;
}
memory::MemoryManager::testingSetInstance(options);
asyncDataCache_ = cache::AsyncDataCache::create(memoryManager()->allocator());
cache::AsyncDataCache::setInstance(asyncDataCache_.get());
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/tests/utils/OperatorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "velox/vector/tests/utils/VectorMaker.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

DECLARE_bool(velox_testing_enable_arbitration);

namespace facebook::velox::exec::test {
class OperatorTestBase : public testing::Test,
public velox::test::VectorTestBase {
Expand Down
1 change: 1 addition & 0 deletions velox/expression/tests/FuzzerRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ int FuzzerRunner::run(
void FuzzerRunner::runFromGtest(
size_t seed,
const std::unordered_set<std::string>& skipFunctions) {
memory::MemoryManager::testingSetInstance({});
auto signatures = facebook::velox::getFunctionSignatures();
ExpressionFuzzerVerifier(
signatures, seed, getExpressionFuzzerVerifierOptions(skipFunctions))
Expand Down

0 comments on commit f391c02

Please sign in to comment.