From b7197a0876bb08f604b174c28efd8eddc2ff7647 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Wed, 4 Dec 2024 12:47:59 -0800 Subject: [PATCH] feat: Add auto scale writer support (#11702) Summary: This change adds local scale writer partition support to improve memory efficiency in case of a large number of partitions. We add two customized local partition operators: ScaleWriterLocalPartition for non-partitioned table write. It starts with single table writer thread and scale the writer processing if the exchange queue has >50% memory buffering until scale to all the table writer threads; ScaleWriterPartitioningLocalPartition for partitioned table writer. It starts with assigning a single table writer thread to each logical table partition. Multiple physical table partitions could be mapped to a single logical partition based on the partition keys of the written table. Similar, if the exchange queue has > 50% memory buffering, we leverage the skewed partition balancer by scaling the busy logical table partition by assigning more table writer threads. Meta internal shadow results show this could prevent query write OOM pattern, reduce the peak memory usage which benefits the resource usage accounting which takes into account of accumulated memory usage, it also reduces >2x of written files The followup is to investigate the more reliable rebalance signal such as consumer/producer queuing delay in the exchange queue. To complete this feature, we need a Prestissimo change to setup scale writer local partition based on arbitrary partitioning scheme, and the coordinator needs to configure the query plan accordingly. Reviewed By: arhimondr, oerling, zation99 Differential Revision: D66380785 --- velox/common/base/SkewedPartitionBalancer.h | 1 - .../tests/SkewedPartitionBalancerTest.cpp | 2 +- velox/connectors/hive/HiveConfig.cpp | 2 +- .../connectors/hive/tests/HiveConfigTest.cpp | 4 +- velox/core/PlanNode.cpp | 5 + velox/core/PlanNode.h | 29 +- velox/core/QueryConfig.h | 43 + velox/docs/configs.rst | 27 +- velox/docs/monitoring/metrics.rst | 1 + velox/docs/monitoring/stats.rst | 11 + velox/exec/CMakeLists.txt | 1 + velox/exec/LocalPartition.cpp | 95 +- velox/exec/LocalPartition.h | 37 +- velox/exec/LocalPlanner.cpp | 25 + velox/exec/ScaleWriterLocalPartition.cpp | 326 ++++++ velox/exec/ScaleWriterLocalPartition.h | 126 +++ velox/exec/TableWriter.cpp | 6 +- velox/exec/Task.cpp | 16 + velox/exec/Task.h | 5 + velox/exec/tests/CMakeLists.txt | 1 + velox/exec/tests/PlanNodeSerdeTest.cpp | 14 + .../tests/ScaleWriterLocalPartitionTest.cpp | 972 ++++++++++++++++++ velox/exec/tests/TableWriteTest.cpp | 812 +++++++++------ velox/exec/tests/utils/AssertQueryBuilder.cpp | 14 +- velox/exec/tests/utils/AssertQueryBuilder.h | 3 + velox/exec/tests/utils/Cursor.h | 32 +- velox/exec/tests/utils/PlanBuilder.cpp | 37 +- velox/exec/tests/utils/PlanBuilder.h | 8 + 28 files changed, 2280 insertions(+), 375 deletions(-) create mode 100644 velox/exec/ScaleWriterLocalPartition.cpp create mode 100644 velox/exec/ScaleWriterLocalPartition.h create mode 100644 velox/exec/tests/ScaleWriterLocalPartitionTest.cpp diff --git a/velox/common/base/SkewedPartitionBalancer.h b/velox/common/base/SkewedPartitionBalancer.h index 592d50c8711e6..6c3c00a7cc68e 100644 --- a/velox/common/base/SkewedPartitionBalancer.h +++ b/velox/common/base/SkewedPartitionBalancer.h @@ -60,7 +60,6 @@ class SkewedPartitionRebalancer { /// processed bytes of a partition. void addPartitionRowCount(uint32_t partition, uint32_t numRows) { VELOX_CHECK_LT(partition, partitionCount_); - VELOX_CHECK_GT(numRows, 0); partitionRowCount_[partition] += numRows; } diff --git a/velox/common/base/tests/SkewedPartitionBalancerTest.cpp b/velox/common/base/tests/SkewedPartitionBalancerTest.cpp index 99281f2db522f..2b50892631397 100644 --- a/velox/common/base/tests/SkewedPartitionBalancerTest.cpp +++ b/velox/common/base/tests/SkewedPartitionBalancerTest.cpp @@ -320,7 +320,7 @@ TEST_F(SkewedPartitionRebalancerTest, error) { auto balancer = createBalancer(32, 4, 128, 256); VELOX_ASSERT_THROW(balancer->addProcessedBytes(0), ""); VELOX_ASSERT_THROW(balancer->addPartitionRowCount(32, 4), ""); - VELOX_ASSERT_THROW(balancer->addPartitionRowCount(0, 0), ""); + balancer->addPartitionRowCount(0, 0); VELOX_ASSERT_THROW(createBalancer(0, 4, 128, 256), ""); VELOX_ASSERT_THROW(createBalancer(0, 4, 0, 0), ""); } diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index e46ac2bf2f3b8..ba30a0c19e758 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -64,7 +64,7 @@ uint32_t HiveConfig::maxPartitionsPerWriters( const config::ConfigBase* session) const { return session->get( kMaxPartitionsPerWritersSession, - config_->get(kMaxPartitionsPerWriters, 100)); + config_->get(kMaxPartitionsPerWriters, 128)); } bool HiveConfig::immutablePartitions() const { diff --git a/velox/connectors/hive/tests/HiveConfigTest.cpp b/velox/connectors/hive/tests/HiveConfigTest.cpp index 4151ac89693c6..5bc2932e911cf 100644 --- a/velox/connectors/hive/tests/HiveConfigTest.cpp +++ b/velox/connectors/hive/tests/HiveConfigTest.cpp @@ -31,7 +31,7 @@ TEST(HiveConfigTest, defaultConfig) { hiveConfig.insertExistingPartitionsBehavior(emptySession.get()), facebook::velox::connector::hive::HiveConfig:: InsertExistingPartitionsBehavior::kError); - ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 100); + ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 128); ASSERT_EQ(hiveConfig.immutablePartitions(), false); ASSERT_EQ(hiveConfig.gcsEndpoint(), ""); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), ""); @@ -172,7 +172,7 @@ TEST(HiveConfigTest, overrideSession) { hiveConfig.insertExistingPartitionsBehavior(session.get()), facebook::velox::connector::hive::HiveConfig:: InsertExistingPartitionsBehavior::kOverwrite); - ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 100); + ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 128); ASSERT_EQ(hiveConfig.immutablePartitions(), false); ASSERT_EQ(hiveConfig.gcsEndpoint(), ""); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), ""); diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index cdb724f1ef0f0..16546128e65af 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -1998,11 +1998,15 @@ void LocalPartitionNode::addDetails(std::stringstream& stream) const { if (type_ != Type::kGather) { stream << " " << partitionFunctionSpec_->toString(); } + if (scaleWriter_) { + stream << " scaleWriter"; + } } folly::dynamic LocalPartitionNode::serialize() const { auto obj = PlanNode::serialize(); obj["type"] = typeName(type_); + obj["scaleWriter"] = scaleWriter_; obj["partitionFunctionSpec"] = partitionFunctionSpec_->serialize(); return obj; } @@ -2014,6 +2018,7 @@ PlanNodePtr LocalPartitionNode::create( return std::make_shared( deserializePlanNodeId(obj), typeFromName(obj["type"].asString()), + obj["scaleWriter"].asBool(), ISerializable::deserialize( obj["partitionFunctionSpec"]), deserializeSources(obj, context)); diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 6f910aab1a425..ecb84d81c769a 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -735,8 +735,8 @@ class TableWriteNode : public PlanNode { hasPartitioningScheme_(hasPartitioningScheme), outputType_(std::move(outputType)), commitStrategy_(commitStrategy) { - VELOX_USER_CHECK_EQ(columns->size(), columnNames.size()); - for (const auto& column : columns->names()) { + VELOX_USER_CHECK_EQ(columns_->size(), columnNames_.size()); + for (const auto& column : columns_->names()) { VELOX_USER_CHECK( source->outputType()->containsChild(column), "Column {} not found in TableWriter input: {}", @@ -1184,13 +1184,31 @@ class LocalPartitionNode : public PlanNode { static Type typeFromName(const std::string& name); +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY LocalPartitionNode( const PlanNodeId& id, Type type, PartitionFunctionSpecPtr partitionFunctionSpec, std::vector sources) + : LocalPartitionNode( + id, + std::move(type), + /*scaleWriter=*/false, + std::move(partitionFunctionSpec), + std::move(sources)) {} +#endif + + /// If 'scaleWriter' is true, the local partition is used to scale the table + /// writer prcessing. + LocalPartitionNode( + const PlanNodeId& id, + Type type, + bool scaleWriter, + PartitionFunctionSpecPtr partitionFunctionSpec, + std::vector sources) : PlanNode(id), type_{type}, + scaleWriter_(scaleWriter), sources_{std::move(sources)}, partitionFunctionSpec_{std::move(partitionFunctionSpec)} { VELOX_USER_CHECK_GT( @@ -1215,10 +1233,16 @@ class LocalPartitionNode : public PlanNode { return std::make_shared( id, Type::kGather, + /*scaleWriter=*/false, std::make_shared(), std::move(sources)); } + /// Returns true if this is for table writer scaling. + bool scaleWriter() const { + return scaleWriter_; + } + Type type() const { return type_; } @@ -1247,6 +1271,7 @@ class LocalPartitionNode : public PlanNode { void addDetails(std::stringstream& stream) const override; const Type type_; + const bool scaleWriter_; const std::vector sources_; const PartitionFunctionSpecPtr partitionFunctionSpec_; }; diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 105062de81308..8bdcd72ebc67f 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -437,6 +437,31 @@ class QueryConfig { static constexpr const char* kSelectiveNimbleReaderEnabled = "selective_nimble_reader_enabled"; + /// The max ratio of a query used memory to its max capacity, and the scale + /// writer exchange stops scaling writer processing if the query's current + /// memory usage exceeds this ratio. The value is in the range of (0, 1]. + static constexpr const char* kScaleWriterRebalanceMaxMemoryUsageRatio = + "scaled_writer_rebalance_max_memory_usage_ratio"; + + /// The max number of logical table partitions that can be assigned to a + /// single table writer thread. The logical table partition is used by local + /// exchange writer for writer scaling, and multiple physical table + /// partitions can be mapped to the same logical table partition based on the + /// hash value of calculated partitioned ids. + static constexpr const char* kScaleWriterMaxPartitionsPerWriter = + "scaled_writer_max_partitions_per_writer"; + + /// Minimum amount of data processed by a logical table partition to trigger + /// writer scaling if it is detected as overloaded by scale wrirer exchange. + static constexpr const char* + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold = + "scaled_writer_min_partition_processed_bytes_rebalance_threshold"; + + /// Minimum amount of data processed by all the logical table partitions to + /// trigger skewed partition rebalancing by scale writer exchange. + static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold = + "scaled_writer_min_processed_bytes_rebalance_threshold"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -819,6 +844,24 @@ class QueryConfig { return get(kPrefixSortMinRows, 130); } + double scaleWriterRebalanceMaxMemoryUsageRatio() const { + return get(kScaleWriterRebalanceMaxMemoryUsageRatio, 0.7); + } + + uint32_t scaleWriterMaxPartitionsPerWriter() const { + return get(kScaleWriterMaxPartitionsPerWriter, 128); + } + + uint64_t scaleWriterMinPartitionProcessedBytesRebalanceThreshold() const { + return get( + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, 128 << 20); + } + + uint64_t scaleWriterMinProcessedBytesRebalanceThreshold() const { + return get( + kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 761ab60436007..e6ce8423b28b0 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -400,7 +400,32 @@ Table Writer * - task_partitioned_writer_count - integer - task_writer_count - - The number of parallel table writer threads per task for partitioned table writes. If not set, use 'task_writer_count' as default. + - The number of parallel table writer threads per task for partitioned + table writes. If not set, use 'task_writer_count' as default. + * - scaled_writer_rebalance_max_memory_usage_ratio + - double + - 0.7 + - The max ratio of a query used memory to its max capacity, and the scale + - writer exchange stops scaling writer processing if the query's current + - memory usage exceeds this ratio. The value is in the range of (0, 1]. + * - scaled_writer_max_partitions_per_writer + - integer + - 128 + - The max number of logical table partitions that can be assigned to a + - single table writer thread. The logical table partition is used by local + - exchange writer for writer scaling, and multiple physical table + - partitions can be mapped to the same logical table partition based on the + - hash value of calculated partitioned ids. + - integer + - 128MB + * - scaled_writer_min_partition_processed_bytes_rebalance_threshold + - Minimum amount of data processed by a logical table partition to trigger + - writer scaling if it is detected as overloaded by scale wrirer exchange. + * - scaled_writer_min_processed_bytes_rebalance_threshold + - Minimum amount of data processed by all the logical table partitions to + - trigger skewed partition rebalancing by scale writer exchange. + - integer + - 256MB Hive Connector -------------- diff --git a/velox/docs/monitoring/metrics.rst b/velox/docs/monitoring/metrics.rst index 6fea60a7f03f4..870b77f2e0ae4 100644 --- a/velox/docs/monitoring/metrics.rst +++ b/velox/docs/monitoring/metrics.rst @@ -438,6 +438,7 @@ Storage * - storage_network_throttled_count - Count - The number of times that storage IOs get throttled in a storage cluster because of network. + Spilling -------- diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index 4958587e7bcfd..f77d2ba824ca0 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -104,6 +104,17 @@ These stats are reported only by TableWriter operator * - earlyFlushedRawBytes - bytes - Number of bytes pre-maturely flushed from file writers because of memory reclaiming. + * - rebalanceTriggers + - + - The number of times that we triggers the rebalance of table partitions + for a non-bucketed partition table. + * - scaledPartitions + - + - The number of times that we scale a partition processing for a + non-bucketed partition table. + * - scaledWriters + - + - The number of times that we scale writers for a non-partitioned table. Spilling -------- diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 73ca666358956..25882fd44f146 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -74,6 +74,7 @@ velox_add_library( RowsStreamingWindowBuild.cpp RowContainer.cpp RowNumber.cpp + ScaleWriterLocalPartition.cpp SortBuffer.cpp SortedAggregations.cpp SortWindowBuild.cpp diff --git a/velox/exec/LocalPartition.cpp b/velox/exec/LocalPartition.cpp index 2ebb469caf64d..dce6dff0ad38e 100644 --- a/velox/exec/LocalPartition.cpp +++ b/velox/exec/LocalPartition.cpp @@ -166,6 +166,11 @@ bool LocalExchangeQueue::isFinished() { return queue_.withWLock([&](auto& queue) { return isFinishedLocked(queue); }); } +bool LocalExchangeQueue::testingProducersDone() const { + return queue_.withRLock( + [&](auto& queue) { return noMoreProducers_ && pendingProducers_ == 0; }); +} + void LocalExchangeQueue::close() { std::vector consumerPromises; std::vector memoryPromises; @@ -256,54 +261,48 @@ LocalPartition::LocalPartition( for (auto& queue : queues_) { queue->addProducer(); } -} - -namespace { -std::vector allocateIndexBuffers( - const std::vector& sizes, - memory::MemoryPool* pool) { - std::vector indexBuffers; - indexBuffers.reserve(sizes.size()); - for (auto size : sizes) { - indexBuffers.push_back(allocateIndices(size, pool)); + if (numPartitions_ > 0) { + indexBuffers_.resize(numPartitions_); + rawIndices_.resize(numPartitions_); } - return indexBuffers; } -std::vector getRawIndices( - const std::vector& indexBuffers) { - std::vector rawIndices; - rawIndices.reserve(indexBuffers.size()); - for (auto& buffer : indexBuffers) { - rawIndices.emplace_back(buffer->asMutable()); +void LocalPartition::allocateIndexBuffers( + const std::vector& sizes) { + VELOX_CHECK_EQ(indexBuffers_.size(), sizes.size()); + VELOX_CHECK_EQ(rawIndices_.size(), sizes.size()); + + for (auto i = 0; i < sizes.size(); ++i) { + const auto indicesBufferBytes = sizes[i] * sizeof(vector_size_t); + if ((indexBuffers_[i] == nullptr) || + (indexBuffers_[i]->capacity() < indicesBufferBytes) || + !indexBuffers_[i]->unique()) { + indexBuffers_[i] = allocateIndices(sizes[i], pool()); + } else { + const auto indicesBufferBytes = sizes[i] * sizeof(vector_size_t); + indexBuffers_[i]->setSize(indicesBufferBytes); + } + rawIndices_[i] = indexBuffers_[i]->asMutable(); } - return rawIndices; } -RowVectorPtr -wrapChildren(const RowVectorPtr& input, vector_size_t size, BufferPtr indices) { - std::vector wrappedChildren; - wrappedChildren.reserve(input->type()->size()); - for (auto i = 0; i < input->type()->size(); i++) { - wrappedChildren.emplace_back(BaseVector::wrapInDictionary( - BufferPtr(nullptr), indices, size, input->childAt(i))); +RowVectorPtr LocalPartition::wrapChildren( + const RowVectorPtr& input, + vector_size_t size, + BufferPtr indices) { + VELOX_CHECK_EQ(childVectors_.size(), input->type()->size()); + + for (auto i = 0; i < input->type()->size(); ++i) { + childVectors_[i] = BaseVector::wrapInDictionary( + BufferPtr(nullptr), indices, size, input->childAt(i)); } return std::make_shared( - input->pool(), input->type(), BufferPtr(nullptr), size, wrappedChildren); + input->pool(), input->type(), BufferPtr(nullptr), size, childVectors_); } -} // namespace void LocalPartition::addInput(RowVectorPtr input) { - { - auto lockedStats = stats_.wlock(); - lockedStats->addOutputVector(input->estimateFlatSize(), input->size()); - } - - // Lazy vectors must be loaded or processed. - for (auto& child : input->children()) { - child->loadedVector(); - } + prepareForInput(input); if (numPartitions_ == 1) { ContinueFuture future; @@ -334,13 +333,12 @@ void LocalPartition::addInput(RowVectorPtr input) { for (auto i = 0; i < numInput; ++i) { ++maxIndex[partitions_[i]]; } - auto indexBuffers = allocateIndexBuffers(maxIndex, pool()); - auto rawIndices = getRawIndices(indexBuffers); + allocateIndexBuffers(maxIndex); std::fill(maxIndex.begin(), maxIndex.end(), 0); for (auto i = 0; i < numInput; ++i) { auto partition = partitions_[i]; - rawIndices[partition][maxIndex[partition]] = i; + rawIndices_[partition][maxIndex[partition]] = i; ++maxIndex[partition]; } @@ -351,8 +349,7 @@ void LocalPartition::addInput(RowVectorPtr input) { // Do not enqueue empty partitions. continue; } - auto partitionData = - wrapChildren(input, partitionSize, std::move(indexBuffers[i])); + auto partitionData = wrapChildren(input, partitionSize, indexBuffers_[i]); ContinueFuture future; auto reason = queues_[i]->enqueue( partitionData, totalSize * partitionSize / numInput, &future); @@ -363,6 +360,22 @@ void LocalPartition::addInput(RowVectorPtr input) { } } +void LocalPartition::prepareForInput(RowVectorPtr& input) { + { + auto lockedStats = stats_.wlock(); + lockedStats->addOutputVector(input->estimateFlatSize(), input->size()); + } + + // Lazy vectors must be loaded or processed to ensure the late materialized in + // order. + for (auto& child : input->children()) { + child->loadedVector(); + } + if (childVectors_.empty()) { + childVectors_.resize(input->type()->size()); + } +} + BlockingReason LocalPartition::isBlocked(ContinueFuture* future) { if (!futures_.empty()) { auto blockingReason = blockingReasons_.front(); diff --git a/velox/exec/LocalPartition.h b/velox/exec/LocalPartition.h index 75d4098a1915c..a6f4d6bbf5976 100644 --- a/velox/exec/LocalPartition.h +++ b/velox/exec/LocalPartition.h @@ -36,10 +36,20 @@ class LocalExchangeMemoryManager { /// caller to fulfill. std::vector decreaseMemoryUsage(int64_t removed); + /// Returns the maximum buffer size in bytes. + int64_t maxBufferBytes() const { + return maxBufferSize_; + } + + /// Returns the current buffer size in bytes. + int64_t bufferedBytes() const { + return bufferedBytes_; + } + private: const int64_t maxBufferSize_; std::mutex mutex_; - int64_t bufferedBytes_{0}; + tsan_atomic bufferedBytes_{0}; std::vector promises_; }; @@ -89,13 +99,17 @@ class LocalExchangeQueue { /// called before all the data has been processed. No-op otherwise. void close(); + /// Returns true if all producers have sent no more data signal. + bool testingProducersDone() const; + private: using Queue = std::queue>; bool isFinishedLocked(const Queue& queue) const; - std::shared_ptr memoryManager_; + const std::shared_ptr memoryManager_; const int partition_; + folly::Synchronized queue_; // Satisfied when data becomes available or all producers report that they // finished producing, e.g. queue_ is not empty or noMoreProducers_ is true @@ -162,8 +176,8 @@ class LocalPartition : public Operator { return nullptr; } - // Always true but the caller will check isBlocked before adding input, hence - // the blocked state does not accumulate input. + /// Always true but the caller will check isBlocked before adding input, hence + /// the blocked state does not accumulate input. bool needsInput() const override { return true; } @@ -174,7 +188,16 @@ class LocalPartition : public Operator { bool isFinished() override; - private: + protected: + void prepareForInput(RowVectorPtr& input); + + void allocateIndexBuffers(const std::vector& sizes); + + RowVectorPtr wrapChildren( + const RowVectorPtr& input, + vector_size_t size, + BufferPtr indices); + const std::vector> queues_; const size_t numPartitions_; std::unique_ptr partitionFunction_; @@ -184,6 +207,10 @@ class LocalPartition : public Operator { /// Reusable memory for hash calculation. std::vector partitions_; + /// Reusable buffers for input partitioning. + std::vector indexBuffers_; + std::vector rawIndices_; + std::vector childVectors_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index aec3104be3a2c..3645e737b83e8 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -35,7 +35,9 @@ #include "velox/exec/OperatorTraceScan.h" #include "velox/exec/OrderBy.h" #include "velox/exec/PartitionedOutput.h" +#include "velox/exec/RoundRobinPartitionFunction.h" #include "velox/exec/RowNumber.h" +#include "velox/exec/ScaleWriterLocalPartition.h" #include "velox/exec/StreamingAggregation.h" #include "velox/exec/TableScan.h" #include "velox/exec/TableWriteMerge.h" @@ -69,6 +71,23 @@ bool mustStartNewPipeline( return sourceId != 0; } +// Creates the customized local partition operator for table writer scaling. +std::unique_ptr createScaleWriterLocalPartition( + const std::shared_ptr& localPartitionNode, + int32_t operatorId, + DriverCtx* ctx) { + if (dynamic_cast( + &localPartitionNode->partitionFunctionSpec())) { + return std::make_unique( + operatorId, ctx, localPartitionNode); + } + + VELOX_CHECK_NOT_NULL(dynamic_cast( + &localPartitionNode->partitionFunctionSpec())); + return std::make_unique( + operatorId, ctx, localPartitionNode); +} + OperatorSupplier makeConsumerSupplier(ConsumerSupplier consumerSupplier) { if (consumerSupplier) { return [consumerSupplier = std::move(consumerSupplier)]( @@ -98,6 +117,12 @@ OperatorSupplier makeConsumerSupplier( if (auto localPartitionNode = std::dynamic_pointer_cast(planNode)) { + if (localPartitionNode->scaleWriter()) { + return [localPartitionNode](int32_t operatorId, DriverCtx* ctx) { + return createScaleWriterLocalPartition( + localPartitionNode, operatorId, ctx); + }; + } return [localPartitionNode](int32_t operatorId, DriverCtx* ctx) { return std::make_unique( operatorId, ctx, localPartitionNode); diff --git a/velox/exec/ScaleWriterLocalPartition.cpp b/velox/exec/ScaleWriterLocalPartition.cpp new file mode 100644 index 0000000000000..c4995d0975281 --- /dev/null +++ b/velox/exec/ScaleWriterLocalPartition.cpp @@ -0,0 +1,326 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/ScaleWriterLocalPartition.h" + +#include "velox/exec/HashPartitionFunction.h" +#include "velox/exec/RoundRobinPartitionFunction.h" +#include "velox/exec/Task.h" + +namespace facebook::velox::exec { +ScaleWriterPartitioningLocalPartition::ScaleWriterPartitioningLocalPartition( + int32_t operatorId, + DriverCtx* ctx, + const std::shared_ptr& planNode) + : LocalPartition(operatorId, ctx, planNode), + maxQueryMemoryUsageRatio_( + ctx->queryConfig().scaleWriterRebalanceMaxMemoryUsageRatio()), + maxTablePartitionsPerWriter_( + ctx->queryConfig().scaleWriterMaxPartitionsPerWriter()), + numTablePartitions_(maxTablePartitionsPerWriter_ * numPartitions_), + queryPool_(pool()->root()), + tablePartitionRebalancer_(std::make_unique( + numTablePartitions_, + numPartitions_, + ctx->queryConfig() + .scaleWriterMinPartitionProcessedBytesRebalanceThreshold(), + ctx->queryConfig() + .scaleWriterMinProcessedBytesRebalanceThreshold())) { + VELOX_CHECK_GT(maxTablePartitionsPerWriter_, 0); + + writerAssignmentCounts_.resize(numPartitions_, 0); + tablePartitionRowCounts_.resize(numTablePartitions_, 0); + tablePartitionWriterIds_.resize(numTablePartitions_, -1); + tablePartitionWriterIndexes_.resize(numTablePartitions_, 0); + writerAssignmmentIndicesBuffers_.resize(numPartitions_); + rawWriterAssignmmentIndicesBuffers_.resize(numPartitions_); + + // Reset the hash partition function with the number of logical table + // partitions instead of the number of table writers. + // 'tablePartitionRebalancer_' is responsible for maintaining the mapping + // from logical table partition id to the assigned table writer ids. + partitionFunction_ = numPartitions_ == 1 + ? nullptr + : planNode->partitionFunctionSpec().create( + numTablePartitions_, + /*localExchange=*/true); + if (partitionFunction_ != nullptr) { + VELOX_CHECK_NOT_NULL( + dynamic_cast(partitionFunction_.get())); + } +} + +void ScaleWriterPartitioningLocalPartition::initialize() { + LocalPartition::initialize(); + VELOX_CHECK_NULL(memoryManager_); + memoryManager_ = + operatorCtx_->driver()->task()->getLocalExchangeMemoryManager( + operatorCtx_->driverCtx()->splitGroupId, planNodeId()); +} + +void ScaleWriterPartitioningLocalPartition::prepareForWriterAssignments( + vector_size_t numInput) { + const auto maxIndicesBufferBytes = numInput * sizeof(vector_size_t); + for (auto writerId = 0; writerId < numPartitions_; ++writerId) { + if (writerAssignmmentIndicesBuffers_[writerId] == nullptr || + !writerAssignmmentIndicesBuffers_[writerId]->unique() || + writerAssignmmentIndicesBuffers_[writerId]->size() < + maxIndicesBufferBytes) { + writerAssignmmentIndicesBuffers_[writerId] = + allocateIndices(numInput, pool()); + rawWriterAssignmmentIndicesBuffers_[writerId] = + writerAssignmmentIndicesBuffers_[writerId] + ->asMutable(); + } + } + std::fill(writerAssignmentCounts_.begin(), writerAssignmentCounts_.end(), 0); + // Reset the value of partition writer id assignments for the new input. + std::fill( + tablePartitionWriterIds_.begin(), tablePartitionWriterIds_.end(), -1); +} + +void ScaleWriterPartitioningLocalPartition::addInput(RowVectorPtr input) { + prepareForInput(input); + + if (numPartitions_ == 1) { + ContinueFuture future; + auto blockingReason = + queues_[0]->enqueue(input, input->retainedSize(), &future); + if (blockingReason != BlockingReason::kNotBlocked) { + blockingReasons_.push_back(blockingReason); + futures_.push_back(std::move(future)); + } + return; + } + + // Scale up writers when current buffer memory utilization is more than 50% + // of the maximum. This also mean that we won't scale local writers if the + // writing speed can keep up with incoming data. In another word, buffer + // utilization is below 50%. + // + // TODO: investigate using the consumer/producer queue time ratio as + // additional signal to trigger rebalance to avoid unnecessary rebalancing + // when the worker is overloaded which might cause a lot of queuing on both + // producer and consumer sides. The buffered memory ratio is not a reliable + // signal in that case. + if ((memoryManager_->bufferedBytes() > + memoryManager_->maxBufferBytes() * 0.5) && + // Do not scale up if total memory used is greater than + // 'maxQueryMemoryUsageRatio_' of max query memory capacity. We have to be + // conservative here otherwise scaling of writers will happen first + // before we hit the query memory capacity limit, and then we won't be + // able to do anything to prevent query OOM. + queryPool_->reservedBytes() < + queryPool_->maxCapacity() * maxQueryMemoryUsageRatio_) { + tablePartitionRebalancer_->rebalance(); + } + + const auto singlePartition = + partitionFunction_->partition(*input, partitions_); + + const auto numInput = input->size(); + const int64_t totalInputBytes = input->retainedSize(); + // Reset the value of partition row count for the new input. + std::fill( + tablePartitionRowCounts_.begin(), tablePartitionRowCounts_.end(), 0); + + // Assign each row to a writer by looking at logical table partition + // assignments maintained by 'tablePartitionRebalancer_'. + // + // Get partition id which limits to 'tablePartitionCount_'. If there are + // more physical table partitions than the logical 'tablePartitionCount_', + // then it is possible that multiple physical table partitions will get + // assigned the same logical partition id. Thus, multiple table partitions + // will be scaled together since we track the written bytes of a logical table + // partition. + if (singlePartition.has_value()) { + const auto partitionId = singlePartition.value(); + tablePartitionRowCounts_[partitionId] = numInput; + + VELOX_CHECK_EQ(tablePartitionWriterIds_[partitionId], -1); + const auto writerId = getNextWriterId(partitionId); + ContinueFuture future; + auto blockingReason = + queues_[writerId]->enqueue(input, totalInputBytes, &future); + if (blockingReason != BlockingReason::kNotBlocked) { + blockingReasons_.push_back(blockingReason); + futures_.push_back(std::move(future)); + } + } else { + prepareForWriterAssignments(numInput); + + for (auto row = 0; row < numInput; ++row) { + const auto partitionId = partitions_[row]; + ++tablePartitionRowCounts_[partitionId]; + + // Get writer id for this partition by looking at the scaling state. + auto writerId = tablePartitionWriterIds_[partitionId]; + if (writerId == -1) { + writerId = getNextWriterId(partitionId); + tablePartitionWriterIds_[partitionId] = writerId; + } + rawWriterAssignmmentIndicesBuffers_[writerId] + [writerAssignmentCounts_[writerId]++] = + row; + } + + for (auto i = 0; i < numPartitions_; ++i) { + const auto writerRowCount = writerAssignmentCounts_[i]; + if (writerRowCount == 0) { + continue; + } + + auto writerInput = wrapChildren( + input, + writerRowCount, + std::move(writerAssignmmentIndicesBuffers_[i])); + ContinueFuture future; + auto reason = queues_[i]->enqueue( + writerInput, totalInputBytes * writerRowCount / numInput, &future); + if (reason != BlockingReason::kNotBlocked) { + blockingReasons_.push_back(reason); + futures_.push_back(std::move(future)); + } + } + } + + // Only update the scaling state if the memory used is below the + // 'maxQueryMemoryUsageRatio_' limit. Otherwise, if we keep updating the + // scaling state and the memory used is fluctuating around the limit, then we + // could do massive scaling in a single rebalancing cycle which could cause + // query OOM. + if (queryPool_->reservedBytes() < + queryPool_->maxCapacity() * maxQueryMemoryUsageRatio_) { + for (auto tablePartition = 0; tablePartition < numTablePartitions_; + ++tablePartition) { + tablePartitionRebalancer_->addPartitionRowCount( + tablePartition, tablePartitionRowCounts_[tablePartition]); + } + tablePartitionRebalancer_->addProcessedBytes(totalInputBytes); + } +} + +uint32_t ScaleWriterPartitioningLocalPartition::getNextWriterId( + uint32_t partitionId) { + return tablePartitionRebalancer_->getTaskId( + partitionId, tablePartitionWriterIndexes_[partitionId]++); +} + +void ScaleWriterPartitioningLocalPartition::close() { + LocalPartition::close(); + + const auto scaleStats = tablePartitionRebalancer_->stats(); + auto lockedStats = stats_.wlock(); + if (scaleStats.numScaledPartitions != 0) { + lockedStats->addRuntimeStat( + kScaledPartitions, RuntimeCounter(scaleStats.numScaledPartitions)); + } + if (scaleStats.numBalanceTriggers != 0) { + lockedStats->addRuntimeStat( + kRebalanceTriggers, RuntimeCounter(scaleStats.numBalanceTriggers)); + } +} + +ScaleWriterLocalPartition::ScaleWriterLocalPartition( + int32_t operatorId, + DriverCtx* ctx, + const std::shared_ptr& planNode) + : LocalPartition(operatorId, ctx, planNode), + maxQueryMemoryUsageRatio_( + ctx->queryConfig().scaleWriterRebalanceMaxMemoryUsageRatio()), + queryPool_(pool()->root()), + minDataProcessedBytes_( + ctx->queryConfig() + .scaleWriterMinPartitionProcessedBytesRebalanceThreshold()) { + if (partitionFunction_ != nullptr) { + VELOX_CHECK_NOT_NULL( + dynamic_cast(partitionFunction_.get())); + } +} + +void ScaleWriterLocalPartition::initialize() { + LocalPartition::initialize(); + VELOX_CHECK_NULL(memoryManager_); + memoryManager_ = + operatorCtx_->driver()->task()->getLocalExchangeMemoryManager( + operatorCtx_->driverCtx()->splitGroupId, planNodeId()); +} + +void ScaleWriterLocalPartition::addInput(RowVectorPtr input) { + prepareForInput(input); + + const int64_t totalInputBytes = input->retainedSize(); + processedDataBytes_ += totalInputBytes; + + uint32_t writerId = 0; + if (numPartitions_ > 1) { + writerId = getNextWriterId(); + } + VELOX_CHECK_LT(writerId, numPartitions_); + + ContinueFuture future; + auto blockingReason = + queues_[writerId]->enqueue(input, input->retainedSize(), &future); + if (blockingReason != BlockingReason::kNotBlocked) { + blockingReasons_.push_back(blockingReason); + futures_.push_back(std::move(future)); + } +} + +uint32_t ScaleWriterLocalPartition::getNextWriterId() { + VELOX_CHECK_LE(numWriters_, numPartitions_); + VELOX_CHECK_GE(processedDataBytes_, processedBytesAtLastScale_); + + // Scale up writers when current buffer memory utilization is more than 50% + // of the maximum. This also mean that we won't scale local writers if the + // writing speed can keep up with incoming data. In another word, buffer + // utilization is below 50%. + // + // TODO: investigate using the consumer/producer queue time ratio as + // additional signal to trigger rebalance to avoid unnecessary rebalancing + // when the worker is overloaded which might cause a lot of queuing on both + // producer and consumer sides. The buffered memory ratio is not a reliable + // signal in that case. + if ((numWriters_ < numPartitions_) && + (memoryManager_->bufferedBytes() >= + memoryManager_->maxBufferBytes() / 2) && + // Do not scale up if total memory used is greater than + // 'maxQueryMemoryUsageRatio_' of max query memory capacity. We have to be + // conservative here otherwise scaling of writers will happen first + // before we hit the query memory capacity limit, and then we won't be + // able to do anything to prevent query OOM. + (processedDataBytes_ - processedBytesAtLastScale_ >= + numWriters_ * minDataProcessedBytes_) && + (queryPool_->reservedBytes() < + queryPool_->maxCapacity() * maxQueryMemoryUsageRatio_)) { + ++numWriters_; + processedBytesAtLastScale_ = processedDataBytes_; + LOG(INFO) << "Scaled task writer count to: " << numWriters_ + << " with max of " << numPartitions_; + } + return (nextWriterIndex_++) % numWriters_; +} + +void ScaleWriterLocalPartition::close() { + LocalPartition::close(); + + if (numWriters_ == 1) { + return; + } + stats_.wlock()->addRuntimeStat( + kScaledWriters, RuntimeCounter(numWriters_ - 1)); +} +} // namespace facebook::velox::exec diff --git a/velox/exec/ScaleWriterLocalPartition.h b/velox/exec/ScaleWriterLocalPartition.h new file mode 100644 index 0000000000000..03b16517c9f16 --- /dev/null +++ b/velox/exec/ScaleWriterLocalPartition.h @@ -0,0 +1,126 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/base/SkewedPartitionBalancer.h" +#include "velox/exec/LocalPartition.h" + +namespace facebook::velox::exec { + +using namespace facebook::velox::common; + +/// Customized local partition for writer scaling for (non-bucketed) partitioned +/// table write. +class ScaleWriterPartitioningLocalPartition : public LocalPartition { + public: + ScaleWriterPartitioningLocalPartition( + int32_t operatorId, + DriverCtx* ctx, + const std::shared_ptr& planNode); + + std::string toString() const override { + return fmt::format( + "ScaleWriterPartitioningLocalPartition({})", numPartitions_); + } + + void initialize() override; + + void addInput(RowVectorPtr input) override; + + void close() override; + + /// The name of the runtime stats of writer scaling. + /// The number of times that we triggers the rebalance of table partitions. + static inline const std::string kRebalanceTriggers{"rebalanceTriggers"}; + /// The number of times that we scale a partition processing. + static inline const std::string kScaledPartitions{"scaledPartitions"}; + + private: + void prepareForWriterAssignments(vector_size_t numInput); + + uint32_t getNextWriterId(uint32_t partitionId); + + // The max query memory usage ratio before we stop writer scaling. + const double maxQueryMemoryUsageRatio_; + // The max number of logical table partitions that can be assigned to a single + // table writer thread. Multiple physical table partitions can be mapped to + // one logical table partition. + const uint32_t maxTablePartitionsPerWriter_; + // The total number of logical table partitions that can be served by all the + // table writer threads. + const uint32_t numTablePartitions_; + + memory::MemoryPool* const queryPool_; + + // The skewed partition balancer for writer scaling. + const std::unique_ptr tablePartitionRebalancer_; + + std::shared_ptr memoryManager_; + + // Reusable memory for writer assignment processing. + std::vector tablePartitionRowCounts_; + std::vector tablePartitionWriterIds_; + std::vector tablePartitionWriterIndexes_; + + // Reusable memory for writer assignment processing. + std::vector writerAssignmentCounts_; + std::vector writerAssignmmentIndicesBuffers_; + std::vector rawWriterAssignmmentIndicesBuffers_; +}; + +/// Customized local partition for writer scaling for un-partitioned table +/// write. +class ScaleWriterLocalPartition : public LocalPartition { + public: + ScaleWriterLocalPartition( + int32_t operatorId, + DriverCtx* ctx, + const std::shared_ptr& planNode); + + void addInput(RowVectorPtr input) override; + + void initialize() override; + + void close() override; + + /// The name of the runtime stats of writer scaling. + /// The number of scaled writers. + static inline const std::string kScaledWriters{"scaledWriters"}; + + private: + // Gets the writer id to process the next input in a round-robin manner. + uint32_t getNextWriterId(); + + // The max query memory usage ratio before we stop writer scaling. + const double maxQueryMemoryUsageRatio_; + memory::MemoryPool* const queryPool_; + // The minimal amount of processed data bytes before we trigger next writer + // scaling. + const uint64_t minDataProcessedBytes_; + + std::shared_ptr memoryManager_; + + // The number of assigned writers. + uint32_t numWriters_{1}; + // The monotonically increasing writer index to find the next writer id in a + // round-robin manner. + uint32_t nextWriterIndex_{0}; + // The total processed data bytes from all writers. + uint64_t processedDataBytes_{0}; + // The total processed data bytes at the last writer scaling. + uint64_t processedBytesAtLastScale_{0}; +}; +} // namespace facebook::velox::exec diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index df3ca49d7934b..604a2ec004728 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -272,8 +272,10 @@ void TableWriter::updateStats(const connector::DataSink::Stats& stats) { VELOX_CHECK(stats.spillStats.empty()); return; } - lockedStats->addRuntimeStat( - "numWrittenFiles", RuntimeCounter(stats.numWrittenFiles)); + if (stats.numWrittenFiles != 0) { + lockedStats->addRuntimeStat( + "numWrittenFiles", RuntimeCounter(stats.numWrittenFiles)); + } lockedStats->addRuntimeStat( "writeIOTime", RuntimeCounter( diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 7b10948b53a08..e0ab89a46fcd8 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2587,6 +2587,22 @@ Task::getLocalExchangeQueues( return it->second.queues; } +const std::shared_ptr& +Task::getLocalExchangeMemoryManager( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId) { + auto& splitGroupState = splitGroupStates_[splitGroupId]; + + auto it = splitGroupState.localExchanges.find(planNodeId); + VELOX_CHECK( + it != splitGroupState.localExchanges.end(), + "Incorrect local exchange ID {} for group {}, task {}", + planNodeId, + splitGroupId, + taskId()); + return it->second.memoryManager; +} + void Task::setError(const std::exception_ptr& exception) { TestValue::adjust("facebook::velox::exec::Task::setError", this); { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index d205b15178aff..ddd146a63fae1 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -467,6 +467,11 @@ class Task : public std::enable_shared_from_this { uint32_t splitGroupId, const core::PlanNodeId& planNodeId); + const std::shared_ptr& + getLocalExchangeMemoryManager( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId); + void setError(const std::exception_ptr& exception); void setError(const std::string& message); diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index ccd1d9caa81dd..4bc601ade56ed 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -73,6 +73,7 @@ add_executable( RoundRobinPartitionFunctionTest.cpp RowContainerTest.cpp RowNumberTest.cpp + ScaleWriterLocalPartitionTest.cpp SortBufferTest.cpp SpillerTest.cpp SpillTest.cpp diff --git a/velox/exec/tests/PlanNodeSerdeTest.cpp b/velox/exec/tests/PlanNodeSerdeTest.cpp index 03159c2e276f5..f815cb0360d5e 100644 --- a/velox/exec/tests/PlanNodeSerdeTest.cpp +++ b/velox/exec/tests/PlanNodeSerdeTest.cpp @@ -228,6 +228,20 @@ TEST_F(PlanNodeSerdeTest, localPartition) { testSerde(plan); } +TEST_F(PlanNodeSerdeTest, scaleWriterlocalPartition) { + auto plan = PlanBuilder() + .values({data_}) + .scaleWriterlocalPartition(std::vector{"c0"}) + .planNode(); + testSerde(plan); + + plan = PlanBuilder() + .values({data_}) + .scaleWriterlocalPartitionRoundRobin() + .planNode(); + testSerde(plan); +} + TEST_F(PlanNodeSerdeTest, limit) { auto plan = PlanBuilder().values({data_}).limit(0, 10, true).planNode(); testSerde(plan); diff --git a/velox/exec/tests/ScaleWriterLocalPartitionTest.cpp b/velox/exec/tests/ScaleWriterLocalPartitionTest.cpp new file mode 100644 index 0000000000000..b0fbe45f470ee --- /dev/null +++ b/velox/exec/tests/ScaleWriterLocalPartitionTest.cpp @@ -0,0 +1,972 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/ScaleWriterLocalPartition.h" + +#include "velox/core/PlanNode.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/QueryAssertions.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace { +using BlockingCallback = std::function; +using FinishCallback = std::function; + +// The object used to coordinate the behavior of the consumer and producer of +// the scale writer exchange for test purpose. +class TestExchangeController { + public: + TestExchangeController( + uint32_t numProducers, + uint32_t numConsumers, + uint64_t holdBufferBytes, + std::optional producerTargetBufferMemoryRatio, + std::optional consumerTargetBufferMemoryRatio, + std::optional producerMaxDelayUs, + std::optional consumerMaxDelayUs, + const std::vector& inputVectors, + bool keepConsumerInput = true) + : numProducers_(numProducers), + numConsumers_(numConsumers), + holdBufferBytes_(holdBufferBytes), + producerTargetBufferMemoryRatio_(producerTargetBufferMemoryRatio), + consumerTargetBufferMemoryRatio_(consumerTargetBufferMemoryRatio), + producerMaxDelayUs_(producerMaxDelayUs), + consumerMaxDelayUs_(consumerMaxDelayUs), + keepConsumerInput_(keepConsumerInput), + producerInputVectors_(inputVectors), + consumerInputs_(numConsumers_) {} + + ~TestExchangeController() { + clear(); + } + + // The number of drivers of the producer pipeline. + uint32_t numProducers() const { + return numProducers_; + } + + // The number of drivers of the consumer pipeline. + uint32_t numConsumers() const { + return numConsumers_; + } + + // Specifies the buffer to hold to test the effect of query memory usage + // ratio for writer scaling control. + uint64_t holdBufferBytes() const { + return holdBufferBytes_; + } + + void maybeHoldBuffer(memory::MemoryPool* pool) { + std::lock_guard l(mutex_); + if (holdBufferBytes_ == 0) { + return; + } + holdPool_ = pool; + holdBuffer_ = holdPool_->allocate(holdBufferBytes_); + } + + std::optional producerTargetBufferMemoryRatio() const { + return producerTargetBufferMemoryRatio_; + } + + std::optional consumerTargetBufferMemoryRatio() const { + return consumerTargetBufferMemoryRatio_; + } + + std::optional producerMaxDelayUs() const { + return producerMaxDelayUs_; + } + + std::optional consumerMaxDelayUs() const { + return consumerMaxDelayUs_; + } + + RowVectorPtr getInput() { + std::lock_guard l(mutex_); + if (nextInput_ >= producerInputVectors_.size()) { + return nullptr; + } + return producerInputVectors_[nextInput_++]; + } + + core::PlanNodeId exchangeNodeId() const { + std::lock_guard l(mutex_); + return exchnangeNodeId_; + } + + void setExchangeNodeId(const core::PlanNodeId& nodeId) { + std::lock_guard l(mutex_); + VELOX_CHECK(exchnangeNodeId_.empty()); + exchnangeNodeId_ = nodeId; + } + + void addConsumerInput(uint32_t consumerId, const RowVectorPtr& input) { + if (!keepConsumerInput_) { + return; + } + consumerInputs_[consumerId].push_back(input); + } + + const std::vector> consumerInputs() const { + std::lock_guard l(mutex_); + return consumerInputs_; + } + + void clear() { + std::lock_guard l(mutex_); + if (holdBuffer_ != nullptr) { + holdPool_->free(holdBuffer_, holdBufferBytes_); + holdBuffer_ = nullptr; + } + for (auto& input : consumerInputs_) { + input.clear(); + } + consumerInputs_.clear(); + } + + private: + const uint32_t numProducers_; + const uint32_t numConsumers_; + const uint64_t holdBufferBytes_; + const std::optional producerTargetBufferMemoryRatio_; + const std::optional consumerTargetBufferMemoryRatio_; + const std::optional producerMaxDelayUs_; + const std::optional consumerMaxDelayUs_; + const bool keepConsumerInput_; + + mutable std::mutex mutex_; + memory::MemoryPool* holdPool_{nullptr}; + void* holdBuffer_{nullptr}; + core::PlanNodeId exchnangeNodeId_; + uint32_t nextInput_{0}; + std::vector producerInputVectors_; + std::vector> consumerInputs_; +}; + +class FakeSourceNode : public core::PlanNode { + public: + FakeSourceNode(const core::PlanNodeId& id, const RowTypePtr& ouputType) + : PlanNode(id), ouputType_{ouputType} {} + + const RowTypePtr& outputType() const override { + return ouputType_; + } + + const std::vector>& sources() const override { + static const std::vector kEmptySources; + return kEmptySources; + } + + std::string_view name() const override { + return "FakeSourceNode"; + } + + private: + void addDetails(std::stringstream& /* stream */) const override {} + + const RowTypePtr ouputType_; +}; + +class FakeSourceOperator : public SourceOperator { + public: + FakeSourceOperator( + DriverCtx* ctx, + int32_t id, + const std::shared_ptr& node, + const std::shared_ptr& testController) + : SourceOperator( + ctx, + node->outputType(), + id, + node->id(), + "FakeSourceOperator"), + testController_(testController) { + VELOX_CHECK_NOT_NULL(testController_); + rng_.seed(id); + } + + RowVectorPtr getOutput() override { + VELOX_CHECK(!finished_); + auto output = testController_->getInput(); + if (output == nullptr) { + finished_ = true; + return nullptr; + } + waitForOutput(); + return output; + } + + bool isFinished() override { + return finished_; + } + + BlockingReason isBlocked(ContinueFuture* /*unused*/) override { + return BlockingReason::kNotBlocked; + } + + private: + void initialize() override { + Operator::initialize(); + + if (operatorCtx_->driverCtx()->driverId != 0) { + return; + } + + testController_->maybeHoldBuffer(pool()); + } + + void waitForOutput() { + if (FOLLY_UNLIKELY(memoryManager_ == nullptr)) { + memoryManager_ = + operatorCtx_->driver()->task()->getLocalExchangeMemoryManager( + operatorCtx_->driverCtx()->splitGroupId, + testController_->exchangeNodeId()); + } + + if (testController_->producerTargetBufferMemoryRatio().has_value()) { + while (memoryManager_->bufferedBytes() > + (memoryManager_->maxBufferBytes() * + testController_->producerTargetBufferMemoryRatio().value())) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); // NOLINT + } + } + if (testController_->producerMaxDelayUs().has_value()) { + const auto delayUs = folly::Random::rand32(rng_) % + testController_->producerMaxDelayUs().value(); + std::this_thread::sleep_for(std::chrono::microseconds(delayUs)); // NOLINT + } + } + + const std::shared_ptr testController_; + folly::Random::DefaultGenerator rng_; + + std::shared_ptr memoryManager_; + bool finished_{false}; +}; + +class FakeSourceNodeFactory : public Operator::PlanNodeTranslator { + public: + explicit FakeSourceNodeFactory( + const std::shared_ptr& testController) + : testController_(testController) {} + + std::unique_ptr toOperator( + DriverCtx* ctx, + int32_t id, + const core::PlanNodePtr& node) override { + auto fakeSourceNode = std::dynamic_pointer_cast(node); + if (fakeSourceNode == nullptr) { + return nullptr; + } + return std::make_unique( + ctx, + id, + std::dynamic_pointer_cast(node), + testController_); + } + + std::optional maxDrivers(const core::PlanNodePtr& node) override { + auto fakeSourceNode = std::dynamic_pointer_cast(node); + if (fakeSourceNode == nullptr) { + return std::nullopt; + } + return testController_->numProducers(); + } + + private: + const std::shared_ptr testController_; +}; + +class FakeWriteNode : public core::PlanNode { + public: + FakeWriteNode(const core::PlanNodeId& id, const core::PlanNodePtr& input) + : PlanNode(id), sources_{input} {} + + const RowTypePtr& outputType() const override { + return sources_[0]->outputType(); + } + + const std::vector>& sources() const override { + return sources_; + } + + std::string_view name() const override { + return "FakeWriteNode"; + } + + private: + void addDetails(std::stringstream& /* stream */) const override {} + std::vector sources_; +}; + +class FakeWriteOperator : public Operator { + public: + FakeWriteOperator( + DriverCtx* ctx, + int32_t id, + const std::shared_ptr& node, + const std::shared_ptr& testController) + : Operator(ctx, node->outputType(), id, node->id(), "FakeWriteOperator"), + testController_(testController) { + VELOX_CHECK_NOT_NULL(testController_); + rng_.seed(id); + } + + void initialize() override { + Operator::initialize(); + VELOX_CHECK(exchangeQueues_.empty()); + exchangeQueues_ = operatorCtx_->driver()->task()->getLocalExchangeQueues( + operatorCtx_->driverCtx()->splitGroupId, + testController_->exchangeNodeId()); + } + + bool needsInput() const override { + return !noMoreInput_ && !input_; + } + + void addInput(RowVectorPtr input) override { + waitForConsume(); + testController_->addConsumerInput( + operatorCtx_->driverCtx()->driverId, input); + input_ = std::move(input); + } + + RowVectorPtr getOutput() override { + return std::move(input_); + } + + bool isFinished() override { + return noMoreInput_ && input_ == nullptr; + } + + BlockingReason isBlocked(ContinueFuture* /*unused*/) override { + return BlockingReason::kNotBlocked; + } + + private: + // Returns true if the producers of all the exchange queues are done. + bool exchangeQueueClosed() const { + for (const auto& queue : exchangeQueues_) { + if (!queue->testingProducersDone()) { + return false; + } + } + return true; + } + + void waitForConsume() { + if (FOLLY_UNLIKELY(memoryManager_ == nullptr)) { + memoryManager_ = + operatorCtx_->driver()->task()->getLocalExchangeMemoryManager( + operatorCtx_->driverCtx()->splitGroupId, + testController_->exchangeNodeId()); + } + if (testController_->consumerTargetBufferMemoryRatio().has_value()) { + while (!exchangeQueueClosed() && + memoryManager_->bufferedBytes() < + (memoryManager_->maxBufferBytes() * + testController_->consumerTargetBufferMemoryRatio().value())) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); // NOLINT + } + } + if (testController_->consumerMaxDelayUs().has_value()) { + const auto delayUs = folly::Random::rand32(rng_) % + testController_->consumerMaxDelayUs().value(); + std::this_thread::sleep_for(std::chrono::microseconds(delayUs)); // NOLINT + } + } + + const std::shared_ptr testController_; + folly::Random::DefaultGenerator rng_; + + std::shared_ptr memoryManager_; + std::vector> exchangeQueues_; +}; + +class FakeWriteNodeFactory : public Operator::PlanNodeTranslator { + public: + explicit FakeWriteNodeFactory( + const std::shared_ptr& testController) + : testController_(testController) {} + + std::unique_ptr toOperator( + DriverCtx* ctx, + int32_t id, + const core::PlanNodePtr& node) override { + auto fakeWriteNode = std::dynamic_pointer_cast(node); + if (fakeWriteNode == nullptr) { + return nullptr; + } + return std::make_unique( + ctx, + id, + std::dynamic_pointer_cast(node), + testController_); + } + + std::optional maxDrivers(const core::PlanNodePtr& node) override { + auto fakeWriteNode = std::dynamic_pointer_cast(node); + if (fakeWriteNode == nullptr) { + return std::nullopt; + } + return testController_->numConsumers(); + } + + private: + const std::shared_ptr testController_; +}; +} // namespace + +class ScaleWriterLocalPartitionTest : public HiveConnectorTestBase { + protected: + void SetUp() override { + HiveConnectorTestBase::SetUp(); + + rng_.seed(123); + rowType_ = ROW({"c0", "c1", "c3"}, {BIGINT(), INTEGER(), BIGINT()}); + } + + void TearDown() override { + Operator::unregisterAllOperators(); + HiveConnectorTestBase::TearDown(); + } + + std::vector makeVectors( + uint32_t numVectors, + uint32_t vectorSize, + const std::vector& partitionKeyValues = {}) { + VectorFuzzer::Options options; + options.vectorSize = vectorSize; + options.allowLazyVector = false; + // NOTE: order by used to sort data doesn't handle null rows. + options.nullRatio = 0.0; + VectorFuzzer fuzzer(options, pool_.get()); + + std::vector vectors; + for (auto i = 0; i < numVectors; ++i) { + vectors.push_back(fuzzer.fuzzRow(rowType_)); + } + if (partitionKeyValues.empty()) { + return vectors; + } + for (auto i = 0; i < numVectors; ++i) { + auto partitionVector = BaseVector::create( + vectors[i]->childAt(partitionChannel_)->type(), + vectors[i]->childAt(partitionChannel_)->size(), + pool_.get()); + auto* partitionVectorFlat = partitionVector->asFlatVector(); + for (auto j = 0; j < partitionVector->size(); ++j) { + partitionVectorFlat->set( + j, + partitionKeyValues + [folly::Random::rand32(rng_) % partitionKeyValues.size()]); + } + vectors[i]->childAt(partitionChannel_) = partitionVector; + } + return vectors; + } + + std::string partitionColumnName() const { + return rowType_->nameOf(partitionChannel_); + } + + RowVectorPtr sortData(const std::vector& inputVectors) { + std::vector orderByKeys; + orderByKeys.reserve(rowType_->size()); + for (const auto& name : rowType_->names()) { + orderByKeys.push_back(fmt::format("{} ASC NULLS FIRST", name)); + } + AssertQueryBuilder queryBuilder(PlanBuilder() + .values(inputVectors) + .orderBy(orderByKeys, false) + .planNode()); + return queryBuilder.copyResults(pool_.get()); + } + + void verifyResults( + const std::vector& actual, + const std::vector& expected) { + const auto actualSorted = sortData(actual); + const auto expectedSorted = sortData(expected); + exec::test::assertEqualResults({actualSorted}, {expectedSorted}); + } + + // Returns the partition keys set of the input 'vectors'. + std::set partitionKeys(const std::vector& vectors) { + DecodedVector partitionColumnDecoder; + std::set keys; + for (const auto& vector : vectors) { + partitionColumnDecoder.decode(*vector->childAt(partitionChannel_)); + for (auto i = 0; i < partitionColumnDecoder.size(); ++i) { + keys.insert(partitionColumnDecoder.valueAt(i)); + } + } + return keys; + } + + // Verifies the partition keys of the consumer inputs from 'controller' are + // disjoint. + void verifyDisjointPartitionKeys(TestExchangeController* controller) { + std::set allPartitionKeys; + for (const auto& consumerInput : controller->consumerInputs()) { + std::set consumerPartitionKeys = partitionKeys(consumerInput); + std::set diffPartitionKeys; + std::set_difference( + consumerPartitionKeys.begin(), + consumerPartitionKeys.end(), + allPartitionKeys.begin(), + allPartitionKeys.end(), + std::inserter(diffPartitionKeys, diffPartitionKeys.end())); + ASSERT_EQ(diffPartitionKeys.size(), consumerPartitionKeys.size()) + << "diffPartitionKeys: " << folly::join(",", diffPartitionKeys) + << " consumerPartitionKeys: " + << folly::join(",", consumerPartitionKeys) + << ", allPartitionKeys: " << folly::join(",", allPartitionKeys); + allPartitionKeys = diffPartitionKeys; + } + } + + const column_index_t partitionChannel_{1}; + folly::Random::DefaultGenerator rng_; + RowTypePtr rowType_; +}; + +TEST_F(ScaleWriterLocalPartitionTest, unpartitionBasic) { + const std::vector inputVectors = makeVectors(32, 1024); + const uint64_t queryCapacity = 256 << 20; + const uint32_t maxDrivers = 32; + const uint32_t maxExchanegBufferSize = 2 << 20; + + struct { + uint32_t numProducers; + uint32_t numConsumers; + uint64_t rebalanceProcessBytesThreshold; + double scaleWriterRebalanceMaxMemoryUsageRatio; + uint64_t holdBufferBytes; + double producerBufferedMemoryRatio; + double consumerBufferedMemoryRatio; + bool expectedRebalance; + + std::string debugString() const { + return fmt::format( + "numProducers {}, numConsumers {}, rebalanceProcessBytesThreshold {}, scaleWriterRebalanceMaxMemoryUsageRatio {}, holdBufferBytes {}, producerBufferedMemoryRatio {}, consumerBufferedMemoryRatio {}, expectedRebalance {}", + numProducers, + numConsumers, + succinctBytes(rebalanceProcessBytesThreshold), + scaleWriterRebalanceMaxMemoryUsageRatio, + succinctBytes(holdBufferBytes), + producerBufferedMemoryRatio, + consumerBufferedMemoryRatio, + expectedRebalance); + } + } testSettings[] = { + {1, 1, 0, 1.0, 0, 0.8, 0.6, false}, + {4, 1, 0, 1.0, 0, 0.8, 0.6, false}, + {1, 4, 1ULL << 30, 1.0, 0, 0.8, 0.6, false}, + {4, 4, 1ULL << 30, 1.0, 0, 0.8, 0.6, false}, + {1, 4, 0, 1.0, 0, 0.3, 0.2, false}, + {4, 4, 0, 1.0, 0, 0.3, 0.2, false}, + {1, 4, 0, 0.1, queryCapacity / 2, 0.8, 0.6, false}, + {4, 4, 0, 0.1, queryCapacity / 2, 0.8, 0.6, false}, + {1, 4, 0, 1.0, 0, 0.8, 0.6, true}, + {4, 4, 0, 1.0, 0, 0.8, 0.6, true}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + ASSERT_LE(testData.numProducers, maxDrivers); + ASSERT_LE(testData.numConsumers, maxDrivers); + + Operator::unregisterAllOperators(); + + auto testController = std::make_shared( + testData.numProducers, + testData.numConsumers, + testData.holdBufferBytes, + testData.producerBufferedMemoryRatio, + testData.consumerBufferedMemoryRatio, + std::nullopt, + std::nullopt, + inputVectors); + Operator::registerOperator( + std::make_unique(testController)); + Operator::registerOperator( + std::make_unique(testController)); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId exchnangeNodeId; + auto plan = PlanBuilder(planNodeIdGenerator) + .addNode([&](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, rowType_); + }) + .scaleWriterlocalPartitionRoundRobin() + .capturePlanNodeId(exchnangeNodeId) + .addNode([](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, input); + }) + .planNode(); + testController->setExchangeNodeId(exchnangeNodeId); + + AssertQueryBuilder queryBuilder(plan); + std::shared_ptr task; + const auto result = + // Consumer and producer have overload the max drivers of their + // associated pipelines. We set max driver for the query to make sure it + // is larger than the customiized driver count for consumer and + // producer. + queryBuilder.maxDrivers(maxDrivers) + .maxQueryCapacity(queryCapacity) + .config( + core::QueryConfig::kMaxLocalExchangeBufferSize, + std::to_string(maxExchanegBufferSize)) + .config( + core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + std::to_string( + testData.scaleWriterRebalanceMaxMemoryUsageRatio)) + .config( + core::QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + std::to_string(testData.rebalanceProcessBytesThreshold)) + .copyResults(pool_.get(), task); + uint32_t nonEmptyConsumers{0}; + for (const auto& consumerInput : testController->consumerInputs()) { + if (!consumerInput.empty()) { + ++nonEmptyConsumers; + } + } + auto planStats = toPlanStats(task->taskStats()); + if (testData.expectedRebalance) { + ASSERT_GT( + planStats.at(exchnangeNodeId) + .customStats.at(ScaleWriterLocalPartition::kScaledWriters) + .sum, + 0); + ASSERT_LE( + planStats.at(exchnangeNodeId) + .customStats.at(ScaleWriterLocalPartition::kScaledWriters) + .sum, + planStats.at(exchnangeNodeId) + .customStats.at(ScaleWriterLocalPartition::kScaledWriters) + .count * + (testData.numConsumers - 1)); + ASSERT_GT(nonEmptyConsumers, 1); + } else { + ASSERT_EQ( + planStats.at(exchnangeNodeId) + .customStats.count(ScaleWriterLocalPartition::kScaledWriters), + 0); + ASSERT_EQ(nonEmptyConsumers, 1); + } + + testController->clear(); + task.reset(); + + verifyResults(inputVectors, {result}); + waitForAllTasksToBeDeleted(); + } +} + +TEST_F(ScaleWriterLocalPartitionTest, unpartitionFuzzer) { + const std::vector inputVectors = makeVectors(256, 512); + const uint64_t queryCapacity = 256 << 20; + const uint32_t maxDrivers = 32; + const uint32_t maxExchanegBufferSize = 2 << 20; + + for (bool fastConsumer : {false, true}) { + SCOPED_TRACE(fmt::format("fastConsumer: {}", fastConsumer)); + Operator::unregisterAllOperators(); + + auto testController = std::make_shared( + fastConsumer ? 1 : 4, + 4, + 0, + std::nullopt, + std::nullopt, + fastConsumer ? 4 : 64, + fastConsumer ? 64 : 4, + inputVectors, + /*keepConsumerInput=*/false); + Operator::registerOperator( + std::make_unique(testController)); + Operator::registerOperator( + std::make_unique(testController)); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId exchnangeNodeId; + auto plan = PlanBuilder(planNodeIdGenerator) + .addNode([&](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, rowType_); + }) + .scaleWriterlocalPartitionRoundRobin() + .capturePlanNodeId(exchnangeNodeId) + .addNode([](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, input); + }) + .planNode(); + testController->setExchangeNodeId(exchnangeNodeId); + + AssertQueryBuilder queryBuilder(plan); + std::shared_ptr task; + const auto result = + // Consumer and producer have overload the max drivers of their + // associated pipelines. We set max driver for the query to make sure it + // is larger than the customiized driver count for consumer and + // producer. + queryBuilder.maxDrivers(32) + .maxQueryCapacity(queryCapacity) + .config( + core::QueryConfig::kMaxLocalExchangeBufferSize, + std::to_string(maxExchanegBufferSize)) + .config( + core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + "1.0") + .config( + core::QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "256") + .copyResults(pool_.get()); + verifyResults(inputVectors, {result}); + waitForAllTasksToBeDeleted(); + } +} + +TEST_F(ScaleWriterLocalPartitionTest, partitionBasic) { + const uint64_t queryCapacity = 256 << 20; + const uint32_t maxDrivers = 32; + const uint32_t maxExchanegBufferSize = 2 << 20; + + struct { + uint32_t numProducers; + uint32_t numConsumers; + uint32_t numPartitionsPerWriter; + uint64_t rebalanceProcessBytesThreshold; + double scaleWriterRebalanceMaxMemoryUsageRatio; + uint64_t holdBufferBytes; + std::vector partitionKeys; + double producerBufferedMemoryRatio; + double consumerBufferedMemoryRatio; + bool expectedRebalance; + + std::string debugString() const { + return fmt::format( + "numProducers {}, numConsumers {}, numPartitionsPerWriter {}, rebalanceProcessBytesThreshold {}, scaleWriterRebalanceMaxMemoryUsageRatio {}, holdBufferBytes {}, partitionKeys {}, producerBufferedMemoryRatio {}, consumerBufferedMemoryRatio {}, expectedRebalance {}", + numProducers, + numConsumers, + numPartitionsPerWriter, + succinctBytes(rebalanceProcessBytesThreshold), + scaleWriterRebalanceMaxMemoryUsageRatio, + succinctBytes(holdBufferBytes), + folly::join(":", partitionKeys), + producerBufferedMemoryRatio, + consumerBufferedMemoryRatio, + expectedRebalance); + } + } testSettings[] = { + {1, 1, 4, 0, 1.0, 0, {1, 2}, 0.8, 0.6, false}, + {4, 1, 4, 0, 1.0, 0, {1, 2}, 0.8, 0.6, false}, + {1, 4, 4, 1ULL << 30, 1.0, 0, {1, 2}, 0.8, 0.6, false}, + {4, 4, 4, 1ULL << 30, 1.0, 0, {1, 2}, 0.8, 0.6, false}, + {1, 4, 4, 0, 1.0, 0, {1, 2}, 0.3, 0.2, false}, + {4, 4, 4, 0, 1.0, 0, {1, 2}, 0.3, 0.2, false}, + {1, 4, 4, 0, 0.1, queryCapacity / 2, {1, 2}, 0.8, 0.6, false}, + {4, 4, 4, 0, 0.1, queryCapacity / 2, {1, 2}, 0.8, 0.6, false}, + {1, 32, 128, 0, 1.0, 0, {1, 2, 3, 4, 5, 6, 7, 8}, 0.8, 0.6, true}, + {4, 32, 128, 0, 1.0, 0, {1, 2, 3, 4, 5, 6, 7, 8}, 0.8, 0.6, true}, + }; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + ASSERT_LE(testData.numProducers, maxDrivers); + ASSERT_LE(testData.numConsumers, maxDrivers); + + Operator::unregisterAllOperators(); + + const std::vector inputVectors = + makeVectors(32, 2048, testData.partitionKeys); + + auto testController = std::make_shared( + testData.numProducers, + testData.numConsumers, + testData.holdBufferBytes, + testData.producerBufferedMemoryRatio, + testData.consumerBufferedMemoryRatio, + std::nullopt, + std::nullopt, + inputVectors); + Operator::registerOperator( + std::make_unique(testController)); + Operator::registerOperator( + std::make_unique(testController)); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId exchnangeNodeId; + auto plan = PlanBuilder(planNodeIdGenerator) + .addNode([&](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, rowType_); + }) + .scaleWriterlocalPartition({partitionColumnName()}) + .capturePlanNodeId(exchnangeNodeId) + .addNode([](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, input); + }) + .planNode(); + testController->setExchangeNodeId(exchnangeNodeId); + + AssertQueryBuilder queryBuilder(plan); + std::shared_ptr task; + const auto result = + // Consumer and producer have overload the max drivers of their + // associated pipelines. We set max driver for the query to make sure it + // is larger than the customiized driver count for consumer and + // producer. + queryBuilder.maxDrivers(maxDrivers) + .maxQueryCapacity(queryCapacity) + .config( + core::QueryConfig::kMaxLocalExchangeBufferSize, + std::to_string(maxExchanegBufferSize)) + .config( + core::QueryConfig::kScaleWriterMaxPartitionsPerWriter, + std::to_string(testData.numPartitionsPerWriter)) + .config( + core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + std::to_string( + testData.scaleWriterRebalanceMaxMemoryUsageRatio)) + .config( + core::QueryConfig:: + kScaleWriterMinProcessedBytesRebalanceThreshold, + std::to_string(testData.rebalanceProcessBytesThreshold)) + .config( + core::QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") + .copyResults(pool_.get(), task); + auto planStats = toPlanStats(task->taskStats()); + if (testData.expectedRebalance) { + ASSERT_GT( + planStats.at(exchnangeNodeId) + .customStats + .at(ScaleWriterPartitioningLocalPartition::kScaledPartitions) + .sum, + 0); + ASSERT_GT( + planStats.at(exchnangeNodeId) + .customStats + .at(ScaleWriterPartitioningLocalPartition::kRebalanceTriggers) + .sum, + 0); + } else { + ASSERT_EQ( + planStats.at(exchnangeNodeId) + .customStats.count( + ScaleWriterPartitioningLocalPartition::kScaledPartitions), + 0); + ASSERT_EQ( + planStats.at(exchnangeNodeId) + .customStats.count( + ScaleWriterPartitioningLocalPartition::kRebalanceTriggers), + 0); + verifyDisjointPartitionKeys(testController.get()); + } + testController->clear(); + task.reset(); + + verifyResults(inputVectors, {result}); + waitForAllTasksToBeDeleted(); + } +} + +TEST_F(ScaleWriterLocalPartitionTest, partitionFuzzer) { + const std::vector inputVectors = + makeVectors(1024, 256, {1, 2, 3, 4, 5, 6, 7, 8}); + const uint64_t queryCapacity = 256 << 20; + const uint32_t maxDrivers = 32; + const uint32_t maxExchanegBufferSize = 2 << 20; + + for (bool fastConsumer : {false, true}) { + SCOPED_TRACE(fmt::format("fastConsumer: {}", fastConsumer)); + Operator::unregisterAllOperators(); + + auto testController = std::make_shared( + fastConsumer ? 1 : 4, + 4, + 0, + std::nullopt, + std::nullopt, + fastConsumer ? 4 : 64, + fastConsumer ? 64 : 4, + inputVectors, + /*keepConsumerInput=*/false); + Operator::registerOperator( + std::make_unique(testController)); + Operator::registerOperator( + std::make_unique(testController)); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId exchnangeNodeId; + auto plan = PlanBuilder(planNodeIdGenerator) + .addNode([&](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, rowType_); + }) + .scaleWriterlocalPartition({partitionColumnName()}) + .capturePlanNodeId(exchnangeNodeId) + .addNode([](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, input); + }) + .planNode(); + testController->setExchangeNodeId(exchnangeNodeId); + + AssertQueryBuilder queryBuilder(plan); + std::shared_ptr task; + const auto result = + // Consumer and producer have overload the max drivers of their + // associated pipelines. We set max driver for the query to make sure it + // is larger than the customiized driver count for consumer and + // producer. + queryBuilder.maxDrivers(32) + .maxQueryCapacity(queryCapacity) + .config( + core::QueryConfig::kMaxLocalExchangeBufferSize, + std::to_string(maxExchanegBufferSize)) + .config( + core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + "1.0") + .config( + core::QueryConfig:: + kScaleWriterMinProcessedBytesRebalanceThreshold, + "256") + .config( + core::QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "32") + .copyResults(pool_.get()); + verifyResults(inputVectors, {result}); + waitForAllTasksToBeDeleted(); + } +} diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index bd16261975f66..b34d33245f858 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -27,7 +27,6 @@ #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" -#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include @@ -156,8 +155,10 @@ struct TestParam { HiveBucketProperty::Kind bucketKind, bool bucketSort, bool multiDrivers, - CompressionKind compressionKind) { - value = static_cast(compressionKind) << 48 | + CompressionKind compressionKind, + bool scaleWriter) { + value = (scaleWriter ? 1ULL << 56 : 0) | + static_cast(compressionKind) << 48 | static_cast(!!multiDrivers) << 40 | static_cast(fileFormat) << 32 | static_cast(testMode) << 24 | @@ -195,16 +196,21 @@ struct TestParam { return (value & ((1L << 8) - 1)) != 0; } + bool scaleWriter() const { + return (value >> 56) != 0; + } + std::string toString() const { return fmt::format( - "FileFormat[{}] TestMode[{}] commitStrategy[{}] bucketKind[{}] bucketSort[{}] multiDrivers[{}] compression[{}]", + "FileFormat[{}] TestMode[{}] commitStrategy[{}] bucketKind[{}] bucketSort[{}] multiDrivers[{}] compression[{}] scaleWriter[{}]", dwio::common::toString((fileFormat())), testModeString(testMode()), commitStrategyToString(commitStrategy()), HiveBucketProperty::kindString(bucketKind()), bucketSort(), multiDrivers(), - compressionKindToString(compressionKind())); + compressionKindToString(compressionKind()), + scaleWriter()); } }; @@ -219,7 +225,8 @@ class TableWriteTest : public HiveConnectorTestBase { numPartitionedTableWriterCount_( testParam_.multiDrivers() ? kNumPartitionedTableWriterCount : 1), commitStrategy_(testParam_.commitStrategy()), - compressionKind_(testParam_.compressionKind()) { + compressionKind_(testParam_.compressionKind()), + scaleWriter_(testParam_.scaleWriter()) { LOG(INFO) << testParam_.toString(); auto rowType = @@ -281,6 +288,14 @@ class TableWriteTest : public HiveConnectorTestBase { .config( QueryConfig::kTaskPartitionedWriterCount, std::to_string(numPartitionedTableWriterCount_)) + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .splits(splits) .assertResults(duckDbSql); } @@ -297,6 +312,14 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .splits(splits) .assertResults(duckDbSql); } @@ -319,6 +342,14 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .assertResults(duckDbSql); } @@ -335,6 +366,14 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .assertResults(duckDbSql); } @@ -352,6 +391,14 @@ class TableWriteTest : public HiveConnectorTestBase { .config( QueryConfig::kTaskPartitionedWriterCount, std::to_string(numPartitionedTableWriterCount_)) + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .copyResults(pool()); } @@ -368,6 +415,14 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .copyResults(pool()); } @@ -594,97 +649,213 @@ class TableWriteTest : public HiveConnectorTestBase { bool aggregateResult = true, std::shared_ptr aggregationNode = nullptr) { if (numTableWriters == 1) { - auto insertPlan = inputPlan - .addNode(addTableWriter( - inputRowType, - tableRowType->names(), - aggregationNode, - createInsertTableHandle( - tableRowType, - outputTableType, - outputDirectoryPath, - partitionedBy, - bucketProperty, - compressionKind), - false, - outputCommitStrategy)) - .capturePlanNodeId(tableWriteNodeId_); - if (aggregateResult) { - insertPlan.project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format( - "sum({})", TableWriteTraits::rowCountColumnName())}); - } - return insertPlan.planNode(); + return createInsertPlanWithSingleWriter( + inputPlan, + inputRowType, + tableRowType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind, + outputTableType, + outputCommitStrategy, + aggregateResult, + aggregationNode); } else if (bucketProperty_ == nullptr) { - auto insertPlan = inputPlan.localPartitionRoundRobin() - .addNode(addTableWriter( - inputRowType, - tableRowType->names(), - nullptr, - createInsertTableHandle( - tableRowType, - outputTableType, - outputDirectoryPath, - partitionedBy, - bucketProperty, - compressionKind), - false, - outputCommitStrategy)) - .capturePlanNodeId(tableWriteNodeId_) - .localPartition(std::vector{}) - .tableWriteMerge(); - if (aggregateResult) { - insertPlan.project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format( - "sum({})", TableWriteTraits::rowCountColumnName())}); - } - return insertPlan.planNode(); + return createInsertPlanWithForNonBucketedTable( + inputPlan, + inputRowType, + tableRowType, + outputDirectoryPath, + partitionedBy, + compressionKind, + outputTableType, + outputCommitStrategy, + aggregateResult, + aggregationNode); } else { - // Since we might do column rename, so generate bucket property based on - // the data type from 'inputPlan'. - std::vector bucketColumns; - bucketColumns.reserve(bucketProperty->bucketedBy().size()); - for (int i = 0; i < bucketProperty->bucketedBy().size(); ++i) { - bucketColumns.push_back(inputRowType->names()[tableRowType->getChildIdx( - bucketProperty->bucketedBy()[i])]); + return createInsertPlanForBucketTable( + inputPlan, + inputRowType, + tableRowType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind, + outputTableType, + outputCommitStrategy, + aggregateResult, + aggregationNode); + } + } + + PlanNodePtr createInsertPlanWithSingleWriter( + PlanBuilder& inputPlan, + const RowTypePtr& inputRowType, + const RowTypePtr& tableRowType, + const std::string& outputDirectoryPath, + const std::vector& partitionedBy, + std::shared_ptr bucketProperty, + const std::optional compressionKind, + const connector::hive::LocationHandle::TableType& outputTableType, + const CommitStrategy& outputCommitStrategy, + bool aggregateResult, + std::shared_ptr aggregationNode) { + const bool addScaleWriterExchange = + scaleWriter_ && (bucketProperty != nullptr); + auto insertPlan = inputPlan; + if (addScaleWriterExchange) { + if (!partitionedBy.empty()) { + insertPlan.scaleWriterlocalPartition( + inputColumnNames(partitionedBy, tableRowType, inputRowType)); + } else { + insertPlan.scaleWriterlocalPartitionRoundRobin(); } - auto localPartitionBucketProperty = std::make_shared( - bucketProperty->kind(), - bucketProperty->bucketCount(), - bucketColumns, - bucketProperty->bucketedTypes(), - bucketProperty->sortedBy()); - auto insertPlan = - inputPlan.localPartitionByBucket(localPartitionBucketProperty) - .addNode(addTableWriter( - inputRowType, - tableRowType->names(), - nullptr, - createInsertTableHandle( - tableRowType, - outputTableType, - outputDirectoryPath, - partitionedBy, - bucketProperty, - compressionKind), - false, - outputCommitStrategy)) - .capturePlanNodeId(tableWriteNodeId_) - .localPartition({}) - .tableWriteMerge(); - if (aggregateResult) { - insertPlan.project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format( - "sum({})", TableWriteTraits::rowCountColumnName())}); + } + insertPlan + .addNode(addTableWriter( + inputRowType, + tableRowType->names(), + aggregationNode, + createInsertTableHandle( + tableRowType, + outputTableType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind), + false, + outputCommitStrategy)) + .capturePlanNodeId(tableWriteNodeId_); + if (addScaleWriterExchange) { + if (!partitionedBy.empty()) { + insertPlan.scaleWriterlocalPartition( + inputColumnNames(partitionedBy, tableRowType, inputRowType)); + } else { + insertPlan.scaleWriterlocalPartitionRoundRobin(); + } + } + if (aggregateResult) { + insertPlan.project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}); + } + return insertPlan.planNode(); + } + + PlanNodePtr createInsertPlanForBucketTable( + PlanBuilder& inputPlan, + const RowTypePtr& inputRowType, + const RowTypePtr& tableRowType, + const std::string& outputDirectoryPath, + const std::vector& partitionedBy, + std::shared_ptr bucketProperty, + const std::optional compressionKind, + const connector::hive::LocationHandle::TableType& outputTableType, + const CommitStrategy& outputCommitStrategy, + bool aggregateResult, + std::shared_ptr aggregationNode) { + // Since we might do column rename, so generate bucket property based on + // the data type from 'inputPlan'. + std::vector bucketColumns; + bucketColumns.reserve(bucketProperty->bucketedBy().size()); + for (int i = 0; i < bucketProperty->bucketedBy().size(); ++i) { + bucketColumns.push_back(inputRowType->names()[tableRowType->getChildIdx( + bucketProperty->bucketedBy()[i])]); + } + auto localPartitionBucketProperty = std::make_shared( + bucketProperty->kind(), + bucketProperty->bucketCount(), + bucketColumns, + bucketProperty->bucketedTypes(), + bucketProperty->sortedBy()); + auto insertPlan = + inputPlan.localPartitionByBucket(localPartitionBucketProperty) + .addNode(addTableWriter( + inputRowType, + tableRowType->names(), + nullptr, + createInsertTableHandle( + tableRowType, + outputTableType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind), + false, + outputCommitStrategy)) + .capturePlanNodeId(tableWriteNodeId_) + .localPartition({}) + .tableWriteMerge(); + if (aggregateResult) { + insertPlan.project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}); + } + return insertPlan.planNode(); + } + + // Return the corresponding column names in 'inputRowType' of + // 'tableColumnNames' from 'tableRowType'. + static std::vector inputColumnNames( + const std::vector& tableColumnNames, + const RowTypePtr& tableRowType, + const RowTypePtr& inputRowType) { + std::vector inputNames; + inputNames.reserve(tableColumnNames.size()); + for (const auto& tableColumnName : tableColumnNames) { + const auto columnIdx = tableRowType->getChildIdx(tableColumnName); + inputNames.push_back(inputRowType->nameOf(columnIdx)); + } + return inputNames; + } + + PlanNodePtr createInsertPlanWithForNonBucketedTable( + PlanBuilder& inputPlan, + const RowTypePtr& inputRowType, + const RowTypePtr& tableRowType, + const std::string& outputDirectoryPath, + const std::vector& partitionedBy, + const std::optional compressionKind, + const connector::hive::LocationHandle::TableType& outputTableType, + const CommitStrategy& outputCommitStrategy, + bool aggregateResult, + std::shared_ptr aggregationNode) { + auto insertPlan = inputPlan; + if (scaleWriter_) { + if (!partitionedBy.empty()) { + insertPlan.scaleWriterlocalPartition( + inputColumnNames(partitionedBy, tableRowType, inputRowType)); + } else { + insertPlan.scaleWriterlocalPartitionRoundRobin(); } - return insertPlan.planNode(); } + insertPlan + .addNode(addTableWriter( + inputRowType, + tableRowType->names(), + nullptr, + createInsertTableHandle( + tableRowType, + outputTableType, + outputDirectoryPath, + partitionedBy, + nullptr, + compressionKind), + false, + outputCommitStrategy)) + .capturePlanNodeId(tableWriteNodeId_) + .localPartition(std::vector{}) + .tableWriteMerge(); + if (aggregateResult) { + insertPlan.project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}); + } + return insertPlan.planNode(); } // Parameter partitionName is string formatted in the Hive style @@ -1032,6 +1203,7 @@ class TableWriteTest : public HiveConnectorTestBase { RowTypePtr tableSchema_; CommitStrategy commitStrategy_; std::optional compressionKind_; + bool scaleWriter_; std::vector partitionedBy_; std::vector partitionTypes_; std::vector partitionChannels_; @@ -1141,60 +1313,68 @@ class PartitionedTableWriterTest } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); + for (bool scaleWriter : {false, true}) { + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + } } } return testParams; @@ -1216,24 +1396,28 @@ class UnpartitionedTableWriterTest } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { - testParams.push_back(TestParam{ - fileFormat, - TestMode::kUnpartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_NONE} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kUnpartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_NONE} - .value); + for (bool scaleWriter : {false, true}) { + testParams.push_back(TestParam{ + fileFormat, + TestMode::kUnpartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_NONE, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kUnpartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_NONE, + scaleWriter} + .value); + } } } return testParams; @@ -1265,7 +1449,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kHiveCompatible, false, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1274,7 +1459,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kHiveCompatible, true, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1283,7 +1469,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kHiveCompatible, false, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1292,7 +1479,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kHiveCompatible, true, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1301,7 +1489,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kPrestoNative, false, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1310,7 +1499,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kPrestoNative, true, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1319,7 +1509,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kPrestoNative, false, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1328,7 +1519,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kPrestoNative, true, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); } } @@ -1360,7 +1552,8 @@ class BucketSortOnlyTableWriterTest HiveBucketProperty::Kind::kHiveCompatible, true, multiDrivers, - facebook::velox::common::CompressionKind_ZSTD} + facebook::velox::common::CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1369,7 +1562,8 @@ class BucketSortOnlyTableWriterTest HiveBucketProperty::Kind::kHiveCompatible, true, multiDrivers, - facebook::velox::common::CompressionKind_NONE} + facebook::velox::common::CompressionKind_NONE, + /*scaleWriter=*/false} .value); } } @@ -1393,24 +1587,28 @@ class PartitionedWithoutBucketTableWriterTest } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - true, - CompressionKind_ZSTD} - .value); + for (bool scaleWriter : {false, true}) { + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + true, + CompressionKind_ZSTD, + scaleWriter} + .value); + } } } return testParams; @@ -1431,114 +1629,128 @@ class AllTableWriterTest : public TableWriteTest, } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { - testParams.push_back(TestParam{ - fileFormat, - TestMode::kUnpartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kUnpartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kOnlyBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kOnlyBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kOnlyBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kOnlyBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); + for (bool scaleWriter : {false, true}) { + testParams.push_back(TestParam{ + fileFormat, + TestMode::kUnpartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kUnpartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kOnlyBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kOnlyBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kOnlyBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kOnlyBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + } } } return testParams; diff --git a/velox/exec/tests/utils/AssertQueryBuilder.cpp b/velox/exec/tests/utils/AssertQueryBuilder.cpp index 554ee2bb34838..427193fa67c52 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.cpp +++ b/velox/exec/tests/utils/AssertQueryBuilder.cpp @@ -57,6 +57,12 @@ AssertQueryBuilder& AssertQueryBuilder::maxDrivers(int32_t maxDrivers) { return *this; } +AssertQueryBuilder& AssertQueryBuilder::maxQueryCapacity( + int64_t maxQueryCapacity) { + params_.maxQueryCapacity = maxQueryCapacity; + return *this; +} + AssertQueryBuilder& AssertQueryBuilder::destination(int32_t destination) { params_.destination = destination; return *this; @@ -256,15 +262,19 @@ AssertQueryBuilder::readCursor() { // NOTE: the destructor of 'executor_' will wait for all the async task // activities to finish on AssertQueryBuilder dtor. static std::atomic cursorQueryId{0}; + const std::string queryId = + fmt::format("TaskCursorQuery_{}", cursorQueryId++); + auto queryPool = memory::memoryManager()->addRootPool( + queryId, params_.maxQueryCapacity); params_.queryCtx = core::QueryCtx::create( executor_.get(), core::QueryConfig({}), std:: unordered_map>{}, cache::AsyncDataCache::getInstance(), + std::move(queryPool), nullptr, - nullptr, - fmt::format("TaskCursorQuery_{}", cursorQueryId++)); + queryId); } } if (!configs_.empty()) { diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index 4d5d4299d177b..b8bdcbb63e36a 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -35,6 +35,9 @@ class AssertQueryBuilder { /// Change requested number of drivers. Default is 1. AssertQueryBuilder& maxDrivers(int32_t maxDrivers); + /// Change the query memory pool capacity. Default has no limit + AssertQueryBuilder& maxQueryCapacity(int64_t maxCapacity); + /// Change task's 'destination', the partition number assigned to the task. /// Default is 0. AssertQueryBuilder& destination(int32_t destination); diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index 9c8cfaf7d7fa0..51fe73eb403c4 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -28,23 +28,26 @@ bool waitForTaskDriversToFinish( exec::Task* task, uint64_t maxWaitMicros = 1'000'000); -// Parameters for initializing a TaskCursor or RowCursor. +/// Parameters for initializing a TaskCursor or RowCursor. struct CursorParameters { - // Root node of the plan tree + /// Root node of the plan tree std::shared_ptr planNode; - int32_t destination = 0; + int32_t destination{0}; - // Maximum number of drivers per pipeline. - int32_t maxDrivers = 1; + /// Maximum number of drivers per pipeline. + int32_t maxDrivers{1}; - // Maximum number of split groups processed concurrently. - int32_t numConcurrentSplitGroups = 1; + /// The max capacity of the query memory pool. + int64_t maxQueryCapacity{memory::kMaxMemory}; - // Optional, created if not present. + /// Maximum number of split groups processed concurrently. + int32_t numConcurrentSplitGroups{1}; + + /// Optional, created if not present. std::shared_ptr queryCtx; - uint64_t bufferedBytes = 512 * 1024; + uint64_t bufferedBytes{512 * 1024}; /// Ungrouped (by default) or grouped (bucketed) execution. core::ExecutionStrategy executionStrategy{ @@ -57,17 +60,18 @@ struct CursorParameters { /// ungrouped execution. int numSplitGroups{1}; - /// Spilling directory, if not empty, then the task's spilling directory would - /// be built from it. + /// Spilling directory, if not empty, then the task's spilling directory + /// would be built from it. std::string spillDirectory; bool copyResult = true; - /// If true, use serial execution mode. Use parallel execution mode otherwise. + /// If true, use serial execution mode. Use parallel execution mode + /// otherwise. bool serialExecution = false; - /// If both 'queryConfigs' and 'queryCtx' are specified, the configurations in - /// 'queryCtx' will be overridden by 'queryConfig'. + /// If both 'queryConfigs' and 'queryCtx' are specified, the configurations + /// in 'queryCtx' will be overridden by 'queryConfig'. std::unordered_map queryConfigs; }; diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 76a7301e4d58a..e2eeceda311a6 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -1175,6 +1175,7 @@ RowTypePtr rename( core::PlanNodePtr createLocalPartitionNode( const core::PlanNodeId& planNodeId, const std::vector& keys, + bool scaleWriter, const std::vector& sources, memory::MemoryPool* pool) { auto partitionFunctionFactory = @@ -1183,6 +1184,7 @@ core::PlanNodePtr createLocalPartitionNode( planNodeId, keys.empty() ? core::LocalPartitionNode::Type::kGather : core::LocalPartitionNode::Type::kRepartition, + scaleWriter, partitionFunctionFactory, sources); } @@ -1271,7 +1273,11 @@ PlanBuilder& PlanBuilder::localPartition( const std::vector& sources) { VELOX_CHECK_NULL(planNode_, "localPartition() must be the first call"); planNode_ = createLocalPartitionNode( - nextPlanNodeId(), exprs(keys, sources[0]->outputType()), sources, pool_); + nextPlanNodeId(), + exprs(keys, sources[0]->outputType()), + /*scaleWriter=*/false, + sources, + pool_); return *this; } @@ -1279,6 +1285,18 @@ PlanBuilder& PlanBuilder::localPartition(const std::vector& keys) { planNode_ = createLocalPartitionNode( nextPlanNodeId(), exprs(keys, planNode_->outputType()), + /*scaleWriter=*/false, + {planNode_}, + pool_); + return *this; +} + +PlanBuilder& PlanBuilder::scaleWriterlocalPartition( + const std::vector& keys) { + planNode_ = createLocalPartitionNode( + nextPlanNodeId(), + exprs(keys, planNode_->outputType()), + /*scaleWriter=*/true, {planNode_}, pool_); return *this; @@ -1294,6 +1312,7 @@ PlanBuilder& PlanBuilder::localPartition( planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, + /*scaleWriter=*/false, std::move(hivePartitionFunctionFactory), std::vector{planNode_}); return *this; @@ -1316,6 +1335,7 @@ PlanBuilder& PlanBuilder::localPartitionByBucket( planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, + /*scaleWriter=*/false, std::move(hivePartitionFunctionFactory), std::vector{planNode_}); return *this; @@ -1324,10 +1344,12 @@ PlanBuilder& PlanBuilder::localPartitionByBucket( namespace { core::PlanNodePtr createLocalPartitionRoundRobinNode( const core::PlanNodeId& planNodeId, + bool scaleWriter, const std::vector& sources) { return std::make_shared( planNodeId, core::LocalPartitionNode::Type::kRepartition, + scaleWriter, std::make_shared(), sources); } @@ -1337,12 +1359,20 @@ PlanBuilder& PlanBuilder::localPartitionRoundRobin( const std::vector& sources) { VELOX_CHECK_NULL( planNode_, "localPartitionRoundRobin() must be the first call"); - planNode_ = createLocalPartitionRoundRobinNode(nextPlanNodeId(), sources); + planNode_ = createLocalPartitionRoundRobinNode( + nextPlanNodeId(), /*scaleWriter=*/false, sources); return *this; } PlanBuilder& PlanBuilder::localPartitionRoundRobin() { - planNode_ = createLocalPartitionRoundRobinNode(nextPlanNodeId(), {planNode_}); + planNode_ = createLocalPartitionRoundRobinNode( + nextPlanNodeId(), /*scaleWriter=*/false, {planNode_}); + return *this; +} + +PlanBuilder& PlanBuilder::scaleWriterlocalPartitionRoundRobin() { + planNode_ = createLocalPartitionRoundRobinNode( + nextPlanNodeId(), /*scaleWriter=*/true, {planNode_}); return *this; } @@ -1399,6 +1429,7 @@ PlanBuilder& PlanBuilder::localPartitionRoundRobinRow() { planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, + /*scaleWriter=*/false, std::make_shared(), std::vector{planNode_}); return *this; diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index d47930e430404..264df5d2ea632 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -872,6 +872,14 @@ class PlanBuilder { /// current plan node). PlanBuilder& localPartitionRoundRobin(); + /// A convenience method to add a LocalPartitionNode for scale writer with + /// hash partitioning. + PlanBuilder& scaleWriterlocalPartition(const std::vector& keys); + + /// A convenience method to add a LocalPartitionNode for scale writer with + /// round-robin partitioning. + PlanBuilder& scaleWriterlocalPartitionRoundRobin(); + /// Add a LocalPartitionNode to partition the input using row-wise /// round-robin. Number of partitions is determined at runtime based on /// parallelism of the downstream pipeline.