Skip to content

Commit

Permalink
Enable arbitrator in OperatorTestBase by default (facebookincubator#9141
Browse files Browse the repository at this point in the history
)

Summary:
Enable arbitrator by default for all test suites that extend from OperatorTestBase.

Pull Request resolved: facebookincubator#9141

Reviewed By: xiaoxmeng

Differential Revision: D55044372

Pulled By: tanjialiang

fbshipit-source-id: 6b2f1b752dd91ba7f976935696536bd02924b405
  • Loading branch information
tanjialiang authored and Joe-Abraham committed Jun 7, 2024
1 parent d815682 commit 0a6a018
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 85 deletions.
8 changes: 4 additions & 4 deletions velox/common/memory/tests/MemoryCapExceededTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ TEST_P(MemoryCapExceededTest, singleDriver) {
.orderBy({"c0"}, false)
.planNode();
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
queryCtx->testingOverrideMemoryPool(
memory::memoryManager()->addRootPool(queryCtx->queryId(), kMaxBytes));
queryCtx->testingOverrideMemoryPool(memory::memoryManager()->addRootPool(
queryCtx->queryId(), kMaxBytes, exec::MemoryReclaimer::create()));
CursorParameters params;
params.planNode = plan;
params.queryCtx = queryCtx;
Expand Down Expand Up @@ -154,8 +154,8 @@ TEST_P(MemoryCapExceededTest, multipleDrivers) {
.singleAggregation({"c0"}, {"sum(c1)"})
.planNode();
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
queryCtx->testingOverrideMemoryPool(
memory::memoryManager()->addRootPool(queryCtx->queryId(), kMaxBytes));
queryCtx->testingOverrideMemoryPool(memory::memoryManager()->addRootPool(
queryCtx->queryId(), kMaxBytes, exec::MemoryReclaimer::create()));

const int32_t numDrivers = 10;
CursorParameters params;
Expand Down
39 changes: 19 additions & 20 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ TEST_F(HiveDataSinkTest, abort) {
}

TEST_F(HiveDataSinkTest, memoryReclaim) {
const int numBatches = 20;
auto vectors = createVectors(500, 20);
const int numBatches = 200;
auto vectors = createVectors(500, 200);

struct {
dwio::common::FileFormat format;
Expand All @@ -587,7 +587,7 @@ TEST_F(HiveDataSinkTest, memoryReclaim) {
expectedWriterReclaimed);
}
} testSettings[] = {
//{dwio::common::FileFormat::DWRF, true, true, 1 << 30, true, true},
// {dwio::common::FileFormat::DWRF, true, true, 1 << 30, true, true},
{dwio::common::FileFormat::DWRF, true, true, 1, true, true},
{dwio::common::FileFormat::DWRF, true, false, 1 << 30, false, false},
{dwio::common::FileFormat::DWRF, true, false, 1, false, false},
Expand Down Expand Up @@ -670,30 +670,29 @@ TEST_F(HiveDataSinkTest, memoryReclaim) {
for (int i = 0; i < numBatches; ++i) {
dataSink->appendData(vectors[i]);
}
memory::MemoryReclaimer::Stats stats;
memory::MemoryArbitrator::Stats oldStats =
memory::memoryManager()->arbitrator()->stats();
uint64_t reclaimableBytes{0};
if (testData.expectedWriterReclaimed) {
reclaimableBytes = root_->reclaimableBytes().value();
ASSERT_GT(reclaimableBytes, 0);
ASSERT_GT(root_->reclaim(256L << 20, 0, stats), 0);
ASSERT_GT(stats.reclaimExecTimeUs, 0);
ASSERT_GT(stats.reclaimedBytes, 0);
memory::testingRunArbitration();
memory::MemoryArbitrator::Stats curStats =
memory::memoryManager()->arbitrator()->stats();
ASSERT_GT(curStats.reclaimTimeUs - oldStats.reclaimTimeUs, 0);
ASSERT_GT(curStats.numReclaimedBytes - oldStats.numReclaimedBytes, 0);
// We expect dwrf writer set numNonReclaimableAttempts counter.
ASSERT_LE(stats.numNonReclaimableAttempts, 1);
ASSERT_LE(
curStats.numNonReclaimableAttempts -
oldStats.numNonReclaimableAttempts,
1);
} else {
ASSERT_FALSE(root_->reclaimableBytes().has_value());
ASSERT_EQ(root_->reclaim(256L << 20, 0, stats), 0);
ASSERT_EQ(stats.reclaimExecTimeUs, 0);
ASSERT_EQ(stats.reclaimedBytes, 0);
if (testData.expectedWriterReclaimEnabled) {
if (testData.sortWriter) {
ASSERT_GE(stats.numNonReclaimableAttempts, 1);
} else {
ASSERT_EQ(stats.numNonReclaimableAttempts, 1);
}
} else {
ASSERT_EQ(stats.numNonReclaimableAttempts, 0);
}
memory::testingRunArbitration();
memory::MemoryArbitrator::Stats curStats =
memory::memoryManager()->arbitrator()->stats();
ASSERT_EQ(curStats.reclaimTimeUs - oldStats.reclaimTimeUs, 0);
ASSERT_EQ(curStats.numReclaimedBytes - oldStats.numReclaimedBytes, 0);
}
const auto partitions = dataSink->close();
if (testData.sortWriter && testData.expectedWriterReclaimed) {
Expand Down
6 changes: 0 additions & 6 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,10 @@ void checkSpillStats(PlanNodeStats& stats, bool expectedSpill) {
class AggregationTest : public OperatorTestBase {
protected:
static void SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
OperatorTestBase::SetUpTestCase();
TestValue::enable();
}

static void TearDownTestCase() {
FLAGS_velox_testing_enable_arbitration = false;
OperatorTestBase::TearDownTestCase();
}

void SetUp() override {
OperatorTestBase::SetUp();
filesystems::registerLocalFileSystem();
Expand Down
1 change: 0 additions & 1 deletion velox/exec/tests/GroupedExecutionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class GroupedExecutionTest : public virtual HiveConnectorTestBase {
}

static void SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
HiveConnectorTestBase::SetUpTestCase();
}

Expand Down
10 changes: 0 additions & 10 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,16 +749,6 @@ class HashJoinTest : public HiveConnectorTestBase {
explicit HashJoinTest(const TestParam& param)
: numDrivers_(param.numDrivers) {}

static void SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
OperatorTestBase::SetUpTestCase();
}

static void TearDownTestCase() {
FLAGS_velox_testing_enable_arbitration = false;
OperatorTestBase::TearDownTestCase();
}

void SetUp() override {
HiveConnectorTestBase::SetUp();

Expand Down
10 changes: 0 additions & 10 deletions velox/exec/tests/RowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,6 @@ namespace facebook::velox::exec::test {

class RowNumberTest : public OperatorTestBase {
protected:
static void SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
OperatorTestBase::SetUpTestCase();
}

static void TearDownTestCase() {
FLAGS_velox_testing_enable_arbitration = false;
OperatorTestBase::TearDownTestCase();
}

RowNumberTest() {
filesystems::registerLocalFileSystem();
rowType_ = ROW(
Expand Down
10 changes: 0 additions & 10 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,6 @@ class TestBadMemoryTranslator : public exec::Operator::PlanNodeTranslator {
} // namespace
class TaskTest : public HiveConnectorTestBase {
protected:
static void SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
OperatorTestBase::SetUpTestCase();
}

static void TearDownTestCase() {
FLAGS_velox_testing_enable_arbitration = false;
OperatorTestBase::TearDownTestCase();
}

static std::pair<std::shared_ptr<exec::Task>, std::vector<RowVectorPtr>>
executeSingleThreaded(
core::PlanFragment plan,
Expand Down
14 changes: 4 additions & 10 deletions velox/exec/tests/utils/OperatorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@

DECLARE_bool(velox_memory_leak_check_enabled);
DECLARE_bool(velox_enable_memory_usage_track_in_default_memory_pool);
DEFINE_bool(
velox_testing_enable_arbitration,
false,
"Enable to turn on arbitration for tests by default");

using namespace facebook::velox::common::testutil;
using namespace facebook::velox::memory;
Expand All @@ -61,12 +57,10 @@ void OperatorTestBase::SetUpTestCase() {
memory::SharedArbitrator::registerFactory();
MemoryManagerOptions options;
options.allocatorCapacity = 8L << 30;
if (FLAGS_velox_testing_enable_arbitration) {
options.arbitratorCapacity = 6L << 30;
options.arbitratorKind = "SHARED";
options.checkUsageLeak = true;
options.arbitrationStateCheckCb = memoryArbitrationStateCheck;
}
options.arbitratorCapacity = 6L << 30;
options.arbitratorKind = "SHARED";
options.checkUsageLeak = true;
options.arbitrationStateCheckCb = memoryArbitrationStateCheck;
memory::MemoryManager::testingSetInstance(options);
asyncDataCache_ =
cache::AsyncDataCache::create(memory::memoryManager()->allocator());
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/tests/utils/OperatorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include "velox/vector/tests/utils/VectorMaker.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

DECLARE_bool(velox_testing_enable_arbitration);

namespace facebook::velox::exec::test {
class OperatorTestBase : public testing::Test,
public velox::test::VectorTestBase {
Expand Down
10 changes: 0 additions & 10 deletions velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,6 @@ void AggregationTestBase::TearDown() {
OperatorTestBase::TearDown();
}

void AggregationTestBase::SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
OperatorTestBase::SetUpTestCase();
}

void AggregationTestBase::TearDownTestCase() {
FLAGS_velox_testing_enable_arbitration = false;
OperatorTestBase::TearDownTestCase();
}

void AggregationTestBase::testAggregations(
const std::vector<RowVectorPtr>& data,
const std::vector<std::string>& groupingKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class AggregationTestBase : public exec::test::OperatorTestBase {
protected:
void SetUp() override;
void TearDown() override;
static void SetUpTestCase();
static void TearDownTestCase();

std::vector<RowVectorPtr>
makeVectors(const RowTypePtr& rowType, vector_size_t size, int numVectors);
Expand Down

0 comments on commit 0a6a018

Please sign in to comment.