Skip to content

Commit

Permalink
feat: Add auto scale writer support (facebookincubator#11702)
Browse files Browse the repository at this point in the history
Summary:

This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions.
We add two customized local partition operators:
ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing
if the exchange queue has >50% memory buffering until scale to all the table writer threads;
ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical
table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the
written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling
the busy logical table partition by assigning more table writer threads.

Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the
resource usage accounting which takes into account of accumulated memory usage,  it also reduces >2x of written files

The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange
queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning
scheme, and the coordinator needs to configure the query plan accordingly.

Reviewed By: arhimondr, oerling, zation99

Differential Revision: D66380785
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 4, 2024
1 parent 28c319e commit 7148d31
Show file tree
Hide file tree
Showing 28 changed files with 2,271 additions and 375 deletions.
1 change: 0 additions & 1 deletion velox/common/base/SkewedPartitionBalancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class SkewedPartitionRebalancer {
/// processed bytes of a partition.
void addPartitionRowCount(uint32_t partition, uint32_t numRows) {
VELOX_CHECK_LT(partition, partitionCount_);
VELOX_CHECK_GT(numRows, 0);
partitionRowCount_[partition] += numRows;
}

Expand Down
2 changes: 1 addition & 1 deletion velox/common/base/tests/SkewedPartitionBalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ TEST_F(SkewedPartitionRebalancerTest, error) {
auto balancer = createBalancer(32, 4, 128, 256);
VELOX_ASSERT_THROW(balancer->addProcessedBytes(0), "");
VELOX_ASSERT_THROW(balancer->addPartitionRowCount(32, 4), "");
VELOX_ASSERT_THROW(balancer->addPartitionRowCount(0, 0), "");
balancer->addPartitionRowCount(0, 0);
VELOX_ASSERT_THROW(createBalancer(0, 4, 128, 256), "");
VELOX_ASSERT_THROW(createBalancer(0, 4, 0, 0), "");
}
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ uint32_t HiveConfig::maxPartitionsPerWriters(
const config::ConfigBase* session) const {
return session->get<uint32_t>(
kMaxPartitionsPerWritersSession,
config_->get<uint32_t>(kMaxPartitionsPerWriters, 100));
config_->get<uint32_t>(kMaxPartitionsPerWriters, 128));
}

bool HiveConfig::immutablePartitions() const {
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/tests/HiveConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ TEST(HiveConfigTest, defaultConfig) {
hiveConfig.insertExistingPartitionsBehavior(emptySession.get()),
facebook::velox::connector::hive::HiveConfig::
InsertExistingPartitionsBehavior::kError);
ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 100);
ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 128);
ASSERT_EQ(hiveConfig.immutablePartitions(), false);
ASSERT_EQ(hiveConfig.gcsEndpoint(), "");
ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "");
Expand Down Expand Up @@ -172,7 +172,7 @@ TEST(HiveConfigTest, overrideSession) {
hiveConfig.insertExistingPartitionsBehavior(session.get()),
facebook::velox::connector::hive::HiveConfig::
InsertExistingPartitionsBehavior::kOverwrite);
ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 100);
ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 128);
ASSERT_EQ(hiveConfig.immutablePartitions(), false);
ASSERT_EQ(hiveConfig.gcsEndpoint(), "");
ASSERT_EQ(hiveConfig.gcsCredentialsPath(), "");
Expand Down
5 changes: 5 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1998,11 +1998,15 @@ void LocalPartitionNode::addDetails(std::stringstream& stream) const {
if (type_ != Type::kGather) {
stream << " " << partitionFunctionSpec_->toString();
}
if (scaleWriter_) {
stream << " scaleWriter";
}
}

