From d83e8beae66f83e9e6e159eb446f91b109d9c802 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Mon, 16 Dec 2024 22:00:32 -0800 Subject: [PATCH] feat: Add auto table scan scaling based on memory usage (#11879) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11879 Adds auto table scan scaling support to solve the query OOM caused by high concurrent memory intensive table scan operations. Instead of running all the table scan threads at the start of the query, we start from running one single table scan thread and gradually scheduling more table scan thread when there is sufficient free available memory capacity for the query (measured as the current used memory versus the query max capacity). When the query is approaching its max limit, we stop scheduling more table scan threads and even stop current running scan threads to free up memory to prevent OOM. The scale up/down decision happens when a table scan operator finishes process a non-empty split. A scale controller is added to each table scan plan node for this coordinated control and two memory ratios are defined for scale up/down decision, and it estimate the per-scan driver memory usage based on the memory usage report when a table scan thread finishes a non-empty split. The Meta internal test shows that a query failed with OOM right after 1 min execution with 10 leaf threads, and finished in 2 hrs if reduced leaf thread count to 5. Java took 1 hour to finish with persistent shuffle (LBM). With auto scale, the query finish on Prestissimo in 30 mins. We don't expect this feature to be enabled by default as adhoc small query needs to run all the scan threads in parallel at the start. This will only be used for some large pipeline that could be enabled by query config in velox and session properties in Prestissimo Differential Revision: D67114511 --- velox/core/QueryConfig.h | 26 +++ velox/docs/configs.rst | 37 +++- velox/docs/monitoring/stats.rst | 15 ++ velox/exec/Driver.cpp | 9 +- velox/exec/Driver.h | 4 + velox/exec/TableScan.cpp | 250 ++++++++++++++++++++++++- velox/exec/TableScan.h | 147 ++++++++++++++- velox/exec/Task.cpp | 27 +++ velox/exec/Task.h | 10 + velox/exec/TaskStructs.h | 4 + velox/exec/tests/MultiFragmentTest.cpp | 121 ++++++++++++ 11 files changed, 621 insertions(+), 29 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index e6c4032768b1..8031145115f0 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -472,6 +472,24 @@ class QueryConfig { static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold = "scaled_writer_min_processed_bytes_rebalance_threshold"; + /// If true, enables the scaled table scan processing. For each table scan + /// plan node, a scan controller is used to control the number of running scan + /// threads based on the query memory usage. It keeps increasing the number of + /// running threads until the query memory usage exceeds the threshold defined + /// by 'table_scan_scale_up_memory_usage_ratio'. + static constexpr const char* kTableScanScaledProcessingEnabled = + "table_scan_scaled_processing_enabled"; + + /// The query memory usage ratio used by scan controller to decide if it can + /// increase the number of running scan threads. When the query memory usage + /// is below this ratio, the scan controller keeps increasing the running scan + /// thread for scale up, and stop once exceeds this ratio. The value is in the + /// range of [0, 1]. + /// + /// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true. + static constexpr const char* kTableScanScaleUpMemoryUsageRatio = + "table_scan_scale_up_memory_usage_ratio"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -880,6 +898,14 @@ class QueryConfig { kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20); } + bool tableScanScaledProcessingEnabled() const { + return get(kTableScanScaledProcessingEnabled, true); + } + + double tableScanScaleUpMemoryUsageRatio() const { + return get(kTableScanScaleUpMemoryUsageRatio, 0.5); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index a932ecd57558..9fe2534b1262 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -390,6 +390,23 @@ Table Scan - integer - 2 - Maximum number of splits to preload per driver. Set to 0 to disable preloading. + * - table_scan_scaled_processing_enabled + - bool + - false + - If true, enables the scaled table scan processing. For each table scan + plan node, a scan controller is used to control the number of running scan + threads based on the query memory usage. It keeps increasing the number of + running threads until the query memory usage exceeds the threshold defined + by 'table_scan_scale_up_memory_usage_ratio'. + * - table_scan_scale_up_memory_usage_ratio + - double + - 0.5 + - The query memory usage ratio used by scan controller to decide if it can + increase the number of running scan threads. When the query memory usage + is below this ratio, the scan controller scale up the scan processing by + increasing the number of running scan threads, and stop once exceeds this + ratio. The value is in the range of [0, 1]. This only applies if + 'table_scan_scaled_processing_enabled' is true. Table Writer ------------ @@ -414,26 +431,26 @@ Table Writer - double - 0.7 - The max ratio of a query used memory to its max capacity, and the scale - - writer exchange stops scaling writer processing if the query's current - - memory usage exceeds this ratio. The value is in the range of (0, 1]. + writer exchange stops scaling writer processing if the query's current + memory usage exceeds this ratio. The value is in the range of (0, 1]. * - scaled_writer_max_partitions_per_writer - integer - 128 - The max number of logical table partitions that can be assigned to a - - single table writer thread. The logical table partition is used by local - - exchange writer for writer scaling, and multiple physical table - - partitions can be mapped to the same logical table partition based on the - - hash value of calculated partitioned ids. + single table writer thread. The logical table partition is used by local + exchange writer for writer scaling, and multiple physical table + partitions can be mapped to the same logical table partition based on the + hash value of calculated partitioned ids. + * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - integer - 128MB - * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - Minimum amount of data processed by a logical table partition to trigger - - writer scaling if it is detected as overloaded by scale wrirer exchange. + writer scaling if it is detected as overloaded by scale wrirer exchange. * - scaled_writer_min_processed_bytes_rebalance_threshold - - Minimum amount of data processed by all the logical table partitions to - - trigger skewed partition rebalancing by scale writer exchange. - integer - 256MB + - Minimum amount of data processed by all the logical table partitions to + trigger skewed partition rebalancing by scale writer exchange. Hive Connector -------------- diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index 53538d854974..a6feb1d32cb0 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -90,6 +90,21 @@ These stats are reported only by HashBuild and HashAggregation operators. - Time spent on building the hash table from rows collected by all the hash build operators. This stat is only reported by the HashBuild operator. +TableScan +--------- +These stats are reported only by TableScan operator + +.. list-table:: + :widths: 50 25 50 + :header-rows: 1 + + * - Stats + - Unit + - Description + * - numRunningScanThreads + - + - The number of running table scan drivers. + TableWriter ----------- These stats are reported only by TableWriter operator diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index e5c4656d6521..9a6a4740ddd8 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -631,11 +631,10 @@ StopReason Driver::runInternal( nextOp, curOperatorId_ + 1, kOpMethodAddInput); - - // The next iteration will see if operators_[i + 1] has - // output now that it got input. - i += 2; }); + // The next iteration will see if operators_[i + 1] has + // output now that it got input. + i += 2; continue; } else { stop = task()->shouldStop(); @@ -1129,6 +1128,8 @@ std::string blockingReasonToString(BlockingReason reason) { return "kYield"; case BlockingReason::kWaitForArbitration: return "kWaitForArbitration"; + case BlockingReason::kWaitForScanScaleUp: + return "kWaitForScanScaleUp"; default: VELOX_UNREACHABLE( fmt::format("Unknown blocking reason {}", static_cast(reason))); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7d46017187e4..64d0a9b219b6 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -210,6 +210,10 @@ enum class BlockingReason { /// Operator is blocked waiting for its associated query memory arbitration to /// finish. kWaitForArbitration, + /// For a table scan operator, it is blocked waiting for the scan controller + /// to increase the number of table scan processing threads to start + /// processing. + kWaitForScanScaleUp, }; std::string blockingReasonToString(BlockingReason reason); diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 7586a52f763f..628724327292 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -37,19 +37,29 @@ TableScan::TableScan( tableHandle_(tableScanNode->tableHandle()), columnHandles_(tableScanNode->assignments()), driverCtx_(driverCtx), + maxSplitPreloadPerDriver_( + driverCtx_->queryConfig().maxSplitPreloadPerDriver()), + maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), connectorPool_(driverCtx_->task->addConnectorPoolLocked( planNodeId(), driverCtx_->pipelineId, driverCtx_->driverId, operatorType(), tableHandle_->connectorId())), - maxSplitPreloadPerDriver_( - driverCtx_->queryConfig().maxSplitPreloadPerDriver()), - readBatchSize_(driverCtx_->queryConfig().preferredOutputBatchRows()), - maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), + connector_(connector::getConnector(tableHandle_->connectorId())), getOutputTimeLimitMs_( driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()) { - connector_ = connector::getConnector(tableHandle_->connectorId()); + readBatchSize_ = driverCtx_->queryConfig().preferredOutputBatchRows(); +} + +void TableScan::initialize() { + Operator::initialize(); + VELOX_CHECK_NULL(scaledController_); + scaledController_ = driverCtx_->task->getOrAddScaledScanController( + driverCtx_->splitGroupId, + driverCtx_->pipelineId, + planNodeId(), + pool()->parent()); } folly::dynamic TableScan::toJson() const { @@ -76,10 +86,20 @@ bool TableScan::shouldStop(StopReason taskStopReason) const { RowVectorPtr TableScan::getOutput() { auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; }); + VELOX_CHECK(!blockingFuture_.valid()); + blockingReason_ = BlockingReason::kNotBlocked; + if (noMoreSplits_) { return nullptr; } + // Check if we need to wait for scale up. We expect only wait once on startup. + if (shouldWaitForScaleUp()) { + VELOX_CHECK(blockingFuture_.valid()); + VELOX_CHECK_EQ(blockingReason_, BlockingReason::kWaitForScanScaleUp); + return nullptr; + } + curStatus_ = "getOutput: enter"; const auto startTimeMs = getCurrentTimeMs(); for (;;) { @@ -259,6 +279,7 @@ RowVectorPtr TableScan::getOutput() { curStatus_ = "getOutput: updating stats_.rawInput"; lockedStats->rawInputPositions = dataSource_->getCompletedRows(); lockedStats->rawInputBytes = dataSource_->getCompletedBytes(); + RowVectorPtr data = std::move(dataOptional).value(); if (data != nullptr) { if (data->size() > 0) { @@ -274,6 +295,7 @@ RowVectorPtr TableScan::getOutput() { } } + uint64_t currNumRawInputRows{0}; { curStatus_ = "getOutput: updating stats_.preloadedSplits"; auto lockedStats = stats_.wlock(); @@ -287,12 +309,50 @@ RowVectorPtr TableScan::getOutput() { "readyPreloadedSplits", RuntimeCounter(numReadyPreloadedSplits_)); numReadyPreloadedSplits_ = 0; } + currNumRawInputRows = lockedStats->rawInputPositions; } + VELOX_CHECK_LE(rawInputRowsSinceLastSplit_, currNumRawInputRows); + const bool emptySplit = currNumRawInputRows == rawInputRowsSinceLastSplit_; + rawInputRowsSinceLastSplit_ = currNumRawInputRows; curStatus_ = "getOutput: task->splitFinished"; driverCtx_->task->splitFinished(true, currentSplitWeight_); needNewSplit_ = true; + + // We only update scaled controller when we have finished a non-empty split. + // Otherwise, it can lead to the wrong scale up decisions if the first few + // splits are empty. Then we only report the memory usage for the file + // footer read which is much smaller the actual memory usage when read from + // a non-empty split. This can cause query OOM as we run too many scan + // drivers with each use non-trivial amount of memory. + if (!emptySplit) { + tryScaleUp(); + } + } +} + +bool TableScan::shouldWaitForScaleUp() { + if (scaledController_ == nullptr) { + return false; + } + + curStatus_ = "getOutput: shouldWaitForScaleUp"; + if (!scaledController_->shouldStop( + operatorCtx_->driverCtx()->driverId, &blockingFuture_)) { + VELOX_CHECK(!blockingFuture_.valid()); + return false; + } + blockingReason_ = BlockingReason::kWaitForScanScaleUp; + return true; +} + +void TableScan::tryScaleUp() { + if (scaledController_ == nullptr) { + return; } + + scaledController_->updateAndTryScale( + operatorCtx_->driverCtx()->driverId, pool()->peakBytes()); } void TableScan::preload( @@ -380,4 +440,184 @@ void TableScan::addDynamicFilter( stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer); } +void TableScan::close() { + Operator::close(); + + if (scaledController_ == nullptr) { + return; + } + + // Report the scaled controller stats by the first finished scan operator at + // which point all the splits have been dispatched. + if (!scaledController_->close()) { + return; + } + + const auto scaledStats = scaledController_->stats(); + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + TableScan::kNumRunningScaleThreads, + RuntimeCounter(scaledStats.numRunningDrivers)); +} + +ScaledScanController::ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio) + : nodePool_(nodePool), + queryPool_(nodePool_->root()), + numDrivers_(numDrivers), + scaleUpMemoryUsageRatio_(scaleUpMemoryUsageRatio), + driverPromises_(numDrivers_) { + VELOX_CHECK_GE(scaleUpMemoryUsageRatio_, 0.0); + VELOX_CHECK_LE(scaleUpMemoryUsageRatio_, 1.0); +} + +bool ScaledScanController::shouldStop( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::lock_guard l(lock_); + if (closed_) { + return false; + } + return shouldStopLocked(driverIdx, future); +} + +bool ScaledScanController::shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK(!closed_); + if (driverIdx < numRunningDrivers_) { + return false; + } + + VELOX_CHECK(!driverPromises_[driverIdx].has_value()); + auto [driverPromise, driverFuture] = makeVeloxContinuePromiseContract( + fmt::format("Table scan driver {} scale promise", driverIdx)); + driverPromises_[driverIdx] = std::move(driverPromise); + *future = std::move(driverFuture); + return true; +} + +void ScaledScanController::updateAndTryScale( + uint32_t driverIdx, + uint64_t memoryUsage) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::optional driverPromise; + SCOPE_EXIT { + if (driverPromise.has_value()) { + driverPromise->setValue(); + } + }; + { + std::lock_guard l(lock_); + VELOX_CHECK_LT(driverIdx, numRunningDrivers_); + + if (closed_) { + return; + } + + updateDriverScanUsageLocked(driverIdx, memoryUsage); + + tryScaleLocked(driverPromise); + } +} + +void ScaledScanController::updateDriverScanUsageLocked( + uint32_t driverIdx, + uint64_t memoryUsage) { + if (estimatedDriverUsage_ == 0) { + estimatedDriverUsage_ = memoryUsage; + } else { + estimatedDriverUsage_ = (estimatedDriverUsage_ * 3 + memoryUsage) / 4; + } + + if (numDriverUsageReports_ == numRunningDrivers_) { + return; + } + VELOX_CHECK_EQ(numDriverUsageReports_ + 1, numRunningDrivers_); + + if (driverIdx + 1 < numRunningDrivers_) { + return; + } + VELOX_CHECK_EQ(driverIdx, numRunningDrivers_ - 1); + ++numDriverUsageReports_; +} + +void ScaledScanController::tryScaleLocked( + std::optional& driverPromise) { + VELOX_CHECK_LE(numDriverUsageReports_, numRunningDrivers_); + + if (numRunningDrivers_ == numDrivers_) { + return; + } + if (numDriverUsageReports_ < numRunningDrivers_) { + // We shall only make the next scale up decision until we have received + // the memory usage updates from all the running scan drivers. + return; + } + + const uint64_t maxQueryCapacity = queryPool_->maxCapacity(); + + const uint64_t currQueryUsage = queryPool_->reservedBytes(); + const uint64_t currNodeUsage = nodePool_->reservedBytes(); + const uint64_t currOtherUsage = + currQueryUsage > currNodeUsage ? currQueryUsage - currNodeUsage : 0; + + const uint64_t estimatedNodeUsageAfterScale = std::max( + currNodeUsage + estimatedDriverUsage_, + estimatedDriverUsage_ * (numRunningDrivers_ + 1)); + if ((estimatedNodeUsageAfterScale + currOtherUsage) >= + maxQueryCapacity * scaleUpMemoryUsageRatio_) { + return; + } + + scaleUpLocked(driverPromise); +} + +void ScaledScanController::scaleUpLocked( + std::optional& driverPromise) { + VELOX_CHECK_LT(numRunningDrivers_, numDrivers_); + + ++numRunningDrivers_; + if (driverPromises_[numRunningDrivers_ - 1].has_value()) { + driverPromise = std::move(driverPromises_[numRunningDrivers_ - 1]); + driverPromises_[numRunningDrivers_ - 1].reset(); + } +} + +ScaledScanController::~ScaledScanController() { + close(); +} + +bool ScaledScanController::close() { + std::vector promises; + { + std::lock_guard l(lock_); + if (closed_) { + return false; + } + + promises.reserve(driverPromises_.size()); + for (auto& promise : driverPromises_) { + if (promise.has_value()) { + promises.emplace_back(std::move(promise.value())); + promise.reset(); + } + } + closed_ = true; + } + + for (auto& promise : promises) { + promise.setValue(); + } + return true; +} + +std::string ScaledScanController::Stats::toString() const { + return fmt::format("numRunningDrivers: {}", numRunningDrivers); +} } // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index deec109b02c1..af9ffb07f109 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -20,6 +20,102 @@ namespace facebook::velox::exec { +/// Controller used to scales table scan processing based on the query memory +/// usage. +class ScaledScanController { + public: + /// 'nodePool' is the memory pool of the table scan node. 'numDrivers' is + /// number of the table scan drivers. 'scaleUpMemoryUsageRatio' specifies the + /// memory usage ratio used to make scan scale up decision. + ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio); + + ~ScaledScanController(); + + ScaledScanController(const ScaledScanController&) = delete; + ScaledScanController(ScaledScanController&&) = delete; + ScaledScanController& operator=(const ScaledScanController&) = delete; + ScaledScanController& operator=(ScaledScanController&&) = delete; + + /// Invoked by a scan operator to check if it needs to stop waiting for scan + /// up to start processing. If so, 'future' is set to a future that will be + /// ready when the controller decides to start this scan operator processing, + /// or all the splits from the scan node have been dispatched to finish all + /// the scan operators. 'driverIcx' is the driver id of the scan operator. + /// Initially, only the first scan operator at driver index 0 is allowed to + /// run. + bool shouldStop(uint32_t driverIdx, ContinueFuture* future); + + /// Invoked by a scan operator to update per-driver memory usage estimation + /// after finish processing a non-empty split. 'driverIdx' is the driver id of + /// the scan operator. 'driverMemoryUsage' is the peak memory usage of the + /// scan operator. + void updateAndTryScale(uint32_t driverIdx, uint64_t driverMemoryUsage); + + struct Stats { + uint32_t numRunningDrivers{0}; + + std::string toString() const; + }; + + Stats stats() const { + std::lock_guard l(lock_); + return {.numRunningDrivers = numRunningDrivers_}; + } + + /// Invoked by the closed scan operator to close the controller. It returns + /// true on the first invocation, and otherwise false. + bool close(); + + void testingSetMemoryRatio(double scaleUpMemoryUsageRatio) { + std::lock_guard l(lock_); + *const_cast(&scaleUpMemoryUsageRatio_) = scaleUpMemoryUsageRatio; + } + + private: + // Invoked to check if we can scale up scan processing. If so, call + // 'scaleUpLocked' for scale up processing. + void tryScaleLocked(std::optional& driverePromise); + + // Invoked to scale up scan processing by bumping up the number of running + // scan drivers by one. 'drverPromise' returns the promise to fulfill if the + // scaled scan driver has been stopped. + void scaleUpLocked(std::optional& driverePromise); + + // Invoked to check if we need to stop waiting for scale up processing of the + // specified scan driver. If 'driverIdx' is beyond 'numRunningDrivers_', then + // a promise is set in 'driverPromises_' and the associated future is returned + // in 'future'. + bool shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future); + + /// Invoked to update the per-driver memory usage estimation with a new driver + /// report. + void updateDriverScanUsageLocked(uint32_t driverIdx, uint64_t memoryUsage); + + memory::MemoryPool* const nodePool_; + memory::MemoryPool* const queryPool_; + const uint32_t numDrivers_; + const double scaleUpMemoryUsageRatio_; + + mutable std::mutex lock_; + uint32_t numRunningDrivers_{1}; + + // The estimated per-driver memory usage of the table scan node. + uint64_t estimatedDriverUsage_{0}; + + // The number of drivers has reported memory usage. + uint32_t numDriverUsageReports_{0}; + + // The driver resume promises with one per each driver index. + std::vector> driverPromises_; + + bool closed_{false}; +}; + class TableScan : public SourceOperator { public: TableScan( @@ -27,6 +123,8 @@ class TableScan : public SourceOperator { DriverCtx* driverCtx, const std::shared_ptr& tableScanNode); + void initialize() override; + folly::dynamic toJson() const override; RowVectorPtr getOutput() override; @@ -41,6 +139,8 @@ class TableScan : public SourceOperator { bool isFinished() override; + void close() override; + bool canAddDynamicFilter() const override { return connector_->canAddDynamicFilter(); } @@ -50,6 +150,18 @@ class TableScan : public SourceOperator { column_index_t outputChannel, const std::shared_ptr& filter) override; + /// The name of runtime stats specific to table scan. + /// The number of running table scan drivers. + /// + /// NOTE: we only report the number of running scan drivers at the point that + /// all the splits have been dispatched. + static inline const std::string kNumRunningScaleThreads{ + "numRunningScaleThreads"}; + + std::shared_ptr testingScaledController() const { + return scaledController_; + } + private: // Checks if this table scan operator needs to yield before processing the // next split. @@ -70,17 +182,37 @@ class TableScan : public SourceOperator { // done, it will be made when needed. void preload(const std::shared_ptr& split); + // Invoked by scan operator to check if it needs to stop to wait for scale up. + bool shouldWaitForScaleUp(); + + // Invoked after scan operator finishes processing a non-empty split to update + // the scan driver memory usage and check to see if we need to scale up scan + // processing or not. + void tryScaleUp(); + const std::shared_ptr tableHandle_; const std:: unordered_map> columnHandles_; DriverCtx* const driverCtx_; + const int32_t maxSplitPreloadPerDriver_{0}; + const vector_size_t maxReadBatchSize_; memory::MemoryPool* const connectorPool_; + const std::shared_ptr connector_; + // Exits getOutput() method after this many milliseconds. Zero means 'no + // limit'. + const size_t getOutputTimeLimitMs_{0}; + + // If set, used for scan scale processing. It is shared by all the scan + // operators instantiated from the same table scan node. + std::shared_ptr scaledController_; + + vector_size_t readBatchSize_; + ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; int64_t currentSplitWeight_{0}; bool needNewSplit_ = true; - std::shared_ptr connector_; std::shared_ptr connectorQueryCtx_; std::unique_ptr dataSource_; bool noMoreSplits_ = false; @@ -90,8 +222,6 @@ class TableScan : public SourceOperator { int32_t maxPreloadedSplits_{0}; - const int32_t maxSplitPreloadPerDriver_{0}; - // Callback passed to getSplitOrFuture() for triggering async preload. The // callback's lifetime is the lifetime of 'this'. This callback can schedule // preloads on an executor. These preloads may outlive the Task and therefore @@ -105,13 +235,6 @@ class TableScan : public SourceOperator { // Count of splits that finished preloading before being read. int32_t numReadyPreloadedSplits_{0}; - vector_size_t readBatchSize_; - vector_size_t maxReadBatchSize_; - - // Exits getOutput() method after this many milliseconds. Zero means 'no - // limit'. - size_t getOutputTimeLimitMs_{0}; - double maxFilteringRatio_{0}; // String shown in ExceptionContext inside DataSource and LazyVector loading. @@ -120,5 +243,9 @@ class TableScan : public SourceOperator { // Holds the current status of the operator. Used when debugging to understand // what operator is doing. std::atomic curStatus_{""}; + + // The total number of raw input rows read up till the last finished split. + // This is used to detect if a finished split is empty or not. + uint64_t rawInputRowsSinceLastSplit_{0}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index fbe57f3e34a7..b66848f564b0 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -1586,6 +1586,33 @@ exec::Split Task::getSplitLocked( return split; } +std::shared_ptr Task::getOrAddScaledScanController( + uint32_t splitGroupId, + uint32_t pipelineId, + const core::PlanNodeId& planNodeId, + memory::MemoryPool* nodePool) { + std::lock_guard l(mutex_); + auto& splitGroupState = splitGroupStates_[splitGroupId]; + auto it = splitGroupState.scaledScanControllers.find(planNodeId); + if (it != splitGroupState.scaledScanControllers.end()) { + VELOX_CHECK_NOT_NULL(it->second); + VELOX_CHECK(queryCtx_->queryConfig().tableScanScaledProcessingEnabled()); + return it->second; + } + + if (!queryCtx_->queryConfig().tableScanScaledProcessingEnabled()) { + return nullptr; + } + + splitGroupState.scaledScanControllers.emplace( + planNodeId, + std::make_shared( + nodePool, + numDrivers(pipelineId), + queryCtx_->queryConfig().tableScanScaleUpMemoryUsageRatio())); + return splitGroupState.scaledScanControllers[planNodeId]; +} + void Task::splitFinished(bool fromTableScan, int64_t splitWeight) { std::lock_guard l(mutex_); ++taskStats_.numFinishedSplits; diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 8c11bb6493a9..7176c297d008 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -23,6 +23,7 @@ #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/MergeSource.h" #include "velox/exec/Split.h" +#include "velox/exec/TableScan.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" #include "velox/exec/TaskTraceWriter.h" @@ -502,6 +503,15 @@ class Task : public std::enable_shared_from_this { int32_t maxPreloadSplits = 0, const ConnectorSplitPreloadFunc& preload = nullptr); + /// Returns the scaled scan controller for a given table scan node if the + /// query has configured. This function is called by the table scan operator + /// initialization, and the first invocation creates the controller. + std::shared_ptr getOrAddScaledScanController( + uint32_t splitGroupId, + uint32_t pipelineId, + const core::PlanNodeId& planNodeId, + memory::MemoryPool* nodePool); + void splitFinished(bool fromTableScan, int64_t splitWeight); void multipleSplitsFinished( diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index 7c193240689a..cf3a39703fd4 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -116,6 +116,10 @@ struct SplitGroupState { /// Map of local exchanges keyed on LocalPartition plan node ID. std::unordered_map localExchanges; + /// Map of scaled scan controllers keyed on TableScan plan node ID. + std::unordered_map> + scaledScanControllers; + /// Drivers created and still running for this split group. /// The split group is finished when this numbers reaches zero. uint32_t numRunningDrivers{0}; diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index b57a55e8fee9..ecc10739a4c7 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -2481,6 +2481,127 @@ TEST_P(MultiFragmentTest, compression) { test("local://t2", 0.0000001, true); } +TEST_P(MultiFragmentTest, tableScanScaleUp) { + const int numSplits = 20; + std::vector> splitFiles; + std::vector splitVectors; + for (auto i = 0; i < numSplits; ++i) { + auto vectors = makeVectors(10, 1'000); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + splitVectors.insert(splitVectors.end(), vectors.begin(), vectors.end()); + } + + createDuckDbTable(splitVectors); + + struct { + bool scaleEnabled; + double scaleUpMemoryUsageRatio; + bool expectScaleUp; + + std::string debugString() const { + return fmt::format( + "scaleEnabled {}, scaleUpMemoryUsageRatio {}, expectScaleUp {}", + scaleEnabled, + scaleUpMemoryUsageRatio, + expectScaleUp); + } + } testSettings[] = { + {false, 0.9, false}, + {true, 0.9, true}, + {false, 1.0, false}, + {true, 1.0, true}, + {false, 0.00001, false}, + {true, 0.00001, false}, + {false, 0.0, false}, + {true, 0.0, false}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId; + configSettings_[core::QueryConfig::kTableScanScaledProcessingEnabled] = + testData.scaleEnabled ? "true" : "false"; + configSettings_[core::QueryConfig::kTableScanScaleUpMemoryUsageRatio] = + std::to_string(testData.scaleUpMemoryUsageRatio); + + const auto leafPlan = + PlanBuilder() + .tableScan(rowType_) + .capturePlanNodeId(scanNodeId) + .partialAggregation( + {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto leafTaskId = "local://leaf-0"; + auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 128ULL << 20); + const auto numLeafDrivers{4}; + leafTask->start(numLeafDrivers); + addHiveSplits(leafTask, splitFiles); + + const auto finalAggPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam()) + .finalAggregation( + {"c5"}, + {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, + {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto finalAggTaskId = "local://final-agg-0"; + auto finalAggTask = makeTask(finalAggTaskId, finalAggPlan, 0); + const auto numFinalAggrDrivers{1}; + finalAggTask->start(numFinalAggrDrivers); + addRemoteSplits(finalAggTask, {leafTaskId}); + + const auto resultPlan = + PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam()) + .planNode(); + + assertQuery( + resultPlan, + {finalAggTaskId}, + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + + ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); + ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) + << finalAggTask->taskId(); + + auto planStats = toPlanStats(leafTask->taskStats()); + if (testData.scaleEnabled) { + ASSERT_EQ( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 1); + if (testData.expectScaleUp) { + ASSERT_GT( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 1); + ASSERT_LE( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + numLeafDrivers); + } else { + ASSERT_EQ( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 1); + } + } else { + ASSERT_EQ( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 0); + } + } +} + VELOX_INSTANTIATE_TEST_SUITE_P( MultiFragmentTest, MultiFragmentTest,