diff --git a/velox/common/base/SkewedPartitionBalancer.h b/velox/common/base/SkewedPartitionBalancer.h index 592d50c8711e6..6c3c00a7cc68e 100644 --- a/velox/common/base/SkewedPartitionBalancer.h +++ b/velox/common/base/SkewedPartitionBalancer.h @@ -60,7 +60,6 @@ class SkewedPartitionRebalancer { /// processed bytes of a partition. void addPartitionRowCount(uint32_t partition, uint32_t numRows) { VELOX_CHECK_LT(partition, partitionCount_); - VELOX_CHECK_GT(numRows, 0); partitionRowCount_[partition] += numRows; } diff --git a/velox/common/base/tests/SkewedPartitionBalancerTest.cpp b/velox/common/base/tests/SkewedPartitionBalancerTest.cpp index 99281f2db522f..2b50892631397 100644 --- a/velox/common/base/tests/SkewedPartitionBalancerTest.cpp +++ b/velox/common/base/tests/SkewedPartitionBalancerTest.cpp @@ -320,7 +320,7 @@ TEST_F(SkewedPartitionRebalancerTest, error) { auto balancer = createBalancer(32, 4, 128, 256); VELOX_ASSERT_THROW(balancer->addProcessedBytes(0), ""); VELOX_ASSERT_THROW(balancer->addPartitionRowCount(32, 4), ""); - VELOX_ASSERT_THROW(balancer->addPartitionRowCount(0, 0), ""); + balancer->addPartitionRowCount(0, 0); VELOX_ASSERT_THROW(createBalancer(0, 4, 128, 256), ""); VELOX_ASSERT_THROW(createBalancer(0, 4, 0, 0), ""); } diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index e46ac2bf2f3b8..ba30a0c19e758 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -64,7 +64,7 @@ uint32_t HiveConfig::maxPartitionsPerWriters( const config::ConfigBase* session) const { return session->get( kMaxPartitionsPerWritersSession, - config_->get(kMaxPartitionsPerWriters, 100)); + config_->get(kMaxPartitionsPerWriters, 128)); } bool HiveConfig::immutablePartitions() const { diff --git a/velox/connectors/hive/tests/HiveConfigTest.cpp b/velox/connectors/hive/tests/HiveConfigTest.cpp index 4151ac89693c6..5bc2932e911cf 100644 --- a/velox/connectors/hive/tests/HiveConfigTest.cpp +++ b/velox/connectors/hive/tests/HiveConfigTest.cpp @@ -31,7 +31,7 @@ TEST(HiveConfigTest, defaultConfig) { hiveConfig.insertExistingPartitionsBehavior(emptySession.get()), facebook::velox::connector::hive::HiveConfig:: InsertExistingPartitionsBehavior::kError); - ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 100); + ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(emptySession.get()), 128); ASSERT_EQ(hiveConfig.immutablePartitions(), false); ASSERT_EQ(hiveConfig.gcsEndpoint(), ""); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), ""); @@ -172,7 +172,7 @@ TEST(HiveConfigTest, overrideSession) { hiveConfig.insertExistingPartitionsBehavior(session.get()), facebook::velox::connector::hive::HiveConfig:: InsertExistingPartitionsBehavior::kOverwrite); - ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 100); + ASSERT_EQ(hiveConfig.maxPartitionsPerWriters(session.get()), 128); ASSERT_EQ(hiveConfig.immutablePartitions(), false); ASSERT_EQ(hiveConfig.gcsEndpoint(), ""); ASSERT_EQ(hiveConfig.gcsCredentialsPath(), ""); diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index cdb724f1ef0f0..16546128e65af 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -1998,11 +1998,15 @@ void LocalPartitionNode::addDetails(std::stringstream& stream) const { if (type_ != Type::kGather) { stream << " " << partitionFunctionSpec_->toString(); } + if (scaleWriter_) { + stream << " scaleWriter"; + } } folly::dynamic LocalPartitionNode::serialize() const { auto obj = PlanNode::serialize(); obj["type"] = typeName(type_); + obj["scaleWriter"] = scaleWriter_; obj["partitionFunctionSpec"] = partitionFunctionSpec_->serialize(); return obj; } @@ -2014,6 +2018,7 @@ PlanNodePtr LocalPartitionNode::create( return std::make_shared( deserializePlanNodeId(obj), typeFromName(obj["type"].asString()), + obj["scaleWriter"].asBool(), ISerializable::deserialize( obj["partitionFunctionSpec"]), deserializeSources(obj, context)); diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 6f910aab1a425..ecb84d81c769a 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -735,8 +735,8 @@ class TableWriteNode : public PlanNode { hasPartitioningScheme_(hasPartitioningScheme), outputType_(std::move(outputType)), commitStrategy_(commitStrategy) { - VELOX_USER_CHECK_EQ(columns->size(), columnNames.size()); - for (const auto& column : columns->names()) { + VELOX_USER_CHECK_EQ(columns_->size(), columnNames_.size()); + for (const auto& column : columns_->names()) { VELOX_USER_CHECK( source->outputType()->containsChild(column), "Column {} not found in TableWriter input: {}", @@ -1184,13 +1184,31 @@ class LocalPartitionNode : public PlanNode { static Type typeFromName(const std::string& name); +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY LocalPartitionNode( const PlanNodeId& id, Type type, PartitionFunctionSpecPtr partitionFunctionSpec, std::vector sources) + : LocalPartitionNode( + id, + std::move(type), + /*scaleWriter=*/false, + std::move(partitionFunctionSpec), + std::move(sources)) {} +#endif + + /// If 'scaleWriter' is true, the local partition is used to scale the table + /// writer prcessing. + LocalPartitionNode( + const PlanNodeId& id, + Type type, + bool scaleWriter, + PartitionFunctionSpecPtr partitionFunctionSpec, + std::vector sources) : PlanNode(id), type_{type}, + scaleWriter_(scaleWriter), sources_{std::move(sources)}, partitionFunctionSpec_{std::move(partitionFunctionSpec)} { VELOX_USER_CHECK_GT( @@ -1215,10 +1233,16 @@ class LocalPartitionNode : public PlanNode { return std::make_shared( id, Type::kGather, + /*scaleWriter=*/false, std::make_shared(), std::move(sources)); } + /// Returns true if this is for table writer scaling. + bool scaleWriter() const { + return scaleWriter_; + } + Type type() const { return type_; } @@ -1247,6 +1271,7 @@ class LocalPartitionNode : public PlanNode { void addDetails(std::stringstream& stream) const override; const Type type_; + const bool scaleWriter_; const std::vector sources_; const PartitionFunctionSpecPtr partitionFunctionSpec_; }; diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 105062de81308..8bdcd72ebc67f 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -437,6 +437,31 @@ class QueryConfig { static constexpr const char* kSelectiveNimbleReaderEnabled = "selective_nimble_reader_enabled"; + /// The max ratio of a query used memory to its max capacity, and the scale + /// writer exchange stops scaling writer processing if the query's current + /// memory usage exceeds this ratio. The value is in the range of (0, 1]. + static constexpr const char* kScaleWriterRebalanceMaxMemoryUsageRatio = + "scaled_writer_rebalance_max_memory_usage_ratio"; + + /// The max number of logical table partitions that can be assigned to a + /// single table writer thread. The logical table partition is used by local + /// exchange writer for writer scaling, and multiple physical table + /// partitions can be mapped to the same logical table partition based on the + /// hash value of calculated partitioned ids. + static constexpr const char* kScaleWriterMaxPartitionsPerWriter = + "scaled_writer_max_partitions_per_writer"; + + /// Minimum amount of data processed by a logical table partition to trigger + /// writer scaling if it is detected as overloaded by scale wrirer exchange. + static constexpr const char* + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold = + "scaled_writer_min_partition_processed_bytes_rebalance_threshold"; + + /// Minimum amount of data processed by all the logical table partitions to + /// trigger skewed partition rebalancing by scale writer exchange. + static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold = + "scaled_writer_min_processed_bytes_rebalance_threshold"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -819,6 +844,24 @@ class QueryConfig { return get(kPrefixSortMinRows, 130); } + double scaleWriterRebalanceMaxMemoryUsageRatio() const { + return get(kScaleWriterRebalanceMaxMemoryUsageRatio, 0.7); + } + + uint32_t scaleWriterMaxPartitionsPerWriter() const { + return get(kScaleWriterMaxPartitionsPerWriter, 128); + } + + uint64_t scaleWriterMinPartitionProcessedBytesRebalanceThreshold() const { + return get( + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, 128 << 20); + } + + uint64_t scaleWriterMinProcessedBytesRebalanceThreshold() const { + return get( + kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 761ab60436007..e6ce8423b28b0 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -400,7 +400,32 @@ Table Writer * - task_partitioned_writer_count - integer - task_writer_count - - The number of parallel table writer threads per task for partitioned table writes. If not set, use 'task_writer_count' as default. + - The number of parallel table writer threads per task for partitioned + table writes. If not set, use 'task_writer_count' as default. + * - scaled_writer_rebalance_max_memory_usage_ratio + - double + - 0.7 + - The max ratio of a query used memory to its max capacity, and the scale + - writer exchange stops scaling writer processing if the query's current + - memory usage exceeds this ratio. The value is in the range of (0, 1]. + * - scaled_writer_max_partitions_per_writer + - integer + - 128 + - The max number of logical table partitions that can be assigned to a + - single table writer thread. The logical table partition is used by local + - exchange writer for writer scaling, and multiple physical table + - partitions can be mapped to the same logical table partition based on the + - hash value of calculated partitioned ids. + - integer + - 128MB + * - scaled_writer_min_partition_processed_bytes_rebalance_threshold + - Minimum amount of data processed by a logical table partition to trigger + - writer scaling if it is detected as overloaded by scale wrirer exchange. + * - scaled_writer_min_processed_bytes_rebalance_threshold + - Minimum amount of data processed by all the logical table partitions to + - trigger skewed partition rebalancing by scale writer exchange. + - integer + - 256MB Hive Connector -------------- diff --git a/velox/docs/monitoring/metrics.rst b/velox/docs/monitoring/metrics.rst index 6fea60a7f03f4..870b77f2e0ae4 100644 --- a/velox/docs/monitoring/metrics.rst +++ b/velox/docs/monitoring/metrics.rst @@ -438,6 +438,7 @@ Storage * - storage_network_throttled_count - Count - The number of times that storage IOs get throttled in a storage cluster because of network. + Spilling -------- diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index 4958587e7bcfd..f77d2ba824ca0 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -104,6 +104,17 @@ These stats are reported only by TableWriter operator * - earlyFlushedRawBytes - bytes - Number of bytes pre-maturely flushed from file writers because of memory reclaiming. + * - rebalanceTriggers + - + - The number of times that we triggers the rebalance of table partitions + for a non-bucketed partition table. + * - scaledPartitions + - + - The number of times that we scale a partition processing for a + non-bucketed partition table. + * - scaledWriters + - + - The number of times that we scale writers for a non-partitioned table. Spilling -------- diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 73ca666358956..25882fd44f146 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -74,6 +74,7 @@ velox_add_library( RowsStreamingWindowBuild.cpp RowContainer.cpp RowNumber.cpp + ScaleWriterLocalPartition.cpp SortBuffer.cpp SortedAggregations.cpp SortWindowBuild.cpp diff --git a/velox/exec/LocalPartition.cpp b/velox/exec/LocalPartition.cpp index 2ebb469caf64d..dce6dff0ad38e 100644 --- a/velox/exec/LocalPartition.cpp +++ b/velox/exec/LocalPartition.cpp @@ -166,6 +166,11 @@ bool LocalExchangeQueue::isFinished() { return queue_.withWLock([&](auto& queue) { return isFinishedLocked(queue); }); } +bool LocalExchangeQueue::testingProducersDone() const { + return queue_.withRLock( + [&](auto& queue) { return noMoreProducers_ && pendingProducers_ == 0; }); +} + void LocalExchangeQueue::close() { std::vector consumerPromises; std::vector memoryPromises; @@ -256,54 +261,48 @@ LocalPartition::LocalPartition( for (auto& queue : queues_) { queue->addProducer(); } -} - -namespace { -std::vector allocateIndexBuffers( - const std::vector& sizes, - memory::MemoryPool* pool) { - std::vector indexBuffers; - indexBuffers.reserve(sizes.size()); - for (auto size : sizes) { - indexBuffers.push_back(allocateIndices(size, pool)); + if (numPartitions_ > 0) { + indexBuffers_.resize(numPartitions_); + rawIndices_.resize(numPartitions_); } - return indexBuffers; } -std::vector getRawIndices( - const std::vector& indexBuffers) { - std::vector rawIndices; - rawIndices.reserve(indexBuffers.size()); - for (auto& buffer : indexBuffers) { - rawIndices.emplace_back(buffer->asMutable()); +void LocalPartition::allocateIndexBuffers( + const std::vector& sizes) { + VELOX_CHECK_EQ(indexBuffers_.size(), sizes.size()); + VELOX_CHECK_EQ(rawIndices_.size(), sizes.size()); + + for (auto i = 0; i < sizes.size(); ++i) { + const auto indicesBufferBytes = sizes[i] * sizeof(vector_size_t); + if ((indexBuffers_[i] == nullptr) || + (indexBuffers_[i]->capacity() < indicesBufferBytes) || + !indexBuffers_[i]->unique()) { + indexBuffers_[i] = allocateIndices(sizes[i], pool()); + } else { + const auto indicesBufferBytes = sizes[i] * sizeof(vector_size_t); + indexBuffers_[i]->setSize(indicesBufferBytes); + } + rawIndices_[i] = indexBuffers_[i]->asMutable(); } - return rawIndices; } -RowVectorPtr -wrapChildren(const RowVectorPtr& input, vector_size_t size, BufferPtr indices) { - std::vector wrappedChildren; - wrappedChildren.reserve(input->type()->size()); - for (auto i = 0; i < input->type()->size(); i++) { - wrappedChildren.emplace_back(BaseVector::wrapInDictionary( - BufferPtr(nullptr), indices, size, input->childAt(i))); +RowVectorPtr LocalPartition::wrapChildren( + const RowVectorPtr& input, + vector_size_t size, + BufferPtr indices) { + VELOX_CHECK_EQ(childVectors_.size(), input->type()->size()); + + for (auto i = 0; i < input->type()->size(); ++i) { + childVectors_[i] = BaseVector::wrapInDictionary( + BufferPtr(nullptr), indices, size, input->childAt(i)); } return std::make_shared( - input->pool(), input->type(), BufferPtr(nullptr), size, wrappedChildren); + input->pool(), input->type(), BufferPtr(nullptr), size, childVectors_); } -} // namespace void LocalPartition::addInput(RowVectorPtr input) { - { - auto lockedStats = stats_.wlock(); - lockedStats->addOutputVector(input->estimateFlatSize(), input->size()); - } - - // Lazy vectors must be loaded or processed. - for (auto& child : input->children()) { - child->loadedVector(); - } + prepareForInput(input); if (numPartitions_ == 1) { ContinueFuture future; @@ -334,13 +333,12 @@ void LocalPartition::addInput(RowVectorPtr input) { for (auto i = 0; i < numInput; ++i) { ++maxIndex[partitions_[i]]; } - auto indexBuffers = allocateIndexBuffers(maxIndex, pool()); - auto rawIndices = getRawIndices(indexBuffers); + allocateIndexBuffers(maxIndex); std::fill(maxIndex.begin(), maxIndex.end(), 0); for (auto i = 0; i < numInput; ++i) { auto partition = partitions_[i]; - rawIndices[partition][maxIndex[partition]] = i; + rawIndices_[partition][maxIndex[partition]] = i; ++maxIndex[partition]; } @@ -351,8 +349,7 @@ void LocalPartition::addInput(RowVectorPtr input) { // Do not enqueue empty partitions. continue; } - auto partitionData = - wrapChildren(input, partitionSize, std::move(indexBuffers[i])); + auto partitionData = wrapChildren(input, partitionSize, indexBuffers_[i]); ContinueFuture future; auto reason = queues_[i]->enqueue( partitionData, totalSize * partitionSize / numInput, &future); @@ -363,6 +360,22 @@ void LocalPartition::addInput(RowVectorPtr input) { } } +void LocalPartition::prepareForInput(RowVectorPtr& input) { + { + auto lockedStats = stats_.wlock(); + lockedStats->addOutputVector(input->estimateFlatSize(), input->size()); + } + + // Lazy vectors must be loaded or processed to ensure the late materialized in + // order. + for (auto& child : input->children()) { + child->loadedVector(); + } + if (childVectors_.empty()) { + childVectors_.resize(input->type()->size()); + } +} + BlockingReason LocalPartition::isBlocked(ContinueFuture* future) { if (!futures_.empty()) { auto blockingReason = blockingReasons_.front(); diff --git a/velox/exec/LocalPartition.h b/velox/exec/LocalPartition.h index 75d4098a1915c..a6f4d6bbf5976 100644 --- a/velox/exec/LocalPartition.h +++ b/velox/exec/LocalPartition.h @@ -36,10 +36,20 @@ class LocalExchangeMemoryManager { /// caller to fulfill. std::vector decreaseMemoryUsage(int64_t removed); + /// Returns the maximum buffer size in bytes. + int64_t maxBufferBytes() const { + return maxBufferSize_; + } + + /// Returns the current buffer size in bytes. + int64_t bufferedBytes() const { + return bufferedBytes_; + } + private: const int64_t maxBufferSize_; std::mutex mutex_; - int64_t bufferedBytes_{0}; + tsan_atomic bufferedBytes_{0}; std::vector promises_; }; @@ -89,13 +99,17 @@ class LocalExchangeQueue { /// called before all the data has been processed. No-op otherwise. void close(); + /// Returns true if all producers have sent no more data signal. + bool testingProducersDone() const; + private: using Queue = std::queue>; bool isFinishedLocked(const Queue& queue) const; - std::shared_ptr memoryManager_; + const std::shared_ptr memoryManager_; const int partition_; + folly::Synchronized queue_; // Satisfied when data becomes available or all producers report that they // finished producing, e.g. queue_ is not empty or noMoreProducers_ is true @@ -162,8 +176,8 @@ class LocalPartition : public Operator { return nullptr; } - // Always true but the caller will check isBlocked before adding input, hence - // the blocked state does not accumulate input. + /// Always true but the caller will check isBlocked before adding input, hence + /// the blocked state does not accumulate input. bool needsInput() const override { return true; } @@ -174,7 +188,16 @@ class LocalPartition : public Operator { bool isFinished() override; - private: + protected: + void prepareForInput(RowVectorPtr& input); + + void allocateIndexBuffers(const std::vector& sizes); + + RowVectorPtr wrapChildren( + const RowVectorPtr& input, + vector_size_t size, + BufferPtr indices); + const std::vector> queues_; const size_t numPartitions_; std::unique_ptr partitionFunction_; @@ -184,6 +207,10 @@ class LocalPartition : public Operator { /// Reusable memory for hash calculation. std::vector partitions_; + /// Reusable buffers for input partitioning. + std::vector indexBuffers_; + std::vector rawIndices_; + std::vector childVectors_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index aec3104be3a2c..3645e737b83e8 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -35,7 +35,9 @@ #include "velox/exec/OperatorTraceScan.h" #include "velox/exec/OrderBy.h" #include "velox/exec/PartitionedOutput.h" +#include "velox/exec/RoundRobinPartitionFunction.h" #include "velox/exec/RowNumber.h" +#include "velox/exec/ScaleWriterLocalPartition.h" #include "velox/exec/StreamingAggregation.h" #include "velox/exec/TableScan.h" #include "velox/exec/TableWriteMerge.h" @@ -69,6 +71,23 @@ bool mustStartNewPipeline( return sourceId != 0; } +// Creates the customized local partition operator for table writer scaling. +std::unique_ptr createScaleWriterLocalPartition( + const std::shared_ptr& localPartitionNode, + int32_t operatorId, + DriverCtx* ctx) { + if (dynamic_cast( + &localPartitionNode->partitionFunctionSpec())) { + return std::make_unique( + operatorId, ctx, localPartitionNode); + } + + VELOX_CHECK_NOT_NULL(dynamic_cast( + &localPartitionNode->partitionFunctionSpec())); + return std::make_unique( + operatorId, ctx, localPartitionNode); +} + OperatorSupplier makeConsumerSupplier(ConsumerSupplier consumerSupplier) { if (consumerSupplier) { return [consumerSupplier = std::move(consumerSupplier)]( @@ -98,6 +117,12 @@ OperatorSupplier makeConsumerSupplier( if (auto localPartitionNode = std::dynamic_pointer_cast(planNode)) { + if (localPartitionNode->scaleWriter()) { + return [localPartitionNode](int32_t operatorId, DriverCtx* ctx) { + return createScaleWriterLocalPartition( + localPartitionNode, operatorId, ctx); + }; + } return [localPartitionNode](int32_t operatorId, DriverCtx* ctx) { return std::make_unique( operatorId, ctx, localPartitionNode); diff --git a/velox/exec/ScaleWriterLocalPartition.cpp b/velox/exec/ScaleWriterLocalPartition.cpp new file mode 100644 index 0000000000000..c4995d0975281 --- /dev/null +++ b/velox/exec/ScaleWriterLocalPartition.cpp @@ -0,0 +1,326 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/ScaleWriterLocalPartition.h" + +#include "velox/exec/HashPartitionFunction.h" +#include "velox/exec/RoundRobinPartitionFunction.h" +#include "velox/exec/Task.h" + +namespace facebook::velox::exec { +ScaleWriterPartitioningLocalPartition::ScaleWriterPartitioningLocalPartition( + int32_t operatorId, + DriverCtx* ctx, + const std::shared_ptr& planNode) + : LocalPartition(operatorId, ctx, planNode), + maxQueryMemoryUsageRatio_( + ctx->queryConfig().scaleWriterRebalanceMaxMemoryUsageRatio()), + maxTablePartitionsPerWriter_( + ctx->queryConfig().scaleWriterMaxPartitionsPerWriter()), + numTablePartitions_(maxTablePartitionsPerWriter_ * numPartitions_), + queryPool_(pool()->root()), + tablePartitionRebalancer_(std::make_unique( + numTablePartitions_, + numPartitions_, + ctx->queryConfig() + .scaleWriterMinPartitionProcessedBytesRebalanceThreshold(), + ctx->queryConfig() + .scaleWriterMinProcessedBytesRebalanceThreshold())) { + VELOX_CHECK_GT(maxTablePartitionsPerWriter_, 0); + + writerAssignmentCounts_.resize(numPartitions_, 0); + tablePartitionRowCounts_.resize(numTablePartitions_, 0); + tablePartitionWriterIds_.resize(numTablePartitions_, -1); + tablePartitionWriterIndexes_.resize(numTablePartitions_, 0); + writerAssignmmentIndicesBuffers_.resize(numPartitions_); + rawWriterAssignmmentIndicesBuffers_.resize(numPartitions_); + + // Reset the hash partition function with the number of logical table + // partitions instead of the number of table writers. + // 'tablePartitionRebalancer_' is responsible for maintaining the mapping + // from logical table partition id to the assigned table writer ids. + partitionFunction_ = numPartitions_ == 1 + ? nullptr + : planNode->partitionFunctionSpec().create( + numTablePartitions_, + /*localExchange=*/true); + if (partitionFunction_ != nullptr) { + VELOX_CHECK_NOT_NULL( + dynamic_cast(partitionFunction_.get())); + } +} + +void ScaleWriterPartitioningLocalPartition::initialize() { + LocalPartition::initialize(); + VELOX_CHECK_NULL(memoryManager_); + memoryManager_ = + operatorCtx_->driver()->task()->getLocalExchangeMemoryManager( + operatorCtx_->driverCtx()->splitGroupId, planNodeId()); +} + +void ScaleWriterPartitioningLocalPartition::prepareForWriterAssignments( + vector_size_t numInput) { + const auto maxIndicesBufferBytes = numInput * sizeof(vector_size_t); + for (auto writerId = 0; writerId < numPartitions_; ++writerId) { + if (writerAssignmmentIndicesBuffers_[writerId] == nullptr || + !writerAssignmmentIndicesBuffers_[writerId]->unique() || + writerAssignmmentIndicesBuffers_[writerId]->size() < + maxIndicesBufferBytes) { + writerAssignmmentIndicesBuffers_[writerId] = + allocateIndices(numInput, pool()); + rawWriterAssignmmentIndicesBuffers_[writerId] = + writerAssignmmentIndicesBuffers_[writerId] + ->asMutable(); + } + } + std::fill(writerAssignmentCounts_.begin(), writerAssignmentCounts_.end(), 0); + // Reset the value of partition writer id assignments for the new input. + std::fill( + tablePartitionWriterIds_.begin(), tablePartitionWriterIds_.end(), -1); +} + +void ScaleWriterPartitioningLocalPartition::addInput(RowVectorPtr input) { + prepareForInput(input); + + if (numPartitions_ == 1) { + ContinueFuture future; + auto blockingReason = + queues_[0]->enqueue(input, input->retainedSize(), &future); + if (blockingReason != BlockingReason::kNotBlocked) { + blockingReasons_.push_back(blockingReason); + futures_.push_back(std::move(future)); + } + return; + } + + // Scale up writers when current buffer memory utilization is more than 50% + // of the maximum. This also mean that we won't scale local writers if the + // writing speed can keep up with incoming data. In another word, buffer + // utilization is below 50%. + // + // TODO: investigate using the consumer/producer queue time ratio as + // additional signal to trigger rebalance to avoid unnecessary rebalancing + // when the worker is overloaded which might cause a lot of queuing on both + // producer and consumer sides. The buffered memory ratio is not a reliable + // signal in that case. + if ((memoryManager_->bufferedBytes() > + memoryManager_->maxBufferBytes() * 0.5) && + // Do not scale up if total memory used is greater than + // 'maxQueryMemoryUsageRatio_' of max query memory capacity. We have to be + // conservative here otherwise scaling of writers will happen first + // before we hit the query memory capacity limit, and then we won't be + // able to do anything to prevent query OOM. + queryPool_->reservedBytes() < + queryPool_->maxCapacity() * maxQueryMemoryUsageRatio_) { + tablePartitionRebalancer_->rebalance(); + } + + const auto singlePartition = + partitionFunction_->partition(*input, partitions_); + + const auto numInput = input->size(); + const int64_t totalInputBytes = input->retainedSize(); + // Reset the value of partition row count for the new input. + std::fill( + tablePartitionRowCounts_.begin(), tablePartitionRowCounts_.end(), 0); + + // Assign each row to a writer by looking at logical table partition + // assignments maintained by 'tablePartitionRebalancer_'. + // + // Get partition id which limits to 'tablePartitionCount_'. If there are + // more physical table partitions than the logical 'tablePartitionCount_', + // then it is possible that multiple physical table partitions will get + // assigned the same logical partition id. Thus, multiple table partitions + // will be scaled together since we track the written bytes of a logical table + // partition. + if (singlePartition.has_value()) { + const auto partitionId = singlePartition.value(); + tablePartitionRowCounts_[partitionId] = numInput; + + VELOX_CHECK_EQ(tablePartitionWriterIds_[partitionId], -1); + const auto writerId = getNextWriterId(partitionId); + ContinueFuture future; + auto blockingReason = + queues_[writerId]->enqueue(input, totalInputBytes, &future); + if (blockingReason != BlockingReason::kNotBlocked) { + blockingReasons_.push_back(blockingReason); + futures_.push_back(std::move(future)); + } + } else { + prepareForWriterAssignments(numInput); + + for (auto row = 0; row < numInput; ++row) { + const auto partitionId = partitions_[row]; + ++tablePartitionRowCounts_[partitionId]; + + // Get writer id for this partition by looking at the scaling state. + auto writerId = tablePartitionWriterIds_[partitionId]; + if (writerId == -1) { + writerId = getNextWriterId(partitionId); + tablePartitionWriterIds_[partitionId] = writerId; + } + rawWriterAssignmmentIndicesBuffers_[writerId] + [writerAssignmentCounts_[writerId]++] = + row; + } + + for (auto i = 0; i < numPartitions_; ++i) { + const auto writerRowCount = writerAssignmentCounts_[i]; + if (writerRowCount == 0) { + continue; + } + + auto writerInput = wrapChildren( + input, + writerRowCount, + std::move(writerAssignmmentIndicesBuffers_[i])); + ContinueFuture future; + auto reason = queues_[i]->enqueue( + writerInput, totalInputBytes * writerRowCount / numInput, &future); + if (reason != BlockingReason::kNotBlocked) { + blockingReasons_.push_back(reason); + futures_.push_back(std::move(future)); + } + } + } + + // Only update the scaling state if the memory used is below the + // 'maxQueryMemoryUsageRatio_' limit. Otherwise, if we keep updating the + // scaling state and the memory used is fluctuating around the limit, then we + // could do massive scaling in a single rebalancing cycle which could cause + // query OOM. + if (queryPool_->reservedBytes() < + queryPool_->maxCapacity() * maxQueryMemoryUsageRatio_) { + for (auto tablePartition = 0; tablePartition < numTablePartitions_; + ++tablePartition) { + tablePartitionRebalancer_->addPartitionRowCount( + tablePartition, tablePartitionRowCounts_[tablePartition]); + } + tablePartitionRebalancer_->addProcessedBytes(totalInputBytes); + } +} + +uint32_t ScaleWriterPartitioningLocalPartition::getNextWriterId( + uint32_t partitionId) { + return tablePartitionRebalancer_->getTaskId( + partitionId, tablePartitionWriterIndexes_[partitionId]++); +} + +void ScaleWriterPartitioningLocalPartition::close() { + LocalPartition::close(); + + const auto scaleStats = tablePartitionRebalancer_->stats(); + auto lockedStats = stats_.wlock(); + if (scaleStats.numScaledPartitions != 0) { + lockedStats->addRuntimeStat( + kScaledPartitions, RuntimeCounter(scaleStats.numScaledPartitions)); + } + if (scaleStats.numBalanceTriggers != 0) { + lockedStats->addRuntimeStat( + kRebalanceTriggers, RuntimeCounter(scaleStats.numBalanceTriggers)); + } +} + +ScaleWriterLocalPartition::ScaleWriterLocalPartition( + int32_t operatorId, + DriverCtx* ctx, + const std::shared_ptr& planNode) + : LocalPartition(operatorId, ctx, planNode), + maxQueryMemoryUsageRatio_( + ctx->queryConfig().scaleWriterRebalanceMaxMemoryUsageRatio()), + queryPool_(pool()->root()), + minDataProcessedBytes_( + ctx->queryConfig() + .scaleWriterMinPartitionProcessedBytesRebalanceThreshold()) { + if (partitionFunction_ != nullptr) { + VELOX_CHECK_NOT_NULL( + dynamic_cast(partitionFunction_.get())); + } +} + +void ScaleWriterLocalPartition::initialize() { + LocalPartition::initialize(); + VELOX_CHECK_NULL(memoryManager_); + memoryManager_ = + operatorCtx_->driver()->task()->getLocalExchangeMemoryManager( + operatorCtx_->driverCtx()->splitGroupId, planNodeId()); +} + +void ScaleWriterLocalPartition::addInput(RowVectorPtr input) { + prepareForInput(input); + + const int64_t totalInputBytes = input->retainedSize(); + processedDataBytes_ += totalInputBytes; + + uint32_t writerId = 0; + if (numPartitions_ > 1) { + writerId = getNextWriterId(); + } + VELOX_CHECK_LT(writerId, numPartitions_); + + ContinueFuture future; + auto blockingReason = + queues_[writerId]->enqueue(input, input->retainedSize(), &future); + if (blockingReason != BlockingReason::kNotBlocked) { + blockingReasons_.push_back(blockingReason); + futures_.push_back(std::move(future)); + } +} + +uint32_t ScaleWriterLocalPartition::getNextWriterId() { + VELOX_CHECK_LE(numWriters_, numPartitions_); + VELOX_CHECK_GE(processedDataBytes_, processedBytesAtLastScale_); + + // Scale up writers when current buffer memory utilization is more than 50% + // of the maximum. This also mean that we won't scale local writers if the + // writing speed can keep up with incoming data. In another word, buffer + // utilization is below 50%. + // + // TODO: investigate using the consumer/producer queue time ratio as + // additional signal to trigger rebalance to avoid unnecessary rebalancing + // when the worker is overloaded which might cause a lot of queuing on both + // producer and consumer sides. The buffered memory ratio is not a reliable + // signal in that case. + if ((numWriters_ < numPartitions_) && + (memoryManager_->bufferedBytes() >= + memoryManager_->maxBufferBytes() / 2) && + // Do not scale up if total memory used is greater than + // 'maxQueryMemoryUsageRatio_' of max query memory capacity. We have to be + // conservative here otherwise scaling of writers will happen first + // before we hit the query memory capacity limit, and then we won't be + // able to do anything to prevent query OOM. + (processedDataBytes_ - processedBytesAtLastScale_ >= + numWriters_ * minDataProcessedBytes_) && + (queryPool_->reservedBytes() < + queryPool_->maxCapacity() * maxQueryMemoryUsageRatio_)) { + ++numWriters_; + processedBytesAtLastScale_ = processedDataBytes_; + LOG(INFO) << "Scaled task writer count to: " << numWriters_ + << " with max of " << numPartitions_; + } + return (nextWriterIndex_++) % numWriters_; +} + +void ScaleWriterLocalPartition::close() { + LocalPartition::close(); + + if (numWriters_ == 1) { + return; + } + stats_.wlock()->addRuntimeStat( + kScaledWriters, RuntimeCounter(numWriters_ - 1)); +} +} // namespace facebook::velox::exec diff --git a/velox/exec/ScaleWriterLocalPartition.h b/velox/exec/ScaleWriterLocalPartition.h new file mode 100644 index 0000000000000..03b16517c9f16 --- /dev/null +++ b/velox/exec/ScaleWriterLocalPartition.h @@ -0,0 +1,126 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/base/SkewedPartitionBalancer.h" +#include "velox/exec/LocalPartition.h" + +namespace facebook::velox::exec { + +using namespace facebook::velox::common; + +/// Customized local partition for writer scaling for (non-bucketed) partitioned +/// table write. +class ScaleWriterPartitioningLocalPartition : public LocalPartition { + public: + ScaleWriterPartitioningLocalPartition( + int32_t operatorId, + DriverCtx* ctx, + const std::shared_ptr& planNode); + + std::string toString() const override { + return fmt::format( + "ScaleWriterPartitioningLocalPartition({})", numPartitions_); + } + + void initialize() override; + + void addInput(RowVectorPtr input) override; + + void close() override; + + /// The name of the runtime stats of writer scaling. + /// The number of times that we triggers the rebalance of table partitions. + static inline const std::string kRebalanceTriggers{"rebalanceTriggers"}; + /// The number of times that we scale a partition processing. + static inline const std::string kScaledPartitions{"scaledPartitions"}; + + private: + void prepareForWriterAssignments(vector_size_t numInput); + + uint32_t getNextWriterId(uint32_t partitionId); + + // The max query memory usage ratio before we stop writer scaling. + const double maxQueryMemoryUsageRatio_; + // The max number of logical table partitions that can be assigned to a single + // table writer thread. Multiple physical table partitions can be mapped to + // one logical table partition. + const uint32_t maxTablePartitionsPerWriter_; + // The total number of logical table partitions that can be served by all the + // table writer threads. + const uint32_t numTablePartitions_; + + memory::MemoryPool* const queryPool_; + + // The skewed partition balancer for writer scaling. + const std::unique_ptr tablePartitionRebalancer_; + + std::shared_ptr memoryManager_; + + // Reusable memory for writer assignment processing. + std::vector tablePartitionRowCounts_; + std::vector tablePartitionWriterIds_; + std::vector tablePartitionWriterIndexes_; + + // Reusable memory for writer assignment processing. + std::vector writerAssignmentCounts_; + std::vector writerAssignmmentIndicesBuffers_; + std::vector rawWriterAssignmmentIndicesBuffers_; +}; + +/// Customized local partition for writer scaling for un-partitioned table +/// write. +class ScaleWriterLocalPartition : public LocalPartition { + public: + ScaleWriterLocalPartition( + int32_t operatorId, + DriverCtx* ctx, + const std::shared_ptr& planNode); + + void addInput(RowVectorPtr input) override; + + void initialize() override; + + void close() override; + + /// The name of the runtime stats of writer scaling. + /// The number of scaled writers. + static inline const std::string kScaledWriters{"scaledWriters"}; + + private: + // Gets the writer id to process the next input in a round-robin manner. + uint32_t getNextWriterId(); + + // The max query memory usage ratio before we stop writer scaling. + const double maxQueryMemoryUsageRatio_; + memory::MemoryPool* const queryPool_; + // The minimal amount of processed data bytes before we trigger next writer + // scaling. + const uint64_t minDataProcessedBytes_; + + std::shared_ptr memoryManager_; + + // The number of assigned writers. + uint32_t numWriters_{1}; + // The monotonically increasing writer index to find the next writer id in a + // round-robin manner. + uint32_t nextWriterIndex_{0}; + // The total processed data bytes from all writers. + uint64_t processedDataBytes_{0}; + // The total processed data bytes at the last writer scaling. + uint64_t processedBytesAtLastScale_{0}; +}; +} // namespace facebook::velox::exec diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 4899e4b4ebf89..5feb60e42c1dd 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -272,8 +272,10 @@ void TableWriter::updateStats(const connector::DataSink::Stats& stats) { VELOX_CHECK(stats.spillStats.empty()); return; } - lockedStats->addRuntimeStat( - "numWrittenFiles", RuntimeCounter(stats.numWrittenFiles)); + if (stats.numWrittenFiles != 0) { + lockedStats->addRuntimeStat( + "numWrittenFiles", RuntimeCounter(stats.numWrittenFiles)); + } lockedStats->addRuntimeStat( "writeIOTime", RuntimeCounter( diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 7b10948b53a08..e0ab89a46fcd8 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2587,6 +2587,22 @@ Task::getLocalExchangeQueues( return it->second.queues; } +const std::shared_ptr& +Task::getLocalExchangeMemoryManager( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId) { + auto& splitGroupState = splitGroupStates_[splitGroupId]; + + auto it = splitGroupState.localExchanges.find(planNodeId); + VELOX_CHECK( + it != splitGroupState.localExchanges.end(), + "Incorrect local exchange ID {} for group {}, task {}", + planNodeId, + splitGroupId, + taskId()); + return it->second.memoryManager; +} + void Task::setError(const std::exception_ptr& exception) { TestValue::adjust("facebook::velox::exec::Task::setError", this); { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index d205b15178aff..ddd146a63fae1 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -467,6 +467,11 @@ class Task : public std::enable_shared_from_this { uint32_t splitGroupId, const core::PlanNodeId& planNodeId); + const std::shared_ptr& + getLocalExchangeMemoryManager( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId); + void setError(const std::exception_ptr& exception); void setError(const std::string& message); diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index ccd1d9caa81dd..4bc601ade56ed 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -73,6 +73,7 @@ add_executable( RoundRobinPartitionFunctionTest.cpp RowContainerTest.cpp RowNumberTest.cpp + ScaleWriterLocalPartitionTest.cpp SortBufferTest.cpp SpillerTest.cpp SpillTest.cpp diff --git a/velox/exec/tests/PlanNodeSerdeTest.cpp b/velox/exec/tests/PlanNodeSerdeTest.cpp index 03159c2e276f5..f815cb0360d5e 100644 --- a/velox/exec/tests/PlanNodeSerdeTest.cpp +++ b/velox/exec/tests/PlanNodeSerdeTest.cpp @@ -228,6 +228,20 @@ TEST_F(PlanNodeSerdeTest, localPartition) { testSerde(plan); } +TEST_F(PlanNodeSerdeTest, scaleWriterlocalPartition) { + auto plan = PlanBuilder() + .values({data_}) + .scaleWriterlocalPartition(std::vector{"c0"}) + .planNode(); + testSerde(plan); + + plan = PlanBuilder() + .values({data_}) + .scaleWriterlocalPartitionRoundRobin() + .planNode(); + testSerde(plan); +} + TEST_F(PlanNodeSerdeTest, limit) { auto plan = PlanBuilder().values({data_}).limit(0, 10, true).planNode(); testSerde(plan); diff --git a/velox/exec/tests/ScaleWriterLocalPartitionTest.cpp b/velox/exec/tests/ScaleWriterLocalPartitionTest.cpp new file mode 100644 index 0000000000000..07017cb2e628b --- /dev/null +++ b/velox/exec/tests/ScaleWriterLocalPartitionTest.cpp @@ -0,0 +1,963 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/ScaleWriterLocalPartition.h" + +#include "velox/core/PlanNode.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/QueryAssertions.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; + +namespace { +using BlockingCallback = std::function; +using FinishCallback = std::function; + +// The object used to coordinate the behavior of the consumer and producer of +// the scale writer exchange for test purpose. +class TestExchangeController { + public: + TestExchangeController( + uint32_t numProducers, + uint32_t numConsumers, + uint64_t holdBufferBytes, + std::optional producerTargetBufferMemoryRatio, + std::optional consumerTargetBufferMemoryRatio, + std::optional producerMaxDelayUs, + std::optional consumerMaxDelayUs, + const std::vector& inputVectors, + bool keepConsumerInput = true) + : numProducers_(numProducers), + numConsumers_(numConsumers), + holdBufferBytes_(holdBufferBytes), + producerTargetBufferMemoryRatio_(producerTargetBufferMemoryRatio), + consumerTargetBufferMemoryRatio_(consumerTargetBufferMemoryRatio), + producerMaxDelayUs_(producerMaxDelayUs), + consumerMaxDelayUs_(consumerMaxDelayUs), + keepConsumerInput_(keepConsumerInput), + producerInputVectors_(inputVectors), + consumerInputs_(numConsumers_) {} + + // The number of drivers of the producer pipeline. + uint32_t numProducers() const { + return numProducers_; + } + + // The number of drivers of the consumer pipeline. + uint32_t numConsumers() const { + return numConsumers_; + } + + // Specifies the buffer to hold to test the effect of query memory usage ratio + // for writer scaling control. + uint64_t holdBufferBytes() const { + return holdBufferBytes_; + } + + std::optional producerTargetBufferMemoryRatio() const { + return producerTargetBufferMemoryRatio_; + } + + std::optional consumerTargetBufferMemoryRatio() const { + return consumerTargetBufferMemoryRatio_; + } + + std::optional producerMaxDelayUs() const { + return producerMaxDelayUs_; + } + + std::optional consumerMaxDelayUs() const { + return consumerMaxDelayUs_; + } + + RowVectorPtr getInput() { + std::lock_guard l(mutex_); + if (nextInput_ >= producerInputVectors_.size()) { + return nullptr; + } + return producerInputVectors_[nextInput_++]; + } + + core::PlanNodeId exchangeNodeId() const { + std::lock_guard l(mutex_); + return exchnangeNodeId_; + } + + void setExchangeNodeId(const core::PlanNodeId& nodeId) { + std::lock_guard l(mutex_); + VELOX_CHECK(exchnangeNodeId_.empty()); + exchnangeNodeId_ = nodeId; + } + + void addConsumerInput(uint32_t consumerId, const RowVectorPtr& input) { + if (!keepConsumerInput_) { + return; + } + consumerInputs_[consumerId].push_back(input); + } + + const std::vector> consumerInputs() const { + std::lock_guard l(mutex_); + return consumerInputs_; + } + + void clearConsumerInputs() { + std::lock_guard l(mutex_); + for (auto& input : consumerInputs_) { + input.clear(); + } + } + + private: + const uint32_t numProducers_; + const uint32_t numConsumers_; + const uint64_t holdBufferBytes_; + const std::optional producerTargetBufferMemoryRatio_; + const std::optional consumerTargetBufferMemoryRatio_; + const std::optional producerMaxDelayUs_; + const std::optional consumerMaxDelayUs_; + const bool keepConsumerInput_; + + mutable std::mutex mutex_; + core::PlanNodeId exchnangeNodeId_; + uint32_t nextInput_{0}; + std::vector producerInputVectors_; + std::vector> consumerInputs_; +}; + +class FakeSourceNode : public core::PlanNode { + public: + FakeSourceNode(const core::PlanNodeId& id, const RowTypePtr& ouputType) + : PlanNode(id), ouputType_{ouputType} {} + + const RowTypePtr& outputType() const override { + return ouputType_; + } + + const std::vector>& sources() const override { + static const std::vector kEmptySources; + return kEmptySources; + } + + std::string_view name() const override { + return "FakeSourceNode"; + } + + private: + void addDetails(std::stringstream& /* stream */) const override {} + + const RowTypePtr ouputType_; +}; + +class FakeSourceOperator : public SourceOperator { + public: + FakeSourceOperator( + DriverCtx* ctx, + int32_t id, + const std::shared_ptr& node, + const std::shared_ptr& testController) + : SourceOperator( + ctx, + node->outputType(), + id, + node->id(), + "FakeSourceOperator"), + testController_(testController) { + VELOX_CHECK_NOT_NULL(testController_); + rng_.seed(id); + } + + RowVectorPtr getOutput() override { + VELOX_CHECK(!finished_); + auto output = testController_->getInput(); + if (output == nullptr) { + finished_ = true; + return nullptr; + } + waitForOutput(); + return output; + } + + bool isFinished() override { + return finished_; + } + + BlockingReason isBlocked(ContinueFuture* /*unused*/) override { + return BlockingReason::kNotBlocked; + } + + private: + void waitForOutput() { + if (FOLLY_UNLIKELY(memoryManager_ == nullptr)) { + memoryManager_ = + operatorCtx_->driver()->task()->getLocalExchangeMemoryManager( + operatorCtx_->driverCtx()->splitGroupId, + testController_->exchangeNodeId()); + } + + if (testController_->producerTargetBufferMemoryRatio().has_value()) { + while (memoryManager_->bufferedBytes() > + (memoryManager_->maxBufferBytes() * + testController_->producerTargetBufferMemoryRatio().value())) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); // NOLINT + } + } + if (testController_->producerMaxDelayUs().has_value()) { + const auto delayUs = folly::Random::rand32(rng_) % + testController_->producerMaxDelayUs().value(); + std::this_thread::sleep_for(std::chrono::microseconds(delayUs)); // NOLINT + } + } + + const std::shared_ptr testController_; + folly::Random::DefaultGenerator rng_; + + std::shared_ptr memoryManager_; + bool finished_{false}; +}; + +class FakeSourceNodeFactory : public Operator::PlanNodeTranslator { + public: + explicit FakeSourceNodeFactory( + const std::shared_ptr& testController) + : testController_(testController) {} + + std::unique_ptr toOperator( + DriverCtx* ctx, + int32_t id, + const core::PlanNodePtr& node) override { + auto fakeSourceNode = std::dynamic_pointer_cast(node); + if (fakeSourceNode == nullptr) { + return nullptr; + } + return std::make_unique( + ctx, + id, + std::dynamic_pointer_cast(node), + testController_); + } + + std::optional maxDrivers(const core::PlanNodePtr& node) override { + auto fakeSourceNode = std::dynamic_pointer_cast(node); + if (fakeSourceNode == nullptr) { + return std::nullopt; + } + return testController_->numProducers(); + } + + private: + const std::shared_ptr testController_; +}; + +class FakeWriteNode : public core::PlanNode { + public: + FakeWriteNode(const core::PlanNodeId& id, const core::PlanNodePtr& input) + : PlanNode(id), sources_{input} {} + + const RowTypePtr& outputType() const override { + return sources_[0]->outputType(); + } + + const std::vector>& sources() const override { + return sources_; + } + + std::string_view name() const override { + return "FakeWriteNode"; + } + + private: + void addDetails(std::stringstream& /* stream */) const override {} + std::vector sources_; +}; + +class FakeWriteOperator : public Operator { + public: + FakeWriteOperator( + DriverCtx* ctx, + int32_t id, + const std::shared_ptr& node, + const std::shared_ptr& testController) + : Operator(ctx, node->outputType(), id, node->id(), "FakeWriteOperator"), + testController_(testController) { + VELOX_CHECK_NOT_NULL(testController_); + rng_.seed(id); + } + + // Custimized the operator initialize to hold the buffer on the first fake + // writer operator. + void initialize() override { + Operator::initialize(); + VELOX_CHECK(exchangeQueues_.empty()); + exchangeQueues_ = operatorCtx_->driver()->task()->getLocalExchangeQueues( + operatorCtx_->driverCtx()->splitGroupId, + testController_->exchangeNodeId()); + + if (operatorCtx_->driverCtx()->driverId != 0) { + return; + } + if (testController_->holdBufferBytes() == 0) { + return; + } + holdBuffer_ = pool()->allocate(testController_->holdBufferBytes()); + } + + bool needsInput() const override { + return !noMoreInput_ && !input_; + } + + void addInput(RowVectorPtr input) override { + waitForConsume(); + testController_->addConsumerInput( + operatorCtx_->driverCtx()->driverId, input); + input_ = std::move(input); + } + + RowVectorPtr getOutput() override { + return std::move(input_); + } + + bool isFinished() override { + return noMoreInput_ && input_ == nullptr; + } + + BlockingReason isBlocked(ContinueFuture* /*unused*/) override { + return BlockingReason::kNotBlocked; + } + + void close() override { + Operator::close(); + if (holdBuffer_ == nullptr) { + return; + } + pool()->free(holdBuffer_, testController_->holdBufferBytes()); + } + + private: + // Returns true if the producers of all the exchange queues are done. + bool exchangeQueueClosed() const { + for (const auto& queue : exchangeQueues_) { + if (!queue->testingProducersDone()) { + return false; + } + } + return true; + } + + void waitForConsume() { + if (FOLLY_UNLIKELY(memoryManager_ == nullptr)) { + memoryManager_ = + operatorCtx_->driver()->task()->getLocalExchangeMemoryManager( + operatorCtx_->driverCtx()->splitGroupId, + testController_->exchangeNodeId()); + } + if (testController_->consumerTargetBufferMemoryRatio().has_value()) { + while (!exchangeQueueClosed() && + memoryManager_->bufferedBytes() < + (memoryManager_->maxBufferBytes() * + testController_->consumerTargetBufferMemoryRatio().value())) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); // NOLINT + } + } + if (testController_->consumerMaxDelayUs().has_value()) { + const auto delayUs = folly::Random::rand32(rng_) % + testController_->consumerMaxDelayUs().value(); + std::this_thread::sleep_for(std::chrono::microseconds(delayUs)); // NOLINT + } + } + + const std::shared_ptr testController_; + folly::Random::DefaultGenerator rng_; + + std::shared_ptr memoryManager_; + std::vector> exchangeQueues_; + + void* holdBuffer_{nullptr}; +}; + +class FakeWriteNodeFactory : public Operator::PlanNodeTranslator { + public: + explicit FakeWriteNodeFactory( + const std::shared_ptr& testController) + : testController_(testController) {} + + std::unique_ptr toOperator( + DriverCtx* ctx, + int32_t id, + const core::PlanNodePtr& node) override { + auto fakeWriteNode = std::dynamic_pointer_cast(node); + if (fakeWriteNode == nullptr) { + return nullptr; + } + return std::make_unique( + ctx, + id, + std::dynamic_pointer_cast(node), + testController_); + } + + std::optional maxDrivers(const core::PlanNodePtr& node) override { + auto fakeWriteNode = std::dynamic_pointer_cast(node); + if (fakeWriteNode == nullptr) { + return std::nullopt; + } + return testController_->numConsumers(); + } + + private: + const std::shared_ptr testController_; +}; +} // namespace + +class ScaleWriterLocalPartitionTest : public HiveConnectorTestBase { + protected: + void SetUp() override { + HiveConnectorTestBase::SetUp(); + + rng_.seed(123); + rowType_ = ROW({"c0", "c1", "c3"}, {BIGINT(), INTEGER(), BIGINT()}); + } + + void TearDown() override { + Operator::unregisterAllOperators(); + HiveConnectorTestBase::TearDown(); + } + + std::vector makeVectors( + uint32_t numVectors, + uint32_t vectorSize, + const std::vector& partitionKeyValues = {}) { + VectorFuzzer::Options options; + options.vectorSize = vectorSize; + options.allowLazyVector = false; + // NOTE: order by used to sort data doesn't handle null rows. + options.nullRatio = 0.0; + VectorFuzzer fuzzer(options, pool_.get()); + + std::vector vectors; + for (auto i = 0; i < numVectors; ++i) { + vectors.push_back(fuzzer.fuzzRow(rowType_)); + } + if (partitionKeyValues.empty()) { + return vectors; + } + for (auto i = 0; i < numVectors; ++i) { + auto partitionVector = BaseVector::create( + vectors[i]->childAt(partitionChannel_)->type(), + vectors[i]->childAt(partitionChannel_)->size(), + pool_.get()); + auto* partitionVectorFlat = partitionVector->asFlatVector(); + for (auto j = 0; j < partitionVector->size(); ++j) { + partitionVectorFlat->set( + j, + partitionKeyValues + [folly::Random::rand32(rng_) % partitionKeyValues.size()]); + } + vectors[i]->childAt(partitionChannel_) = partitionVector; + } + return vectors; + } + + std::string partitionColumnName() const { + return rowType_->nameOf(partitionChannel_); + } + + RowVectorPtr sortData(const std::vector& inputVectors) { + std::vector orderByKeys; + orderByKeys.reserve(rowType_->size()); + for (const auto& name : rowType_->names()) { + orderByKeys.push_back(fmt::format("{} ASC NULLS FIRST", name)); + } + AssertQueryBuilder queryBuilder(PlanBuilder() + .values(inputVectors) + .orderBy(orderByKeys, false) + .planNode()); + return queryBuilder.copyResults(pool_.get()); + } + + void verifyResults( + const std::vector& actual, + const std::vector& expected) { + const auto actualSorted = sortData(actual); + const auto expectedSorted = sortData(expected); + exec::test::assertEqualResults({actualSorted}, {expectedSorted}); + } + + // Returns the partition keys set of the input 'vectors'. + std::set partitionKeys(const std::vector& vectors) { + DecodedVector partitionColumnDecoder; + std::set keys; + for (const auto& vector : vectors) { + partitionColumnDecoder.decode(*vector->childAt(partitionChannel_)); + for (auto i = 0; i < partitionColumnDecoder.size(); ++i) { + keys.insert(partitionColumnDecoder.valueAt(i)); + } + } + return keys; + } + + // Verifies the partition keys of the consumer inputs from 'controller' are + // disjoint. + void verifyDisjointPartitionKeys(TestExchangeController* controller) { + std::set allPartitionKeys; + for (const auto& consumerInput : controller->consumerInputs()) { + std::set consumerPartitionKeys = partitionKeys(consumerInput); + std::set diffPartitionKeys; + std::set_difference( + consumerPartitionKeys.begin(), + consumerPartitionKeys.end(), + allPartitionKeys.begin(), + allPartitionKeys.end(), + std::inserter(diffPartitionKeys, diffPartitionKeys.end())); + ASSERT_EQ(diffPartitionKeys.size(), consumerPartitionKeys.size()) + << "diffPartitionKeys: " << folly::join(",", diffPartitionKeys) + << " consumerPartitionKeys: " + << folly::join(",", consumerPartitionKeys) + << ", allPartitionKeys: " << folly::join(",", allPartitionKeys); + allPartitionKeys = diffPartitionKeys; + } + } + + const column_index_t partitionChannel_{1}; + folly::Random::DefaultGenerator rng_; + RowTypePtr rowType_; +}; + +TEST_F(ScaleWriterLocalPartitionTest, unpartitionBasic) { + const std::vector inputVectors = makeVectors(32, 1024); + const uint64_t queryCapacity = 256 << 20; + const uint32_t maxDrivers = 32; + const uint32_t maxExchanegBufferSize = 2 << 20; + + struct { + uint32_t numProducers; + uint32_t numConsumers; + uint64_t rebalanceProcessBytesThreshold; + double scaleWriterRebalanceMaxMemoryUsageRatio; + uint64_t holdBufferBytes; + double producerBufferedMemoryRatio; + double consumerBufferedMemoryRatio; + bool expectedRebalance; + + std::string debugString() const { + return fmt::format( + "numProducers {}, numConsumers {}, rebalanceProcessBytesThreshold {}, scaleWriterRebalanceMaxMemoryUsageRatio {}, holdBufferBytes {}, producerBufferedMemoryRatio {}, consumerBufferedMemoryRatio {}, expectedRebalance {}", + numProducers, + numConsumers, + succinctBytes(rebalanceProcessBytesThreshold), + scaleWriterRebalanceMaxMemoryUsageRatio, + succinctBytes(holdBufferBytes), + producerBufferedMemoryRatio, + consumerBufferedMemoryRatio, + expectedRebalance); + } + } testSettings[] = { + {1, 1, 0, 1.0, 0, 0.8, 0.6, false}, + {4, 1, 0, 1.0, 0, 0.8, 0.6, false}, + {1, 4, 1ULL << 30, 1.0, 0, 0.8, 0.6, false}, + {4, 4, 1ULL << 30, 1.0, 0, 0.8, 0.6, false}, + {1, 4, 0, 1.0, 0, 0.3, 0.2, false}, + {4, 4, 0, 1.0, 0, 0.3, 0.2, false}, + {1, 4, 0, 0.1, queryCapacity / 2, 0.8, 0.6, false}, + {4, 4, 0, 0.1, queryCapacity / 2, 0.8, 0.6, false}, + {1, 4, 0, 1.0, 0, 0.8, 0.6, true}, + {4, 4, 0, 1.0, 0, 0.8, 0.6, true}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + ASSERT_LE(testData.numProducers, maxDrivers); + ASSERT_LE(testData.numConsumers, maxDrivers); + + Operator::unregisterAllOperators(); + + auto testController = std::make_shared( + testData.numProducers, + testData.numConsumers, + testData.holdBufferBytes, + testData.producerBufferedMemoryRatio, + testData.consumerBufferedMemoryRatio, + std::nullopt, + std::nullopt, + inputVectors); + Operator::registerOperator( + std::make_unique(testController)); + Operator::registerOperator( + std::make_unique(testController)); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId exchnangeNodeId; + auto plan = PlanBuilder(planNodeIdGenerator) + .addNode([&](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, rowType_); + }) + .scaleWriterlocalPartitionRoundRobin() + .capturePlanNodeId(exchnangeNodeId) + .addNode([](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, input); + }) + .planNode(); + testController->setExchangeNodeId(exchnangeNodeId); + + AssertQueryBuilder queryBuilder(plan); + std::shared_ptr task; + const auto result = + // Consumer and producer have overload the max drivers of their + // associated pipelines. We set max driver for the query to make sure it + // is larger than the customiized driver count for consumer and + // producer. + queryBuilder.maxDrivers(maxDrivers) + .maxQueryCapacity(queryCapacity) + .config( + core::QueryConfig::kMaxLocalExchangeBufferSize, + std::to_string(maxExchanegBufferSize)) + .config( + core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + std::to_string( + testData.scaleWriterRebalanceMaxMemoryUsageRatio)) + .config( + core::QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + std::to_string(testData.rebalanceProcessBytesThreshold)) + .copyResults(pool_.get(), task); + uint32_t nonEmptyConsumers{0}; + for (const auto& consumerInput : testController->consumerInputs()) { + if (!consumerInput.empty()) { + ++nonEmptyConsumers; + } + } + auto planStats = toPlanStats(task->taskStats()); + if (testData.expectedRebalance) { + ASSERT_GT( + planStats.at(exchnangeNodeId) + .customStats.at(ScaleWriterLocalPartition::kScaledWriters) + .sum, + 0); + ASSERT_LE( + planStats.at(exchnangeNodeId) + .customStats.at(ScaleWriterLocalPartition::kScaledWriters) + .sum, + planStats.at(exchnangeNodeId) + .customStats.at(ScaleWriterLocalPartition::kScaledWriters) + .count * + (testData.numConsumers - 1)); + ASSERT_GT(nonEmptyConsumers, 1); + } else { + ASSERT_EQ( + planStats.at(exchnangeNodeId) + .customStats.count(ScaleWriterLocalPartition::kScaledWriters), + 0); + ASSERT_EQ(nonEmptyConsumers, 1); + } + testController->clearConsumerInputs(); + + task.reset(); + + verifyResults(inputVectors, {result}); + waitForAllTasksToBeDeleted(); + } +} + +TEST_F(ScaleWriterLocalPartitionTest, unpartitionFuzzer) { + const std::vector inputVectors = makeVectors(256, 512); + const uint64_t queryCapacity = 256 << 20; + const uint32_t maxDrivers = 32; + const uint32_t maxExchanegBufferSize = 2 << 20; + + for (bool fastConsumer : {false, true}) { + SCOPED_TRACE(fmt::format("fastConsumer: {}", fastConsumer)); + Operator::unregisterAllOperators(); + + auto testController = std::make_shared( + fastConsumer ? 1 : 4, + 4, + 0, + std::nullopt, + std::nullopt, + fastConsumer ? 4 : 64, + fastConsumer ? 64 : 4, + inputVectors, + /*keepConsumerInput=*/false); + Operator::registerOperator( + std::make_unique(testController)); + Operator::registerOperator( + std::make_unique(testController)); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId exchnangeNodeId; + auto plan = PlanBuilder(planNodeIdGenerator) + .addNode([&](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, rowType_); + }) + .scaleWriterlocalPartitionRoundRobin() + .capturePlanNodeId(exchnangeNodeId) + .addNode([](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, input); + }) + .planNode(); + testController->setExchangeNodeId(exchnangeNodeId); + + AssertQueryBuilder queryBuilder(plan); + std::shared_ptr task; + const auto result = + // Consumer and producer have overload the max drivers of their + // associated pipelines. We set max driver for the query to make sure it + // is larger than the customiized driver count for consumer and + // producer. + queryBuilder.maxDrivers(32) + .maxQueryCapacity(queryCapacity) + .config( + core::QueryConfig::kMaxLocalExchangeBufferSize, + std::to_string(maxExchanegBufferSize)) + .config( + core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + "1.0") + .config( + core::QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "256") + .copyResults(pool_.get()); + verifyResults(inputVectors, {result}); + waitForAllTasksToBeDeleted(); + } +} + +TEST_F(ScaleWriterLocalPartitionTest, partitionBasic) { + const uint64_t queryCapacity = 256 << 20; + const uint32_t maxDrivers = 32; + const uint32_t maxExchanegBufferSize = 2 << 20; + + struct { + uint32_t numProducers; + uint32_t numConsumers; + uint32_t numPartitionsPerWriter; + uint64_t rebalanceProcessBytesThreshold; + double scaleWriterRebalanceMaxMemoryUsageRatio; + uint64_t holdBufferBytes; + std::vector partitionKeys; + double producerBufferedMemoryRatio; + double consumerBufferedMemoryRatio; + bool expectedRebalance; + + std::string debugString() const { + return fmt::format( + "numProducers {}, numConsumers {}, numPartitionsPerWriter {}, rebalanceProcessBytesThreshold {}, scaleWriterRebalanceMaxMemoryUsageRatio {}, holdBufferBytes {}, partitionKeys {}, producerBufferedMemoryRatio {}, consumerBufferedMemoryRatio {}, expectedRebalance {}", + numProducers, + numConsumers, + numPartitionsPerWriter, + succinctBytes(rebalanceProcessBytesThreshold), + scaleWriterRebalanceMaxMemoryUsageRatio, + succinctBytes(holdBufferBytes), + folly::join(":", partitionKeys), + producerBufferedMemoryRatio, + consumerBufferedMemoryRatio, + expectedRebalance); + } + } testSettings[] = { + {1, 1, 4, 0, 1.0, 0, {1, 2}, 0.8, 0.6, false}, + {4, 1, 4, 0, 1.0, 0, {1, 2}, 0.8, 0.6, false}, + {1, 4, 4, 1ULL << 30, 1.0, 0, {1, 2}, 0.8, 0.6, false}, + {4, 4, 4, 1ULL << 30, 1.0, 0, {1, 2}, 0.8, 0.6, false}, + {1, 4, 4, 0, 1.0, 0, {1, 2}, 0.3, 0.2, false}, + {4, 4, 4, 0, 1.0, 0, {1, 2}, 0.3, 0.2, false}, + {1, 4, 4, 0, 0.1, queryCapacity / 2, {1, 2}, 0.8, 0.6, false}, + {4, 4, 4, 0, 0.1, queryCapacity / 2, {1, 2}, 0.8, 0.6, false}, + {1, 32, 128, 0, 1.0, 0, {1, 2, 3, 4, 5, 6, 7, 8}, 0.8, 0.6, true}, + {4, 32, 128, 0, 1.0, 0, {1, 2, 3, 4, 5, 6, 7, 8}, 0.8, 0.6, true}, + }; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + ASSERT_LE(testData.numProducers, maxDrivers); + ASSERT_LE(testData.numConsumers, maxDrivers); + + Operator::unregisterAllOperators(); + + const std::vector inputVectors = + makeVectors(32, 2048, testData.partitionKeys); + + auto testController = std::make_shared( + testData.numProducers, + testData.numConsumers, + testData.holdBufferBytes, + testData.producerBufferedMemoryRatio, + testData.consumerBufferedMemoryRatio, + std::nullopt, + std::nullopt, + inputVectors); + Operator::registerOperator( + std::make_unique(testController)); + Operator::registerOperator( + std::make_unique(testController)); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId exchnangeNodeId; + auto plan = PlanBuilder(planNodeIdGenerator) + .addNode([&](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, rowType_); + }) + .scaleWriterlocalPartition({partitionColumnName()}) + .capturePlanNodeId(exchnangeNodeId) + .addNode([](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, input); + }) + .planNode(); + testController->setExchangeNodeId(exchnangeNodeId); + + AssertQueryBuilder queryBuilder(plan); + std::shared_ptr task; + const auto result = + // Consumer and producer have overload the max drivers of their + // associated pipelines. We set max driver for the query to make sure it + // is larger than the customiized driver count for consumer and + // producer. + queryBuilder.maxDrivers(maxDrivers) + .maxQueryCapacity(queryCapacity) + .config( + core::QueryConfig::kMaxLocalExchangeBufferSize, + std::to_string(maxExchanegBufferSize)) + .config( + core::QueryConfig::kScaleWriterMaxPartitionsPerWriter, + std::to_string(testData.numPartitionsPerWriter)) + .config( + core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + std::to_string( + testData.scaleWriterRebalanceMaxMemoryUsageRatio)) + .config( + core::QueryConfig:: + kScaleWriterMinProcessedBytesRebalanceThreshold, + std::to_string(testData.rebalanceProcessBytesThreshold)) + .config( + core::QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") + .copyResults(pool_.get(), task); + auto planStats = toPlanStats(task->taskStats()); + if (testData.expectedRebalance) { + ASSERT_GT( + planStats.at(exchnangeNodeId) + .customStats + .at(ScaleWriterPartitioningLocalPartition::kScaledPartitions) + .sum, + 0); + ASSERT_GT( + planStats.at(exchnangeNodeId) + .customStats + .at(ScaleWriterPartitioningLocalPartition::kRebalanceTriggers) + .sum, + 0); + } else { + ASSERT_EQ( + planStats.at(exchnangeNodeId) + .customStats.count( + ScaleWriterPartitioningLocalPartition::kScaledPartitions), + 0); + ASSERT_EQ( + planStats.at(exchnangeNodeId) + .customStats.count( + ScaleWriterPartitioningLocalPartition::kRebalanceTriggers), + 0); + verifyDisjointPartitionKeys(testController.get()); + } + testController->clearConsumerInputs(); + task.reset(); + + verifyResults(inputVectors, {result}); + waitForAllTasksToBeDeleted(); + } +} + +TEST_F(ScaleWriterLocalPartitionTest, partitionFuzzer) { + const std::vector inputVectors = + makeVectors(1024, 256, {1, 2, 3, 4, 5, 6, 7, 8}); + const uint64_t queryCapacity = 256 << 20; + const uint32_t maxDrivers = 32; + const uint32_t maxExchanegBufferSize = 2 << 20; + + for (bool fastConsumer : {false, true}) { + SCOPED_TRACE(fmt::format("fastConsumer: {}", fastConsumer)); + Operator::unregisterAllOperators(); + + auto testController = std::make_shared( + fastConsumer ? 1 : 4, + 4, + 0, + std::nullopt, + std::nullopt, + fastConsumer ? 4 : 64, + fastConsumer ? 64 : 4, + inputVectors, + /*keepConsumerInput=*/false); + Operator::registerOperator( + std::make_unique(testController)); + Operator::registerOperator( + std::make_unique(testController)); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId exchnangeNodeId; + auto plan = PlanBuilder(planNodeIdGenerator) + .addNode([&](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, rowType_); + }) + .scaleWriterlocalPartition({partitionColumnName()}) + .capturePlanNodeId(exchnangeNodeId) + .addNode([](const core::PlanNodeId& id, + const core::PlanNodePtr& input) { + return std::make_shared(id, input); + }) + .planNode(); + testController->setExchangeNodeId(exchnangeNodeId); + + AssertQueryBuilder queryBuilder(plan); + std::shared_ptr task; + const auto result = + // Consumer and producer have overload the max drivers of their + // associated pipelines. We set max driver for the query to make sure it + // is larger than the customiized driver count for consumer and + // producer. + queryBuilder.maxDrivers(32) + .maxQueryCapacity(queryCapacity) + .config( + core::QueryConfig::kMaxLocalExchangeBufferSize, + std::to_string(maxExchanegBufferSize)) + .config( + core::QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, + "1.0") + .config( + core::QueryConfig:: + kScaleWriterMinProcessedBytesRebalanceThreshold, + "256") + .config( + core::QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "32") + .copyResults(pool_.get()); + + verifyResults(inputVectors, {result}); + waitForAllTasksToBeDeleted(); + } +} diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 6e4a171a3c2cb..507159493a5a5 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -27,7 +27,6 @@ #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" -#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/vector/fuzzer/VectorFuzzer.h" #include @@ -156,8 +155,10 @@ struct TestParam { HiveBucketProperty::Kind bucketKind, bool bucketSort, bool multiDrivers, - CompressionKind compressionKind) { - value = static_cast(compressionKind) << 48 | + CompressionKind compressionKind, + bool scaleWriter) { + value = (scaleWriter ? 1ULL << 56 : 0) | + static_cast(compressionKind) << 48 | static_cast(!!multiDrivers) << 40 | static_cast(fileFormat) << 32 | static_cast(testMode) << 24 | @@ -195,16 +196,21 @@ struct TestParam { return (value & ((1L << 8) - 1)) != 0; } + bool scaleWriter() const { + return (value >> 56) != 0; + } + std::string toString() const { return fmt::format( - "FileFormat[{}] TestMode[{}] commitStrategy[{}] bucketKind[{}] bucketSort[{}] multiDrivers[{}] compression[{}]", + "FileFormat[{}] TestMode[{}] commitStrategy[{}] bucketKind[{}] bucketSort[{}] multiDrivers[{}] compression[{}] scaleWriter[{}]", dwio::common::toString((fileFormat())), testModeString(testMode()), commitStrategyToString(commitStrategy()), HiveBucketProperty::kindString(bucketKind()), bucketSort(), multiDrivers(), - compressionKindToString(compressionKind())); + compressionKindToString(compressionKind()), + scaleWriter()); } }; @@ -219,7 +225,8 @@ class TableWriteTest : public HiveConnectorTestBase { numPartitionedTableWriterCount_( testParam_.multiDrivers() ? kNumPartitionedTableWriterCount : 1), commitStrategy_(testParam_.commitStrategy()), - compressionKind_(testParam_.compressionKind()) { + compressionKind_(testParam_.compressionKind()), + scaleWriter_(testParam_.scaleWriter()) { LOG(INFO) << testParam_.toString(); auto rowType = @@ -281,6 +288,14 @@ class TableWriteTest : public HiveConnectorTestBase { .config( QueryConfig::kTaskPartitionedWriterCount, std::to_string(numPartitionedTableWriterCount_)) + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .splits(splits) .assertResults(duckDbSql); } @@ -297,6 +312,14 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .splits(splits) .assertResults(duckDbSql); } @@ -319,6 +342,14 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .assertResults(duckDbSql); } @@ -335,6 +366,14 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .assertResults(duckDbSql); } @@ -352,6 +391,14 @@ class TableWriteTest : public HiveConnectorTestBase { .config( QueryConfig::kTaskPartitionedWriterCount, std::to_string(numPartitionedTableWriterCount_)) + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .copyResults(pool()); } @@ -368,6 +415,14 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") + // Scale writer settings to trigger partition rebalancing. + .config(QueryConfig::kScaleWriterRebalanceMaxMemoryUsageRatio, "1.0") + .config( + QueryConfig::kScaleWriterMinProcessedBytesRebalanceThreshold, "0") + .config( + QueryConfig:: + kScaleWriterMinPartitionProcessedBytesRebalanceThreshold, + "0") .copyResults(pool()); } @@ -594,97 +649,213 @@ class TableWriteTest : public HiveConnectorTestBase { bool aggregateResult = true, std::shared_ptr aggregationNode = nullptr) { if (numTableWriters == 1) { - auto insertPlan = inputPlan - .addNode(addTableWriter( - inputRowType, - tableRowType->names(), - aggregationNode, - createInsertTableHandle( - tableRowType, - outputTableType, - outputDirectoryPath, - partitionedBy, - bucketProperty, - compressionKind), - false, - outputCommitStrategy)) - .capturePlanNodeId(tableWriteNodeId_); - if (aggregateResult) { - insertPlan.project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format( - "sum({})", TableWriteTraits::rowCountColumnName())}); - } - return insertPlan.planNode(); + return createInsertPlanWithSingleWriter( + inputPlan, + inputRowType, + tableRowType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind, + outputTableType, + outputCommitStrategy, + aggregateResult, + aggregationNode); } else if (bucketProperty_ == nullptr) { - auto insertPlan = inputPlan.localPartitionRoundRobin() - .addNode(addTableWriter( - inputRowType, - tableRowType->names(), - nullptr, - createInsertTableHandle( - tableRowType, - outputTableType, - outputDirectoryPath, - partitionedBy, - bucketProperty, - compressionKind), - false, - outputCommitStrategy)) - .capturePlanNodeId(tableWriteNodeId_) - .localPartition(std::vector{}) - .tableWriteMerge(); - if (aggregateResult) { - insertPlan.project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format( - "sum({})", TableWriteTraits::rowCountColumnName())}); - } - return insertPlan.planNode(); + return createInsertPlanWithForNonBucketedTable( + inputPlan, + inputRowType, + tableRowType, + outputDirectoryPath, + partitionedBy, + compressionKind, + outputTableType, + outputCommitStrategy, + aggregateResult, + aggregationNode); } else { - // Since we might do column rename, so generate bucket property based on - // the data type from 'inputPlan'. - std::vector bucketColumns; - bucketColumns.reserve(bucketProperty->bucketedBy().size()); - for (int i = 0; i < bucketProperty->bucketedBy().size(); ++i) { - bucketColumns.push_back(inputRowType->names()[tableRowType->getChildIdx( - bucketProperty->bucketedBy()[i])]); + return createInsertPlanForBucketTable( + inputPlan, + inputRowType, + tableRowType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind, + outputTableType, + outputCommitStrategy, + aggregateResult, + aggregationNode); + } + } + + PlanNodePtr createInsertPlanWithSingleWriter( + PlanBuilder& inputPlan, + const RowTypePtr& inputRowType, + const RowTypePtr& tableRowType, + const std::string& outputDirectoryPath, + const std::vector& partitionedBy, + std::shared_ptr bucketProperty, + const std::optional compressionKind, + const connector::hive::LocationHandle::TableType& outputTableType, + const CommitStrategy& outputCommitStrategy, + bool aggregateResult, + std::shared_ptr aggregationNode) { + const bool addScaleWriterExchange = + scaleWriter_ && (bucketProperty != nullptr); + auto insertPlan = inputPlan; + if (addScaleWriterExchange) { + if (!partitionedBy.empty()) { + insertPlan.scaleWriterlocalPartition( + inputColumnNames(partitionedBy, tableRowType, inputRowType)); + } else { + insertPlan.scaleWriterlocalPartitionRoundRobin(); } - auto localPartitionBucketProperty = std::make_shared( - bucketProperty->kind(), - bucketProperty->bucketCount(), - bucketColumns, - bucketProperty->bucketedTypes(), - bucketProperty->sortedBy()); - auto insertPlan = - inputPlan.localPartitionByBucket(localPartitionBucketProperty) - .addNode(addTableWriter( - inputRowType, - tableRowType->names(), - nullptr, - createInsertTableHandle( - tableRowType, - outputTableType, - outputDirectoryPath, - partitionedBy, - bucketProperty, - compressionKind), - false, - outputCommitStrategy)) - .capturePlanNodeId(tableWriteNodeId_) - .localPartition({}) - .tableWriteMerge(); - if (aggregateResult) { - insertPlan.project({TableWriteTraits::rowCountColumnName()}) - .singleAggregation( - {}, - {fmt::format( - "sum({})", TableWriteTraits::rowCountColumnName())}); + } + insertPlan + .addNode(addTableWriter( + inputRowType, + tableRowType->names(), + aggregationNode, + createInsertTableHandle( + tableRowType, + outputTableType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind), + false, + outputCommitStrategy)) + .capturePlanNodeId(tableWriteNodeId_); + if (addScaleWriterExchange) { + if (!partitionedBy.empty()) { + insertPlan.scaleWriterlocalPartition( + inputColumnNames(partitionedBy, tableRowType, inputRowType)); + } else { + insertPlan.scaleWriterlocalPartitionRoundRobin(); + } + } + if (aggregateResult) { + insertPlan.project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}); + } + return insertPlan.planNode(); + } + + PlanNodePtr createInsertPlanForBucketTable( + PlanBuilder& inputPlan, + const RowTypePtr& inputRowType, + const RowTypePtr& tableRowType, + const std::string& outputDirectoryPath, + const std::vector& partitionedBy, + std::shared_ptr bucketProperty, + const std::optional compressionKind, + const connector::hive::LocationHandle::TableType& outputTableType, + const CommitStrategy& outputCommitStrategy, + bool aggregateResult, + std::shared_ptr aggregationNode) { + // Since we might do column rename, so generate bucket property based on + // the data type from 'inputPlan'. + std::vector bucketColumns; + bucketColumns.reserve(bucketProperty->bucketedBy().size()); + for (int i = 0; i < bucketProperty->bucketedBy().size(); ++i) { + bucketColumns.push_back(inputRowType->names()[tableRowType->getChildIdx( + bucketProperty->bucketedBy()[i])]); + } + auto localPartitionBucketProperty = std::make_shared( + bucketProperty->kind(), + bucketProperty->bucketCount(), + bucketColumns, + bucketProperty->bucketedTypes(), + bucketProperty->sortedBy()); + auto insertPlan = + inputPlan.localPartitionByBucket(localPartitionBucketProperty) + .addNode(addTableWriter( + inputRowType, + tableRowType->names(), + nullptr, + createInsertTableHandle( + tableRowType, + outputTableType, + outputDirectoryPath, + partitionedBy, + bucketProperty, + compressionKind), + false, + outputCommitStrategy)) + .capturePlanNodeId(tableWriteNodeId_) + .localPartition({}) + .tableWriteMerge(); + if (aggregateResult) { + insertPlan.project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}); + } + return insertPlan.planNode(); + } + + // Return the corresponding column names in 'inputRowType' of + // 'tableColumnNames' from 'tableRowType'. + static std::vector inputColumnNames( + const std::vector& tableColumnNames, + const RowTypePtr& tableRowType, + const RowTypePtr& inputRowType) { + std::vector inputNames; + inputNames.reserve(tableColumnNames.size()); + for (const auto& tableColumnName : tableColumnNames) { + const auto columnIdx = tableRowType->getChildIdx(tableColumnName); + inputNames.push_back(inputRowType->nameOf(columnIdx)); + } + return inputNames; + } + + PlanNodePtr createInsertPlanWithForNonBucketedTable( + PlanBuilder& inputPlan, + const RowTypePtr& inputRowType, + const RowTypePtr& tableRowType, + const std::string& outputDirectoryPath, + const std::vector& partitionedBy, + const std::optional compressionKind, + const connector::hive::LocationHandle::TableType& outputTableType, + const CommitStrategy& outputCommitStrategy, + bool aggregateResult, + std::shared_ptr aggregationNode) { + auto insertPlan = inputPlan; + if (scaleWriter_) { + if (!partitionedBy.empty()) { + insertPlan.scaleWriterlocalPartition( + inputColumnNames(partitionedBy, tableRowType, inputRowType)); + } else { + insertPlan.scaleWriterlocalPartitionRoundRobin(); } - return insertPlan.planNode(); } + insertPlan + .addNode(addTableWriter( + inputRowType, + tableRowType->names(), + nullptr, + createInsertTableHandle( + tableRowType, + outputTableType, + outputDirectoryPath, + partitionedBy, + nullptr, + compressionKind), + false, + outputCommitStrategy)) + .capturePlanNodeId(tableWriteNodeId_) + .localPartition(std::vector{}) + .tableWriteMerge(); + if (aggregateResult) { + insertPlan.project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}); + } + return insertPlan.planNode(); } // Parameter partitionName is string formatted in the Hive style @@ -1032,6 +1203,7 @@ class TableWriteTest : public HiveConnectorTestBase { RowTypePtr tableSchema_; CommitStrategy commitStrategy_; std::optional compressionKind_; + bool scaleWriter_; std::vector partitionedBy_; std::vector partitionTypes_; std::vector partitionChannels_; @@ -1141,60 +1313,68 @@ class PartitionedTableWriterTest } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); + for (bool scaleWriter : {false, true}) { + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + } } } return testParams; @@ -1216,24 +1396,28 @@ class UnpartitionedTableWriterTest } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { - testParams.push_back(TestParam{ - fileFormat, - TestMode::kUnpartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_NONE} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kUnpartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_NONE} - .value); + for (bool scaleWriter : {false, true}) { + testParams.push_back(TestParam{ + fileFormat, + TestMode::kUnpartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_NONE, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kUnpartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_NONE, + scaleWriter} + .value); + } } } return testParams; @@ -1265,7 +1449,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kHiveCompatible, false, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1274,7 +1459,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kHiveCompatible, true, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1283,7 +1469,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kHiveCompatible, false, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1292,7 +1479,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kHiveCompatible, true, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1301,7 +1489,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kPrestoNative, false, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1310,7 +1499,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kPrestoNative, true, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1319,7 +1509,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kPrestoNative, false, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1328,7 +1519,8 @@ class BucketedTableOnlyWriteTest HiveBucketProperty::Kind::kPrestoNative, true, multiDrivers, - CompressionKind_ZSTD} + CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); } } @@ -1360,7 +1552,8 @@ class BucketSortOnlyTableWriterTest HiveBucketProperty::Kind::kHiveCompatible, true, multiDrivers, - facebook::velox::common::CompressionKind_ZSTD} + facebook::velox::common::CompressionKind_ZSTD, + /*scaleWriter=*/false} .value); testParams.push_back(TestParam{ fileFormat, @@ -1369,7 +1562,8 @@ class BucketSortOnlyTableWriterTest HiveBucketProperty::Kind::kHiveCompatible, true, multiDrivers, - facebook::velox::common::CompressionKind_NONE} + facebook::velox::common::CompressionKind_NONE, + /*scaleWriter=*/false} .value); } } @@ -1393,24 +1587,28 @@ class PartitionedWithoutBucketTableWriterTest } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - true, - CompressionKind_ZSTD} - .value); + for (bool scaleWriter : {false, true}) { + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + true, + CompressionKind_ZSTD, + scaleWriter} + .value); + } } } return testParams; @@ -1431,114 +1629,128 @@ class AllTableWriterTest : public TableWriteTest, } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { - testParams.push_back(TestParam{ - fileFormat, - TestMode::kUnpartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kUnpartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kPartitioned, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kOnlyBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kOnlyBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kHiveCompatible, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kOnlyBucketed, - CommitStrategy::kNoCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); - testParams.push_back(TestParam{ - fileFormat, - TestMode::kOnlyBucketed, - CommitStrategy::kTaskCommit, - HiveBucketProperty::Kind::kPrestoNative, - false, - multiDrivers, - CompressionKind_ZSTD} - .value); + for (bool scaleWriter : {false, true}) { + testParams.push_back(TestParam{ + fileFormat, + TestMode::kUnpartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kUnpartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kPartitioned, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kOnlyBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kOnlyBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kHiveCompatible, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kOnlyBucketed, + CommitStrategy::kNoCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + testParams.push_back(TestParam{ + fileFormat, + TestMode::kOnlyBucketed, + CommitStrategy::kTaskCommit, + HiveBucketProperty::Kind::kPrestoNative, + false, + multiDrivers, + CompressionKind_ZSTD, + scaleWriter} + .value); + } } } return testParams; diff --git a/velox/exec/tests/utils/AssertQueryBuilder.cpp b/velox/exec/tests/utils/AssertQueryBuilder.cpp index 554ee2bb34838..427193fa67c52 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.cpp +++ b/velox/exec/tests/utils/AssertQueryBuilder.cpp @@ -57,6 +57,12 @@ AssertQueryBuilder& AssertQueryBuilder::maxDrivers(int32_t maxDrivers) { return *this; } +AssertQueryBuilder& AssertQueryBuilder::maxQueryCapacity( + int64_t maxQueryCapacity) { + params_.maxQueryCapacity = maxQueryCapacity; + return *this; +} + AssertQueryBuilder& AssertQueryBuilder::destination(int32_t destination) { params_.destination = destination; return *this; @@ -256,15 +262,19 @@ AssertQueryBuilder::readCursor() { // NOTE: the destructor of 'executor_' will wait for all the async task // activities to finish on AssertQueryBuilder dtor. static std::atomic cursorQueryId{0}; + const std::string queryId = + fmt::format("TaskCursorQuery_{}", cursorQueryId++); + auto queryPool = memory::memoryManager()->addRootPool( + queryId, params_.maxQueryCapacity); params_.queryCtx = core::QueryCtx::create( executor_.get(), core::QueryConfig({}), std:: unordered_map>{}, cache::AsyncDataCache::getInstance(), + std::move(queryPool), nullptr, - nullptr, - fmt::format("TaskCursorQuery_{}", cursorQueryId++)); + queryId); } } if (!configs_.empty()) { diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index 4d5d4299d177b..b8bdcbb63e36a 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -35,6 +35,9 @@ class AssertQueryBuilder { /// Change requested number of drivers. Default is 1. AssertQueryBuilder& maxDrivers(int32_t maxDrivers); + /// Change the query memory pool capacity. Default has no limit + AssertQueryBuilder& maxQueryCapacity(int64_t maxCapacity); + /// Change task's 'destination', the partition number assigned to the task. /// Default is 0. AssertQueryBuilder& destination(int32_t destination); diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index 9c8cfaf7d7fa0..51fe73eb403c4 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -28,23 +28,26 @@ bool waitForTaskDriversToFinish( exec::Task* task, uint64_t maxWaitMicros = 1'000'000); -// Parameters for initializing a TaskCursor or RowCursor. +/// Parameters for initializing a TaskCursor or RowCursor. struct CursorParameters { - // Root node of the plan tree + /// Root node of the plan tree std::shared_ptr planNode; - int32_t destination = 0; + int32_t destination{0}; - // Maximum number of drivers per pipeline. - int32_t maxDrivers = 1; + /// Maximum number of drivers per pipeline. + int32_t maxDrivers{1}; - // Maximum number of split groups processed concurrently. - int32_t numConcurrentSplitGroups = 1; + /// The max capacity of the query memory pool. + int64_t maxQueryCapacity{memory::kMaxMemory}; - // Optional, created if not present. + /// Maximum number of split groups processed concurrently. + int32_t numConcurrentSplitGroups{1}; + + /// Optional, created if not present. std::shared_ptr queryCtx; - uint64_t bufferedBytes = 512 * 1024; + uint64_t bufferedBytes{512 * 1024}; /// Ungrouped (by default) or grouped (bucketed) execution. core::ExecutionStrategy executionStrategy{ @@ -57,17 +60,18 @@ struct CursorParameters { /// ungrouped execution. int numSplitGroups{1}; - /// Spilling directory, if not empty, then the task's spilling directory would - /// be built from it. + /// Spilling directory, if not empty, then the task's spilling directory + /// would be built from it. std::string spillDirectory; bool copyResult = true; - /// If true, use serial execution mode. Use parallel execution mode otherwise. + /// If true, use serial execution mode. Use parallel execution mode + /// otherwise. bool serialExecution = false; - /// If both 'queryConfigs' and 'queryCtx' are specified, the configurations in - /// 'queryCtx' will be overridden by 'queryConfig'. + /// If both 'queryConfigs' and 'queryCtx' are specified, the configurations + /// in 'queryCtx' will be overridden by 'queryConfig'. std::unordered_map queryConfigs; }; diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index b17547e9554bd..6a346c5c4c007 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -1174,6 +1174,7 @@ RowTypePtr rename( core::PlanNodePtr createLocalPartitionNode( const core::PlanNodeId& planNodeId, const std::vector& keys, + bool scaleWriter, const std::vector& sources, memory::MemoryPool* pool) { auto partitionFunctionFactory = @@ -1182,6 +1183,7 @@ core::PlanNodePtr createLocalPartitionNode( planNodeId, keys.empty() ? core::LocalPartitionNode::Type::kGather : core::LocalPartitionNode::Type::kRepartition, + scaleWriter, partitionFunctionFactory, sources); } @@ -1270,7 +1272,11 @@ PlanBuilder& PlanBuilder::localPartition( const std::vector& sources) { VELOX_CHECK_NULL(planNode_, "localPartition() must be the first call"); planNode_ = createLocalPartitionNode( - nextPlanNodeId(), exprs(keys, sources[0]->outputType()), sources, pool_); + nextPlanNodeId(), + exprs(keys, sources[0]->outputType()), + /*scaleWriter=*/false, + sources, + pool_); return *this; } @@ -1278,6 +1284,18 @@ PlanBuilder& PlanBuilder::localPartition(const std::vector& keys) { planNode_ = createLocalPartitionNode( nextPlanNodeId(), exprs(keys, planNode_->outputType()), + /*scaleWriter=*/false, + {planNode_}, + pool_); + return *this; +} + +PlanBuilder& PlanBuilder::scaleWriterlocalPartition( + const std::vector& keys) { + planNode_ = createLocalPartitionNode( + nextPlanNodeId(), + exprs(keys, planNode_->outputType()), + /*scaleWriter=*/true, {planNode_}, pool_); return *this; @@ -1293,6 +1311,7 @@ PlanBuilder& PlanBuilder::localPartition( planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, + /*scaleWriter=*/false, std::move(hivePartitionFunctionFactory), std::vector{planNode_}); return *this; @@ -1315,6 +1334,7 @@ PlanBuilder& PlanBuilder::localPartitionByBucket( planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, + /*scaleWriter=*/false, std::move(hivePartitionFunctionFactory), std::vector{planNode_}); return *this; @@ -1323,10 +1343,12 @@ PlanBuilder& PlanBuilder::localPartitionByBucket( namespace { core::PlanNodePtr createLocalPartitionRoundRobinNode( const core::PlanNodeId& planNodeId, + bool scaleWriter, const std::vector& sources) { return std::make_shared( planNodeId, core::LocalPartitionNode::Type::kRepartition, + scaleWriter, std::make_shared(), sources); } @@ -1336,12 +1358,20 @@ PlanBuilder& PlanBuilder::localPartitionRoundRobin( const std::vector& sources) { VELOX_CHECK_NULL( planNode_, "localPartitionRoundRobin() must be the first call"); - planNode_ = createLocalPartitionRoundRobinNode(nextPlanNodeId(), sources); + planNode_ = createLocalPartitionRoundRobinNode( + nextPlanNodeId(), /*scaleWriter=*/false, sources); return *this; } PlanBuilder& PlanBuilder::localPartitionRoundRobin() { - planNode_ = createLocalPartitionRoundRobinNode(nextPlanNodeId(), {planNode_}); + planNode_ = createLocalPartitionRoundRobinNode( + nextPlanNodeId(), /*scaleWriter=*/false, {planNode_}); + return *this; +} + +PlanBuilder& PlanBuilder::scaleWriterlocalPartitionRoundRobin() { + planNode_ = createLocalPartitionRoundRobinNode( + nextPlanNodeId(), /*scaleWriter=*/true, {planNode_}); return *this; } @@ -1398,6 +1428,7 @@ PlanBuilder& PlanBuilder::localPartitionRoundRobinRow() { planNode_ = std::make_shared( nextPlanNodeId(), core::LocalPartitionNode::Type::kRepartition, + /*scaleWriter=*/false, std::make_shared(), std::vector{planNode_}); return *this; diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index bcd152a37de16..8347d38627f14 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -869,6 +869,14 @@ class PlanBuilder { /// current plan node). PlanBuilder& localPartitionRoundRobin(); + /// A convenience method to add a LocalPartitionNode for scale writer with + /// hash partitioning. + PlanBuilder& scaleWriterlocalPartition(const std::vector& keys); + + /// A convenience method to add a LocalPartitionNode for scale writer with + /// round-robin partitioning. + PlanBuilder& scaleWriterlocalPartitionRoundRobin(); + /// Add a LocalPartitionNode to partition the input using row-wise /// round-robin. Number of partitions is determined at runtime based on /// parallelism of the downstream pipeline.