Skip to content

Commit

Permalink
Add max_local_exchange_partition_count config property (facebookincub…
Browse files Browse the repository at this point in the history
…ator#11292)

Summary:
Pull Request resolved: facebookincubator#11292

Used to limit the number of partitions created by a local exchange.
Partitioning data too granularly can lead to poor performance.
This setting allows increasing the task concurrency for all pipelines except the ones that require a local partitioning.

Reviewed By: spershin

Differential Revision: D64557900

fbshipit-source-id: 6bd6a3dfad8e0a1e330a7ef357d7dc36fb794ef9
  • Loading branch information
arhimondr authored and facebook-github-bot committed Oct 22, 2024
1 parent 1fc46ad commit abc563c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
16 changes: 16 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ class QueryConfig {
static constexpr const char* kMaxLocalExchangeBufferSize =
"max_local_exchange_buffer_size";

/// Limits the number of partitions created by a local exchange.
/// Partitioning data too granularly can lead to poor performance.
/// This setting allows increasing the task concurrency for all
/// pipelines except the ones that require a local partitioning.
/// Affects the number of drivers for pipelines containing
/// LocalPartitionNode and cannot exceed the maximum number of
/// pipeline drivers configured for the task.
static constexpr const char* kMaxLocalExchangePartitionCount =
"max_local_exchange_partition_count";

/// Maximum size in bytes to accumulate in ExchangeQueue. Enforced
/// approximately, not strictly.
static constexpr const char* kMaxExchangeBufferSize =
Expand Down Expand Up @@ -490,6 +500,12 @@ class QueryConfig {
return get<uint64_t>(kMaxLocalExchangeBufferSize, kDefault);
}

uint32_t maxLocalExchangePartitionCount() const {
// defaults to unlimited
static constexpr uint32_t kDefault = std::numeric_limits<uint32_t>::max();
return get<uint32_t>(kMaxLocalExchangePartitionCount, kDefault);
}

uint64_t maxExchangeBufferSize() const {
static constexpr uint64_t kDefault = 32UL << 20;
return get<uint64_t>(kMaxExchangeBufferSize, kDefault);
Expand Down
7 changes: 7 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ Generic Configuration
- integer
- 32MB
- Used for backpressure to block local exchange producers when the local exchange buffer reaches or exceeds this size.
* - max_local_exchange_partition_count
- integer
- 2^32
- Limits the number of partitions created by a local exchange. Partitioning data too granularly can lead to poor performance.
This setting allows increasing the task concurrency for all pipelines except the ones that require a local partitioning.
Affects the number of drivers for pipelines containing LocalPartitionNode and cannot exceed the maximum number of
pipeline drivers configured for the task.
* - exchange.max_buffer_size
- integer
- 32MB
Expand Down
10 changes: 8 additions & 2 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,14 @@ uint32_t maxDrivers(
auto localExchange =
std::dynamic_pointer_cast<const core::LocalPartitionNode>(node)) {
// Local gather must run single-threaded.
if (localExchange->type() == core::LocalPartitionNode::Type::kGather) {
return 1;
switch (localExchange->type()) {
case core::LocalPartitionNode::Type::kGather:
return 1;
case core::LocalPartitionNode::Type::kRepartition:
count = std::min(queryConfig.maxLocalExchangePartitionCount(), count);
break;
default:
VELOX_UNREACHABLE("Unexpected local exchange type");
}
} else if (std::dynamic_pointer_cast<const core::LocalMergeNode>(node)) {
// Local merge must run single-threaded.
Expand Down
21 changes: 12 additions & 9 deletions velox/exec/tests/LocalPartitionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ class LocalPartitionTest : public HiveConnectorTestBase {
void verifyExchangeSourceOperatorStats(
const std::shared_ptr<exec::Task>& task,
int expectedPositions,
int expectedVectors) {
int expectedVectors,
int expectedDrivers) {
auto stats = task->taskStats().pipelineStats[0].operatorStats.front();
ASSERT_EQ(stats.inputPositions, expectedPositions);
ASSERT_EQ(stats.inputVectors, expectedVectors);
ASSERT_EQ(stats.numDrivers, expectedDrivers);
ASSERT_TRUE(stats.inputBytes > 0);

ASSERT_EQ(stats.outputPositions, stats.inputPositions);
Expand Down Expand Up @@ -112,7 +114,7 @@ TEST_F(LocalPartitionTest, gather) {
.planNode();

auto task = assertQuery(op, "SELECT 300, -71, 152");
verifyExchangeSourceOperatorStats(task, 300, 3);
verifyExchangeSourceOperatorStats(task, 300, 3, 1);

auto filePaths = writeToFiles(vectors);

Expand Down Expand Up @@ -144,7 +146,7 @@ TEST_F(LocalPartitionTest, gather) {
}

task = queryBuilder.assertResults("SELECT 300, -71, 152");
verifyExchangeSourceOperatorStats(task, 300, 3);
verifyExchangeSourceOperatorStats(task, 300, 3, 1);
}

TEST_F(LocalPartitionTest, partition) {
Expand Down Expand Up @@ -183,15 +185,16 @@ TEST_F(LocalPartitionTest, partition) {
createDuckDbTable(vectors);

AssertQueryBuilder queryBuilder(op, duckDbQueryRunner_);
queryBuilder.maxDrivers(2);
queryBuilder.maxDrivers(4);
queryBuilder.config(core::QueryConfig::kMaxLocalExchangePartitionCount, "2");
for (auto i = 0; i < filePaths.size(); ++i) {
queryBuilder.split(
scanNodeIds[i], makeHiveConnectorSplit(filePaths[i]->getPath()));
}

auto task =
queryBuilder.assertResults("SELECT c0, count(1) FROM tmp GROUP BY 1");
verifyExchangeSourceOperatorStats(task, 300, 6);
verifyExchangeSourceOperatorStats(task, 300, 6, 2);
}

TEST_F(LocalPartitionTest, maxBufferSizeGather) {
Expand Down Expand Up @@ -225,7 +228,7 @@ TEST_F(LocalPartitionTest, maxBufferSizeGather) {
.config(core::QueryConfig::kMaxLocalExchangeBufferSize, "100")
.assertResults("SELECT 2100, -71, 228");

verifyExchangeSourceOperatorStats(task, 2100, 21);
verifyExchangeSourceOperatorStats(task, 2100, 21, 1);
}

TEST_F(LocalPartitionTest, maxBufferSizePartition) {
Expand Down Expand Up @@ -277,12 +280,12 @@ TEST_F(LocalPartitionTest, maxBufferSizePartition) {
// Set an artificially low buffer size limit to trigger blocking behavior.
auto task = makeQueryBuilder("100").assertResults(
"SELECT c0, count(1) FROM tmp GROUP BY 1");
verifyExchangeSourceOperatorStats(task, 2100, 42);
verifyExchangeSourceOperatorStats(task, 2100, 42, 2);

// Re-run with higher memory limit (enough to hold ~10 vectors at a time).
task = makeQueryBuilder("10240").assertResults(
"SELECT c0, count(1) FROM tmp GROUP BY 1");
verifyExchangeSourceOperatorStats(task, 2100, 42);
verifyExchangeSourceOperatorStats(task, 2100, 42, 2);
}

TEST_F(LocalPartitionTest, indicesBufferCapacity) {
Expand Down Expand Up @@ -483,7 +486,7 @@ TEST_F(LocalPartitionTest, earlyCompletion) {

auto task = assertQuery(plan, "VALUES (3), (4)");

verifyExchangeSourceOperatorStats(task, 100, 1);
verifyExchangeSourceOperatorStats(task, 100, 1, 1);

// Make sure there is only one reference to Task left, i.e. no Driver is
// blocked forever.
Expand Down

0 comments on commit abc563c

Please sign in to comment.