Skip to content

Commit

Permalink
feat: Make skewed partition balancer thread-safe and share among scal…
Browse files Browse the repository at this point in the history
…e writer partitioners

Summary:
Make skewed partition balancer thread-safe and share it with all the scale writer local partitioners.
Skewed partition balance is created by the first scale writer local partitioner on constructor.
This will help further reduces the unnecessary written files and make memory usage more efficient as
all the local scale writer partitioners have the consistent partition assignment views

Differential Revision: D66628614
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 7, 2024
1 parent 21f1e21 commit f8465cd
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 49 deletions.
81 changes: 63 additions & 18 deletions velox/common/base/SkewedPartitionBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

#include "velox/common/base/SkewedPartitionBalancer.h"

#include "velox/common/testutil/TestValue.h"

using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::common {
SkewedPartitionRebalancer::SkewedPartitionRebalancer(
uint32_t partitionCount,
Expand All @@ -28,41 +32,75 @@ SkewedPartitionRebalancer::SkewedPartitionRebalancer(
minProcessedBytesRebalanceThresholdPerPartition),
minProcessedBytesRebalanceThreshold_(std::max(
minProcessedBytesRebalanceThreshold,
minProcessedBytesRebalanceThresholdPerPartition_)) {
minProcessedBytesRebalanceThresholdPerPartition_)),
partitionRowCount_(partitionCount_),
partitionAssignments_(partitionCount_) {
VELOX_CHECK_GT(partitionCount_, 0);
VELOX_CHECK_GT(taskCount_, 0);

partitionRowCount_.resize(partitionCount_, 0);
partitionBytes_.resize(partitionCount_, 0);
partitionBytesAtLastRebalance_.resize(partitionCount_, 0);
partitionBytesSinceLastRebalancePerTask_.resize(partitionCount_, 0);
estimatedTaskBytesSinceLastRebalance_.resize(taskCount_, 0);

partitionAssignments_.resize(partitionCount_);

// Assigns one task for each partition intitially.
for (auto partition = 0; partition < partitionCount_; ++partition) {
const uint32_t taskId = partition % taskCount;
partitionAssignments_[partition].emplace_back(taskId);
const uint32_t taskId = partition % taskCount_;
partitionAssignments_[partition].addTaskId(taskId);
}
}

void SkewedPartitionRebalancer::PartitionAssignment::addTaskId(
uint32_t taskId) {
std::unique_lock guard{lock_};
taskIds_.push_back(taskId);
}

uint32_t SkewedPartitionRebalancer::PartitionAssignment::nextTaskId(
uint64_t index) const {
std::shared_lock guard{lock_};
const auto taskIndex = index % taskIds_.size();
return taskIds_[taskIndex];
}

uint32_t SkewedPartitionRebalancer::PartitionAssignment::size() const {
std::shared_lock guard{lock_};
return taskIds_.size();
}

const std::vector<uint32_t>
SkewedPartitionRebalancer::PartitionAssignment::taskIds() const {
std::shared_lock guard{lock_};
return taskIds_;
}

void SkewedPartitionRebalancer::rebalance() {
if (shouldRebalance()) {
rebalancePartitions();
const int64_t processedBytes = processedBytes_.load();
if (shouldRebalance(processedBytes)) {
rebalancePartitions(processedBytes);
}
}

bool SkewedPartitionRebalancer::shouldRebalance() const {
VELOX_CHECK_GE(processedBytes_, processedBytesAtLastRebalance_);
return (processedBytes_ - processedBytesAtLastRebalance_) >=
bool SkewedPartitionRebalancer::shouldRebalance(int64_t processedBytes) const {
return (processedBytes - processedBytesAtLastRebalance_) >=
minProcessedBytesRebalanceThreshold_;
}

void SkewedPartitionRebalancer::rebalancePartitions() {
VELOX_DCHECK(shouldRebalance());
void SkewedPartitionRebalancer::rebalancePartitions(int64_t processedBytes) {
if (rebalancing_.exchange(true)) {
return;
}

SCOPE_EXIT {
VELOX_CHECK(rebalancing_);
rebalancing_ = false;
};
++stats_.numBalanceTriggers;

TestValue::adjust(
"facebook::velox::common::SkewedPartitionRebalancer::rebalancePartitions",
this);

// Updates the processed bytes for each partition.
calculatePartitionProcessedBytes();

Expand All @@ -83,7 +121,7 @@ void SkewedPartitionRebalancer::rebalancePartitions() {
taskMaxPartitions{taskCount_};
for (auto partition = 0; partition < partitionCount_; ++partition) {
auto& taskAssignments = partitionAssignments_[partition];
for (uint32_t taskId : taskAssignments) {
for (uint32_t taskId : taskAssignments.taskIds()) {
auto& taskQueue = taskMaxPartitions[taskId];
taskQueue.addOrUpdate(
partition, partitionBytesSinceLastRebalancePerTask_[partition]);
Expand All @@ -102,7 +140,7 @@ void SkewedPartitionRebalancer::rebalancePartitions() {
}

rebalanceBasedOnTaskSkewness(maxTasks, minTasks, taskMaxPartitions);
processedBytesAtLastRebalance_ = processedBytes_;
processedBytesAtLastRebalance_.store(processedBytes);
}

void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness(
Expand Down Expand Up @@ -168,21 +206,21 @@ bool SkewedPartitionRebalancer::rebalancePartition(
IndexedPriorityQueue<uint32_t, true>& maxTasks,
IndexedPriorityQueue<uint32_t, false>& minTasks) {
auto& taskAssignments = partitionAssignments_[rebalancePartition];
for (auto taskId : taskAssignments) {
for (auto taskId : taskAssignments.taskIds()) {
if (taskId == targetTaskId) {
return false;
}
}

taskAssignments.push_back(targetTaskId);
taskAssignments.addTaskId(targetTaskId);
VELOX_CHECK_GT(partitionAssignments_[rebalancePartition].size(), 1);

const auto newTaskCount = taskAssignments.size();
const auto oldTaskCount = newTaskCount - 1;
// Since a partition is rebalanced from max to min skewed tasks,
// decrease the priority of max taskBucket as well as increase the priority
// of min taskBucket.
for (uint32_t taskId : taskAssignments) {
for (uint32_t taskId : taskAssignments.taskIds()) {
if (taskId == targetTaskId) {
estimatedTaskBytesSinceLastRebalance_[taskId] +=
(partitionBytesSinceLastRebalancePerTask_[rebalancePartition] *
Expand All @@ -208,6 +246,13 @@ void SkewedPartitionRebalancer::calculatePartitionProcessedBytes() {
for (auto partition = 0; partition < partitionCount_; ++partition) {
totalPartitionRowCount += partitionRowCount_[partition];
}
if (totalPartitionRowCount <= 0) {
LOG(ERROR) << "processedBytes " << processedBytes_
<< " processedBytesAtLastRebalance_ "
<< minProcessedBytesRebalanceThreshold_
<< " minProcessedBytesRebalanceThreshold_ "
<< minProcessedBytesRebalanceThreshold_;
}
VELOX_CHECK_GT(totalPartitionRowCount, 0);

for (auto partition = 0; partition < partitionCount_; ++partition) {
Expand Down
45 changes: 34 additions & 11 deletions velox/common/base/SkewedPartitionBalancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SkewedPartitionRebalancerTestHelper;
/// tasks to busy partition measured by processed data size. This is used by
/// local partition to scale table writers for now.
///
/// NOTE: this object is not thread-safe.
/// NOTE: this object is thread-safe.
class SkewedPartitionRebalancer {
public:
/// 'partitionCount' is the number of partitions to process. 'taskCount' is
Expand All @@ -46,14 +46,18 @@ class SkewedPartitionRebalancer {
uint64_t minProcessedBytesRebalanceThresholdPerPartition,
uint64_t minProcessedBytesRebalanceThreshold);

~SkewedPartitionRebalancer() {
VELOX_CHECK(!rebalancing_);
}

/// Invoked to rebalance the partition assignments if applicable.
void rebalance();

/// Gets the assigned task id for a given 'partition'. 'index' is used to
/// choose one of multiple assigned tasks in a round-robin order.
uint32_t getTaskId(uint32_t partition, uint64_t index) const {
const auto& taskList = partitionAssignments_[partition];
return taskList[index % taskList.size()];
auto& taskList = partitionAssignments_[partition];
return taskList.nextTaskId(index);
}

/// Adds the processed partition row count. This is used to estimate the
Expand All @@ -64,7 +68,7 @@ class SkewedPartitionRebalancer {
}

/// Adds the total processed bytes from all the partitions.
void addProcessedBytes(long bytes) {
void addProcessedBytes(int64_t bytes) {
VELOX_CHECK_GT(bytes, 0);
processedBytes_ += bytes;
}
Expand All @@ -89,9 +93,9 @@ class SkewedPartitionRebalancer {
}

private:
bool shouldRebalance() const;
bool shouldRebalance(int64_t processedBytes) const;

void rebalancePartitions();
void rebalancePartitions(int64_t processedBytes);

// Calculates the partition processed data size based on the number of
// processed rows and the averaged row size.
Expand Down Expand Up @@ -138,13 +142,16 @@ class SkewedPartitionRebalancer {
const uint64_t minProcessedBytesRebalanceThreshold_;

// The accumulated number of rows processed by each partition.
std::vector<uint64_t> partitionRowCount_;
std::vector<std::atomic_uint64_t> partitionRowCount_;

// Indicates if the rebalancer is running or not.
std::atomic_bool rebalancing_{false};

// The accumulated number of bytes processed by all the partitions.
uint64_t processedBytes_{0};
std::atomic_int64_t processedBytes_{0};
// 'processedBytes_' at the last rebalance. It is used to calculate the
// processed bytes changes since the last rebalance.
uint64_t processedBytesAtLastRebalance_{0};
std::atomic_int64_t processedBytesAtLastRebalance_{0};
// The accumulated number of bytes processed by each partition.
std::vector<uint64_t> partitionBytes_;
// 'partitionBytes_' at the last rebalance. It is used to calculate the
Expand All @@ -157,8 +164,24 @@ class SkewedPartitionRebalancer {
// The estimated task processed bytes since the last rebalance.
std::vector<uint64_t> estimatedTaskBytesSinceLastRebalance_;

// The assigned task id list for each partition.
std::vector<std::vector<uint32_t>> partitionAssignments_;
// The assigned task id assignment for a partition.
class PartitionAssignment {
public:
PartitionAssignment() = default;

void addTaskId(uint32_t taskId);

uint32_t nextTaskId(uint64_t index) const;

const std::vector<uint32_t> taskIds() const;

uint32_t size() const;

private:
mutable folly::SharedMutex lock_;
std::vector<uint32_t> taskIds_;
};
std::vector<PartitionAssignment> partitionAssignments_;

Stats stats_;

Expand Down
89 changes: 85 additions & 4 deletions velox/common/base/tests/SkewedPartitionBalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@
#include <gtest/gtest.h>

#include "folly/Random.h"
#include "folly/experimental/EventCount.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/testutil/TestValue.h"

using namespace facebook::velox::common::testutil;

namespace facebook::velox::common::test {
class SkewedPartitionRebalancerTestHelper {
public:
static void SetUpTestCase() {
TestValue::enable();
}

explicit SkewedPartitionRebalancerTestHelper(
SkewedPartitionRebalancer* balancer)
: balancer_(balancer) {
Expand All @@ -34,8 +42,8 @@ class SkewedPartitionRebalancerTestHelper {
uint32_t partition,
const std::set<uint32_t>& expectedAssignedTasks) const {
const std::set<uint32_t> assignedTasks(
balancer_->partitionAssignments_[partition].begin(),
balancer_->partitionAssignments_[partition].end());
balancer_->partitionAssignments_[partition].taskIds().begin(),
balancer_->partitionAssignments_[partition].taskIds().end());
ASSERT_EQ(assignedTasks, expectedAssignedTasks)
<< "\nExpected: " << folly::join(",", expectedAssignedTasks)
<< "\nActual: " << folly::join(",", assignedTasks);
Expand All @@ -59,7 +67,8 @@ class SkewedPartitionRebalancerTestHelper {
}

bool shouldRebalance() const {
return balancer_->shouldRebalance();
const int64_t processedBytes = balancer_->processedBytes_;
return balancer_->shouldRebalance(processedBytes);
}

private:
Expand Down Expand Up @@ -316,6 +325,35 @@ TEST_F(SkewedPartitionRebalancerTest, skewTasksCondition) {
}
}

DEBUG_ONLY_TEST_F(SkewedPartitionRebalancerTest, serializedRebalanceExecution) {
auto balancer = createBalancer(32, 4, 128, 256);
SkewedPartitionRebalancerTestHelper helper(balancer.get());
folly::EventCount rebalanceWait;
std::atomic_bool rebalanceWaitFlag{true};
SCOPED_TESTVALUE_SET(
"facebook::velox::common::SkewedPartitionRebalancer::rebalancePartitions",
std::function<void(SkewedPartitionRebalancer*)>(
[&](SkewedPartitionRebalancer*) {
rebalanceWait.await([&] { return !rebalanceWaitFlag.load(); });
}));

for (int partition = 0; partition < helper.taskCount(); ++partition) {
balancer->addProcessedBytes(1000);
balancer->addPartitionRowCount(partition, partition == 0 ? 20 : 1);
}

std::thread rebalanceThread([&]() { balancer->rebalance(); });

balancer->rebalance();

rebalanceWaitFlag = false;
rebalanceWait.notifyAll();

rebalanceThread.join();
ASSERT_EQ(balancer->stats().numBalanceTriggers, 1);
ASSERT_GT(balancer->stats().numScaledPartitions, 0);
}

TEST_F(SkewedPartitionRebalancerTest, error) {
auto balancer = createBalancer(32, 4, 128, 256);
VELOX_ASSERT_THROW(balancer->addProcessedBytes(0), "");
Expand All @@ -325,7 +363,7 @@ TEST_F(SkewedPartitionRebalancerTest, error) {
VELOX_ASSERT_THROW(createBalancer(0, 4, 0, 0), "");
}

TEST_F(SkewedPartitionRebalancerTest, fuzz) {
TEST_F(SkewedPartitionRebalancerTest, singleThreadFuzz) {
std::mt19937 rng{100};
for (int taskCount = 1; taskCount <= 10; ++taskCount) {
const uint64_t rebalanceThreshold = folly::Random::rand32(128, rng);
Expand Down Expand Up @@ -355,4 +393,47 @@ TEST_F(SkewedPartitionRebalancerTest, fuzz) {
}
}
}

TEST_F(SkewedPartitionRebalancerTest, concurrentFuzz) {
for (int numProducers = 1; numProducers <= 10; ++numProducers) {
std::mt19937 rng{100};
const uint64_t rebalanceThreshold = folly::Random::rand32(128, rng);
const uint64_t perPartitionRebalanceThreshold =
folly::Random::rand32(rebalanceThreshold / 2, rng);
for (int taskCount = 1; taskCount <= 10; ++taskCount) {
auto balancer = createBalancer(
32, taskCount, perPartitionRebalanceThreshold, rebalanceThreshold);
SkewedPartitionRebalancerTestHelper helper(balancer.get());
std::vector<std::thread> threads;
for (int producer = 0; producer < numProducers; ++producer) {
threads.emplace_back([&]() {
std::mt19937 localRng{200};
for (int iteration = 0; iteration < 1'000; ++iteration) {
SCOPED_TRACE(fmt::format(
"taskCount {}, iteration {}", taskCount, iteration));
const uint64_t processedBytes =
1 + folly::Random::rand32(512, localRng);
balancer->addProcessedBytes(processedBytes);
const auto numPartitons = folly::Random::rand32(32, localRng);
for (auto i = 0; i < numPartitons; ++i) {
const auto partition = folly::Random::rand32(32, localRng);
const auto numRows = 1 + folly::Random::rand32(32, localRng);
balancer->addPartitionRowCount(partition, numRows);
}
balancer->rebalance();
for (int round = 0; round < 10; ++round) {
for (int partition = 0; partition < helper.partitionCount();
++partition) {
ASSERT_LT(balancer->getTaskId(partition, round), taskCount);
}
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
}
}
}
} // namespace facebook::velox::common::test
Loading

0 comments on commit f8465cd

Please sign in to comment.