From 975bf226e627722279e21da923c899b08189eb8f Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Mon, 16 Dec 2024 11:34:26 -0800 Subject: [PATCH] feat: Add auto table scan scaling based on memory usage Summary: Adds auto table scan scaling support to solve the query OOM caused by high concurrent memory intensive table scan operations. Instead of running all the table scan threads at the start of the query, we start from running one single table scan thread and gradually scheduling more table scan thread when there is sufficient free available memory capacity for the query (measured as the current used memory versus the query max capacity). When the query is approaching its max limit, we stop scheduling more table scan threads and even stop current running scan threads to free up memory to prevent OOM. The scale up/down decision happens when a table scan operator finishes process a non-empty split. A scale controller is added to each table scan plan node for this coordinated control and two memory ratios are defined for scale up/down decision, and it estimate the per-scan driver memory usage based on the memory usage report when a table scan thread finishes a non-empty split. The Meta internal test shows that a query failed with OOM right after 1 min execution with 10 leaf threads, and finished in 2 hrs if reduced leaf thread count to 5. Java took 1 hour to finish with persistent shuffle (LBM). With auto scale, the query finish on Prestissimo in 30 mins. We don't expect this feature to be enabled by default as adhoc small query needs to run all the scan threads in parallel at the start. This will only be used for some large pipeline that could be enabled by query config in velox and session properties in Prestissimo Differential Revision: D67114511 --- velox/core/QueryConfig.h | 41 +++ velox/docs/configs.rst | 47 ++- velox/docs/monitoring/stats.rst | 21 ++ velox/exec/Driver.cpp | 30 +- velox/exec/Driver.h | 30 ++ velox/exec/FilterProject.cpp | 4 +- velox/exec/HashAggregation.cpp | 31 +- velox/exec/HashAggregation.h | 17 +- velox/exec/Operator.h | 6 + velox/exec/PartitionedOutput.cpp | 38 ++- velox/exec/PartitionedOutput.h | 10 + velox/exec/TableScan.cpp | 310 +++++++++++++++++++- velox/exec/TableScan.h | 189 +++++++++++- velox/exec/Task.cpp | 28 ++ velox/exec/Task.h | 10 + velox/exec/TaskStructs.h | 4 + velox/exec/tests/MultiFragmentTest.cpp | 220 ++++++++++++++ velox/exec/tests/utils/OperatorTestBase.cpp | 2 +- 18 files changed, 984 insertions(+), 54 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index e6c4032768b1..2e8daa8fe77b 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -472,6 +472,35 @@ class QueryConfig { static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold = "scaled_writer_min_processed_bytes_rebalance_threshold"; + /// If true, enables the scaled table scan processing. For each table scan + /// plan node, a scan controller is used to dynamically adjusts the number of + /// running scan threads in response to the query memory usage change. It + /// increases the number of running threads when the query has sufficient free + /// available memory or reduces the number when the query has less available + /// memory. + static constexpr const char* kTableScanScaledProcessingEnabled = + "table_scan_scaled_processing_enabled"; + + /// The query memory usage ratio defined for the scaled scan processing. When + /// the query memory usage is below this ratio, the scan controller keeps + /// scaling up the scan processing. Once exceeds this ratio, it stops scaling + /// up processing. The value is in the range of [0, 1]. + /// + /// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true. + static constexpr const char* kTableScanScaleUpMemoryUsageRatio = + "table_scan_scale_up_memory_usage_ratio"; + + /// The query memory usage ratio defined to control scan scaling. When the + /// query memory usage is above this ratio, we start to scale down table scan + /// processing to free up memory. The value is in the range of (0, 1]. + /// + /// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true. + /// 'table_scan_down_scale_memory_usage_ratio' needs to be set to a value + /// higher than 'table_scan_up_scale_memory_usage_ratio' to avoid scan scaling + /// fluctuation around a certain query memory usage ratio. + static constexpr const char* kTableScanScaleDownMemoryUsageRatio = + "table_scan_scale_down_memory_usage_ratio"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -880,6 +909,18 @@ class QueryConfig { kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20); } + bool tableScanScaledProcessingEnabled() const { + return get(kTableScanScaledProcessingEnabled, true); + } + + double tableScanScaleUpMemoryUsageRatio() const { + return get(kTableScanScaleUpMemoryUsageRatio, 0.5); + } + + double tableScanScaleDownMemoryUsageRatio() const { + return get(kTableScanScaleDownMemoryUsageRatio, 0.7); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index a932ecd57558..041f06bc04dc 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -390,6 +390,33 @@ Table Scan - integer - 2 - Maximum number of splits to preload per driver. Set to 0 to disable preloading. + * - table_scan_scaled_processing_enabled + - bool + - false + - If true, enables the scaled table scan processing. For each table scan + plan node, a scan controller is used to dynamically adjusts the number of + running scan threads in response to the query memory usage change. It + increases the number of running threads when the query has sufficient + free available memory or reduces the number when the query has less + available memory. + * - table_scan_scale_up_memory_usage_ratio + - double + - 0.5 + - The query memory usage ratio defined for the scaled scan processing. When + the query memory usage is below this ratio, the scan controller keeps + scaling up the scan processing. Once exceeds this ratio, it stops scaling + up processing. The value is in the range of [0, 1]. This only applies if + 'table_scan_scaled_processing_enabled' is set to true. + * - table_scan_scale_down_memory_usage_ratio + - double + - 0.7 + - The query memory usage ratio defined to control scan scaling. When the + query memory usage is above this ratio, we start to scale down table scan + processing to free up memory. The value is in the range of (0, 1]. This + only applies if 'table_scan_scaled_processing_enabled' is set to true. + 'table_scan_down_scale_memory_usage_ratio' needs to be set to a value + higher than 'table_scan_up_scale_memory_usage_ratio' to avoid scan scaling + fluctuation around a certain query memory usage ratio. Table Writer ------------ @@ -414,26 +441,26 @@ Table Writer - double - 0.7 - The max ratio of a query used memory to its max capacity, and the scale - - writer exchange stops scaling writer processing if the query's current - - memory usage exceeds this ratio. The value is in the range of (0, 1]. + writer exchange stops scaling writer processing if the query's current + memory usage exceeds this ratio. The value is in the range of (0, 1]. * - scaled_writer_max_partitions_per_writer - integer - 128 - The max number of logical table partitions that can be assigned to a - - single table writer thread. The logical table partition is used by local - - exchange writer for writer scaling, and multiple physical table - - partitions can be mapped to the same logical table partition based on the - - hash value of calculated partitioned ids. + single table writer thread. The logical table partition is used by local + exchange writer for writer scaling, and multiple physical table + partitions can be mapped to the same logical table partition based on the + hash value of calculated partitioned ids. + * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - integer - 128MB - * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - Minimum amount of data processed by a logical table partition to trigger - - writer scaling if it is detected as overloaded by scale wrirer exchange. + writer scaling if it is detected as overloaded by scale wrirer exchange. * - scaled_writer_min_processed_bytes_rebalance_threshold - - Minimum amount of data processed by all the logical table partitions to - - trigger skewed partition rebalancing by scale writer exchange. - integer - 256MB + - Minimum amount of data processed by all the logical table partitions to + trigger skewed partition rebalancing by scale writer exchange. Hive Connector -------------- diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index 53538d854974..e6bb5245575d 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -90,6 +90,27 @@ These stats are reported only by HashBuild and HashAggregation operators. - Time spent on building the hash table from rows collected by all the hash build operators. This stat is only reported by the HashBuild operator. +TableScan +--------- +These stats are reported only by TableScan operator + +.. list-table:: + :widths: 50 25 50 + :header-rows: 1 + + * - Stats + - Unit + - Description + * - numScaleUp + - + - Number of times the scaled scan controller triggers scale up processing. + * - numScaleDown + - + - Number of times the scaled scan controller triggers scale down processing. + * - numRunningScaleThreads + - + - The number of running table scan drivers. + TableWriter ----------- These stats are reported only by TableWriter operator diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index e5c4656d6521..e04c81d637a8 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -631,11 +631,10 @@ StopReason Driver::runInternal( nextOp, curOperatorId_ + 1, kOpMethodAddInput); - - // The next iteration will see if operators_[i + 1] has - // output now that it got input. - i += 2; }); + // The next iteration will see if operators_[i + 1] has + // output now that it got input. + i += 2; continue; } else { stop = task()->shouldStop(); @@ -934,6 +933,17 @@ std::unordered_set Driver::canPushdownFilters( return supportedChannels; } +bool Driver::needsFlush(FlushReason reason) const { + // Starts checking from the upstream operators first. + for (auto i = 0; i < operators_.size(); ++i) { + auto op = operators_[i].get(); + if (op->needsFlush(reason)) { + return true; + } + } + return false; +} + Operator* Driver::findOperator(std::string_view planNodeId) const { for (auto& op : operators_) { if (op->planNodeId() == planNodeId) { @@ -1129,12 +1139,24 @@ std::string blockingReasonToString(BlockingReason reason) { return "kYield"; case BlockingReason::kWaitForArbitration: return "kWaitForArbitration"; + case BlockingReason::kWaitForScanScaleUp: + return "kWaitForScanScaleUp"; default: VELOX_UNREACHABLE( fmt::format("Unknown blocking reason {}", static_cast(reason))); } } +std::string flushReasonToString(FlushReason reason) { + switch (reason) { + case FlushReason::kScanScaleDown: + return "kScanScaleDown"; + default: + VELOX_UNREACHABLE( + fmt::format("Unknown flush reason {}", static_cast(reason))); + } +} + DriverThreadContext* driverThreadContext() { return driverThreadCtx; } diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7d46017187e4..7f3c6fa520a4 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -210,6 +210,14 @@ enum class BlockingReason { /// Operator is blocked waiting for its associated query memory arbitration to /// finish. kWaitForArbitration, + /// For a table scan operator, it is blocked waiting for the scan controller + /// to increase the number of table scan processing threads to start or resume + /// processing. This only applies when scaled table scan processing is + /// enabled. The scan controller dynamically adjusts the number of running + /// scan threads in response to the query memory usage change. It increases + /// the number of running threads when the query has sufficient free available + /// memory or reduces the number when the query has less available memory. + kWaitForScanScaleUp, }; std::string blockingReasonToString(BlockingReason reason); @@ -258,6 +266,16 @@ class BlockingState { static std::atomic_uint64_t numBlockedDrivers_; }; +/// Defines the reason for a driver to be flushed. +enum class FlushReason : uint8_t { + /// Triggers by a table scan operator to stop it processing in response to + /// scale down the scan processing. Correspondingly, all the operators in the + /// driver pipeline needs to flush out the buffered data to stop processing. + kScanScaleDown, +}; + +std::string flushReasonToString(FlushReason reason); + /// Special group id to reflect the ungrouped execution. constexpr uint32_t kUngroupedGroupId{std::numeric_limits::max()}; @@ -420,6 +438,9 @@ class Driver : public std::enable_shared_from_this { const Operator* filterSource, const std::vector& channels) const; + /// Returns true if any operator in the pipeline has pending data to flush. + bool needsFlush(FlushReason reason) const; + /// Returns the Operator with 'planNodeId' or nullptr if not found. For /// example, hash join probe accesses the corresponding build by id. Operator* findOperator(std::string_view planNodeId) const; @@ -759,6 +780,15 @@ struct fmt::formatter } }; +template <> +struct fmt::formatter + : formatter { + auto format(facebook::velox::exec::FlushReason f, format_context& ctx) const { + return formatter::format( + facebook::velox::exec::flushReasonToString(f), ctx); + } +}; + template <> struct fmt::formatter : formatter { diff --git a/velox/exec/FilterProject.cpp b/velox/exec/FilterProject.cpp index 9c2c991ffe7e..3bb279a3f2a9 100644 --- a/velox/exec/FilterProject.cpp +++ b/velox/exec/FilterProject.cpp @@ -133,8 +133,8 @@ RowVectorPtr FilterProject::getOutput() { rows->setAll(); EvalCtx evalCtx(operatorCtx_->execCtx(), exprs_.get(), input_.get()); - // Pre-load lazy vectors which are referenced by both expressions and identity - // projections. + // Pre-load lazy vectors which are referenced by both expressions and + // identity projections. for (auto fieldIdx : multiplyReferencedFieldIndices_) { evalCtx.ensureFieldLoaded(fieldIdx, *rows); } diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 14e87f336ec2..1c2cb242500a 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -166,6 +166,23 @@ void HashAggregation::setupGroupingKeyChannelProjections( } } +bool HashAggregation::needsFlush(FlushReason reason) { + VELOX_CHECK_EQ(reason, FlushReason::kScanScaleDown); + if (needsFlush_) { + return true; + } + + // We only support flush on a partial aggregation for now. + if (!isPartialOutput_) { + return false; + } + if (groupingSet_->numRows() == 0) { + return false; + } + needsFlush_ = true; + return true; +} + bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const { VELOX_CHECK(isPartialOutput_ && !isGlobal_); return numInputRows_ > abandonPartialAggregationMinRows_ && @@ -256,7 +273,7 @@ void HashAggregation::prepareOutput(vector_size_t size) { } void HashAggregation::resetPartialOutputIfNeed() { - if (!partialFull_) { + if (!partialFull_ && !needsFlush_) { return; } VELOX_DCHECK(!isGlobal_); @@ -272,9 +289,10 @@ void HashAggregation::resetPartialOutputIfNeed() { } groupingSet_->resetTable(/*freeTable=*/false); partialFull_ = false; - if (!finished_) { + if (!finished_ && !needsFlush_) { maybeIncreasePartialAggregationMemoryUsage(aggregationPct); } + needsFlush_ = false; numOutputRows_ = 0; numInputRows_ = 0; } @@ -339,16 +357,19 @@ RowVectorPtr HashAggregation::getOutput() { // Produce results if one of the following is true: // - received no-more-input message; // - partial aggregation reached memory limit; + // - received flush request; // - distinct aggregation has new keys; // - running in partial streaming mode and have some output ready. - if (!noMoreInput_ && !partialFull_ && !newDistincts_ && + if (!noMoreInput_ && !partialFull_ && !needsFlush_ && !newDistincts_ && !groupingSet_->hasOutput()) { input_ = nullptr; return nullptr; } if (isDistinct_) { - return getDistinctOutput(); + auto output = getDistinctOutput(); + resetPartialOutputIfNeed(); + return output; } const auto& queryConfig = operatorCtx_->driverCtx()->queryConfig(); @@ -393,8 +414,6 @@ RowVectorPtr HashAggregation::getDistinctOutput() { // Drop reference to input_ to make it singly-referenced at the producer and // allow for memory reuse. input_ = nullptr; - - resetPartialOutputIfNeed(); return output; } VELOX_CHECK(!newDistincts_); diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index 5cd44f77cb4b..e3f416909771 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -33,6 +33,8 @@ class HashAggregation : public Operator { RowVectorPtr getOutput() override; + bool needsFlush(FlushReason reason) override; + bool needsInput() const override { return !noMoreInput_ && !partialFull_; } @@ -105,22 +107,23 @@ class HashAggregation : public Operator { // 'groupingSet_->estimateRowSize()' across all accumulated data set. std::optional estimatedOutputRowSize_; - bool partialFull_ = false; - bool newDistincts_ = false; - bool finished_ = false; + bool partialFull_{false}; + bool needsFlush_{false}; + bool newDistincts_{false}; + bool finished_{false}; // True if partial aggregation has been found to be non-reducing. bool abandonedPartialAggregation_{false}; RowContainerIterator resultIterator_; - bool pushdownChecked_ = false; - bool mayPushdown_ = false; + bool pushdownChecked_{false}; + bool mayPushdown_{false}; // Count the number of input rows. It is reset on partial aggregation output // flush. - int64_t numInputRows_ = 0; + int64_t numInputRows_{0}; // Count the number of output rows. It is reset on partial aggregation output // flush. - int64_t numOutputRows_ = 0; + int64_t numOutputRows_{0}; // Possibly reusable output vector. RowVectorPtr output_; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index de06d78da65a..f34b39061f0f 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -436,6 +436,12 @@ class Operator : public BaseRuntimeStatWriter { /// build side is empty. virtual bool isFinished() = 0; + /// Returns true if this operator has pending buffer to flush for the given + /// flush 'reaon'. + virtual bool needsFlush(FlushReason reason) { + return false; + } + /// Traces input batch of the operator. virtual void traceInput(const RowVectorPtr&); diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index 67504891ec9e..4ae769b58115 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -253,6 +253,27 @@ void PartitionedOutput::initializeSizeBuffers() { } } +bool PartitionedOutput::needsFlush(FlushReason reason) { + VELOX_CHECK_EQ(reason, FlushReason::kScanScaleDown); + + if (needsFlush_) { + return true; + } + + if (input_ != nullptr) { + needsFlush_ = true; + return true; + } + + for (const auto& destination : destinations_) { + if (destination->needsFlush()) { + needsFlush_ = true; + return true; + } + } + return false; +} + void PartitionedOutput::estimateRowSizes() { const auto numInput = input_->size(); std::fill(rowSize_.begin(), rowSize_.end(), 0); @@ -417,22 +438,29 @@ RowVectorPtr PartitionedOutput::getOutput() { } return nullptr; } + // All of 'output_' is written into the destinations. We are finishing, hence // move all the destinations to the output queue. This will not grow memory // and hence does not need blocking. - if (noMoreInput_) { + if (noMoreInput_ || needsFlush_) { for (auto& destination : destinations_) { if (destination->isFinished()) { continue; } destination->flush(*bufferManager, bufferReleaseFn_, nullptr); - destination->setFinished(); - destination->updateStats(this); + if (noMoreInput_) { + destination->setFinished(); + destination->updateStats(this); + } } - bufferManager->noMoreData(operatorCtx_->task()->taskId()); - finished_ = true; + if (noMoreInput_) { + bufferManager->noMoreData(operatorCtx_->task()->taskId()); + finished_ = true; + } + needsFlush_ = false; } + // The input is fully processed, drop the reference to allow reuse. input_ = nullptr; output_ = nullptr; diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index fecdaf57c9c5..81e60c683025 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -45,6 +45,12 @@ class Destination { setTargetSizePct(); } + /// Returns true if 'this' destination has pending buffer to flush to output + /// buffer. + bool needsFlush() const { + return current_ && rowsInCurrent_ != 0; + } + /// Resets the destination before starting a new batch. void beginBatch() { rows_.clear(); @@ -172,6 +178,8 @@ class PartitionedOutput : public Operator { /// a non-blocked state, otherwise blocked. RowVectorPtr getOutput() override; + bool needsFlush(FlushReason reason) override; + /// always true but the caller will check isBlocked before adding input, hence /// the blocked state does not accumulate input. bool needsInput() const override { @@ -229,6 +237,8 @@ class PartitionedOutput : public Operator { BlockingReason blockingReason_{BlockingReason::kNotBlocked}; ContinueFuture future_; + // Indicates if this operator needs to flush buffered data to output buffer. + bool needsFlush_{false}; bool finished_{false}; // Contains pointers to 'rowSize_' elements. 'sizePointers_[i]' contains a // pointer to 'rowSize_[i]'. diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 7586a52f763f..39a0eb75d015 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -23,6 +23,12 @@ using facebook::velox::common::testutil::TestValue; namespace facebook::velox::exec { +namespace { +#define RETURN_IF_CLOSED(ret) \ + if (FOLLY_UNLIKELY(closed_)) { \ + return ret; \ + } +} // namespace TableScan::TableScan( int32_t operatorId, @@ -37,19 +43,29 @@ TableScan::TableScan( tableHandle_(tableScanNode->tableHandle()), columnHandles_(tableScanNode->assignments()), driverCtx_(driverCtx), + maxSplitPreloadPerDriver_( + driverCtx_->queryConfig().maxSplitPreloadPerDriver()), + maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), connectorPool_(driverCtx_->task->addConnectorPoolLocked( planNodeId(), driverCtx_->pipelineId, driverCtx_->driverId, operatorType(), tableHandle_->connectorId())), - maxSplitPreloadPerDriver_( - driverCtx_->queryConfig().maxSplitPreloadPerDriver()), - readBatchSize_(driverCtx_->queryConfig().preferredOutputBatchRows()), - maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), + connector_(connector::getConnector(tableHandle_->connectorId())), getOutputTimeLimitMs_( driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()) { - connector_ = connector::getConnector(tableHandle_->connectorId()); + readBatchSize_ = driverCtx_->queryConfig().preferredOutputBatchRows(); +} + +void TableScan::initialize() { + Operator::initialize(); + VELOX_CHECK_NULL(scaledController_); + scaledController_ = driverCtx_->task->getOrAddScaledScanController( + driverCtx_->splitGroupId, + driverCtx_->pipelineId, + planNodeId(), + pool()->parent()); } folly::dynamic TableScan::toJson() const { @@ -79,6 +95,12 @@ RowVectorPtr TableScan::getOutput() { if (noMoreSplits_) { return nullptr; } + if (blockingFuture_.valid()) { + VELOX_CHECK_EQ(blockingReason_, BlockingReason::kWaitForScanScaleUp); + return nullptr; + } + + blockingReason_ = BlockingReason::kNotBlocked; curStatus_ = "getOutput: enter"; const auto startTimeMs = getCurrentTimeMs(); @@ -102,6 +124,12 @@ RowVectorPtr TableScan::getOutput() { // A point for test code injection. TestValue::adjust("facebook::velox::exec::TableScan::getOutput", this); + if (checkForScaleDownOnNewSplit()) { + VELOX_CHECK(blockingFuture_.valid()); + VELOX_CHECK_EQ(blockingReason_, BlockingReason::kWaitForScanScaleUp); + return nullptr; + } + exec::Split split; curStatus_ = "getOutput: task->getSplitOrFuture"; blockingReason_ = driverCtx_->task->getSplitOrFuture( @@ -259,6 +287,7 @@ RowVectorPtr TableScan::getOutput() { curStatus_ = "getOutput: updating stats_.rawInput"; lockedStats->rawInputPositions = dataSource_->getCompletedRows(); lockedStats->rawInputBytes = dataSource_->getCompletedBytes(); + RowVectorPtr data = std::move(dataOptional).value(); if (data != nullptr) { if (data->size() > 0) { @@ -274,6 +303,7 @@ RowVectorPtr TableScan::getOutput() { } } + uint64_t currNumRawInputRows{0}; { curStatus_ = "getOutput: updating stats_.preloadedSplits"; auto lockedStats = stats_.wlock(); @@ -287,14 +317,64 @@ RowVectorPtr TableScan::getOutput() { "readyPreloadedSplits", RuntimeCounter(numReadyPreloadedSplits_)); numReadyPreloadedSplits_ = 0; } + currNumRawInputRows = lockedStats->rawInputPositions; } + VELOX_CHECK_LE(rawInputRowsSinceLastSplit_, currNumRawInputRows); + const bool emptySplit = currNumRawInputRows == rawInputRowsSinceLastSplit_; + rawInputRowsSinceLastSplit_ = currNumRawInputRows; curStatus_ = "getOutput: task->splitFinished"; driverCtx_->task->splitFinished(true, currentSplitWeight_); needNewSplit_ = true; + + // We only update scaled controller when we have finished a non-empty split. + // Otherwise, it can lead to the wrong scale up decisions if the first few + // splits are empty. Then we only report the memory usage for the file + // footer read which is much smaller the actual memory usage when read from + // a non-empty split. This can cause query OOM as we run too many scan + // drivers. + if (!emptySplit && checkForScaleDownOnFinishedSplit()) { + VELOX_CHECK(blockingFuture_.valid()); + VELOX_CHECK_EQ(blockingReason_, BlockingReason::kWaitForScanScaleUp); + return nullptr; + } + TestValue::adjust( + "facebook::velox::exec::TableScan::getOutput::finishSplit", this); } } +bool TableScan::checkForScaleDownOnNewSplit() { + if (scaledController_ == nullptr) { + return false; + } + + curStatus_ = "getOutput: checkForScaleDownOnNewSplit"; + if (!scaledController_->shouldStop( + operatorCtx_->driverCtx()->driverId, &blockingFuture_)) { + VELOX_CHECK(!blockingFuture_.valid()); + return false; + } + blockingReason_ = BlockingReason::kWaitForScanScaleUp; + return true; +} + +bool TableScan::checkForScaleDownOnFinishedSplit() { + if (scaledController_ == nullptr) { + return false; + } + + curStatus_ = "getOutput: checkForScaleDownOnFinishedSplit"; + if (!scaledController_->updateAndTryScale( + operatorCtx_->driverCtx()->driverId, + pool()->peakBytes(), + &blockingFuture_)) { + VELOX_CHECK(!blockingFuture_.valid()); + return false; + } + blockingReason_ = BlockingReason::kWaitForScanScaleUp; + return true; +} + void TableScan::preload( const std::shared_ptr& split) { // The AsyncSource returns a unique_ptr to the shared_ptr of the @@ -380,4 +460,224 @@ void TableScan::addDynamicFilter( stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer); } +void TableScan::close() { + Operator::close(); + + if (scaledController_ == nullptr) { + return; + } + + // Report the scaled controller stats by the first finished scan operator at + // which point all the splits have been dispatched. + if (!scaledController_->close()) { + return; + } + + const auto scaledStats = scaledController_->stats(); + auto lockedStats = stats_.wlock(); + if (scaledStats.numScaleDown != 0) { + lockedStats->addRuntimeStat( + TableScan::kNumScaleDown, RuntimeCounter(scaledStats.numScaleDown)); + } + if (scaledStats.numScaleUp != 0) { + lockedStats->addRuntimeStat( + TableScan::kNumScaleUp, RuntimeCounter(scaledStats.numScaleUp)); + } + lockedStats->addRuntimeStat( + TableScan::kNumRunningScaleThreads, + RuntimeCounter(scaledStats.numRunningDrivers)); +} + +ScaledScanController::ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio, + double scaleDownMemoryUsageRatio) + : nodePool_(nodePool), + queryPool_(nodePool_->root()), + scaleUpMemoryUsageRatio_(scaleUpMemoryUsageRatio), + scaleDownMemoryUsageRatio_(scaleDownMemoryUsageRatio), + numDrivers_(numDrivers) { + VELOX_CHECK_GE(scaleUpMemoryUsageRatio_, 0.0); + VELOX_CHECK_LT(scaleUpMemoryUsageRatio_, 1.0); + VELOX_CHECK_LT(scaleUpMemoryUsageRatio_, scaleDownMemoryUsageRatio_); + hasDriverReportedUsage_.resize(numDrivers_, false); +} + +bool ScaledScanController::shouldStop( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::lock_guard l(lock_); + RETURN_IF_CLOSED(false); + return shouldStopLocked(driverIdx, future); +} + +bool ScaledScanController::shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK(!closed_); + if (driverIdx < numRunningDrivers_) { + return false; + } + + VELOX_CHECK_EQ(driverPromises_.count(driverIdx), 0); + auto [driverPromise, driverFuture] = makeVeloxContinuePromiseContract( + fmt::format("Table scan driver {} scale promise", driverIdx)); + driverPromises_.emplace(driverIdx, std::move(driverPromise)); + *future = std::move(driverFuture); + return true; +} + +bool ScaledScanController::updateAndTryScale( + uint32_t driverIdx, + uint64_t memoryUsage, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::vector driverPromises; + driverPromises.reserve(numDrivers_); + SCOPE_EXIT { + for (auto& driverPromise : driverPromises) { + driverPromise.setValue(); + } + }; + { + std::lock_guard l(lock_); + RETURN_IF_CLOSED(false); + + updateDriverScanUsageLocked(driverIdx, memoryUsage); + + if (shouldStopLocked(driverIdx, future)) { + return true; + } + + tryScaleLocked(driverPromises); + } + return false; +} + +void ScaledScanController::updateDriverScanUsageLocked( + uint32_t driverIdx, + uint64_t memoryUsage) { + if (estimatedDriverUsage_ == 0) { + estimatedDriverUsage_ = memoryUsage; + } else { + estimatedDriverUsage_ = (estimatedDriverUsage_ * 3 + memoryUsage) / 4; + } + + if (driverIdx >= numRunningDrivers_) { + return; + } + + if (!hasDriverReportedUsage_[driverIdx]) { + hasDriverReportedUsage_[driverIdx] = true; + ++numDriverUsageReportsSinceLastScale_; + } +} + +void ScaledScanController::tryScaleLocked( + std::vector& driverPromises) { + VELOX_CHECK_LE(numDriverUsageReportsSinceLastScale_, numDrivers_); + + const uint64_t maxQueryCapacity = queryPool_->maxCapacity(); + const uint64_t currQueryUsage = queryPool_->reservedBytes(); + // Check if we need to scale down based on the current query memory usage. + if (currQueryUsage > maxQueryCapacity * scaleDownMemoryUsageRatio_) { + scaleDownLocked(); + return; + } + + if (numRunningDrivers_ == numDrivers_) { + return; + } + + if (numDriverUsageReportsSinceLastScale_ < numRunningDrivers_) { + // We shall only make the next scale up decision until we have received + // the memory usage updates from all the running scan drivers. + return; + } + + const uint64_t currNodeUsage = nodePool_->reservedBytes(); + const uint64_t currOtherUsage = + currQueryUsage > currNodeUsage ? currQueryUsage - currNodeUsage : 0; + const uint64_t estimatedNodeUsageAfterScale = std::max( + currNodeUsage + estimatedDriverUsage_, + estimatedDriverUsage_ * (numRunningDrivers_ + 1)); + if ((estimatedNodeUsageAfterScale + currOtherUsage) >= + maxQueryCapacity * scaleUpMemoryUsageRatio_) { + return; + } + + scaleUpLocked(driverPromises); +} + +void ScaledScanController::scaleDownLocked() { + ++stats_.numScaleDown; + + numRunningDrivers_ = std::max(1, numDrivers_ / 2); + std::fill( + hasDriverReportedUsage_.begin() + numRunningDrivers_, + hasDriverReportedUsage_.end(), + false); + numDriverUsageReportsSinceLastScale_ = std::count( + hasDriverReportedUsage_.begin(), hasDriverReportedUsage_.end(), true); +} + +void ScaledScanController::scaleUpLocked( + std::vector& driverPromises) { + VELOX_CHECK_LT(numRunningDrivers_, numDrivers_); + + ++stats_.numScaleUp; + + std::fill( + hasDriverReportedUsage_.begin() + numRunningDrivers_, + hasDriverReportedUsage_.end(), + false); + numDriverUsageReportsSinceLastScale_ = std::count( + hasDriverReportedUsage_.begin(), hasDriverReportedUsage_.end(), true); + ++numRunningDrivers_; + + auto it = driverPromises_.begin(); + while (it != driverPromises_.end()) { + if (it->first >= numRunningDrivers_) { + break; + } + driverPromises.emplace_back(std::move(it->second)); + it = driverPromises_.erase(it); + } +} + +ScaledScanController::~ScaledScanController() { + close(); +} + +bool ScaledScanController::close() { + std::vector promises; + { + std::lock_guard l(lock_); + RETURN_IF_CLOSED(false); + + promises.reserve(driverPromises_.size()); + for (auto& [_, promise] : driverPromises_) { + promises.emplace_back(std::move(promise)); + } + driverPromises_.clear(); + closed_ = true; + } + + for (auto& promise : promises) { + promise.setValue(); + } + return true; +} + +std::string ScaledScanController::Stats::toString() const { + return fmt::format( + "scaleup: {}, scaledown: {}, numRunningDrivers: {}", + numScaleUp, + numScaleDown, + numRunningDrivers); +} } // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index deec109b02c1..acf2fee083ee 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -20,6 +20,122 @@ namespace facebook::velox::exec { +/// Controller used to scales table scan processing in response to memory +/// pressure. +class ScaledScanController { + public: + /// 'nodePool' is the memory pool of the table scan node. 'numDrivers' is + /// number of the table scan drivers. 'scaleUpMemoryUsageRatio' and + /// 'scaleDownMemoryUsageRatio' specify the memory usage ratios used to make + /// scan scale decisions. + ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio, + double scaleDownMemoryUsageRatio); + + ~ScaledScanController(); + + ScaledScanController(const ScaledScanController&) = delete; + ScaledScanController(ScaledScanController&&) = delete; + ScaledScanController& operator=(const ScaledScanController&) = delete; + ScaledScanController& operator=(ScaledScanController&&) = delete; + + /// Invoked by a scan operator to check if we need to stop its scan + /// processing. If so, 'future' is set to a future that will be ready when the + /// scale up processing is required to resume the scan operator, or all the + /// splits have been processed to finish the scan operator. 'driverIcx' is the + /// driver id of the scan operator. + bool shouldStop(uint32_t driverIdx, ContinueFuture* future); + + /// Invoked by a scan operator to update per-driver memory usage estimation + /// after finish processing a split. 'driverIdx' is the driver id of the scan + /// operator. 'driverMemoryUsage' is the updated peak memory usage of the scan + /// operator. Similar to 'shouldStop', this function returns true if this scan + /// operator needs to stop processing for scan scale down. + bool updateAndTryScale( + uint32_t driverIdx, + uint64_t driverMemoryUsage, + ContinueFuture* future); + + struct Stats { + uint32_t numScaleUp{0}; + uint32_t numScaleDown{0}; + uint32_t numRunningDrivers{0}; + + std::string toString() const; + }; + + Stats stats() const { + std::lock_guard l(lock_); + return stats_; + } + + /// Invoked by the closed scan operator to close the controller. It returns + /// true on the first invocation, and otherwise false. + bool close(); + + void testingSetMemoryRatios( + double scaleUpMemoryUsageRatio, + double scaleDownMemoryUsageRatio) { + std::lock_guard l(lock_); + *const_cast(&scaleUpMemoryUsageRatio_) = scaleUpMemoryUsageRatio; + *const_cast(&scaleDownMemoryUsageRatio_) = + scaleDownMemoryUsageRatio; + } + + private: + // Invoked to check if we can scale up scan processing. If so, call + // 'scaleUpLocked' for scale up processing. + void tryScaleLocked(std::vector& driverPromises); + + // Invoked to scale up scan processing by bumping up the number of running + // scan drivers by one. 'drverPromises' returns the list of promises to + // fulfill to resume the previsouly stopped scan drivers from + // 'driverPromises_'. + void scaleUpLocked(std::vector& driverePromises); + + // Invoked to scale down scan processing by reducing the number of running + // scan drivers by half. + void scaleDownLocked(); + + // Invoked to check if we need to stop the scan processing of the specified + // scan driver if 'driverIdx' is beyond 'numRunningDrivers_' for scan scale + // down. If so, a promise is set in 'stoppedDriverPromises_' and the + // associated future is returned in 'future'. + bool shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future); + + /// Invoked to update the per-driver memory usage estimation with a new driver + /// report. + void updateDriverScanUsageLocked(uint32_t driverIdx, uint64_t memoryUsage); + + memory::MemoryPool* const nodePool_; + memory::MemoryPool* const queryPool_; + const double scaleUpMemoryUsageRatio_; + const double scaleDownMemoryUsageRatio_; + const uint32_t numDrivers_; + + mutable std::mutex lock_; + uint32_t numRunningDrivers_{1}; + // The estimated per-driver memory usage of the table scan node. + uint64_t estimatedDriverUsage_{0}; + // The number of drivers has reported memory usage since start or last scan + // processing. + uint32_t numDriverUsageReportsSinceLastScale_{0}; + // Indicates if a given scan driver has reported memory usage or not since + // start or last scale processing. + std::vector hasDriverReportedUsage_; + + // The map from stopped driver id to the resume promise. + std::map driverPromises_; + + Stats stats_; + + bool closed_{false}; +}; + class TableScan : public SourceOperator { public: TableScan( @@ -27,20 +143,34 @@ class TableScan : public SourceOperator { DriverCtx* driverCtx, const std::shared_ptr& tableScanNode); + void initialize() override; + folly::dynamic toJson() const override; RowVectorPtr getOutput() override; BlockingReason isBlocked(ContinueFuture* future) override { - if (blockingFuture_.valid()) { - *future = std::move(blockingFuture_); - return blockingReason_; + if (!blockingFuture_.valid()) { + return BlockingReason::kNotBlocked; + } + + if (blockingReason_ == BlockingReason::kWaitForScanScaleUp) { + // If any operators from the same driver pipeline has pending data to + // flush, then we need to flush them first before stop this driver. This + // ensures that we don't hold significant amount of memory in this driver + // pipeline while it is stopped. + if (driverCtx_->driver->needsFlush(FlushReason::kScanScaleDown)) { + return BlockingReason::kNotBlocked; + } } - return BlockingReason::kNotBlocked; + *future = std::move(blockingFuture_); + return blockingReason_; } bool isFinished() override; + void close() override; + bool canAddDynamicFilter() const override { return connector_->canAddDynamicFilter(); } @@ -50,6 +180,24 @@ class TableScan : public SourceOperator { column_index_t outputChannel, const std::shared_ptr& filter) override; + /// The name of runtime stats specific to table scan. + /// The number of times the scaled scan controller triggers scale up + /// processing. + static inline const std::string kNumScaleUp{"numScaleUp"}; + /// The number of times the scaled scan controller triggers scale down + /// processing. + static inline const std::string kNumScaleDown{"numScaleDown"}; + /// The number of running table scan drivers. + /// + /// NOTE: we only report the number of running scan drivers at the point that + /// all the splits have been processed. + static inline const std::string kNumRunningScaleThreads{ + "numRunningScaleThreads"}; + + std::shared_ptr testingScaledController() const { + return scaledController_; + } + private: // Checks if this table scan operator needs to yield before processing the // next split. @@ -70,17 +218,36 @@ class TableScan : public SourceOperator { // done, it will be made when needed. void preload(const std::shared_ptr& split); + // Invoked before this scan operator starts processing a new split. It returns + // true if this scan operator needs to stop processing for scan scale down. + bool checkForScaleDownOnNewSplit(); + // Invoked after this scan operator finishes processing a split. It returns + // true if this scan operator needs to stop processing for scan scale down. + bool checkForScaleDownOnFinishedSplit(); + const std::shared_ptr tableHandle_; const std:: unordered_map> columnHandles_; DriverCtx* const driverCtx_; + const int32_t maxSplitPreloadPerDriver_{0}; + const vector_size_t maxReadBatchSize_; memory::MemoryPool* const connectorPool_; + const std::shared_ptr connector_; + // Exits getOutput() method after this many milliseconds. Zero means 'no + // limit'. + const size_t getOutputTimeLimitMs_{0}; + + // If set, used for scan scale processing. It is shared by all the scan + // operators instantiated from the same table scan node. + std::shared_ptr scaledController_; + + vector_size_t readBatchSize_; + ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; int64_t currentSplitWeight_{0}; bool needNewSplit_ = true; - std::shared_ptr connector_; std::shared_ptr connectorQueryCtx_; std::unique_ptr dataSource_; bool noMoreSplits_ = false; @@ -90,8 +257,6 @@ class TableScan : public SourceOperator { int32_t maxPreloadedSplits_{0}; - const int32_t maxSplitPreloadPerDriver_{0}; - // Callback passed to getSplitOrFuture() for triggering async preload. The // callback's lifetime is the lifetime of 'this'. This callback can schedule // preloads on an executor. These preloads may outlive the Task and therefore @@ -105,13 +270,6 @@ class TableScan : public SourceOperator { // Count of splits that finished preloading before being read. int32_t numReadyPreloadedSplits_{0}; - vector_size_t readBatchSize_; - vector_size_t maxReadBatchSize_; - - // Exits getOutput() method after this many milliseconds. Zero means 'no - // limit'. - size_t getOutputTimeLimitMs_{0}; - double maxFilteringRatio_{0}; // String shown in ExceptionContext inside DataSource and LazyVector loading. @@ -120,5 +278,8 @@ class TableScan : public SourceOperator { // Holds the current status of the operator. Used when debugging to understand // what operator is doing. std::atomic curStatus_{""}; + + // The total number of raw input rows read up till the last finished split. + uint64_t rawInputRowsSinceLastSplit_{0}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index fbe57f3e34a7..17d96ecc7d18 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -1586,6 +1586,34 @@ exec::Split Task::getSplitLocked( return split; } +std::shared_ptr Task::getOrAddScaledScanController( + uint32_t splitGroupId, + uint32_t pipelineId, + const core::PlanNodeId& planNodeId, + memory::MemoryPool* nodePool) { + std::lock_guard l(mutex_); + auto& splitGroupState = splitGroupStates_[splitGroupId]; + auto it = splitGroupState.scaledScanControllers.find(planNodeId); + if (it != splitGroupState.scaledScanControllers.end()) { + VELOX_CHECK_NOT_NULL(it->second); + VELOX_CHECK(queryCtx_->queryConfig().tableScanScaledProcessingEnabled()); + return it->second; + } + + if (!queryCtx_->queryConfig().tableScanScaledProcessingEnabled()) { + return nullptr; + } + + splitGroupState.scaledScanControllers.emplace( + planNodeId, + std::make_shared( + nodePool, + numDrivers(pipelineId), + queryCtx_->queryConfig().tableScanScaleUpMemoryUsageRatio(), + queryCtx_->queryConfig().tableScanScaleDownMemoryUsageRatio())); + return splitGroupState.scaledScanControllers[planNodeId]; +} + void Task::splitFinished(bool fromTableScan, int64_t splitWeight) { std::lock_guard l(mutex_); ++taskStats_.numFinishedSplits; diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 8c11bb6493a9..7176c297d008 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -23,6 +23,7 @@ #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/MergeSource.h" #include "velox/exec/Split.h" +#include "velox/exec/TableScan.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" #include "velox/exec/TaskTraceWriter.h" @@ -502,6 +503,15 @@ class Task : public std::enable_shared_from_this { int32_t maxPreloadSplits = 0, const ConnectorSplitPreloadFunc& preload = nullptr); + /// Returns the scaled scan controller for a given table scan node if the + /// query has configured. This function is called by the table scan operator + /// initialization, and the first invocation creates the controller. + std::shared_ptr getOrAddScaledScanController( + uint32_t splitGroupId, + uint32_t pipelineId, + const core::PlanNodeId& planNodeId, + memory::MemoryPool* nodePool); + void splitFinished(bool fromTableScan, int64_t splitWeight); void multipleSplitsFinished( diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index 7c193240689a..cf3a39703fd4 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -116,6 +116,10 @@ struct SplitGroupState { /// Map of local exchanges keyed on LocalPartition plan node ID. std::unordered_map localExchanges; + /// Map of scaled scan controllers keyed on TableScan plan node ID. + std::unordered_map> + scaledScanControllers; + /// Drivers created and still running for this split group. /// The split group is finished when this numbers reaches zero. uint32_t numRunningDrivers{0}; diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index b57a55e8fee9..8389aae5ba22 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -2481,6 +2481,226 @@ TEST_P(MultiFragmentTest, compression) { test("local://t2", 0.0000001, true); } +TEST_P(MultiFragmentTest, tableScanScaleUp) { + const int numSplits = 20; + std::vector> splitFiles; + std::vector splitVectors; + for (auto i = 0; i < numSplits; ++i) { + auto vectors = makeVectors(10, 1'000); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + splitVectors.insert(splitVectors.end(), vectors.begin(), vectors.end()); + } + + createDuckDbTable(splitVectors); + + struct { + bool scaleEnabled; + double scaleUpMemoryUsageRatio; + double scaleDownMemoryUsageRatio; + bool expectScaleUp; + + std::string debugString() const { + return fmt::format( + "scaleEnabled {}, scaleUpMemoryUsageRatio {}, scaleDownMemoryUsageRatio {}, expectScaleUp {}", + scaleEnabled, + scaleUpMemoryUsageRatio, + scaleDownMemoryUsageRatio, + expectScaleUp); + } + } testSettings[] = { + {false, 0.9, 0.95, false}, + {true, 0.9, 0.95, true}, + {false, 0.00001, 0.2, false}, + {true, 0.00001, 0.2, false}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId; + configSettings_[core::QueryConfig::kTableScanScaleUpEnabled] = + testData.scaleEnabled ? "true" : "false"; + configSettings_[core::QueryConfig::kTableScanScaleUpMemoryUsageRatio] = + std::to_string(testData.scaleUpMemoryUsageRatio); + configSettings_[core::QueryConfig::kTableScanScaleDownMemoryUsageRatio] = + std::to_string(testData.scaleDownMemoryUsageRatio); + + const auto leafPlan = + PlanBuilder() + .tableScan(rowType_) + .capturePlanNodeId(scanNodeId) + .partialAggregation( + {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto leafTaskId = "local://leaf-0"; + auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 128ULL << 20); + const auto numLeafDrivers{4}; + leafTask->start(numLeafDrivers); + addHiveSplits(leafTask, splitFiles); + + const auto finalAggPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam()) + .finalAggregation( + {"c5"}, + {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, + {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto finalAggTaskId = "local://final-agg-0"; + auto finalAggTask = makeTask(finalAggTaskId, finalAggPlan, 0); + const auto numFinalAggrDrivers{1}; + finalAggTask->start(numFinalAggrDrivers); + addRemoteSplits(finalAggTask, {leafTaskId}); + + const auto resultPlan = + PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam()) + .planNode(); + + assertQuery( + resultPlan, + {finalAggTaskId}, + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + + ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); + ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) + << finalAggTask->taskId(); + + auto planStats = toPlanStats(leafTask->taskStats()); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleDown), + 0); + if (testData.expectScaleUp) { + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleUp), + 1); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleUp).count, + 1); + ASSERT_GT( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleUp).sum, + 1); + } else { + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleUp), + 0); + } + } +} + +TEST_P(MultiFragmentTest, tableScanScaleDown) { + const int numSplits = 20; + std::vector> splitFiles; + std::vector splitVectors; + for (auto i = 0; i < numSplits; ++i) { + auto vectors = makeVectors(10, 1'000); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + splitVectors.insert(splitVectors.end(), vectors.begin(), vectors.end()); + } + + createDuckDbTable(splitVectors); + + const auto numLeafDrivers{4}; + std::mutex lock; + std::unordered_set runningDriverIds; + std::atomic_bool blockDriverFlag{true}; + folly::EventCount blockDriverRunning; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::TableScan::getOutput::finishSplit", + std::function(([&](Operator* op) { + bool waitForOtherDriver{true}; + { + std::lock_guard l(lock); + if (runningDriverIds.size() == numLeafDrivers) { + return; + } + runningDriverIds.insert( + op->testingOperatorCtx()->driverCtx()->driverId); + if (runningDriverIds.size() == numLeafDrivers) { + waitForOtherDriver = false; + } + } + if (waitForOtherDriver) { + blockDriverRunning.await([&] { return !blockDriverFlag.load(); }); + } else { + auto* scanOp = static_cast(op); + scanOp->testingScaler()->testingSetMemoryRatios(0.00001, 0.00002); + blockDriverFlag = false; + blockDriverRunning.notifyAll(); + } + }))); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId; + configSettings_[core::QueryConfig::kTableScanScaleUpEnabled] = "true"; + configSettings_[core::QueryConfig::kTableScanScaleUpMemoryUsageRatio] = "0.9"; + configSettings_[core::QueryConfig::kTableScanScaleDownMemoryUsageRatio] = + "0.95"; + + const auto leafPlan = + PlanBuilder() + .tableScan(rowType_) + .capturePlanNodeId(scanNodeId) + .partialAggregation( + {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto leafTaskId = "local://leaf-0"; + auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 128ULL << 20); + leafTask->start(numLeafDrivers); + addHiveSplits(leafTask, splitFiles); + + const auto finalAggPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam()) + .finalAggregation( + {"c5"}, + {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, + {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto finalAggTaskId = "local://final-agg-0"; + auto finalAggTask = makeTask(finalAggTaskId, finalAggPlan, 0); + const auto numFinalAggrDrivers{1}; + finalAggTask->start(numFinalAggrDrivers); + addRemoteSplits(finalAggTask, {leafTaskId}); + + const auto resultPlan = + PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + + assertQuery( + resultPlan, + {finalAggTaskId}, + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + + ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); + ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) + << finalAggTask->taskId(); + + auto planStats = toPlanStats(leafTask->taskStats()); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleUp), 1); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleUp).count, 1); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleUp).sum, 3); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleDown), 1); + ASSERT_GT( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleDown).count, + 0); +} + VELOX_INSTANTIATE_TEST_SUITE_P( MultiFragmentTest, MultiFragmentTest, diff --git a/velox/exec/tests/utils/OperatorTestBase.cpp b/velox/exec/tests/utils/OperatorTestBase.cpp index 69300dafb0b3..abca5d66d57b 100644 --- a/velox/exec/tests/utils/OperatorTestBase.cpp +++ b/velox/exec/tests/utils/OperatorTestBase.cpp @@ -117,7 +117,7 @@ void OperatorTestBase::setupMemory( } void OperatorTestBase::resetMemory() { - OperatorTestBase::setupMemory(8L << 30, 6L << 30, 0, 512 << 20, 0, 0, 0); + OperatorTestBase::setupMemory(256L << 30, 6L << 30, 0, 512 << 20, 0, 0, 0); } void OperatorTestBase::SetUp() {