diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index e6c4032768b1..8031145115f0 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -472,6 +472,24 @@ class QueryConfig { static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold = "scaled_writer_min_processed_bytes_rebalance_threshold"; + /// If true, enables the scaled table scan processing. For each table scan + /// plan node, a scan controller is used to control the number of running scan + /// threads based on the query memory usage. It keeps increasing the number of + /// running threads until the query memory usage exceeds the threshold defined + /// by 'table_scan_scale_up_memory_usage_ratio'. + static constexpr const char* kTableScanScaledProcessingEnabled = + "table_scan_scaled_processing_enabled"; + + /// The query memory usage ratio used by scan controller to decide if it can + /// increase the number of running scan threads. When the query memory usage + /// is below this ratio, the scan controller keeps increasing the running scan + /// thread for scale up, and stop once exceeds this ratio. The value is in the + /// range of [0, 1]. + /// + /// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true. + static constexpr const char* kTableScanScaleUpMemoryUsageRatio = + "table_scan_scale_up_memory_usage_ratio"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -880,6 +898,14 @@ class QueryConfig { kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20); } + bool tableScanScaledProcessingEnabled() const { + return get(kTableScanScaledProcessingEnabled, true); + } + + double tableScanScaleUpMemoryUsageRatio() const { + return get(kTableScanScaleUpMemoryUsageRatio, 0.5); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index a932ecd57558..9fe2534b1262 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -390,6 +390,23 @@ Table Scan - integer - 2 - Maximum number of splits to preload per driver. Set to 0 to disable preloading. + * - table_scan_scaled_processing_enabled + - bool + - false + - If true, enables the scaled table scan processing. For each table scan + plan node, a scan controller is used to control the number of running scan + threads based on the query memory usage. It keeps increasing the number of + running threads until the query memory usage exceeds the threshold defined + by 'table_scan_scale_up_memory_usage_ratio'. + * - table_scan_scale_up_memory_usage_ratio + - double + - 0.5 + - The query memory usage ratio used by scan controller to decide if it can + increase the number of running scan threads. When the query memory usage + is below this ratio, the scan controller scale up the scan processing by + increasing the number of running scan threads, and stop once exceeds this + ratio. The value is in the range of [0, 1]. This only applies if + 'table_scan_scaled_processing_enabled' is true. Table Writer ------------ @@ -414,26 +431,26 @@ Table Writer - double - 0.7 - The max ratio of a query used memory to its max capacity, and the scale - - writer exchange stops scaling writer processing if the query's current - - memory usage exceeds this ratio. The value is in the range of (0, 1]. + writer exchange stops scaling writer processing if the query's current + memory usage exceeds this ratio. The value is in the range of (0, 1]. * - scaled_writer_max_partitions_per_writer - integer - 128 - The max number of logical table partitions that can be assigned to a - - single table writer thread. The logical table partition is used by local - - exchange writer for writer scaling, and multiple physical table - - partitions can be mapped to the same logical table partition based on the - - hash value of calculated partitioned ids. + single table writer thread. The logical table partition is used by local + exchange writer for writer scaling, and multiple physical table + partitions can be mapped to the same logical table partition based on the + hash value of calculated partitioned ids. + * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - integer - 128MB - * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - Minimum amount of data processed by a logical table partition to trigger - - writer scaling if it is detected as overloaded by scale wrirer exchange. + writer scaling if it is detected as overloaded by scale wrirer exchange. * - scaled_writer_min_processed_bytes_rebalance_threshold - - Minimum amount of data processed by all the logical table partitions to - - trigger skewed partition rebalancing by scale writer exchange. - integer - 256MB + - Minimum amount of data processed by all the logical table partitions to + trigger skewed partition rebalancing by scale writer exchange. Hive Connector -------------- diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index 53538d854974..a6feb1d32cb0 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -90,6 +90,21 @@ These stats are reported only by HashBuild and HashAggregation operators. - Time spent on building the hash table from rows collected by all the hash build operators. This stat is only reported by the HashBuild operator. +TableScan +--------- +These stats are reported only by TableScan operator + +.. list-table:: + :widths: 50 25 50 + :header-rows: 1 + + * - Stats + - Unit + - Description + * - numRunningScanThreads + - + - The number of running table scan drivers. + TableWriter ----------- These stats are reported only by TableWriter operator diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index e5c4656d6521..9a6a4740ddd8 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -631,11 +631,10 @@ StopReason Driver::runInternal( nextOp, curOperatorId_ + 1, kOpMethodAddInput); - - // The next iteration will see if operators_[i + 1] has - // output now that it got input. - i += 2; }); + // The next iteration will see if operators_[i + 1] has + // output now that it got input. + i += 2; continue; } else { stop = task()->shouldStop(); @@ -1129,6 +1128,8 @@ std::string blockingReasonToString(BlockingReason reason) { return "kYield"; case BlockingReason::kWaitForArbitration: return "kWaitForArbitration"; + case BlockingReason::kWaitForScanScaleUp: + return "kWaitForScanScaleUp"; default: VELOX_UNREACHABLE( fmt::format("Unknown blocking reason {}", static_cast(reason))); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7d46017187e4..64d0a9b219b6 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -210,6 +210,10 @@ enum class BlockingReason { /// Operator is blocked waiting for its associated query memory arbitration to /// finish. kWaitForArbitration, + /// For a table scan operator, it is blocked waiting for the scan controller + /// to increase the number of table scan processing threads to start + /// processing. + kWaitForScanScaleUp, }; std::string blockingReasonToString(BlockingReason reason); diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 7586a52f763f..0b100d95d48b 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -37,19 +37,29 @@ TableScan::TableScan( tableHandle_(tableScanNode->tableHandle()), columnHandles_(tableScanNode->assignments()), driverCtx_(driverCtx), + maxSplitPreloadPerDriver_( + driverCtx_->queryConfig().maxSplitPreloadPerDriver()), + maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), connectorPool_(driverCtx_->task->addConnectorPoolLocked( planNodeId(), driverCtx_->pipelineId, driverCtx_->driverId, operatorType(), tableHandle_->connectorId())), - maxSplitPreloadPerDriver_( - driverCtx_->queryConfig().maxSplitPreloadPerDriver()), - readBatchSize_(driverCtx_->queryConfig().preferredOutputBatchRows()), - maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), + connector_(connector::getConnector(tableHandle_->connectorId())), getOutputTimeLimitMs_( driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()) { - connector_ = connector::getConnector(tableHandle_->connectorId()); + readBatchSize_ = driverCtx_->queryConfig().preferredOutputBatchRows(); +} + +void TableScan::initialize() { + Operator::initialize(); + VELOX_CHECK_NULL(scaledController_); + scaledController_ = driverCtx_->task->getOrAddScaledScanController( + driverCtx_->splitGroupId, + driverCtx_->pipelineId, + planNodeId(), + pool()->parent()); } folly::dynamic TableScan::toJson() const { @@ -76,10 +86,20 @@ bool TableScan::shouldStop(StopReason taskStopReason) const { RowVectorPtr TableScan::getOutput() { auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; }); + VELOX_CHECK(!blockingFuture_.valid()); + blockingReason_ = BlockingReason::kNotBlocked; + if (noMoreSplits_) { return nullptr; } + // Check if we need to wait for scale up. We expect only wait once on startup. + if (shouldWaitForScaleUp()) { + VELOX_CHECK(blockingFuture_.valid()); + VELOX_CHECK_EQ(blockingReason_, BlockingReason::kWaitForScanScaleUp); + return nullptr; + } + curStatus_ = "getOutput: enter"; const auto startTimeMs = getCurrentTimeMs(); for (;;) { @@ -259,6 +279,7 @@ RowVectorPtr TableScan::getOutput() { curStatus_ = "getOutput: updating stats_.rawInput"; lockedStats->rawInputPositions = dataSource_->getCompletedRows(); lockedStats->rawInputBytes = dataSource_->getCompletedBytes(); + RowVectorPtr data = std::move(dataOptional).value(); if (data != nullptr) { if (data->size() > 0) { @@ -274,6 +295,7 @@ RowVectorPtr TableScan::getOutput() { } } + uint64_t currNumRawInputRows{0}; { curStatus_ = "getOutput: updating stats_.preloadedSplits"; auto lockedStats = stats_.wlock(); @@ -287,12 +309,52 @@ RowVectorPtr TableScan::getOutput() { "readyPreloadedSplits", RuntimeCounter(numReadyPreloadedSplits_)); numReadyPreloadedSplits_ = 0; } + currNumRawInputRows = lockedStats->rawInputPositions; } + VELOX_CHECK_LE(rawInputRowsSinceLastSplit_, currNumRawInputRows); + const bool emptySplit = currNumRawInputRows == rawInputRowsSinceLastSplit_; + rawInputRowsSinceLastSplit_ = currNumRawInputRows; curStatus_ = "getOutput: task->splitFinished"; driverCtx_->task->splitFinished(true, currentSplitWeight_); needNewSplit_ = true; + + // We only update scaled controller when we have finished a non-empty split. + // Otherwise, it can lead to the wrong scale up decisions if the first few + // splits are empty. Then we only report the memory usage for the file + // footer read which is much smaller the actual memory usage when read from + // a non-empty split. This can cause query OOM as we run too many scan + // drivers. + if (!emptySplit) { + tryScaleUp(); + } + TestValue::adjust( + "facebook::velox::exec::TableScan::getOutput::finishSplit", this); + } +} + +bool TableScan::shouldWaitForScaleUp() { + if (scaledController_ == nullptr) { + return false; + } + + curStatus_ = "getOutput: shouldWaitForScaleUp"; + if (!scaledController_->shouldStop( + operatorCtx_->driverCtx()->driverId, &blockingFuture_)) { + VELOX_CHECK(!blockingFuture_.valid()); + return false; + } + blockingReason_ = BlockingReason::kWaitForScanScaleUp; + return true; +} + +void TableScan::tryScaleUp() { + if (scaledController_ == nullptr) { + return; } + + scaledController_->updateAndTryScale( + operatorCtx_->driverCtx()->driverId, pool()->peakBytes()); } void TableScan::preload( @@ -380,4 +442,183 @@ void TableScan::addDynamicFilter( stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer); } +void TableScan::close() { + Operator::close(); + + if (scaledController_ == nullptr) { + return; + } + + // Report the scaled controller stats by the first finished scan operator at + // which point all the splits have been dispatched. + if (!scaledController_->close()) { + return; + } + + const auto scaledStats = scaledController_->stats(); + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + TableScan::kNumRunningScaleThreads, + RuntimeCounter(scaledStats.numRunningDrivers)); +} + +ScaledScanController::ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio) + : nodePool_(nodePool), + queryPool_(nodePool_->root()), + numDrivers_(numDrivers), + scaleUpMemoryUsageRatio_(scaleUpMemoryUsageRatio) { + VELOX_CHECK_GE(scaleUpMemoryUsageRatio_, 0.0); + VELOX_CHECK_LT(scaleUpMemoryUsageRatio_, 1.0); + driverPromises_.resize(numDrivers_, std::nullopt); +} + +bool ScaledScanController::shouldStop( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::lock_guard l(lock_); + if (closed_) { + return false; + } + return shouldStopLocked(driverIdx, future); +} + +bool ScaledScanController::shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK(!closed_); + if (driverIdx < numRunningDrivers_) { + return false; + } + + VELOX_CHECK(!driverPromises_[driverIdx].has_value()); + auto [driverPromise, driverFuture] = makeVeloxContinuePromiseContract( + fmt::format("Table scan driver {} scale promise", driverIdx)); + driverPromises_[driverIdx] = std::move(driverPromise); + *future = std::move(driverFuture); + return true; +} + +void ScaledScanController::updateAndTryScale( + uint32_t driverIdx, + uint64_t memoryUsage) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::optional driverPromise; + SCOPE_EXIT { + if (driverPromise.has_value()) { + driverPromise->setValue(); + } + }; + { + std::lock_guard l(lock_); + VELOX_CHECK_LT(driverIdx, numRunningDrivers_); + + if (closed_) { + return; + } + + updateDriverScanUsageLocked(driverIdx, memoryUsage); + + tryScaleLocked(driverPromise); + } +} + +void ScaledScanController::updateDriverScanUsageLocked( + uint32_t driverIdx, + uint64_t memoryUsage) { + if (estimatedDriverUsage_ == 0) { + estimatedDriverUsage_ = memoryUsage; + } else { + estimatedDriverUsage_ = (estimatedDriverUsage_ * 3 + memoryUsage) / 4; + } + + if (numDriverUsageReports_ == numRunningDrivers_) { + return; + } + VELOX_CHECK_EQ(numDriverUsageReports_ + 1, numRunningDrivers_); + + if (driverIdx + 1 < numRunningDrivers_) { + return; + } + VELOX_CHECK_EQ(driverIdx, numRunningDrivers_ - 1); + ++numDriverUsageReports_; +} + +void ScaledScanController::tryScaleLocked( + std::optional& driverPromise) { + VELOX_CHECK_LE(numDriverUsageReports_, numRunningDrivers_); + + if (numRunningDrivers_ == numDrivers_) { + return; + } + if (numDriverUsageReports_ < numRunningDrivers_) { + // We shall only make the next scale up decision until we have received + // the memory usage updates from all the running scan drivers. + return; + } + + const uint64_t maxQueryCapacity = queryPool_->maxCapacity(); + + const uint64_t currQueryUsage = queryPool_->reservedBytes(); + const uint64_t currNodeUsage = nodePool_->reservedBytes(); + const uint64_t currOtherUsage = + currQueryUsage > currNodeUsage ? currQueryUsage - currNodeUsage : 0; + + const uint64_t estimatedNodeUsageAfterScale = std::max( + currNodeUsage + estimatedDriverUsage_, + estimatedDriverUsage_ * (numRunningDrivers_ + 1)); + if ((estimatedNodeUsageAfterScale + currOtherUsage) >= + maxQueryCapacity * scaleUpMemoryUsageRatio_) { + return; + } + + scaleUpLocked(driverPromise); +} + +void ScaledScanController::scaleUpLocked( + std::optional& driverPromise) { + VELOX_CHECK_LT(numRunningDrivers_, numDrivers_); + + ++numRunningDrivers_; + if (driverPromises_[numRunningDrivers_ - 1].has_value()) { + driverPromise = std::move(driverPromises_[numRunningDrivers_ - 1]); + } +} + +ScaledScanController::~ScaledScanController() { + close(); +} + +bool ScaledScanController::close() { + std::vector promises; + { + std::lock_guard l(lock_); + if (closed_) { + return false; + } + + promises.reserve(driverPromises_.size()); + for (auto& promise : driverPromises_) { + if (promise.has_value()) { + promises.emplace_back(std::move(promise.value())); + promise.reset(); + } + } + closed_ = true; + } + + for (auto& promise : promises) { + promise.setValue(); + } + return true; +} + +std::string ScaledScanController::Stats::toString() const { + return fmt::format("numRunningDrivers: {}", numRunningDrivers); +} } // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index deec109b02c1..e050aa34234c 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -20,6 +20,102 @@ namespace facebook::velox::exec { +/// Controller used to scales table scan processing based on the query memory +/// usage. +class ScaledScanController { + public: + /// 'nodePool' is the memory pool of the table scan node. 'numDrivers' is + /// number of the table scan drivers. 'scaleUpMemoryUsageRatio' specifies the + /// memory usage ratio used to make scan scale up decision. + ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio); + + ~ScaledScanController(); + + ScaledScanController(const ScaledScanController&) = delete; + ScaledScanController(ScaledScanController&&) = delete; + ScaledScanController& operator=(const ScaledScanController&) = delete; + ScaledScanController& operator=(ScaledScanController&&) = delete; + + /// Invoked by a scan operator to check if it needs to stop waiting for scan + /// up to start processing. If so, 'future' is set to a future that will be + /// ready when the controller decides to start this scan operator processing, + /// or all the splits from the scan node have been dispatched to finish all + /// the scan operators. 'driverIcx' is the driver id of the scan operator. + /// Initially, only the first scan operator at driver index 0 is allowed to + /// run. + bool shouldStop(uint32_t driverIdx, ContinueFuture* future); + + /// Invoked by a scan operator to update per-driver memory usage estimation + /// after finish processing a non-empty split. 'driverIdx' is the driver id of + /// the scan operator. 'driverMemoryUsage' is the peak memory usage of the + /// scan operator. + void updateAndTryScale(uint32_t driverIdx, uint64_t driverMemoryUsage); + + struct Stats { + uint32_t numRunningDrivers{0}; + + std::string toString() const; + }; + + Stats stats() const { + std::lock_guard l(lock_); + return {.numRunningDrivers = numRunningDrivers_}; + } + + /// Invoked by the closed scan operator to close the controller. It returns + /// true on the first invocation, and otherwise false. + bool close(); + + void testingSetMemoryRatio(double scaleUpMemoryUsageRatio) { + std::lock_guard l(lock_); + *const_cast(&scaleUpMemoryUsageRatio_) = scaleUpMemoryUsageRatio; + } + + private: + // Invoked to check if we can scale up scan processing. If so, call + // 'scaleUpLocked' for scale up processing. + void tryScaleLocked(std::optional& driverePromise); + + // Invoked to scale up scan processing by bumping up the number of running + // scan drivers by one. 'drverPromise' returns the promise to fulfill if the + // scaled scan driver has been stopped. + void scaleUpLocked(std::optional& driverePromise); + + // Invoked to check if we need to stop waiting for scale up processing of the + // specified scan driver. If 'driverIdx' is beyond 'numRunningDrivers_', then + // a promise is set in 'driverPromises_' and the associated future is returned + // in 'future'. + bool shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future); + + /// Invoked to update the per-driver memory usage estimation with a new driver + /// report. + void updateDriverScanUsageLocked(uint32_t driverIdx, uint64_t memoryUsage); + + memory::MemoryPool* const nodePool_; + memory::MemoryPool* const queryPool_; + const uint32_t numDrivers_; + const double scaleUpMemoryUsageRatio_; + + mutable std::mutex lock_; + uint32_t numRunningDrivers_{1}; + + // The estimated per-driver memory usage of the table scan node. + uint64_t estimatedDriverUsage_{0}; + + // The number of drivers has reported memory usage. + uint32_t numDriverUsageReports_{0}; + + // The driver resume promises with one per each driver index. + std::vector> driverPromises_; + + bool closed_{false}; +}; + class TableScan : public SourceOperator { public: TableScan( @@ -27,20 +123,34 @@ class TableScan : public SourceOperator { DriverCtx* driverCtx, const std::shared_ptr& tableScanNode); + void initialize() override; + folly::dynamic toJson() const override; RowVectorPtr getOutput() override; BlockingReason isBlocked(ContinueFuture* future) override { - if (blockingFuture_.valid()) { - *future = std::move(blockingFuture_); - return blockingReason_; + if (!blockingFuture_.valid()) { + return BlockingReason::kNotBlocked; + } + + if (blockingReason_ == BlockingReason::kWaitForScanScaleUp) { + // If any operators from the same driver pipeline has pending data to + // flush, then we need to flush them first before stop this driver. This + // ensures that we don't hold significant amount of memory in this driver + // pipeline while it is stopped. + if (driverCtx_->driver->needsFlush(FlushReason::kScanScaleDown)) { + return BlockingReason::kNotBlocked; + } } - return BlockingReason::kNotBlocked; + *future = std::move(blockingFuture_); + return blockingReason_; } bool isFinished() override; + void close() override; + bool canAddDynamicFilter() const override { return connector_->canAddDynamicFilter(); } @@ -50,6 +160,18 @@ class TableScan : public SourceOperator { column_index_t outputChannel, const std::shared_ptr& filter) override; + /// The name of runtime stats specific to table scan. + /// The number of running table scan drivers. + /// + /// NOTE: we only report the number of running scan drivers at the point that + /// all the splits have been dispatched. + static inline const std::string kNumRunningScaleThreads{ + "numRunningScaleThreads"}; + + std::shared_ptr testingScaledController() const { + return scaledController_; + } + private: // Checks if this table scan operator needs to yield before processing the // next split. @@ -70,17 +192,37 @@ class TableScan : public SourceOperator { // done, it will be made when needed. void preload(const std::shared_ptr& split); + // Invoked by scan operator to check if it needs to stop to wait for scale up. + bool shouldWaitForScaleUp(); + + // Invoked after this scan operator finishes processing a split to update the + // scan driver memory usage and check to see if we need to scale up scan + // processing or not. + void tryScaleUp(); + const std::shared_ptr tableHandle_; const std:: unordered_map> columnHandles_; DriverCtx* const driverCtx_; + const int32_t maxSplitPreloadPerDriver_{0}; + const vector_size_t maxReadBatchSize_; memory::MemoryPool* const connectorPool_; + const std::shared_ptr connector_; + // Exits getOutput() method after this many milliseconds. Zero means 'no + // limit'. + const size_t getOutputTimeLimitMs_{0}; + + // If set, used for scan scale processing. It is shared by all the scan + // operators instantiated from the same table scan node. + std::shared_ptr scaledController_; + + vector_size_t readBatchSize_; + ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; int64_t currentSplitWeight_{0}; bool needNewSplit_ = true; - std::shared_ptr connector_; std::shared_ptr connectorQueryCtx_; std::unique_ptr dataSource_; bool noMoreSplits_ = false; @@ -90,8 +232,6 @@ class TableScan : public SourceOperator { int32_t maxPreloadedSplits_{0}; - const int32_t maxSplitPreloadPerDriver_{0}; - // Callback passed to getSplitOrFuture() for triggering async preload. The // callback's lifetime is the lifetime of 'this'. This callback can schedule // preloads on an executor. These preloads may outlive the Task and therefore @@ -105,13 +245,6 @@ class TableScan : public SourceOperator { // Count of splits that finished preloading before being read. int32_t numReadyPreloadedSplits_{0}; - vector_size_t readBatchSize_; - vector_size_t maxReadBatchSize_; - - // Exits getOutput() method after this many milliseconds. Zero means 'no - // limit'. - size_t getOutputTimeLimitMs_{0}; - double maxFilteringRatio_{0}; // String shown in ExceptionContext inside DataSource and LazyVector loading. @@ -120,5 +253,8 @@ class TableScan : public SourceOperator { // Holds the current status of the operator. Used when debugging to understand // what operator is doing. std::atomic curStatus_{""}; + + // The total number of raw input rows read up till the last finished split. + uint64_t rawInputRowsSinceLastSplit_{0}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index fbe57f3e34a7..b66848f564b0 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -1586,6 +1586,33 @@ exec::Split Task::getSplitLocked( return split; } +std::shared_ptr Task::getOrAddScaledScanController( + uint32_t splitGroupId, + uint32_t pipelineId, + const core::PlanNodeId& planNodeId, + memory::MemoryPool* nodePool) { + std::lock_guard l(mutex_); + auto& splitGroupState = splitGroupStates_[splitGroupId]; + auto it = splitGroupState.scaledScanControllers.find(planNodeId); + if (it != splitGroupState.scaledScanControllers.end()) { + VELOX_CHECK_NOT_NULL(it->second); + VELOX_CHECK(queryCtx_->queryConfig().tableScanScaledProcessingEnabled()); + return it->second; + } + + if (!queryCtx_->queryConfig().tableScanScaledProcessingEnabled()) { + return nullptr; + } + + splitGroupState.scaledScanControllers.emplace( + planNodeId, + std::make_shared( + nodePool, + numDrivers(pipelineId), + queryCtx_->queryConfig().tableScanScaleUpMemoryUsageRatio())); + return splitGroupState.scaledScanControllers[planNodeId]; +} + void Task::splitFinished(bool fromTableScan, int64_t splitWeight) { std::lock_guard l(mutex_); ++taskStats_.numFinishedSplits; diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 8c11bb6493a9..7176c297d008 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -23,6 +23,7 @@ #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/MergeSource.h" #include "velox/exec/Split.h" +#include "velox/exec/TableScan.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" #include "velox/exec/TaskTraceWriter.h" @@ -502,6 +503,15 @@ class Task : public std::enable_shared_from_this { int32_t maxPreloadSplits = 0, const ConnectorSplitPreloadFunc& preload = nullptr); + /// Returns the scaled scan controller for a given table scan node if the + /// query has configured. This function is called by the table scan operator + /// initialization, and the first invocation creates the controller. + std::shared_ptr getOrAddScaledScanController( + uint32_t splitGroupId, + uint32_t pipelineId, + const core::PlanNodeId& planNodeId, + memory::MemoryPool* nodePool); + void splitFinished(bool fromTableScan, int64_t splitWeight); void multipleSplitsFinished( diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index 7c193240689a..cf3a39703fd4 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -116,6 +116,10 @@ struct SplitGroupState { /// Map of local exchanges keyed on LocalPartition plan node ID. std::unordered_map localExchanges; + /// Map of scaled scan controllers keyed on TableScan plan node ID. + std::unordered_map> + scaledScanControllers; + /// Drivers created and still running for this split group. /// The split group is finished when this numbers reaches zero. uint32_t numRunningDrivers{0}; diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index b57a55e8fee9..8389aae5ba22 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -2481,6 +2481,226 @@ TEST_P(MultiFragmentTest, compression) { test("local://t2", 0.0000001, true); } +TEST_P(MultiFragmentTest, tableScanScaleUp) { + const int numSplits = 20; + std::vector> splitFiles; + std::vector splitVectors; + for (auto i = 0; i < numSplits; ++i) { + auto vectors = makeVectors(10, 1'000); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + splitVectors.insert(splitVectors.end(), vectors.begin(), vectors.end()); + } + + createDuckDbTable(splitVectors); + + struct { + bool scaleEnabled; + double scaleUpMemoryUsageRatio; + double scaleDownMemoryUsageRatio; + bool expectScaleUp; + + std::string debugString() const { + return fmt::format( + "scaleEnabled {}, scaleUpMemoryUsageRatio {}, scaleDownMemoryUsageRatio {}, expectScaleUp {}", + scaleEnabled, + scaleUpMemoryUsageRatio, + scaleDownMemoryUsageRatio, + expectScaleUp); + } + } testSettings[] = { + {false, 0.9, 0.95, false}, + {true, 0.9, 0.95, true}, + {false, 0.00001, 0.2, false}, + {true, 0.00001, 0.2, false}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId; + configSettings_[core::QueryConfig::kTableScanScaleUpEnabled] = + testData.scaleEnabled ? "true" : "false"; + configSettings_[core::QueryConfig::kTableScanScaleUpMemoryUsageRatio] = + std::to_string(testData.scaleUpMemoryUsageRatio); + configSettings_[core::QueryConfig::kTableScanScaleDownMemoryUsageRatio] = + std::to_string(testData.scaleDownMemoryUsageRatio); + + const auto leafPlan = + PlanBuilder() + .tableScan(rowType_) + .capturePlanNodeId(scanNodeId) + .partialAggregation( + {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto leafTaskId = "local://leaf-0"; + auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 128ULL << 20); + const auto numLeafDrivers{4}; + leafTask->start(numLeafDrivers); + addHiveSplits(leafTask, splitFiles); + + const auto finalAggPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam()) + .finalAggregation( + {"c5"}, + {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, + {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto finalAggTaskId = "local://final-agg-0"; + auto finalAggTask = makeTask(finalAggTaskId, finalAggPlan, 0); + const auto numFinalAggrDrivers{1}; + finalAggTask->start(numFinalAggrDrivers); + addRemoteSplits(finalAggTask, {leafTaskId}); + + const auto resultPlan = + PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam()) + .planNode(); + + assertQuery( + resultPlan, + {finalAggTaskId}, + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + + ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); + ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) + << finalAggTask->taskId(); + + auto planStats = toPlanStats(leafTask->taskStats()); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleDown), + 0); + if (testData.expectScaleUp) { + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleUp), + 1); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleUp).count, + 1); + ASSERT_GT( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleUp).sum, + 1); + } else { + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleUp), + 0); + } + } +} + +TEST_P(MultiFragmentTest, tableScanScaleDown) { + const int numSplits = 20; + std::vector> splitFiles; + std::vector splitVectors; + for (auto i = 0; i < numSplits; ++i) { + auto vectors = makeVectors(10, 1'000); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + splitVectors.insert(splitVectors.end(), vectors.begin(), vectors.end()); + } + + createDuckDbTable(splitVectors); + + const auto numLeafDrivers{4}; + std::mutex lock; + std::unordered_set runningDriverIds; + std::atomic_bool blockDriverFlag{true}; + folly::EventCount blockDriverRunning; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::TableScan::getOutput::finishSplit", + std::function(([&](Operator* op) { + bool waitForOtherDriver{true}; + { + std::lock_guard l(lock); + if (runningDriverIds.size() == numLeafDrivers) { + return; + } + runningDriverIds.insert( + op->testingOperatorCtx()->driverCtx()->driverId); + if (runningDriverIds.size() == numLeafDrivers) { + waitForOtherDriver = false; + } + } + if (waitForOtherDriver) { + blockDriverRunning.await([&] { return !blockDriverFlag.load(); }); + } else { + auto* scanOp = static_cast(op); + scanOp->testingScaler()->testingSetMemoryRatios(0.00001, 0.00002); + blockDriverFlag = false; + blockDriverRunning.notifyAll(); + } + }))); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId; + configSettings_[core::QueryConfig::kTableScanScaleUpEnabled] = "true"; + configSettings_[core::QueryConfig::kTableScanScaleUpMemoryUsageRatio] = "0.9"; + configSettings_[core::QueryConfig::kTableScanScaleDownMemoryUsageRatio] = + "0.95"; + + const auto leafPlan = + PlanBuilder() + .tableScan(rowType_) + .capturePlanNodeId(scanNodeId) + .partialAggregation( + {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto leafTaskId = "local://leaf-0"; + auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 128ULL << 20); + leafTask->start(numLeafDrivers); + addHiveSplits(leafTask, splitFiles); + + const auto finalAggPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam()) + .finalAggregation( + {"c5"}, + {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, + {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto finalAggTaskId = "local://final-agg-0"; + auto finalAggTask = makeTask(finalAggTaskId, finalAggPlan, 0); + const auto numFinalAggrDrivers{1}; + finalAggTask->start(numFinalAggrDrivers); + addRemoteSplits(finalAggTask, {leafTaskId}); + + const auto resultPlan = + PlanBuilder().exchange(finalAggPlan->outputType(), GetParam()).planNode(); + + assertQuery( + resultPlan, + {finalAggTaskId}, + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + + ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); + ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) + << finalAggTask->taskId(); + + auto planStats = toPlanStats(leafTask->taskStats()); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleUp), 1); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleUp).count, 1); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleUp).sum, 3); + ASSERT_EQ( + planStats.at(scanNodeId).customStats.count(TableScan::kNumScaleDown), 1); + ASSERT_GT( + planStats.at(scanNodeId).customStats.at(TableScan::kNumScaleDown).count, + 0); +} + VELOX_INSTANTIATE_TEST_SUITE_P( MultiFragmentTest, MultiFragmentTest, diff --git a/velox/exec/tests/utils/OperatorTestBase.cpp b/velox/exec/tests/utils/OperatorTestBase.cpp index 69300dafb0b3..abca5d66d57b 100644 --- a/velox/exec/tests/utils/OperatorTestBase.cpp +++ b/velox/exec/tests/utils/OperatorTestBase.cpp @@ -117,7 +117,7 @@ void OperatorTestBase::setupMemory( } void OperatorTestBase::resetMemory() { - OperatorTestBase::setupMemory(8L << 30, 6L << 30, 0, 512 << 20, 0, 0, 0); + OperatorTestBase::setupMemory(256L << 30, 6L << 30, 0, 512 << 20, 0, 0, 0); } void OperatorTestBase::SetUp() {