Skip to content

Commit

Permalink
Merge branch 'facebookincubator:main' into move_arrow_lib
Browse files Browse the repository at this point in the history
  • Loading branch information
wypb authored Dec 10, 2024
2 parents b628815 + d7e7cf7 commit c19fa13
Show file tree
Hide file tree
Showing 25 changed files with 480 additions and 202 deletions.
1 change: 1 addition & 0 deletions .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ jobs:
./velox_window_fuzzer_test \
--seed ${RANDOM} \
--duration_sec $DURATION \
--batch_size=50 \
--minloglevel=0 \
--stderrthreshold=2 \
--log_dir=/tmp/window_fuzzer_repro/logs \
Expand Down
126 changes: 82 additions & 44 deletions velox/common/base/SkewedPartitionBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,96 @@

#include "velox/common/base/SkewedPartitionBalancer.h"

#include "velox/common/testutil/TestValue.h"

using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::common {
SkewedPartitionRebalancer::SkewedPartitionRebalancer(
uint32_t partitionCount,
uint32_t taskCount,
uint32_t numPartitions,
uint32_t numTasks,
uint64_t minProcessedBytesRebalanceThresholdPerPartition,
uint64_t minProcessedBytesRebalanceThreshold)
: partitionCount_(partitionCount),
taskCount_(taskCount),
: numPartitions_(numPartitions),
numTasks_(numTasks),
minProcessedBytesRebalanceThresholdPerPartition_(
minProcessedBytesRebalanceThresholdPerPartition),
minProcessedBytesRebalanceThreshold_(std::max(
minProcessedBytesRebalanceThreshold,
minProcessedBytesRebalanceThresholdPerPartition_)) {
VELOX_CHECK_GT(partitionCount_, 0);
VELOX_CHECK_GT(taskCount_, 0);
minProcessedBytesRebalanceThresholdPerPartition_)),
partitionRowCount_(numPartitions_),
partitionAssignments_(numPartitions_) {
VELOX_CHECK_GT(numPartitions_, 0);
VELOX_CHECK_GT(numTasks_, 0);

partitionRowCount_.resize(partitionCount_, 0);
partitionBytes_.resize(partitionCount_, 0);
partitionBytesAtLastRebalance_.resize(partitionCount_, 0);
partitionBytesSinceLastRebalancePerTask_.resize(partitionCount_, 0);
estimatedTaskBytesSinceLastRebalance_.resize(taskCount_, 0);

partitionAssignments_.resize(partitionCount_);
partitionBytes_.resize(numPartitions_, 0);
partitionBytesAtLastRebalance_.resize(numPartitions_, 0);
partitionBytesSinceLastRebalancePerTask_.resize(numPartitions_, 0);
estimatedTaskBytesSinceLastRebalance_.resize(numTasks_, 0);

// Assigns one task for each partition intitially.
for (auto partition = 0; partition < partitionCount_; ++partition) {
const uint32_t taskId = partition % taskCount;
partitionAssignments_[partition].emplace_back(taskId);
for (auto partition = 0; partition < numPartitions_; ++partition) {
const uint32_t taskId = partition % numTasks_;
partitionAssignments_[partition].addTaskId(taskId);
}
}

void SkewedPartitionRebalancer::PartitionAssignment::addTaskId(
uint32_t taskId) {
std::unique_lock guard{lock_};
taskIds_.push_back(taskId);
}

uint32_t SkewedPartitionRebalancer::PartitionAssignment::nextTaskId(
uint64_t index) const {
std::shared_lock guard{lock_};
const auto taskIndex = index % taskIds_.size();
return taskIds_[taskIndex];
}

uint32_t SkewedPartitionRebalancer::PartitionAssignment::size() const {
std::shared_lock guard{lock_};
return taskIds_.size();
}

const std::vector<uint32_t>&
SkewedPartitionRebalancer::PartitionAssignment::taskIds() const {
std::shared_lock guard{lock_};
return taskIds_;
}

void SkewedPartitionRebalancer::rebalance() {
if (shouldRebalance()) {
rebalancePartitions();
const int64_t processedBytes = processedBytes_.load();
if (shouldRebalance(processedBytes)) {
rebalancePartitions(processedBytes);
}
}

bool SkewedPartitionRebalancer::shouldRebalance() const {
VELOX_CHECK_GE(processedBytes_, processedBytesAtLastRebalance_);
return (processedBytes_ - processedBytesAtLastRebalance_) >=
bool SkewedPartitionRebalancer::shouldRebalance(int64_t processedBytes) const {
return (processedBytes - processedBytesAtLastRebalance_) >=
minProcessedBytesRebalanceThreshold_;
}

void SkewedPartitionRebalancer::rebalancePartitions() {
VELOX_DCHECK(shouldRebalance());
++stats_.numBalanceTriggers;
void SkewedPartitionRebalancer::rebalancePartitions(int64_t processedBytes) {
if (rebalancing_.exchange(true)) {
return;
}

SCOPE_EXIT {
VELOX_CHECK(rebalancing_);
rebalancing_ = false;
};
++numBalanceTriggers_;

TestValue::adjust(
"facebook::velox::common::SkewedPartitionRebalancer::rebalancePartitions",
this);

// Updates the processed bytes for each partition.
calculatePartitionProcessedBytes();

// Updates 'partitionBytesSinceLastRebalancePerTask_'.
for (auto partition = 0; partition < partitionCount_; ++partition) {
for (auto partition = 0; partition < numPartitions_; ++partition) {
const auto totalAssignedTasks = partitionAssignments_[partition].size();
const auto partitionBytes = partitionBytes_[partition];
partitionBytesSinceLastRebalancePerTask_[partition] =
Expand All @@ -80,10 +118,10 @@ void SkewedPartitionRebalancer::rebalancePartitions() {
// max processed bytes since last rebalance at the top of the queue for
// rebalance later.
std::vector<IndexedPriorityQueue<uint32_t, /*MaxQueue=*/true>>
taskMaxPartitions{taskCount_};
for (auto partition = 0; partition < partitionCount_; ++partition) {
taskMaxPartitions{numTasks_};
for (auto partition = 0; partition < numPartitions_; ++partition) {
auto& taskAssignments = partitionAssignments_[partition];
for (uint32_t taskId : taskAssignments) {
for (uint32_t taskId : taskAssignments.taskIds()) {
auto& taskQueue = taskMaxPartitions[taskId];
taskQueue.addOrUpdate(
partition, partitionBytesSinceLastRebalancePerTask_[partition]);
Expand All @@ -94,15 +132,15 @@ void SkewedPartitionRebalancer::rebalancePartitions() {
// last rebalance.
IndexedPriorityQueue<uint32_t, /*MaxQueue=*/true> maxTasks;
IndexedPriorityQueue<uint32_t, /*MaxQueue=*/false> minTasks;
for (auto taskId = 0; taskId < taskCount_; ++taskId) {
for (auto taskId = 0; taskId < numTasks_; ++taskId) {
estimatedTaskBytesSinceLastRebalance_[taskId] =
calculateTaskDataSizeSinceLastRebalance(taskMaxPartitions[taskId]);
maxTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]);
minTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]);
}

rebalanceBasedOnTaskSkewness(maxTasks, minTasks, taskMaxPartitions);
processedBytesAtLastRebalance_ = processedBytes_;
processedBytesAtLastRebalance_.store(processedBytes);
}

void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness(
Expand Down Expand Up @@ -159,7 +197,7 @@ void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness(
}
}

stats_.numScaledPartitions += scaledPartitions.size();
numScaledPartitions_ += scaledPartitions.size();
}

bool SkewedPartitionRebalancer::rebalancePartition(
Expand All @@ -168,49 +206,49 @@ bool SkewedPartitionRebalancer::rebalancePartition(
IndexedPriorityQueue<uint32_t, true>& maxTasks,
IndexedPriorityQueue<uint32_t, false>& minTasks) {
auto& taskAssignments = partitionAssignments_[rebalancePartition];
for (auto taskId : taskAssignments) {
for (auto taskId : taskAssignments.taskIds()) {
if (taskId == targetTaskId) {
return false;
}
}

taskAssignments.push_back(targetTaskId);
taskAssignments.addTaskId(targetTaskId);
VELOX_CHECK_GT(partitionAssignments_[rebalancePartition].size(), 1);

const auto newTaskCount = taskAssignments.size();
const auto oldTaskCount = newTaskCount - 1;
const auto newNumTasks = taskAssignments.size();
const auto oldNumTasks = newNumTasks - 1;
// Since a partition is rebalanced from max to min skewed tasks,
// decrease the priority of max taskBucket as well as increase the priority
// of min taskBucket.
for (uint32_t taskId : taskAssignments) {
for (uint32_t taskId : taskAssignments.taskIds()) {
if (taskId == targetTaskId) {
estimatedTaskBytesSinceLastRebalance_[taskId] +=
(partitionBytesSinceLastRebalancePerTask_[rebalancePartition] *
oldTaskCount) /
newTaskCount;
oldNumTasks) /
newNumTasks;
} else {
estimatedTaskBytesSinceLastRebalance_[taskId] -=
partitionBytesSinceLastRebalancePerTask_[rebalancePartition] /
newTaskCount;
newNumTasks;
}

maxTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]);
minTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]);
}

LOG(INFO) << "Rebalanced partition " << rebalancePartition << " to task "
<< targetTaskId << " with taskCount " << newTaskCount;
VLOG(3) << "Rebalanced partition " << rebalancePartition << " to task "
<< targetTaskId << " with num of assigned tasks " << newNumTasks;
return true;
}

void SkewedPartitionRebalancer::calculatePartitionProcessedBytes() {
uint64_t totalPartitionRowCount{0};
for (auto partition = 0; partition < partitionCount_; ++partition) {
for (auto partition = 0; partition < numPartitions_; ++partition) {
totalPartitionRowCount += partitionRowCount_[partition];
}
VELOX_CHECK_GT(totalPartitionRowCount, 0);

for (auto partition = 0; partition < partitionCount_; ++partition) {
for (auto partition = 0; partition < numPartitions_; ++partition) {
// Since we estimate 'partitionBytes_' based on 'partitionRowCount_' and
// total 'processedBytes_'. It is possible that the estimated
// 'partitionBytes_' is slightly less than it was estimated at the last
Expand Down
74 changes: 54 additions & 20 deletions velox/common/base/SkewedPartitionBalancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ class SkewedPartitionRebalancerTestHelper;
/// This class is used to auto-scale partition processing by assigning more
/// tasks to busy partition measured by processed data size. This is used by
/// local partition to scale table writers for now.
///
/// NOTE: this object is not thread-safe.
class SkewedPartitionRebalancer {
public:
/// 'partitionCount' is the number of partitions to process. 'taskCount' is
/// 'numPartitions' is the number of partitions to process. 'numTasks' is
/// number of tasks for execution.
/// 'minProcessedBytesRebalanceThresholdPerPartition' is the processed bytes
/// threshold to trigger task scaling for a single partition.
Expand All @@ -41,34 +39,46 @@ class SkewedPartitionRebalancer {
/// measured in the total number of processed data size from all its serving
/// partitions.
SkewedPartitionRebalancer(
uint32_t partitionCount,
uint32_t taskCount,
uint32_t numPartitions,
uint32_t numTasks,
uint64_t minProcessedBytesRebalanceThresholdPerPartition,
uint64_t minProcessedBytesRebalanceThreshold);

~SkewedPartitionRebalancer() {
VELOX_CHECK(!rebalancing_);
}

/// Invoked to rebalance the partition assignments if applicable.
void rebalance();

/// Gets the assigned task id for a given 'partition'. 'index' is used to
/// choose one of multiple assigned tasks in a round-robin order.
uint32_t getTaskId(uint32_t partition, uint64_t index) const {
const auto& taskList = partitionAssignments_[partition];
return taskList[index % taskList.size()];
auto& taskList = partitionAssignments_[partition];
return taskList.nextTaskId(index);
}

/// Adds the processed partition row count. This is used to estimate the
/// processed bytes of a partition.
void addPartitionRowCount(uint32_t partition, uint32_t numRows) {
VELOX_CHECK_LT(partition, partitionCount_);
VELOX_CHECK_LT(partition, numPartitions_);
partitionRowCount_[partition] += numRows;
}

/// Adds the total processed bytes from all the partitions.
void addProcessedBytes(long bytes) {
void addProcessedBytes(int64_t bytes) {
VELOX_CHECK_GT(bytes, 0);
processedBytes_ += bytes;
}

uint32_t numPartitions() const {
return numPartitions_;
}

uint32_t numTasks() const {
return numTasks_;
}

/// The rebalancer internal stats.
struct Stats {
/// The number of times that triggers rebalance.
Expand All @@ -85,13 +95,15 @@ class SkewedPartitionRebalancer {
};

Stats stats() const {
return stats_;
return Stats{
.numBalanceTriggers = numBalanceTriggers_.load(),
.numScaledPartitions = numScaledPartitions_.load()};
}

private:
bool shouldRebalance() const;
bool shouldRebalance(int64_t processedBytes) const;

void rebalancePartitions();
void rebalancePartitions(int64_t processedBytes);

// Calculates the partition processed data size based on the number of
// processed rows and the averaged row size.
Expand Down Expand Up @@ -132,19 +144,22 @@ class SkewedPartitionRebalancer {

static constexpr double kTaskSkewnessThreshod_{0.7};

const uint32_t partitionCount_;
const uint32_t taskCount_;
const uint32_t numPartitions_;
const uint32_t numTasks_;
const uint64_t minProcessedBytesRebalanceThresholdPerPartition_;
const uint64_t minProcessedBytesRebalanceThreshold_;

// The accumulated number of rows processed by each partition.
std::vector<uint64_t> partitionRowCount_;
std::vector<std::atomic_uint64_t> partitionRowCount_;

// Indicates if the rebalancer is running or not.
std::atomic_bool rebalancing_{false};

// The accumulated number of bytes processed by all the partitions.
uint64_t processedBytes_{0};
std::atomic_int64_t processedBytes_{0};
// 'processedBytes_' at the last rebalance. It is used to calculate the
// processed bytes changes since the last rebalance.
uint64_t processedBytesAtLastRebalance_{0};
std::atomic_int64_t processedBytesAtLastRebalance_{0};
// The accumulated number of bytes processed by each partition.
std::vector<uint64_t> partitionBytes_;
// 'partitionBytes_' at the last rebalance. It is used to calculate the
Expand All @@ -157,10 +172,29 @@ class SkewedPartitionRebalancer {
// The estimated task processed bytes since the last rebalance.
std::vector<uint64_t> estimatedTaskBytesSinceLastRebalance_;

// The assigned task id list for each partition.
std::vector<std::vector<uint32_t>> partitionAssignments_;
// The assigned task ids for a partition
class PartitionAssignment {
public:
PartitionAssignment() = default;

void addTaskId(uint32_t taskId);

uint32_t nextTaskId(uint64_t index) const;

const std::vector<uint32_t>& taskIds() const;

uint32_t size() const;

private:
mutable folly::SharedMutex lock_;
std::vector<uint32_t> taskIds_;
};
std::vector<PartitionAssignment> partitionAssignments_;

Stats stats_;
// The number of times that triggers rebalance.
std::atomic_uint64_t numBalanceTriggers_{0};
// The number of times that a scaled partition processing.
std::atomic_uint32_t numScaledPartitions_{0};

friend class test::SkewedPartitionRebalancerTestHelper;
};
Expand Down
Loading

0 comments on commit c19fa13

Please sign in to comment.