From a82450a395803da3dd41f84c5bab2ef39aed91ee Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Wed, 18 Dec 2024 17:23:10 -0800 Subject: [PATCH] feat: Add auto table scan scaling based on memory usage (#11879) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11879 Adds auto table scan scaling support to solve the query OOM caused by high concurrent memory intensive table scan operations. Instead of running all the table scan threads at the start of the query, we start from running one single table scan thread and gradually scheduling more table scan thread when there is sufficient free available memory capacity for the query (measured as the current used memory versus the query max capacity). When the query is approaching its max limit, we stop scheduling more table scan threads to prevent OOM caused by table scan. The scale decision happens when a table scan operator finishes process a non-empty split. A scale controller is added to each table scan plan node for this coordinated control and two memory ratios are defined for scale up decision, and it estimate the per-scan driver memory usage based on the memory usage report when a table scan thread finishes a non-empty split. The Meta internal test shows that a query failed with OOM right after 1 min execution with 10 leaf threads, and finished in 2 hrs if reduced leaf thread count to 5. Java took 1 hour to finish with persistent shuffle (LBM). With auto scale, the query finish on Prestissimo in 30 mins. We don't expect this feature to be enabled by default as adhoc small query needs to run all the scan threads in parallel at the start. This will only be used for some large pipeline that could be enabled by query config in velox and session properties in Prestissimo Reviewed By: Yuhta, oerling Differential Revision: D67114511 fbshipit-source-id: f45e8be2b76b0383ff6e0ac1b8d8219adbde60ef --- velox/core/QueryConfig.h | 26 ++ velox/docs/configs.rst | 37 +- velox/docs/monitoring/stats.rst | 15 + velox/exec/CMakeLists.txt | 1 + velox/exec/Driver.cpp | 9 +- velox/exec/Driver.h | 16 + velox/exec/ScaledScanController.cpp | 190 ++++++++++ velox/exec/ScaledScanController.h | 123 +++++++ velox/exec/TableScan.cpp | 84 ++++- velox/exec/TableScan.h | 50 ++- velox/exec/Task.cpp | 39 ++ velox/exec/Task.h | 13 + velox/exec/TaskStructs.h | 4 + velox/exec/tests/CMakeLists.txt | 1 + velox/exec/tests/MultiFragmentTest.cpp | 123 +++++++ velox/exec/tests/ScaledScanControllerTest.cpp | 338 ++++++++++++++++++ velox/tool/trace/TraceReplayRunner.h | 1 - 17 files changed, 1039 insertions(+), 31 deletions(-) create mode 100644 velox/exec/ScaledScanController.cpp create mode 100644 velox/exec/ScaledScanController.h create mode 100644 velox/exec/tests/ScaledScanControllerTest.cpp diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index e6c4032768b1..90105af887e6 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -472,6 +472,24 @@ class QueryConfig { static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold = "scaled_writer_min_processed_bytes_rebalance_threshold"; + /// If true, enables the scaled table scan processing. For each table scan + /// plan node, a scan controller is used to control the number of running scan + /// threads based on the query memory usage. It keeps increasing the number of + /// running threads until the query memory usage exceeds the threshold defined + /// by 'table_scan_scale_up_memory_usage_ratio'. + static constexpr const char* kTableScanScaledProcessingEnabled = + "table_scan_scaled_processing_enabled"; + + /// The query memory usage ratio used by scan controller to decide if it can + /// increase the number of running scan threads. When the query memory usage + /// is below this ratio, the scan controller keeps increasing the running scan + /// thread for scale up, and stop once exceeds this ratio. The value is in the + /// range of [0, 1]. + /// + /// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true. + static constexpr const char* kTableScanScaleUpMemoryUsageRatio = + "table_scan_scale_up_memory_usage_ratio"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -880,6 +898,14 @@ class QueryConfig { kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20); } + bool tableScanScaledProcessingEnabled() const { + return get(kTableScanScaledProcessingEnabled, false); + } + + double tableScanScaleUpMemoryUsageRatio() const { + return get(kTableScanScaleUpMemoryUsageRatio, 0.7); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index a932ecd57558..9fe2534b1262 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -390,6 +390,23 @@ Table Scan - integer - 2 - Maximum number of splits to preload per driver. Set to 0 to disable preloading. + * - table_scan_scaled_processing_enabled + - bool + - false + - If true, enables the scaled table scan processing. For each table scan + plan node, a scan controller is used to control the number of running scan + threads based on the query memory usage. It keeps increasing the number of + running threads until the query memory usage exceeds the threshold defined + by 'table_scan_scale_up_memory_usage_ratio'. + * - table_scan_scale_up_memory_usage_ratio + - double + - 0.5 + - The query memory usage ratio used by scan controller to decide if it can + increase the number of running scan threads. When the query memory usage + is below this ratio, the scan controller scale up the scan processing by + increasing the number of running scan threads, and stop once exceeds this + ratio. The value is in the range of [0, 1]. This only applies if + 'table_scan_scaled_processing_enabled' is true. Table Writer ------------ @@ -414,26 +431,26 @@ Table Writer - double - 0.7 - The max ratio of a query used memory to its max capacity, and the scale - - writer exchange stops scaling writer processing if the query's current - - memory usage exceeds this ratio. The value is in the range of (0, 1]. + writer exchange stops scaling writer processing if the query's current + memory usage exceeds this ratio. The value is in the range of (0, 1]. * - scaled_writer_max_partitions_per_writer - integer - 128 - The max number of logical table partitions that can be assigned to a - - single table writer thread. The logical table partition is used by local - - exchange writer for writer scaling, and multiple physical table - - partitions can be mapped to the same logical table partition based on the - - hash value of calculated partitioned ids. + single table writer thread. The logical table partition is used by local + exchange writer for writer scaling, and multiple physical table + partitions can be mapped to the same logical table partition based on the + hash value of calculated partitioned ids. + * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - integer - 128MB - * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - Minimum amount of data processed by a logical table partition to trigger - - writer scaling if it is detected as overloaded by scale wrirer exchange. + writer scaling if it is detected as overloaded by scale wrirer exchange. * - scaled_writer_min_processed_bytes_rebalance_threshold - - Minimum amount of data processed by all the logical table partitions to - - trigger skewed partition rebalancing by scale writer exchange. - integer - 256MB + - Minimum amount of data processed by all the logical table partitions to + trigger skewed partition rebalancing by scale writer exchange. Hive Connector -------------- diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index 53538d854974..a6feb1d32cb0 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -90,6 +90,21 @@ These stats are reported only by HashBuild and HashAggregation operators. - Time spent on building the hash table from rows collected by all the hash build operators. This stat is only reported by the HashBuild operator. +TableScan +--------- +These stats are reported only by TableScan operator + +.. list-table:: + :widths: 50 25 50 + :header-rows: 1 + + * - Stats + - Unit + - Description + * - numRunningScanThreads + - + - The number of running table scan drivers. + TableWriter ----------- These stats are reported only by TableWriter operator diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 80bd83d2084e..d519410745c9 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -74,6 +74,7 @@ velox_add_library( RowsStreamingWindowBuild.cpp RowContainer.cpp RowNumber.cpp + ScaledScanController.cpp ScaleWriterLocalPartition.cpp SortBuffer.cpp SortedAggregations.cpp diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index e5c4656d6521..9a6a4740ddd8 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -631,11 +631,10 @@ StopReason Driver::runInternal( nextOp, curOperatorId_ + 1, kOpMethodAddInput); - - // The next iteration will see if operators_[i + 1] has - // output now that it got input. - i += 2; }); + // The next iteration will see if operators_[i + 1] has + // output now that it got input. + i += 2; continue; } else { stop = task()->shouldStop(); @@ -1129,6 +1128,8 @@ std::string blockingReasonToString(BlockingReason reason) { return "kYield"; case BlockingReason::kWaitForArbitration: return "kWaitForArbitration"; + case BlockingReason::kWaitForScanScaleUp: + return "kWaitForScanScaleUp"; default: VELOX_UNREACHABLE( fmt::format("Unknown blocking reason {}", static_cast(reason))); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7d46017187e4..0028f9ac438a 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -210,6 +210,10 @@ enum class BlockingReason { /// Operator is blocked waiting for its associated query memory arbitration to /// finish. kWaitForArbitration, + /// For a table scan operator, it is blocked waiting for the scan controller + /// to increase the number of table scan processing threads to start + /// processing. + kWaitForScanScaleUp, }; std::string blockingReasonToString(BlockingReason reason); @@ -702,6 +706,18 @@ struct DriverFactory { return false; } + /// Returns true if the pipeline gets data from a table scan. The function + /// sets plan node id in 'planNodeId'. + bool needsTableScan(core::PlanNodeId& planNodeId) const { + VELOX_CHECK(!planNodes.empty()); + if (auto scanNode = std::dynamic_pointer_cast( + planNodes.front())) { + planNodeId = scanNode->id(); + return true; + } + return false; + } + /// Returns plan node IDs for which Hash Join Bridges must be created based /// on this pipeline. std::vector needsHashJoinBridges() const; diff --git a/velox/exec/ScaledScanController.cpp b/velox/exec/ScaledScanController.cpp new file mode 100644 index 000000000000..85bd3537120b --- /dev/null +++ b/velox/exec/ScaledScanController.cpp @@ -0,0 +1,190 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/ScaledScanController.h" + +using facebook::velox::common::testutil::TestValue; + +namespace facebook::velox::exec { + +ScaledScanController::ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio) + : queryPool_(nodePool->root()), + nodePool_(nodePool), + numDrivers_(numDrivers), + scaleUpMemoryUsageRatio_(scaleUpMemoryUsageRatio), + driverPromises_(numDrivers_) { + VELOX_CHECK_NOT_NULL(queryPool_); + VELOX_CHECK_NOT_NULL(nodePool_); + VELOX_CHECK_GT(numDrivers_, 0); + VELOX_CHECK_GE(scaleUpMemoryUsageRatio_, 0.0); + VELOX_CHECK_LE(scaleUpMemoryUsageRatio_, 1.0); +} + +bool ScaledScanController::shouldStop( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::lock_guard l(lock_); + if (closed_) { + return false; + } + return shouldStopLocked(driverIdx, future); +} + +bool ScaledScanController::shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK(!closed_); + if (driverIdx < numRunningDrivers_) { + return false; + } + + VELOX_CHECK(!driverPromises_[driverIdx].has_value()); + auto [driverPromise, driverFuture] = makeVeloxContinuePromiseContract( + fmt::format("Table scan driver {} scale promise", driverIdx)); + driverPromises_[driverIdx] = std::move(driverPromise); + *future = std::move(driverFuture); + return true; +} + +void ScaledScanController::updateAndTryScale( + uint32_t driverIdx, + uint64_t memoryUsage) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::optional driverPromise; + SCOPE_EXIT { + if (driverPromise.has_value()) { + driverPromise->setValue(); + } + }; + { + std::lock_guard l(lock_); + VELOX_CHECK_LT(driverIdx, numRunningDrivers_); + + if (closed_) { + return; + } + + updateDriverScanUsageLocked(driverIdx, memoryUsage); + + tryScaleLocked(driverPromise); + } +} + +void ScaledScanController::updateDriverScanUsageLocked( + uint32_t driverIdx, + uint64_t memoryUsage) { + if (estimatedDriverUsage_ == 0) { + estimatedDriverUsage_ = memoryUsage; + } else { + estimatedDriverUsage_ = (estimatedDriverUsage_ * 3 + memoryUsage) / 4; + } + + if (numDriverReportedUsage_ == numRunningDrivers_) { + return; + } + VELOX_CHECK_EQ(numDriverReportedUsage_ + 1, numRunningDrivers_); + + if (driverIdx + 1 < numRunningDrivers_) { + return; + } + VELOX_CHECK_EQ(driverIdx, numRunningDrivers_ - 1); + ++numDriverReportedUsage_; +} + +void ScaledScanController::tryScaleLocked( + std::optional& driverPromise) { + VELOX_CHECK_LE(numDriverReportedUsage_, numRunningDrivers_); + + if (numRunningDrivers_ == numDrivers_) { + return; + } + if (numDriverReportedUsage_ < numRunningDrivers_) { + // We shall only make the next scale up decision until we have received + // the memory usage updates from all the running scan drivers. + return; + } + + const uint64_t peakNodeUsage = nodePool_->peakBytes(); + const uint64_t estimatedPeakNodeUsageAfterScale = std::max( + estimatedDriverUsage_ * (numRunningDrivers_ + 1), + peakNodeUsage + estimatedDriverUsage_); + + const uint64_t currNodeUsage = nodePool_->reservedBytes(); + const uint64_t currQueryUsage = queryPool_->reservedBytes(); + const uint64_t currOtherUsage = + currQueryUsage > currNodeUsage ? currQueryUsage - currNodeUsage : 0; + + const uint64_t estimatedQueryUsageAfterScale = std::max( + currQueryUsage + estimatedDriverUsage_, + currOtherUsage + estimatedPeakNodeUsageAfterScale); + + const uint64_t maxQueryCapacity = queryPool_->maxCapacity(); + if (estimatedQueryUsageAfterScale > + maxQueryCapacity * scaleUpMemoryUsageRatio_) { + return; + } + + scaleUpLocked(driverPromise); +} + +void ScaledScanController::scaleUpLocked( + std::optional& driverPromise) { + VELOX_CHECK_LT(numRunningDrivers_, numDrivers_); + + ++numRunningDrivers_; + if (driverPromises_[numRunningDrivers_ - 1].has_value()) { + driverPromise = std::move(driverPromises_[numRunningDrivers_ - 1]); + driverPromises_[numRunningDrivers_ - 1].reset(); + } +} + +ScaledScanController::~ScaledScanController() { + close(); +} + +bool ScaledScanController::close() { + std::vector promises; + { + std::lock_guard l(lock_); + if (closed_) { + return false; + } + + promises.reserve(driverPromises_.size()); + for (auto& promise : driverPromises_) { + if (promise.has_value()) { + promises.emplace_back(std::move(promise.value())); + promise.reset(); + } + } + closed_ = true; + } + + for (auto& promise : promises) { + promise.setValue(); + } + return true; +} + +std::string ScaledScanController::Stats::toString() const { + return fmt::format("numRunningDrivers: {}", numRunningDrivers); +} +} // namespace facebook::velox::exec diff --git a/velox/exec/ScaledScanController.h b/velox/exec/ScaledScanController.h new file mode 100644 index 000000000000..9b046225a0d2 --- /dev/null +++ b/velox/exec/ScaledScanController.h @@ -0,0 +1,123 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/memory/Memory.h" + +namespace facebook::velox::exec { + +namespace test { +class ScaledScanControllerTestHelper; +} + +/// Controller used to scales table scan processing based on the query memory +/// usage. +class ScaledScanController { + public: + /// 'nodePool' is the table scan node pool. 'numDrivers' is number of the + /// table scan drivers. 'scaleUpMemoryUsageRatio' specifies the memory usage + /// ratio used to make scan scale up decision. + ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio); + + ~ScaledScanController(); + + ScaledScanController(const ScaledScanController&) = delete; + ScaledScanController(ScaledScanController&&) = delete; + ScaledScanController& operator=(const ScaledScanController&) = delete; + ScaledScanController& operator=(ScaledScanController&&) = delete; + + /// Invoked by a scan operator to check if it needs to stop waiting for scan + /// up to start processing. If so, 'future' is set to a future that will be + /// ready when the controller decides to start this scan operator processing, + /// or all the splits from the scan node have been dispatched to finish all + /// the scan operators. 'driverIcx' is the driver id of the scan operator. + /// Initially, only the first scan operator at driver index 0 is allowed to + /// run. + bool shouldStop(uint32_t driverIdx, ContinueFuture* future); + + /// Invoked by a scan operator to update per-driver memory usage estimation + /// after finish processing a non-empty split. 'driverIdx' is the driver id of + /// the scan operator. 'driverMemoryUsage' is the peak memory usage of the + /// scan operator. + void updateAndTryScale(uint32_t driverIdx, uint64_t driverMemoryUsage); + + struct Stats { + uint32_t numRunningDrivers{0}; + + std::string toString() const; + }; + + Stats stats() const { + std::lock_guard l(lock_); + return {.numRunningDrivers = numRunningDrivers_}; + } + + /// Invoked by the closed scan operator to close the controller. It returns + /// true on the first invocation, and otherwise false. + bool close(); + + void testingSetMemoryRatio(double scaleUpMemoryUsageRatio) { + std::lock_guard l(lock_); + *const_cast(&scaleUpMemoryUsageRatio_) = scaleUpMemoryUsageRatio; + } + + private: + // Invoked to check if we can scale up scan processing. If so, call + // 'scaleUpLocked' for scale up processing. + void tryScaleLocked(std::optional& driverePromise); + + // Invoked to scale up scan processing by bumping up the number of running + // scan drivers by one. 'drverPromise' returns the promise to fulfill if the + // scaled scan driver has been stopped. + void scaleUpLocked(std::optional& driverePromise); + + // Invoked to check if we need to stop waiting for scale up processing of the + // specified scan driver. If 'driverIdx' is beyond 'numRunningDrivers_', then + // a promise is set in 'driverPromises_' and the associated future is returned + // in 'future'. + bool shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future); + + /// Invoked to update the per-driver memory usage estimation with a new driver + /// report. + void updateDriverScanUsageLocked(uint32_t driverIdx, uint64_t memoryUsage); + + memory::MemoryPool* const queryPool_; + memory::MemoryPool* const nodePool_; + const uint32_t numDrivers_; + const double scaleUpMemoryUsageRatio_; + + mutable std::mutex lock_; + uint32_t numRunningDrivers_{1}; + + // The estimated per-driver memory usage of the table scan node. + uint64_t estimatedDriverUsage_{0}; + + // The number of drivers that have reported memory usage. + uint32_t numDriverReportedUsage_{0}; + + // The driver resume promises with one per each driver index. + std::vector> driverPromises_; + + bool closed_{false}; + + friend class test::ScaledScanControllerTestHelper; +}; +} // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 7586a52f763f..232786f14ce1 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -37,19 +37,22 @@ TableScan::TableScan( tableHandle_(tableScanNode->tableHandle()), columnHandles_(tableScanNode->assignments()), driverCtx_(driverCtx), + maxSplitPreloadPerDriver_( + driverCtx_->queryConfig().maxSplitPreloadPerDriver()), + maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), connectorPool_(driverCtx_->task->addConnectorPoolLocked( planNodeId(), driverCtx_->pipelineId, driverCtx_->driverId, operatorType(), tableHandle_->connectorId())), - maxSplitPreloadPerDriver_( - driverCtx_->queryConfig().maxSplitPreloadPerDriver()), - readBatchSize_(driverCtx_->queryConfig().preferredOutputBatchRows()), - maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), + connector_(connector::getConnector(tableHandle_->connectorId())), getOutputTimeLimitMs_( - driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()) { - connector_ = connector::getConnector(tableHandle_->connectorId()); + driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()), + scaledController_(driverCtx_->task->getScaledScanControllerLocked( + driverCtx_->splitGroupId, + planNodeId())) { + readBatchSize_ = driverCtx_->queryConfig().preferredOutputBatchRows(); } folly::dynamic TableScan::toJson() const { @@ -76,10 +79,20 @@ bool TableScan::shouldStop(StopReason taskStopReason) const { RowVectorPtr TableScan::getOutput() { auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; }); + VELOX_CHECK(!blockingFuture_.valid()); + blockingReason_ = BlockingReason::kNotBlocked; + if (noMoreSplits_) { return nullptr; } + // Check if we need to wait for scale up. We expect only wait once on startup. + if (shouldWaitForScaleUp()) { + VELOX_CHECK(blockingFuture_.valid()); + VELOX_CHECK_EQ(blockingReason_, BlockingReason::kWaitForScanScaleUp); + return nullptr; + } + curStatus_ = "getOutput: enter"; const auto startTimeMs = getCurrentTimeMs(); for (;;) { @@ -259,6 +272,7 @@ RowVectorPtr TableScan::getOutput() { curStatus_ = "getOutput: updating stats_.rawInput"; lockedStats->rawInputPositions = dataSource_->getCompletedRows(); lockedStats->rawInputBytes = dataSource_->getCompletedBytes(); + RowVectorPtr data = std::move(dataOptional).value(); if (data != nullptr) { if (data->size() > 0) { @@ -274,6 +288,7 @@ RowVectorPtr TableScan::getOutput() { } } + uint64_t currNumRawInputRows{0}; { curStatus_ = "getOutput: updating stats_.preloadedSplits"; auto lockedStats = stats_.wlock(); @@ -287,12 +302,50 @@ RowVectorPtr TableScan::getOutput() { "readyPreloadedSplits", RuntimeCounter(numReadyPreloadedSplits_)); numReadyPreloadedSplits_ = 0; } + currNumRawInputRows = lockedStats->rawInputPositions; } + VELOX_CHECK_LE(rawInputRowsSinceLastSplit_, currNumRawInputRows); + const bool emptySplit = currNumRawInputRows == rawInputRowsSinceLastSplit_; + rawInputRowsSinceLastSplit_ = currNumRawInputRows; curStatus_ = "getOutput: task->splitFinished"; driverCtx_->task->splitFinished(true, currentSplitWeight_); needNewSplit_ = true; + + // We only update scaled controller when we have finished a non-empty split. + // Otherwise, it can lead to the wrong scale up decisions if the first few + // splits are empty. Then we only report the memory usage for the file + // footer read which is much smaller the actual memory usage when read from + // a non-empty split. This can cause query OOM as we run too many scan + // drivers with each use non-trivial amount of memory. + if (!emptySplit) { + tryScaleUp(); + } + } +} + +bool TableScan::shouldWaitForScaleUp() { + if (scaledController_ == nullptr) { + return false; + } + + curStatus_ = "getOutput: shouldWaitForScaleUp"; + if (!scaledController_->shouldStop( + operatorCtx_->driverCtx()->driverId, &blockingFuture_)) { + VELOX_CHECK(!blockingFuture_.valid()); + return false; + } + blockingReason_ = BlockingReason::kWaitForScanScaleUp; + return true; +} + +void TableScan::tryScaleUp() { + if (scaledController_ == nullptr) { + return; } + + scaledController_->updateAndTryScale( + operatorCtx_->driverCtx()->driverId, pool()->peakBytes()); } void TableScan::preload( @@ -380,4 +433,23 @@ void TableScan::addDynamicFilter( stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer); } +void TableScan::close() { + Operator::close(); + + if (scaledController_ == nullptr) { + return; + } + + // Report the scaled controller stats by the first finished scan operator at + // which point all the splits have been dispatched. + if (!scaledController_->close()) { + return; + } + + const auto scaledStats = scaledController_->stats(); + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + TableScan::kNumRunningScaleThreads, + RuntimeCounter(scaledStats.numRunningDrivers)); +} } // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index deec109b02c1..548ae249ffb9 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -17,6 +17,7 @@ #include "velox/core/PlanNode.h" #include "velox/exec/Operator.h" +#include "velox/exec/ScaledScanController.h" namespace facebook::velox::exec { @@ -41,6 +42,8 @@ class TableScan : public SourceOperator { bool isFinished() override; + void close() override; + bool canAddDynamicFilter() const override { return connector_->canAddDynamicFilter(); } @@ -50,6 +53,18 @@ class TableScan : public SourceOperator { column_index_t outputChannel, const std::shared_ptr& filter) override; + /// The name of runtime stats specific to table scan. + /// The number of running table scan drivers. + /// + /// NOTE: we only report the number of running scan drivers at the point that + /// all the splits have been dispatched. + static inline const std::string kNumRunningScaleThreads{ + "numRunningScaleThreads"}; + + std::shared_ptr testingScaledController() const { + return scaledController_; + } + private: // Checks if this table scan operator needs to yield before processing the // next split. @@ -70,17 +85,37 @@ class TableScan : public SourceOperator { // done, it will be made when needed. void preload(const std::shared_ptr& split); + // Invoked by scan operator to check if it needs to stop to wait for scale up. + bool shouldWaitForScaleUp(); + + // Invoked after scan operator finishes processing a non-empty split to update + // the scan driver memory usage and check to see if we need to scale up scan + // processing or not. + void tryScaleUp(); + const std::shared_ptr tableHandle_; const std:: unordered_map> columnHandles_; DriverCtx* const driverCtx_; + const int32_t maxSplitPreloadPerDriver_{0}; + const vector_size_t maxReadBatchSize_; memory::MemoryPool* const connectorPool_; + const std::shared_ptr connector_; + // Exits getOutput() method after this many milliseconds. Zero means 'no + // limit'. + const size_t getOutputTimeLimitMs_{0}; + + // If set, used for scan scale processing. It is shared by all the scan + // operators instantiated from the same table scan node. + const std::shared_ptr scaledController_; + + vector_size_t readBatchSize_; + ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; int64_t currentSplitWeight_{0}; bool needNewSplit_ = true; - std::shared_ptr connector_; std::shared_ptr connectorQueryCtx_; std::unique_ptr dataSource_; bool noMoreSplits_ = false; @@ -90,8 +125,6 @@ class TableScan : public SourceOperator { int32_t maxPreloadedSplits_{0}; - const int32_t maxSplitPreloadPerDriver_{0}; - // Callback passed to getSplitOrFuture() for triggering async preload. The // callback's lifetime is the lifetime of 'this'. This callback can schedule // preloads on an executor. These preloads may outlive the Task and therefore @@ -105,13 +138,6 @@ class TableScan : public SourceOperator { // Count of splits that finished preloading before being read. int32_t numReadyPreloadedSplits_{0}; - vector_size_t readBatchSize_; - vector_size_t maxReadBatchSize_; - - // Exits getOutput() method after this many milliseconds. Zero means 'no - // limit'. - size_t getOutputTimeLimitMs_{0}; - double maxFilteringRatio_{0}; // String shown in ExceptionContext inside DataSource and LazyVector loading. @@ -120,5 +146,9 @@ class TableScan : public SourceOperator { // Holds the current status of the operator. Used when debugging to understand // what operator is doing. std::atomic curStatus_{""}; + + // The total number of raw input rows read up till the last finished split. + // This is used to detect if a finished split is empty or not. + uint64_t rawInputRowsSinceLastSplit_{0}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index fe97ad27dc4f..587c571b76f9 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -1122,6 +1122,14 @@ void Task::createSplitGroupStateLocked(uint32_t splitGroupId) { addNestedLoopJoinBridgesLocked( splitGroupId, factory->needsNestedLoopJoinBridges()); addCustomJoinBridgesLocked(splitGroupId, factory->planNodes); + + core::PlanNodeId tableScanNodeId; + if (queryCtx_->queryConfig().tableScanScaledProcessingEnabled() && + factory->needsTableScan(tableScanNodeId)) { + VELOX_CHECK(!tableScanNodeId.empty()); + addScaledScanControllerLocked( + splitGroupId, tableScanNodeId, factory->numDrivers); + } } } @@ -1632,6 +1640,37 @@ exec::Split Task::getSplitLocked( return split; } +std::shared_ptr Task::getScaledScanControllerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId) { + auto& splitGroupState = splitGroupStates_[splitGroupId]; + auto it = splitGroupState.scaledScanControllers.find(planNodeId); + if (it == splitGroupState.scaledScanControllers.end()) { + VELOX_CHECK(!queryCtx_->queryConfig().tableScanScaledProcessingEnabled()); + return nullptr; + } + + VELOX_CHECK(queryCtx_->queryConfig().tableScanScaledProcessingEnabled()); + VELOX_CHECK_NOT_NULL(it->second); + return it->second; +} + +void Task::addScaledScanControllerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId, + uint32_t numDrivers) { + VELOX_CHECK(queryCtx_->queryConfig().tableScanScaledProcessingEnabled()); + + auto& splitGroupState = splitGroupStates_[splitGroupId]; + VELOX_CHECK_EQ(splitGroupState.scaledScanControllers.count(planNodeId), 0); + splitGroupState.scaledScanControllers.emplace( + planNodeId, + std::make_shared( + getOrAddNodePool(planNodeId), + numDrivers, + queryCtx_->queryConfig().tableScanScaleUpMemoryUsageRatio())); +} + void Task::splitFinished(bool fromTableScan, int64_t splitWeight) { std::lock_guard l(mutex_); ++taskStats_.numFinishedSplits; diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 84680e90d420..e31b64435d59 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -23,6 +23,7 @@ #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/MergeSource.h" #include "velox/exec/Split.h" +#include "velox/exec/TableScan.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" #include "velox/exec/TaskTraceWriter.h" @@ -430,6 +431,12 @@ class Task : public std::enable_shared_from_this { int32_t maxPreloadSplits = 0, const ConnectorSplitPreloadFunc& preload = nullptr); + /// Returns the scaled scan controller for a given table scan node if the + /// query has configured. + std::shared_ptr getScaledScanControllerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId); + void splitFinished(bool fromTableScan, int64_t splitWeight); void multipleSplitsFinished( @@ -779,6 +786,12 @@ class Task : public std::enable_shared_from_this { // Invoked to initialize the memory pool for this task on creation. void initTaskPool(); + // Creates a scaled scan controller for a given table scan node. + void addScaledScanControllerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId, + uint32_t numDrivers); + // Creates new instance of memory pool for a plan node, stores it in the task // to ensure lifetime and returns a raw pointer. memory::MemoryPool* getOrAddNodePool(const core::PlanNodeId& planNodeId); diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index 7c193240689a..cf3a39703fd4 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -116,6 +116,10 @@ struct SplitGroupState { /// Map of local exchanges keyed on LocalPartition plan node ID. std::unordered_map localExchanges; + /// Map of scaled scan controllers keyed on TableScan plan node ID. + std::unordered_map> + scaledScanControllers; + /// Drivers created and still running for this split group. /// The split group is finished when this numbers reaches zero. uint32_t numRunningDrivers{0}; diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 3e8fb5094bca..9892ce4afab1 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -74,6 +74,7 @@ add_executable( RoundRobinPartitionFunctionTest.cpp RowContainerTest.cpp RowNumberTest.cpp + ScaledScanControllerTest.cpp ScaleWriterLocalPartitionTest.cpp SortBufferTest.cpp SpillerTest.cpp diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index b57a55e8fee9..3baff771ea9d 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -2481,6 +2481,129 @@ TEST_P(MultiFragmentTest, compression) { test("local://t2", 0.0000001, true); } +TEST_P(MultiFragmentTest, scaledTableScan) { + const int numSplits = 20; + std::vector> splitFiles; + std::vector splitVectors; + for (auto i = 0; i < numSplits; ++i) { + auto vectors = makeVectors(10, 1'000); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + splitVectors.insert(splitVectors.end(), vectors.begin(), vectors.end()); + } + + createDuckDbTable(splitVectors); + + struct { + bool scaleEnabled; + double scaleUpMemoryUsageRatio; + bool expectScaleUp; + + std::string debugString() const { + return fmt::format( + "scaleEnabled {}, scaleUpMemoryUsageRatio {}, expectScaleUp {}", + scaleEnabled, + scaleUpMemoryUsageRatio, + expectScaleUp); + } + } testSettings[] = { + {false, 0.9, false}, + {true, 0.9, true}, + {false, 1.0, false}, + {true, 1.0, true}, + {false, 0.00001, false}, + {true, 0.00001, false}, + {false, 0.0, false}, + {true, 0.0, false}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId; + configSettings_[core::QueryConfig::kTableScanScaledProcessingEnabled] = + testData.scaleEnabled ? "true" : "false"; + configSettings_[core::QueryConfig::kTableScanScaleUpMemoryUsageRatio] = + std::to_string(testData.scaleUpMemoryUsageRatio); + + const auto leafPlan = + PlanBuilder() + .tableScan(rowType_) + .capturePlanNodeId(scanNodeId) + .partialAggregation( + {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto leafTaskId = "local://leaf-0"; + auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 128ULL << 20); + const auto numLeafDrivers{4}; + leafTask->start(numLeafDrivers); + addHiveSplits(leafTask, splitFiles); + + const auto finalAggPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam()) + .finalAggregation( + {"c5"}, + {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, + {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto finalAggTaskId = "local://final-agg-0"; + auto finalAggTask = makeTask(finalAggTaskId, finalAggPlan, 0); + const auto numFinalAggrDrivers{1}; + finalAggTask->start(numFinalAggrDrivers); + addRemoteSplits(finalAggTask, {leafTaskId}); + + const auto resultPlan = + PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam()) + .planNode(); + + assertQuery( + resultPlan, + {finalAggTaskId}, + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + + ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); + ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) + << finalAggTask->taskId(); + + auto planStats = toPlanStats(leafTask->taskStats()); + if (testData.scaleEnabled) { + ASSERT_EQ( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 1); + if (testData.expectScaleUp) { + ASSERT_GE( + planStats.at(scanNodeId) + .customStats[TableScan::kNumRunningScaleThreads] + .sum, + 1); + ASSERT_LE( + planStats.at(scanNodeId) + .customStats[TableScan::kNumRunningScaleThreads] + .sum, + numLeafDrivers); + } else { + ASSERT_EQ( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 1); + } + } else { + ASSERT_EQ( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 0); + } + } +} + VELOX_INSTANTIATE_TEST_SUITE_P( MultiFragmentTest, MultiFragmentTest, diff --git a/velox/exec/tests/ScaledScanControllerTest.cpp b/velox/exec/tests/ScaledScanControllerTest.cpp new file mode 100644 index 000000000000..fbd68548ef37 --- /dev/null +++ b/velox/exec/tests/ScaledScanControllerTest.cpp @@ -0,0 +1,338 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/ScaledScanController.h" + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/core/PlanNode.h" +#include "velox/exec/PlanNodeStats.h" + +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; + +namespace facebook::velox::exec::test { +class ScaledScanControllerTestHelper { + public: + explicit ScaledScanControllerTestHelper(ScaledScanController* controller) + : controller_(controller) { + VELOX_CHECK_NOT_NULL(controller_); + } + + uint64_t estimatedDriverUsage() const { + return controller_->estimatedDriverUsage_; + } + + private: + ScaledScanController* const controller_; +}; + +class ScaledScanControllerTest : public OperatorTestBase { + protected: + std::shared_ptr rootPool(uint64_t maxCapacity) { + return memory::memoryManager()->addRootPool("", maxCapacity); + } +}; + +TEST_F(ScaledScanControllerTest, basic) { + struct { + // Specifies the query max capacity. + uint64_t queryCapacity; + // Specifies the scale up memory usage ratio. + double scaleUpMemoryUsageRatio; + // Specifies how much memory has been used by the query when we test for + // scale. + uint64_t queryMemoryUsage; + // Specifies how much memory has been used by the scan node when we test for + // scale. + uint64_t nodeMemoryUsage; + // Specifies the peak memory usage of the scan node when we test for scale. + uint64_t nodePeakMemoryUsage; + // Specifies the number of scan drivers in the query. + uint32_t numDrivers; + // Specifies the updates of the memory usage of the scan drivers. The test + // will invoke scale controller update API for each update in the order of + // the update vector list. + std::vector> driverMemoryUsageUpdates; + // Specifies the expected number of running scan drivers after the update or + // the end of the test. + uint32_t expectedNumRunningDrivers; + + std::string debugString() const { + return fmt::format( + "queryCapacity {}, scaleUpMemoryUsageRatio {}, queryMemoryUsage {}, nodeMemoryUsage {}, nodePeakMemoryUsage {}, numDrivers {}, expectedNumRunningDrivers {}", + succinctBytes(queryCapacity), + scaleUpMemoryUsageRatio, + succinctBytes(queryMemoryUsage), + succinctBytes(nodeMemoryUsage), + succinctBytes(nodePeakMemoryUsage), + numDrivers, + expectedNumRunningDrivers); + } + } testSettings[] = { + // Test case that we can't scale up because of the query memory usage + // ratio which is set to 0. + // 1 scan drivers in total. + {256 << 20, 0.0, 1 << 20, 1 << 20, 1 << 20, 1, {{0, 1 << 20}}, 1}, + // 4 scan drivers in total. + {256 << 20, + 0.0, + 32 << 20, + 16 << 20, + 16 << 20, + 4, + {{0, 1 << 20}, {0, 4 << 20}}, + 1}, + + // Test case that we can only scale up to two drivers as we only update + // stats from one driver so can't scale up beyond two. + {256 << 20, + 0.9, + 32 << 20, + 32 << 20, + 32 << 20, + 4, + {{0, 4 << 20}, {0, 4 << 20}}, + 2}, + + // Test case that we can only scale up to three drivers as we only update + // stats from two drivers so can't scale up beyond three. + {256 << 20, + 0.9, + 32 << 20, + 32 << 20, + 32 << 20, + 4, + {{0, 4 << 20}, {0, 4 << 20}, {1, 4 << 20}}, + 3}, + + // Test case that we can only scale up to six drivers as hit the query + // meomry usage ratio limit. + {256 << 20, + 0.5, + 64 << 20, + 32 << 20, + 32 << 20, + 8, + {{0, 16 << 20}, + {1, 16 << 20}, + {2, 16 << 20}, + {3, 16 << 20}, + {4, 16 << 20}, + {5, 16 << 20}}, + 6}, + + // Test cases that we can't scale up because of the peak memory usage of + // scan node. + {256 << 20, 0.5, 64 << 20, 32 << 20, 128 << 20, 8, {{0, 16 << 20}}, 1}, + {256 << 20, 0.5, 64 << 20, 32 << 20, 96 << 20, 8, {{0, 16 << 20}}, 1}, + + // Test cases that we can't scale up because of the sum of the estimated + // driver memory usage. + {256 << 20, + 0.5, + 64 << 20, + 32 << 20, + 80 << 20, + 8, + {{0, 16 << 20}, + {1, 16 << 20}, + {2, 16 << 20}, + {3, 16 << 20}, + {4, 16 << 20}, + {5, 16 << 20}}, + 6}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + auto root = rootPool(testData.queryCapacity); + auto node = root->addAggregateChild("test"); + auto pool = node->addLeafChild("test"); + if (testData.nodePeakMemoryUsage > 0) { + void* tmpBuffer = pool->allocate(testData.nodePeakMemoryUsage); + pool->free(tmpBuffer, testData.nodePeakMemoryUsage); + } + void* buffer{nullptr}; + if (testData.nodeMemoryUsage > 0) { + buffer = pool->allocate(testData.nodeMemoryUsage); + } + SCOPE_EXIT { + if (buffer != nullptr) { + pool->free(buffer, testData.nodeMemoryUsage); + } + }; + + auto otherNode = root->addAggregateChild("other"); + auto otherPool = otherNode->addLeafChild("other"); + void* otherBuffer{nullptr}; + if (testData.queryMemoryUsage > testData.nodeMemoryUsage) { + otherBuffer = otherPool->allocate( + testData.queryMemoryUsage - testData.nodeMemoryUsage); + } + SCOPE_EXIT { + if (otherBuffer != nullptr) { + otherPool->free( + otherBuffer, testData.queryMemoryUsage - testData.nodeMemoryUsage); + } + }; + + auto controller = std::make_shared( + node.get(), testData.numDrivers, testData.scaleUpMemoryUsageRatio); + for (auto& [driverIdx, memoryUsage] : testData.driverMemoryUsageUpdates) { + controller->updateAndTryScale(driverIdx, memoryUsage); + } + + ASSERT_EQ( + testData.expectedNumRunningDrivers, + controller->stats().numRunningDrivers); + + std::vector futures; + futures.reserve(testData.numDrivers); + for (auto i = 0; i < testData.numDrivers; ++i) { + ContinueFuture future{ContinueFuture::makeEmpty()}; + if (i < testData.expectedNumRunningDrivers) { + ASSERT_FALSE(controller->shouldStop(i, &future)); + ASSERT_FALSE(future.valid()); + } else { + ASSERT_TRUE(controller->shouldStop(i, &future)); + ASSERT_TRUE(future.valid()); + futures.push_back(std::move(future)); + } + } + + for (const auto& future : futures) { + ASSERT_FALSE(future.isReady()); + } + + ASSERT_TRUE(controller->close()); + ASSERT_FALSE(controller->close()); + ASSERT_FALSE(controller->close()); + + for (const auto& future : futures) { + ASSERT_TRUE(future.isReady()); + } + } +} + +TEST_F(ScaledScanControllerTest, estimateDriverUsage) { + auto root = rootPool(256 << 20); + auto node = root->addAggregateChild("test"); + auto pool = node->addLeafChild("test"); + const int numDrivers{4}; + auto controller = + std::make_shared(node.get(), numDrivers, 0.5); + std::vector futures; + futures.resize(numDrivers); + for (auto i = 0; i < numDrivers; ++i) { + ContinueFuture future{ContinueFuture::makeEmpty()}; + if (i == 0) { + ASSERT_FALSE(controller->shouldStop(i, &future)); + ASSERT_FALSE(future.valid()); + } else { + ASSERT_TRUE(controller->shouldStop(i, &future)); + ASSERT_TRUE(future.valid()); + futures.push_back(std::move(future)); + } + } + + ScaledScanControllerTestHelper helper(controller.get()); + uint64_t expectedDriverMemoryUsage{0}; + for (auto i = 0; i < numDrivers - 1; ++i) { + controller->updateAndTryScale(i, (i + 1) << 20); + if (i == 0) { + expectedDriverMemoryUsage = (i + 1) << 20; + } else { + expectedDriverMemoryUsage = + (expectedDriverMemoryUsage * 3 + ((i + 1) << 20)) / 4; + } + ASSERT_EQ(helper.estimatedDriverUsage(), expectedDriverMemoryUsage); + } + + controller.reset(); + for (auto& future : futures) { + ASSERT_TRUE(future.isReady()); + } +} + +TEST_F(ScaledScanControllerTest, error) { + auto root = rootPool(256 << 20); + auto node = root->addAggregateChild("test"); + + VELOX_ASSERT_THROW( + std::make_shared(node.get(), 0, 0.5), ""); + VELOX_ASSERT_THROW( + std::make_shared(node.get(), 0, -1), ""); + VELOX_ASSERT_THROW( + std::make_shared(node.get(), 0, 2), ""); + VELOX_ASSERT_THROW( + std::make_shared(root.get(), 0, 2), ""); +} + +TEST_F(ScaledScanControllerTest, fuzzer) { + auto root = rootPool(256 << 20); + auto node = root->addAggregateChild("fuzzer"); + const int numDrivers{4}; + std::vector> pools; + for (int i = 0; i < numDrivers; ++i) { + pools.push_back(node->addLeafChild(fmt::format("fuzzer{}", i))); + } + auto controller = + std::make_shared(node.get(), numDrivers, 0.7); + std::vector futures{numDrivers}; + for (int i = 0; i < numDrivers; ++i) { + controller->shouldStop(i, &futures[i]); + } + + std::vector driverThreads; + std::atomic_int closeCount{0}; + for (int i = 0; i < numDrivers; ++i) { + driverThreads.push_back(std::thread([&, i]() { + auto rng = folly::Random::DefaultGenerator(i); + auto pool = pools[i]; + if (futures[i].valid()) { + futures[i].wait(); + } + for (int j = 0; j < 1'000; ++j) { + uint64_t allocationBytes{0}; + switch (folly::Random::rand32(rng) % 3) { + case 0: + allocationBytes = 256 / numDrivers; + break; + case 1: + allocationBytes = 256 / numDrivers / 2; + break; + case 2: + allocationBytes = 256 / numDrivers / 4; + break; + } + void* buffer = pool->allocate(allocationBytes); + SCOPE_EXIT { + pool->free(buffer, allocationBytes); + }; + controller->updateAndTryScale(i, pool->peakBytes()); + } + if (controller->close()) { + ++closeCount; + } + })); + } + for (auto& thread : driverThreads) { + thread.join(); + } + ASSERT_EQ(closeCount.load(), 1); +} +} // namespace facebook::velox::exec::test diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h index 65640357e533..75bc8ad05f8e 100644 --- a/velox/tool/trace/TraceReplayRunner.h +++ b/velox/tool/trace/TraceReplayRunner.h @@ -31,7 +31,6 @@ DECLARE_string(node_id); DECLARE_int32(driver_id); DECLARE_string(driver_ids); DECLARE_string(table_writer_output_dir); -DECLARE_double(hiveConnectorExecutorHwMultiplier); DECLARE_int32(shuffle_serialization_format); namespace facebook::velox::tool::trace {