folly::dynamic LocalPartitionNode::serialize() const {
auto obj = PlanNode::serialize();
obj["type"] = typeName(type_);
obj["scaleWriter"] = scaleWriter_;
obj["partitionFunctionSpec"] = partitionFunctionSpec_->serialize();
return obj;
}
Expand All @@ -2014,6 +2018,7 @@ PlanNodePtr LocalPartitionNode::create(
return std::make_shared<LocalPartitionNode>(
deserializePlanNodeId(obj),
typeFromName(obj["type"].asString()),
obj["scaleWriter"].asBool(),
ISerializable::deserialize<PartitionFunctionSpec>(
obj["partitionFunctionSpec"]),
deserializeSources(obj, context));
Expand Down
29 changes: 27 additions & 2 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,8 @@ class TableWriteNode : public PlanNode {
hasPartitioningScheme_(hasPartitioningScheme),
outputType_(std::move(outputType)),
commitStrategy_(commitStrategy) {
VELOX_USER_CHECK_EQ(columns->size(), columnNames.size());
for (const auto& column : columns->names()) {
VELOX_USER_CHECK_EQ(columns_->size(), columnNames_.size());
for (const auto& column : columns_->names()) {
VELOX_USER_CHECK(
source->outputType()->containsChild(column),
"Column {} not found in TableWriter input: {}",
Expand Down Expand Up @@ -1184,13 +1184,31 @@ class LocalPartitionNode : public PlanNode {

static Type typeFromName(const std::string& name);

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
LocalPartitionNode(
const PlanNodeId& id,
Type type,
PartitionFunctionSpecPtr partitionFunctionSpec,
std::vector<PlanNodePtr> sources)
: LocalPartitionNode(
id,
std::move(type),
/*scaleWriter=*/false,
std::move(partitionFunctionSpec),
std::move(sources)) {}
#endif

/// If 'scaleWriter' is true, the local partition is used to scale the table
/// writer prcessing.
LocalPartitionNode(
const PlanNodeId& id,
Type type,
bool scaleWriter,
PartitionFunctionSpecPtr partitionFunctionSpec,
std::vector<PlanNodePtr> sources)
: PlanNode(id),
type_{type},
scaleWriter_(scaleWriter),
sources_{std::move(sources)},
partitionFunctionSpec_{std::move(partitionFunctionSpec)} {
VELOX_USER_CHECK_GT(
Expand All @@ -1215,10 +1233,16 @@ class LocalPartitionNode : public PlanNode {
return std::make_shared<LocalPartitionNode>(
id,
Type::kGather,
/*scaleWriter=*/false,
std::make_shared<GatherPartitionFunctionSpec>(),
std::move(sources));
}

/// Returns true if this is for table writer scaling.
bool scaleWriter() const {
return scaleWriter_;
}

Type type() const {
return type_;
}
Expand Down Expand Up @@ -1247,6 +1271,7 @@ class LocalPartitionNode : public PlanNode {
void addDetails(std::stringstream& stream) const override;

const Type type_;
const bool scaleWriter_;
const std::vector<PlanNodePtr> sources_;
const PartitionFunctionSpecPtr partitionFunctionSpec_;
};
Expand Down
43 changes: 43 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,31 @@ class QueryConfig {
static constexpr const char* kSelectiveNimbleReaderEnabled =
"selective_nimble_reader_enabled";

/// The max ratio of a query used memory to its max capacity, and the scale
/// writer exchange stops scaling writer processing if the query's current
/// memory usage exceeds this ratio. The value is in the range of (0, 1].
static constexpr const char* kScaleWriterRebalanceMaxMemoryUsageRatio =
"scaled_writer_rebalance_max_memory_usage_ratio";

/// The max number of logical table partitions that can be assigned to a
/// single table writer thread. The logical table partition is used by local
/// exchange writer for writer scaling, and multiple physical table
/// partitions can be mapped to the same logical table partition based on the
/// hash value of calculated partitioned ids.
static constexpr const char* kScaleWriterMaxPartitionsPerWriter =
"scaled_writer_max_partitions_per_writer";

/// Minimum amount of data processed by a logical table partition to trigger
/// writer scaling if it is detected as overloaded by scale wrirer exchange.
static constexpr const char*
kScaleWriterMinPartitionProcessedBytesRebalanceThreshold =
"scaled_writer_min_partition_processed_bytes_rebalance_threshold";

/// Minimum amount of data processed by all the logical table partitions to
/// trigger skewed partition rebalancing by scale writer exchange.
static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold =
"scaled_writer_min_processed_bytes_rebalance_threshold";

bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, false);
}
Expand Down Expand Up @@ -819,6 +844,24 @@ class QueryConfig {
return get<int32_t>(kPrefixSortMinRows, 130);
}

double scaleWriterRebalanceMaxMemoryUsageRatio() const {
return get<double>(kScaleWriterRebalanceMaxMemoryUsageRatio, 0.7);
}

uint32_t scaleWriterMaxPartitionsPerWriter() const {
return get<uint32_t>(kScaleWriterMaxPartitionsPerWriter, 128);
}

uint64_t scaleWriterMinPartitionProcessedBytesRebalanceThreshold() const {
return get<uint64_t>(
kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, 128 << 20);
}

uint64_t scaleWriterMinProcessedBytesRebalanceThreshold() const {
return get<uint64_t>(
kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
27 changes: 26 additions & 1 deletion velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,32 @@ Table Writer
* - task_partitioned_writer_count
- integer
- task_writer_count
- The number of parallel table writer threads per task for partitioned table writes. If not set, use 'task_writer_count' as default.
- The number of parallel table writer threads per task for partitioned
table writes. If not set, use 'task_writer_count' as default.
* - scaled_writer_rebalance_max_memory_usage_ratio
- double
- 0.7
- The max ratio of a query used memory to its max capacity, and the scale
- writer exchange stops scaling writer processing if the query's current
- memory usage exceeds this ratio. The value is in the range of (0, 1].
* - scaled_writer_max_partitions_per_writer
- integer
- 128
- The max number of logical table partitions that can be assigned to a
- single table writer thread. The logical table partition is used by local
- exchange writer for writer scaling, and multiple physical table
- partitions can be mapped to the same logical table partition based on the
- hash value of calculated partitioned ids.
- integer
- 128MB
* - scaled_writer_min_partition_processed_bytes_rebalance_threshold
- Minimum amount of data processed by a logical table partition to trigger
- writer scaling if it is detected as overloaded by scale wrirer exchange.
* - scaled_writer_min_processed_bytes_rebalance_threshold
- Minimum amount of data processed by all the logical table partitions to
- trigger skewed partition rebalancing by scale writer exchange.
- integer
- 256MB

Hive Connector
--------------
Expand Down
1 change: 1 addition & 0 deletions velox/docs/monitoring/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ Storage
* - storage_network_throttled_count
- Count
- The number of times that storage IOs get throttled in a storage cluster because of network.

Spilling
--------

Expand Down
11 changes: 11 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ These stats are reported only by TableWriter operator
* - earlyFlushedRawBytes
- bytes
- Number of bytes pre-maturely flushed from file writers because of memory reclaiming.
* - rebalanceTriggers
-
- The number of times that we triggers the rebalance of table partitions
for a non-bucketed partition table.
* - scaledPartitions
-
- The number of times that we scale a partition processing for a
non-bucketed partition table.
* - scaledWriters
-
- The number of times that we scale writers for a non-partitioned table.

Spilling
--------
Expand Down
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ velox_add_library(
RowsStreamingWindowBuild.cpp
RowContainer.cpp
RowNumber.cpp
ScaleWriterLocalPartition.cpp
SortBuffer.cpp
SortedAggregations.cpp
SortWindowBuild.cpp
Expand Down
Loading

0 comments on commit 7148d31

Please sign in to comment.