diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index e6c4032768b1..90105af887e6 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -472,6 +472,24 @@ class QueryConfig { static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold = "scaled_writer_min_processed_bytes_rebalance_threshold"; + /// If true, enables the scaled table scan processing. For each table scan + /// plan node, a scan controller is used to control the number of running scan + /// threads based on the query memory usage. It keeps increasing the number of + /// running threads until the query memory usage exceeds the threshold defined + /// by 'table_scan_scale_up_memory_usage_ratio'. + static constexpr const char* kTableScanScaledProcessingEnabled = + "table_scan_scaled_processing_enabled"; + + /// The query memory usage ratio used by scan controller to decide if it can + /// increase the number of running scan threads. When the query memory usage + /// is below this ratio, the scan controller keeps increasing the running scan + /// thread for scale up, and stop once exceeds this ratio. The value is in the + /// range of [0, 1]. + /// + /// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true. + static constexpr const char* kTableScanScaleUpMemoryUsageRatio = + "table_scan_scale_up_memory_usage_ratio"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -880,6 +898,14 @@ class QueryConfig { kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20); } + bool tableScanScaledProcessingEnabled() const { + return get(kTableScanScaledProcessingEnabled, false); + } + + double tableScanScaleUpMemoryUsageRatio() const { + return get(kTableScanScaleUpMemoryUsageRatio, 0.7); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index a932ecd57558..9fe2534b1262 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -390,6 +390,23 @@ Table Scan - integer - 2 - Maximum number of splits to preload per driver. Set to 0 to disable preloading. + * - table_scan_scaled_processing_enabled + - bool + - false + - If true, enables the scaled table scan processing. For each table scan + plan node, a scan controller is used to control the number of running scan + threads based on the query memory usage. It keeps increasing the number of + running threads until the query memory usage exceeds the threshold defined + by 'table_scan_scale_up_memory_usage_ratio'. + * - table_scan_scale_up_memory_usage_ratio + - double + - 0.5 + - The query memory usage ratio used by scan controller to decide if it can + increase the number of running scan threads. When the query memory usage + is below this ratio, the scan controller scale up the scan processing by + increasing the number of running scan threads, and stop once exceeds this + ratio. The value is in the range of [0, 1]. This only applies if + 'table_scan_scaled_processing_enabled' is true. Table Writer ------------ @@ -414,26 +431,26 @@ Table Writer - double - 0.7 - The max ratio of a query used memory to its max capacity, and the scale - - writer exchange stops scaling writer processing if the query's current - - memory usage exceeds this ratio. The value is in the range of (0, 1]. + writer exchange stops scaling writer processing if the query's current + memory usage exceeds this ratio. The value is in the range of (0, 1]. * - scaled_writer_max_partitions_per_writer - integer - 128 - The max number of logical table partitions that can be assigned to a - - single table writer thread. The logical table partition is used by local - - exchange writer for writer scaling, and multiple physical table - - partitions can be mapped to the same logical table partition based on the - - hash value of calculated partitioned ids. + single table writer thread. The logical table partition is used by local + exchange writer for writer scaling, and multiple physical table + partitions can be mapped to the same logical table partition based on the + hash value of calculated partitioned ids. + * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - integer - 128MB - * - scaled_writer_min_partition_processed_bytes_rebalance_threshold - Minimum amount of data processed by a logical table partition to trigger - - writer scaling if it is detected as overloaded by scale wrirer exchange. + writer scaling if it is detected as overloaded by scale wrirer exchange. * - scaled_writer_min_processed_bytes_rebalance_threshold - - Minimum amount of data processed by all the logical table partitions to - - trigger skewed partition rebalancing by scale writer exchange. - integer - 256MB + - Minimum amount of data processed by all the logical table partitions to + trigger skewed partition rebalancing by scale writer exchange. Hive Connector -------------- diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index 53538d854974..a6feb1d32cb0 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -90,6 +90,21 @@ These stats are reported only by HashBuild and HashAggregation operators. - Time spent on building the hash table from rows collected by all the hash build operators. This stat is only reported by the HashBuild operator. +TableScan +--------- +These stats are reported only by TableScan operator + +.. list-table:: + :widths: 50 25 50 + :header-rows: 1 + + * - Stats + - Unit + - Description + * - numRunningScanThreads + - + - The number of running table scan drivers. + TableWriter ----------- These stats are reported only by TableWriter operator diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 80bd83d2084e..d519410745c9 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -74,6 +74,7 @@ velox_add_library( RowsStreamingWindowBuild.cpp RowContainer.cpp RowNumber.cpp + ScaledScanController.cpp ScaleWriterLocalPartition.cpp SortBuffer.cpp SortedAggregations.cpp diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index e5c4656d6521..9a6a4740ddd8 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -631,11 +631,10 @@ StopReason Driver::runInternal( nextOp, curOperatorId_ + 1, kOpMethodAddInput); - - // The next iteration will see if operators_[i + 1] has - // output now that it got input. - i += 2; }); + // The next iteration will see if operators_[i + 1] has + // output now that it got input. + i += 2; continue; } else { stop = task()->shouldStop(); @@ -1129,6 +1128,8 @@ std::string blockingReasonToString(BlockingReason reason) { return "kYield"; case BlockingReason::kWaitForArbitration: return "kWaitForArbitration"; + case BlockingReason::kWaitForScanScaleUp: + return "kWaitForScanScaleUp"; default: VELOX_UNREACHABLE( fmt::format("Unknown blocking reason {}", static_cast(reason))); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 7d46017187e4..0028f9ac438a 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -210,6 +210,10 @@ enum class BlockingReason { /// Operator is blocked waiting for its associated query memory arbitration to /// finish. kWaitForArbitration, + /// For a table scan operator, it is blocked waiting for the scan controller + /// to increase the number of table scan processing threads to start + /// processing. + kWaitForScanScaleUp, }; std::string blockingReasonToString(BlockingReason reason); @@ -702,6 +706,18 @@ struct DriverFactory { return false; } + /// Returns true if the pipeline gets data from a table scan. The function + /// sets plan node id in 'planNodeId'. + bool needsTableScan(core::PlanNodeId& planNodeId) const { + VELOX_CHECK(!planNodes.empty()); + if (auto scanNode = std::dynamic_pointer_cast( + planNodes.front())) { + planNodeId = scanNode->id(); + return true; + } + return false; + } + /// Returns plan node IDs for which Hash Join Bridges must be created based /// on this pipeline. std::vector needsHashJoinBridges() const; diff --git a/velox/exec/ScaledScanController.cpp b/velox/exec/ScaledScanController.cpp new file mode 100644 index 000000000000..85bd3537120b --- /dev/null +++ b/velox/exec/ScaledScanController.cpp @@ -0,0 +1,190 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/ScaledScanController.h" + +using facebook::velox::common::testutil::TestValue; + +namespace facebook::velox::exec { + +ScaledScanController::ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio) + : queryPool_(nodePool->root()), + nodePool_(nodePool), + numDrivers_(numDrivers), + scaleUpMemoryUsageRatio_(scaleUpMemoryUsageRatio), + driverPromises_(numDrivers_) { + VELOX_CHECK_NOT_NULL(queryPool_); + VELOX_CHECK_NOT_NULL(nodePool_); + VELOX_CHECK_GT(numDrivers_, 0); + VELOX_CHECK_GE(scaleUpMemoryUsageRatio_, 0.0); + VELOX_CHECK_LE(scaleUpMemoryUsageRatio_, 1.0); +} + +bool ScaledScanController::shouldStop( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::lock_guard l(lock_); + if (closed_) { + return false; + } + return shouldStopLocked(driverIdx, future); +} + +bool ScaledScanController::shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future) { + VELOX_CHECK(!closed_); + if (driverIdx < numRunningDrivers_) { + return false; + } + + VELOX_CHECK(!driverPromises_[driverIdx].has_value()); + auto [driverPromise, driverFuture] = makeVeloxContinuePromiseContract( + fmt::format("Table scan driver {} scale promise", driverIdx)); + driverPromises_[driverIdx] = std::move(driverPromise); + *future = std::move(driverFuture); + return true; +} + +void ScaledScanController::updateAndTryScale( + uint32_t driverIdx, + uint64_t memoryUsage) { + VELOX_CHECK_LT(driverIdx, numDrivers_); + + std::optional driverPromise; + SCOPE_EXIT { + if (driverPromise.has_value()) { + driverPromise->setValue(); + } + }; + { + std::lock_guard l(lock_); + VELOX_CHECK_LT(driverIdx, numRunningDrivers_); + + if (closed_) { + return; + } + + updateDriverScanUsageLocked(driverIdx, memoryUsage); + + tryScaleLocked(driverPromise); + } +} + +void ScaledScanController::updateDriverScanUsageLocked( + uint32_t driverIdx, + uint64_t memoryUsage) { + if (estimatedDriverUsage_ == 0) { + estimatedDriverUsage_ = memoryUsage; + } else { + estimatedDriverUsage_ = (estimatedDriverUsage_ * 3 + memoryUsage) / 4; + } + + if (numDriverReportedUsage_ == numRunningDrivers_) { + return; + } + VELOX_CHECK_EQ(numDriverReportedUsage_ + 1, numRunningDrivers_); + + if (driverIdx + 1 < numRunningDrivers_) { + return; + } + VELOX_CHECK_EQ(driverIdx, numRunningDrivers_ - 1); + ++numDriverReportedUsage_; +} + +void ScaledScanController::tryScaleLocked( + std::optional& driverPromise) { + VELOX_CHECK_LE(numDriverReportedUsage_, numRunningDrivers_); + + if (numRunningDrivers_ == numDrivers_) { + return; + } + if (numDriverReportedUsage_ < numRunningDrivers_) { + // We shall only make the next scale up decision until we have received + // the memory usage updates from all the running scan drivers. + return; + } + + const uint64_t peakNodeUsage = nodePool_->peakBytes(); + const uint64_t estimatedPeakNodeUsageAfterScale = std::max( + estimatedDriverUsage_ * (numRunningDrivers_ + 1), + peakNodeUsage + estimatedDriverUsage_); + + const uint64_t currNodeUsage = nodePool_->reservedBytes(); + const uint64_t currQueryUsage = queryPool_->reservedBytes(); + const uint64_t currOtherUsage = + currQueryUsage > currNodeUsage ? currQueryUsage - currNodeUsage : 0; + + const uint64_t estimatedQueryUsageAfterScale = std::max( + currQueryUsage + estimatedDriverUsage_, + currOtherUsage + estimatedPeakNodeUsageAfterScale); + + const uint64_t maxQueryCapacity = queryPool_->maxCapacity(); + if (estimatedQueryUsageAfterScale > + maxQueryCapacity * scaleUpMemoryUsageRatio_) { + return; + } + + scaleUpLocked(driverPromise); +} + +void ScaledScanController::scaleUpLocked( + std::optional& driverPromise) { + VELOX_CHECK_LT(numRunningDrivers_, numDrivers_); + + ++numRunningDrivers_; + if (driverPromises_[numRunningDrivers_ - 1].has_value()) { + driverPromise = std::move(driverPromises_[numRunningDrivers_ - 1]); + driverPromises_[numRunningDrivers_ - 1].reset(); + } +} + +ScaledScanController::~ScaledScanController() { + close(); +} + +bool ScaledScanController::close() { + std::vector promises; + { + std::lock_guard l(lock_); + if (closed_) { + return false; + } + + promises.reserve(driverPromises_.size()); + for (auto& promise : driverPromises_) { + if (promise.has_value()) { + promises.emplace_back(std::move(promise.value())); + promise.reset(); + } + } + closed_ = true; + } + + for (auto& promise : promises) { + promise.setValue(); + } + return true; +} + +std::string ScaledScanController::Stats::toString() const { + return fmt::format("numRunningDrivers: {}", numRunningDrivers); +} +} // namespace facebook::velox::exec diff --git a/velox/exec/ScaledScanController.h b/velox/exec/ScaledScanController.h new file mode 100644 index 000000000000..9b046225a0d2 --- /dev/null +++ b/velox/exec/ScaledScanController.h @@ -0,0 +1,123 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/memory/Memory.h" + +namespace facebook::velox::exec { + +namespace test { +class ScaledScanControllerTestHelper; +} + +/// Controller used to scales table scan processing based on the query memory +/// usage. +class ScaledScanController { + public: + /// 'nodePool' is the table scan node pool. 'numDrivers' is number of the + /// table scan drivers. 'scaleUpMemoryUsageRatio' specifies the memory usage + /// ratio used to make scan scale up decision. + ScaledScanController( + memory::MemoryPool* nodePool, + uint32_t numDrivers, + double scaleUpMemoryUsageRatio); + + ~ScaledScanController(); + + ScaledScanController(const ScaledScanController&) = delete; + ScaledScanController(ScaledScanController&&) = delete; + ScaledScanController& operator=(const ScaledScanController&) = delete; + ScaledScanController& operator=(ScaledScanController&&) = delete; + + /// Invoked by a scan operator to check if it needs to stop waiting for scan + /// up to start processing. If so, 'future' is set to a future that will be + /// ready when the controller decides to start this scan operator processing, + /// or all the splits from the scan node have been dispatched to finish all + /// the scan operators. 'driverIcx' is the driver id of the scan operator. + /// Initially, only the first scan operator at driver index 0 is allowed to + /// run. + bool shouldStop(uint32_t driverIdx, ContinueFuture* future); + + /// Invoked by a scan operator to update per-driver memory usage estimation + /// after finish processing a non-empty split. 'driverIdx' is the driver id of + /// the scan operator. 'driverMemoryUsage' is the peak memory usage of the + /// scan operator. + void updateAndTryScale(uint32_t driverIdx, uint64_t driverMemoryUsage); + + struct Stats { + uint32_t numRunningDrivers{0}; + + std::string toString() const; + }; + + Stats stats() const { + std::lock_guard l(lock_); + return {.numRunningDrivers = numRunningDrivers_}; + } + + /// Invoked by the closed scan operator to close the controller. It returns + /// true on the first invocation, and otherwise false. + bool close(); + + void testingSetMemoryRatio(double scaleUpMemoryUsageRatio) { + std::lock_guard l(lock_); + *const_cast(&scaleUpMemoryUsageRatio_) = scaleUpMemoryUsageRatio; + } + + private: + // Invoked to check if we can scale up scan processing. If so, call + // 'scaleUpLocked' for scale up processing. + void tryScaleLocked(std::optional& driverePromise); + + // Invoked to scale up scan processing by bumping up the number of running + // scan drivers by one. 'drverPromise' returns the promise to fulfill if the + // scaled scan driver has been stopped. + void scaleUpLocked(std::optional& driverePromise); + + // Invoked to check if we need to stop waiting for scale up processing of the + // specified scan driver. If 'driverIdx' is beyond 'numRunningDrivers_', then + // a promise is set in 'driverPromises_' and the associated future is returned + // in 'future'. + bool shouldStopLocked( + uint32_t driverIdx, + facebook::velox::ContinueFuture* future); + + /// Invoked to update the per-driver memory usage estimation with a new driver + /// report. + void updateDriverScanUsageLocked(uint32_t driverIdx, uint64_t memoryUsage); + + memory::MemoryPool* const queryPool_; + memory::MemoryPool* const nodePool_; + const uint32_t numDrivers_; + const double scaleUpMemoryUsageRatio_; + + mutable std::mutex lock_; + uint32_t numRunningDrivers_{1}; + + // The estimated per-driver memory usage of the table scan node. + uint64_t estimatedDriverUsage_{0}; + + // The number of drivers that have reported memory usage. + uint32_t numDriverReportedUsage_{0}; + + // The driver resume promises with one per each driver index. + std::vector> driverPromises_; + + bool closed_{false}; + + friend class test::ScaledScanControllerTestHelper; +}; +} // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 7586a52f763f..232786f14ce1 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -37,19 +37,22 @@ TableScan::TableScan( tableHandle_(tableScanNode->tableHandle()), columnHandles_(tableScanNode->assignments()), driverCtx_(driverCtx), + maxSplitPreloadPerDriver_( + driverCtx_->queryConfig().maxSplitPreloadPerDriver()), + maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), connectorPool_(driverCtx_->task->addConnectorPoolLocked( planNodeId(), driverCtx_->pipelineId, driverCtx_->driverId, operatorType(), tableHandle_->connectorId())), - maxSplitPreloadPerDriver_( - driverCtx_->queryConfig().maxSplitPreloadPerDriver()), - readBatchSize_(driverCtx_->queryConfig().preferredOutputBatchRows()), - maxReadBatchSize_(driverCtx_->queryConfig().maxOutputBatchRows()), + connector_(connector::getConnector(tableHandle_->connectorId())), getOutputTimeLimitMs_( - driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()) { - connector_ = connector::getConnector(tableHandle_->connectorId()); + driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()), + scaledController_(driverCtx_->task->getScaledScanControllerLocked( + driverCtx_->splitGroupId, + planNodeId())) { + readBatchSize_ = driverCtx_->queryConfig().preferredOutputBatchRows(); } folly::dynamic TableScan::toJson() const { @@ -76,10 +79,20 @@ bool TableScan::shouldStop(StopReason taskStopReason) const { RowVectorPtr TableScan::getOutput() { auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; }); + VELOX_CHECK(!blockingFuture_.valid()); + blockingReason_ = BlockingReason::kNotBlocked; + if (noMoreSplits_) { return nullptr; } + // Check if we need to wait for scale up. We expect only wait once on startup. + if (shouldWaitForScaleUp()) { + VELOX_CHECK(blockingFuture_.valid()); + VELOX_CHECK_EQ(blockingReason_, BlockingReason::kWaitForScanScaleUp); + return nullptr; + } + curStatus_ = "getOutput: enter"; const auto startTimeMs = getCurrentTimeMs(); for (;;) { @@ -259,6 +272,7 @@ RowVectorPtr TableScan::getOutput() { curStatus_ = "getOutput: updating stats_.rawInput"; lockedStats->rawInputPositions = dataSource_->getCompletedRows(); lockedStats->rawInputBytes = dataSource_->getCompletedBytes(); + RowVectorPtr data = std::move(dataOptional).value(); if (data != nullptr) { if (data->size() > 0) { @@ -274,6 +288,7 @@ RowVectorPtr TableScan::getOutput() { } } + uint64_t currNumRawInputRows{0}; { curStatus_ = "getOutput: updating stats_.preloadedSplits"; auto lockedStats = stats_.wlock(); @@ -287,12 +302,50 @@ RowVectorPtr TableScan::getOutput() { "readyPreloadedSplits", RuntimeCounter(numReadyPreloadedSplits_)); numReadyPreloadedSplits_ = 0; } + currNumRawInputRows = lockedStats->rawInputPositions; } + VELOX_CHECK_LE(rawInputRowsSinceLastSplit_, currNumRawInputRows); + const bool emptySplit = currNumRawInputRows == rawInputRowsSinceLastSplit_; + rawInputRowsSinceLastSplit_ = currNumRawInputRows; curStatus_ = "getOutput: task->splitFinished"; driverCtx_->task->splitFinished(true, currentSplitWeight_); needNewSplit_ = true; + + // We only update scaled controller when we have finished a non-empty split. + // Otherwise, it can lead to the wrong scale up decisions if the first few + // splits are empty. Then we only report the memory usage for the file + // footer read which is much smaller the actual memory usage when read from + // a non-empty split. This can cause query OOM as we run too many scan + // drivers with each use non-trivial amount of memory. + if (!emptySplit) { + tryScaleUp(); + } + } +} + +bool TableScan::shouldWaitForScaleUp() { + if (scaledController_ == nullptr) { + return false; + } + + curStatus_ = "getOutput: shouldWaitForScaleUp"; + if (!scaledController_->shouldStop( + operatorCtx_->driverCtx()->driverId, &blockingFuture_)) { + VELOX_CHECK(!blockingFuture_.valid()); + return false; + } + blockingReason_ = BlockingReason::kWaitForScanScaleUp; + return true; +} + +void TableScan::tryScaleUp() { + if (scaledController_ == nullptr) { + return; } + + scaledController_->updateAndTryScale( + operatorCtx_->driverCtx()->driverId, pool()->peakBytes()); } void TableScan::preload( @@ -380,4 +433,23 @@ void TableScan::addDynamicFilter( stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer); } +void TableScan::close() { + Operator::close(); + + if (scaledController_ == nullptr) { + return; + } + + // Report the scaled controller stats by the first finished scan operator at + // which point all the splits have been dispatched. + if (!scaledController_->close()) { + return; + } + + const auto scaledStats = scaledController_->stats(); + auto lockedStats = stats_.wlock(); + lockedStats->addRuntimeStat( + TableScan::kNumRunningScaleThreads, + RuntimeCounter(scaledStats.numRunningDrivers)); +} } // namespace facebook::velox::exec diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index deec109b02c1..548ae249ffb9 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -17,6 +17,7 @@ #include "velox/core/PlanNode.h" #include "velox/exec/Operator.h" +#include "velox/exec/ScaledScanController.h" namespace facebook::velox::exec { @@ -41,6 +42,8 @@ class TableScan : public SourceOperator { bool isFinished() override; + void close() override; + bool canAddDynamicFilter() const override { return connector_->canAddDynamicFilter(); } @@ -50,6 +53,18 @@ class TableScan : public SourceOperator { column_index_t outputChannel, const std::shared_ptr& filter) override; + /// The name of runtime stats specific to table scan. + /// The number of running table scan drivers. + /// + /// NOTE: we only report the number of running scan drivers at the point that + /// all the splits have been dispatched. + static inline const std::string kNumRunningScaleThreads{ + "numRunningScaleThreads"}; + + std::shared_ptr testingScaledController() const { + return scaledController_; + } + private: // Checks if this table scan operator needs to yield before processing the // next split. @@ -70,17 +85,37 @@ class TableScan : public SourceOperator { // done, it will be made when needed. void preload(const std::shared_ptr& split); + // Invoked by scan operator to check if it needs to stop to wait for scale up. + bool shouldWaitForScaleUp(); + + // Invoked after scan operator finishes processing a non-empty split to update + // the scan driver memory usage and check to see if we need to scale up scan + // processing or not. + void tryScaleUp(); + const std::shared_ptr tableHandle_; const std:: unordered_map> columnHandles_; DriverCtx* const driverCtx_; + const int32_t maxSplitPreloadPerDriver_{0}; + const vector_size_t maxReadBatchSize_; memory::MemoryPool* const connectorPool_; + const std::shared_ptr connector_; + // Exits getOutput() method after this many milliseconds. Zero means 'no + // limit'. + const size_t getOutputTimeLimitMs_{0}; + + // If set, used for scan scale processing. It is shared by all the scan + // operators instantiated from the same table scan node. + const std::shared_ptr scaledController_; + + vector_size_t readBatchSize_; + ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; int64_t currentSplitWeight_{0}; bool needNewSplit_ = true; - std::shared_ptr connector_; std::shared_ptr connectorQueryCtx_; std::unique_ptr dataSource_; bool noMoreSplits_ = false; @@ -90,8 +125,6 @@ class TableScan : public SourceOperator { int32_t maxPreloadedSplits_{0}; - const int32_t maxSplitPreloadPerDriver_{0}; - // Callback passed to getSplitOrFuture() for triggering async preload. The // callback's lifetime is the lifetime of 'this'. This callback can schedule // preloads on an executor. These preloads may outlive the Task and therefore @@ -105,13 +138,6 @@ class TableScan : public SourceOperator { // Count of splits that finished preloading before being read. int32_t numReadyPreloadedSplits_{0}; - vector_size_t readBatchSize_; - vector_size_t maxReadBatchSize_; - - // Exits getOutput() method after this many milliseconds. Zero means 'no - // limit'. - size_t getOutputTimeLimitMs_{0}; - double maxFilteringRatio_{0}; // String shown in ExceptionContext inside DataSource and LazyVector loading. @@ -120,5 +146,9 @@ class TableScan : public SourceOperator { // Holds the current status of the operator. Used when debugging to understand // what operator is doing. std::atomic curStatus_{""}; + + // The total number of raw input rows read up till the last finished split. + // This is used to detect if a finished split is empty or not. + uint64_t rawInputRowsSinceLastSplit_{0}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index fe97ad27dc4f..587c571b76f9 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -1122,6 +1122,14 @@ void Task::createSplitGroupStateLocked(uint32_t splitGroupId) { addNestedLoopJoinBridgesLocked( splitGroupId, factory->needsNestedLoopJoinBridges()); addCustomJoinBridgesLocked(splitGroupId, factory->planNodes); + + core::PlanNodeId tableScanNodeId; + if (queryCtx_->queryConfig().tableScanScaledProcessingEnabled() && + factory->needsTableScan(tableScanNodeId)) { + VELOX_CHECK(!tableScanNodeId.empty()); + addScaledScanControllerLocked( + splitGroupId, tableScanNodeId, factory->numDrivers); + } } } @@ -1632,6 +1640,37 @@ exec::Split Task::getSplitLocked( return split; } +std::shared_ptr Task::getScaledScanControllerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId) { + auto& splitGroupState = splitGroupStates_[splitGroupId]; + auto it = splitGroupState.scaledScanControllers.find(planNodeId); + if (it == splitGroupState.scaledScanControllers.end()) { + VELOX_CHECK(!queryCtx_->queryConfig().tableScanScaledProcessingEnabled()); + return nullptr; + } + + VELOX_CHECK(queryCtx_->queryConfig().tableScanScaledProcessingEnabled()); + VELOX_CHECK_NOT_NULL(it->second); + return it->second; +} + +void Task::addScaledScanControllerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId, + uint32_t numDrivers) { + VELOX_CHECK(queryCtx_->queryConfig().tableScanScaledProcessingEnabled()); + + auto& splitGroupState = splitGroupStates_[splitGroupId]; + VELOX_CHECK_EQ(splitGroupState.scaledScanControllers.count(planNodeId), 0); + splitGroupState.scaledScanControllers.emplace( + planNodeId, + std::make_shared( + getOrAddNodePool(planNodeId), + numDrivers, + queryCtx_->queryConfig().tableScanScaleUpMemoryUsageRatio())); +} + void Task::splitFinished(bool fromTableScan, int64_t splitWeight) { std::lock_guard l(mutex_); ++taskStats_.numFinishedSplits; diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 84680e90d420..e31b64435d59 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -23,6 +23,7 @@ #include "velox/exec/MemoryReclaimer.h" #include "velox/exec/MergeSource.h" #include "velox/exec/Split.h" +#include "velox/exec/TableScan.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" #include "velox/exec/TaskTraceWriter.h" @@ -430,6 +431,12 @@ class Task : public std::enable_shared_from_this { int32_t maxPreloadSplits = 0, const ConnectorSplitPreloadFunc& preload = nullptr); + /// Returns the scaled scan controller for a given table scan node if the + /// query has configured. + std::shared_ptr getScaledScanControllerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId); + void splitFinished(bool fromTableScan, int64_t splitWeight); void multipleSplitsFinished( @@ -779,6 +786,12 @@ class Task : public std::enable_shared_from_this { // Invoked to initialize the memory pool for this task on creation. void initTaskPool(); + // Creates a scaled scan controller for a given table scan node. + void addScaledScanControllerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId, + uint32_t numDrivers); + // Creates new instance of memory pool for a plan node, stores it in the task // to ensure lifetime and returns a raw pointer. memory::MemoryPool* getOrAddNodePool(const core::PlanNodeId& planNodeId); diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index 7c193240689a..cf3a39703fd4 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -116,6 +116,10 @@ struct SplitGroupState { /// Map of local exchanges keyed on LocalPartition plan node ID. std::unordered_map localExchanges; + /// Map of scaled scan controllers keyed on TableScan plan node ID. + std::unordered_map> + scaledScanControllers; + /// Drivers created and still running for this split group. /// The split group is finished when this numbers reaches zero. uint32_t numRunningDrivers{0}; diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 3e8fb5094bca..9892ce4afab1 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -74,6 +74,7 @@ add_executable( RoundRobinPartitionFunctionTest.cpp RowContainerTest.cpp RowNumberTest.cpp + ScaledScanControllerTest.cpp ScaleWriterLocalPartitionTest.cpp SortBufferTest.cpp SpillerTest.cpp diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index b57a55e8fee9..3baff771ea9d 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -2481,6 +2481,129 @@ TEST_P(MultiFragmentTest, compression) { test("local://t2", 0.0000001, true); } +TEST_P(MultiFragmentTest, scaledTableScan) { + const int numSplits = 20; + std::vector> splitFiles; + std::vector splitVectors; + for (auto i = 0; i < numSplits; ++i) { + auto vectors = makeVectors(10, 1'000); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + splitVectors.insert(splitVectors.end(), vectors.begin(), vectors.end()); + } + + createDuckDbTable(splitVectors); + + struct { + bool scaleEnabled; + double scaleUpMemoryUsageRatio; + bool expectScaleUp; + + std::string debugString() const { + return fmt::format( + "scaleEnabled {}, scaleUpMemoryUsageRatio {}, expectScaleUp {}", + scaleEnabled, + scaleUpMemoryUsageRatio, + expectScaleUp); + } + } testSettings[] = { + {false, 0.9, false}, + {true, 0.9, true}, + {false, 1.0, false}, + {true, 1.0, true}, + {false, 0.00001, false}, + {true, 0.00001, false}, + {false, 0.0, false}, + {true, 0.0, false}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId; + configSettings_[core::QueryConfig::kTableScanScaledProcessingEnabled] = + testData.scaleEnabled ? "true" : "false"; + configSettings_[core::QueryConfig::kTableScanScaleUpMemoryUsageRatio] = + std::to_string(testData.scaleUpMemoryUsageRatio); + + const auto leafPlan = + PlanBuilder() + .tableScan(rowType_) + .capturePlanNodeId(scanNodeId) + .partialAggregation( + {"c5"}, {"max(c0)", "sum(c1)", "sum(c2)", "sum(c3)", "sum(c4)"}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto leafTaskId = "local://leaf-0"; + auto leafTask = makeTask(leafTaskId, leafPlan, 0, nullptr, 128ULL << 20); + const auto numLeafDrivers{4}; + leafTask->start(numLeafDrivers); + addHiveSplits(leafTask, splitFiles); + + const auto finalAggPlan = + PlanBuilder() + .exchange(leafPlan->outputType(), GetParam()) + .finalAggregation( + {"c5"}, + {"max(a0)", "sum(a1)", "sum(a2)", "sum(a3)", "sum(a4)"}, + {{BIGINT()}, {INTEGER()}, {SMALLINT()}, {REAL()}, {DOUBLE()}}) + .partitionedOutput({}, 1, /*outputLayout=*/{}, GetParam()) + .planNode(); + + const auto finalAggTaskId = "local://final-agg-0"; + auto finalAggTask = makeTask(finalAggTaskId, finalAggPlan, 0); + const auto numFinalAggrDrivers{1}; + finalAggTask->start(numFinalAggrDrivers); + addRemoteSplits(finalAggTask, {leafTaskId}); + + const auto resultPlan = + PlanBuilder() + .exchange(finalAggPlan->outputType(), GetParam()) + .planNode(); + + assertQuery( + resultPlan, + {finalAggTaskId}, + "SELECT c5, max(c0), sum(c1), sum(c2), sum(c3), sum(c4) FROM tmp group by c5"); + + ASSERT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); + ASSERT_TRUE(waitForTaskCompletion(finalAggTask.get())) + << finalAggTask->taskId(); + + auto planStats = toPlanStats(leafTask->taskStats()); + if (testData.scaleEnabled) { + ASSERT_EQ( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 1); + if (testData.expectScaleUp) { + ASSERT_GE( + planStats.at(scanNodeId) + .customStats[TableScan::kNumRunningScaleThreads] + .sum, + 1); + ASSERT_LE( + planStats.at(scanNodeId) + .customStats[TableScan::kNumRunningScaleThreads] + .sum, + numLeafDrivers); + } else { + ASSERT_EQ( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 1); + } + } else { + ASSERT_EQ( + planStats.at(scanNodeId) + .customStats.count(TableScan::kNumRunningScaleThreads), + 0); + } + } +} + VELOX_INSTANTIATE_TEST_SUITE_P( MultiFragmentTest, MultiFragmentTest, diff --git a/velox/exec/tests/ScaledScanControllerTest.cpp b/velox/exec/tests/ScaledScanControllerTest.cpp new file mode 100644 index 000000000000..fbd68548ef37 --- /dev/null +++ b/velox/exec/tests/ScaledScanControllerTest.cpp @@ -0,0 +1,338 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/ScaledScanController.h" + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/core/PlanNode.h" +#include "velox/exec/PlanNodeStats.h" + +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; + +namespace facebook::velox::exec::test { +class ScaledScanControllerTestHelper { + public: + explicit ScaledScanControllerTestHelper(ScaledScanController* controller) + : controller_(controller) { + VELOX_CHECK_NOT_NULL(controller_); + } + + uint64_t estimatedDriverUsage() const { + return controller_->estimatedDriverUsage_; + } + + private: + ScaledScanController* const controller_; +}; + +class ScaledScanControllerTest : public OperatorTestBase { + protected: + std::shared_ptr rootPool(uint64_t maxCapacity) { + return memory::memoryManager()->addRootPool("", maxCapacity); + } +}; + +TEST_F(ScaledScanControllerTest, basic) { + struct { + // Specifies the query max capacity. + uint64_t queryCapacity; + // Specifies the scale up memory usage ratio. + double scaleUpMemoryUsageRatio; + // Specifies how much memory has been used by the query when we test for + // scale. + uint64_t queryMemoryUsage; + // Specifies how much memory has been used by the scan node when we test for + // scale. + uint64_t nodeMemoryUsage; + // Specifies the peak memory usage of the scan node when we test for scale. + uint64_t nodePeakMemoryUsage; + // Specifies the number of scan drivers in the query. + uint32_t numDrivers; + // Specifies the updates of the memory usage of the scan drivers. The test + // will invoke scale controller update API for each update in the order of + // the update vector list. + std::vector> driverMemoryUsageUpdates; + // Specifies the expected number of running scan drivers after the update or + // the end of the test. + uint32_t expectedNumRunningDrivers; + + std::string debugString() const { + return fmt::format( + "queryCapacity {}, scaleUpMemoryUsageRatio {}, queryMemoryUsage {}, nodeMemoryUsage {}, nodePeakMemoryUsage {}, numDrivers {}, expectedNumRunningDrivers {}", + succinctBytes(queryCapacity), + scaleUpMemoryUsageRatio, + succinctBytes(queryMemoryUsage), + succinctBytes(nodeMemoryUsage), + succinctBytes(nodePeakMemoryUsage), + numDrivers, + expectedNumRunningDrivers); + } + } testSettings[] = { + // Test case that we can't scale up because of the query memory usage + // ratio which is set to 0. + // 1 scan drivers in total. + {256 << 20, 0.0, 1 << 20, 1 << 20, 1 << 20, 1, {{0, 1 << 20}}, 1}, + // 4 scan drivers in total. + {256 << 20, + 0.0, + 32 << 20, + 16 << 20, + 16 << 20, + 4, + {{0, 1 << 20}, {0, 4 << 20}}, + 1}, + + // Test case that we can only scale up to two drivers as we only update + // stats from one driver so can't scale up beyond two. + {256 << 20, + 0.9, + 32 << 20, + 32 << 20, + 32 << 20, + 4, + {{0, 4 << 20}, {0, 4 << 20}}, + 2}, + + // Test case that we can only scale up to three drivers as we only update + // stats from two drivers so can't scale up beyond three. + {256 << 20, + 0.9, + 32 << 20, + 32 << 20, + 32 << 20, + 4, + {{0, 4 << 20}, {0, 4 << 20}, {1, 4 << 20}}, + 3}, + + // Test case that we can only scale up to six drivers as hit the query + // meomry usage ratio limit. + {256 << 20, + 0.5, + 64 << 20, + 32 << 20, + 32 << 20, + 8, + {{0, 16 << 20}, + {1, 16 << 20}, + {2, 16 << 20}, + {3, 16 << 20}, + {4, 16 << 20}, + {5, 16 << 20}}, + 6}, + + // Test cases that we can't scale up because of the peak memory usage of + // scan node. + {256 << 20, 0.5, 64 << 20, 32 << 20, 128 << 20, 8, {{0, 16 << 20}}, 1}, + {256 << 20, 0.5, 64 << 20, 32 << 20, 96 << 20, 8, {{0, 16 << 20}}, 1}, + + // Test cases that we can't scale up because of the sum of the estimated + // driver memory usage. + {256 << 20, + 0.5, + 64 << 20, + 32 << 20, + 80 << 20, + 8, + {{0, 16 << 20}, + {1, 16 << 20}, + {2, 16 << 20}, + {3, 16 << 20}, + {4, 16 << 20}, + {5, 16 << 20}}, + 6}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + auto root = rootPool(testData.queryCapacity); + auto node = root->addAggregateChild("test"); + auto pool = node->addLeafChild("test"); + if (testData.nodePeakMemoryUsage > 0) { + void* tmpBuffer = pool->allocate(testData.nodePeakMemoryUsage); + pool->free(tmpBuffer, testData.nodePeakMemoryUsage); + } + void* buffer{nullptr}; + if (testData.nodeMemoryUsage > 0) { + buffer = pool->allocate(testData.nodeMemoryUsage); + } + SCOPE_EXIT { + if (buffer != nullptr) { + pool->free(buffer, testData.nodeMemoryUsage); + } + }; + + auto otherNode = root->addAggregateChild("other"); + auto otherPool = otherNode->addLeafChild("other"); + void* otherBuffer{nullptr}; + if (testData.queryMemoryUsage > testData.nodeMemoryUsage) { + otherBuffer = otherPool->allocate( + testData.queryMemoryUsage - testData.nodeMemoryUsage); + } + SCOPE_EXIT { + if (otherBuffer != nullptr) { + otherPool->free( + otherBuffer, testData.queryMemoryUsage - testData.nodeMemoryUsage); + } + }; + + auto controller = std::make_shared( + node.get(), testData.numDrivers, testData.scaleUpMemoryUsageRatio); + for (auto& [driverIdx, memoryUsage] : testData.driverMemoryUsageUpdates) { + controller->updateAndTryScale(driverIdx, memoryUsage); + } + + ASSERT_EQ( + testData.expectedNumRunningDrivers, + controller->stats().numRunningDrivers); + + std::vector futures; + futures.reserve(testData.numDrivers); + for (auto i = 0; i < testData.numDrivers; ++i) { + ContinueFuture future{ContinueFuture::makeEmpty()}; + if (i < testData.expectedNumRunningDrivers) { + ASSERT_FALSE(controller->shouldStop(i, &future)); + ASSERT_FALSE(future.valid()); + } else { + ASSERT_TRUE(controller->shouldStop(i, &future)); + ASSERT_TRUE(future.valid()); + futures.push_back(std::move(future)); + } + } + + for (const auto& future : futures) { + ASSERT_FALSE(future.isReady()); + } + + ASSERT_TRUE(controller->close()); + ASSERT_FALSE(controller->close()); + ASSERT_FALSE(controller->close()); + + for (const auto& future : futures) { + ASSERT_TRUE(future.isReady()); + } + } +} + +TEST_F(ScaledScanControllerTest, estimateDriverUsage) { + auto root = rootPool(256 << 20); + auto node = root->addAggregateChild("test"); + auto pool = node->addLeafChild("test"); + const int numDrivers{4}; + auto controller = + std::make_shared(node.get(), numDrivers, 0.5); + std::vector futures; + futures.resize(numDrivers); + for (auto i = 0; i < numDrivers; ++i) { + ContinueFuture future{ContinueFuture::makeEmpty()}; + if (i == 0) { + ASSERT_FALSE(controller->shouldStop(i, &future)); + ASSERT_FALSE(future.valid()); + } else { + ASSERT_TRUE(controller->shouldStop(i, &future)); + ASSERT_TRUE(future.valid()); + futures.push_back(std::move(future)); + } + } + + ScaledScanControllerTestHelper helper(controller.get()); + uint64_t expectedDriverMemoryUsage{0}; + for (auto i = 0; i < numDrivers - 1; ++i) { + controller->updateAndTryScale(i, (i + 1) << 20); + if (i == 0) { + expectedDriverMemoryUsage = (i + 1) << 20; + } else { + expectedDriverMemoryUsage = + (expectedDriverMemoryUsage * 3 + ((i + 1) << 20)) / 4; + } + ASSERT_EQ(helper.estimatedDriverUsage(), expectedDriverMemoryUsage); + } + + controller.reset(); + for (auto& future : futures) { + ASSERT_TRUE(future.isReady()); + } +} + +TEST_F(ScaledScanControllerTest, error) { + auto root = rootPool(256 << 20); + auto node = root->addAggregateChild("test"); + + VELOX_ASSERT_THROW( + std::make_shared(node.get(), 0, 0.5), ""); + VELOX_ASSERT_THROW( + std::make_shared(node.get(), 0, -1), ""); + VELOX_ASSERT_THROW( + std::make_shared(node.get(), 0, 2), ""); + VELOX_ASSERT_THROW( + std::make_shared(root.get(), 0, 2), ""); +} + +TEST_F(ScaledScanControllerTest, fuzzer) { + auto root = rootPool(256 << 20); + auto node = root->addAggregateChild("fuzzer"); + const int numDrivers{4}; + std::vector> pools; + for (int i = 0; i < numDrivers; ++i) { + pools.push_back(node->addLeafChild(fmt::format("fuzzer{}", i))); + } + auto controller = + std::make_shared(node.get(), numDrivers, 0.7); + std::vector futures{numDrivers}; + for (int i = 0; i < numDrivers; ++i) { + controller->shouldStop(i, &futures[i]); + } + + std::vector driverThreads; + std::atomic_int closeCount{0}; + for (int i = 0; i < numDrivers; ++i) { + driverThreads.push_back(std::thread([&, i]() { + auto rng = folly::Random::DefaultGenerator(i); + auto pool = pools[i]; + if (futures[i].valid()) { + futures[i].wait(); + } + for (int j = 0; j < 1'000; ++j) { + uint64_t allocationBytes{0}; + switch (folly::Random::rand32(rng) % 3) { + case 0: + allocationBytes = 256 / numDrivers; + break; + case 1: + allocationBytes = 256 / numDrivers / 2; + break; + case 2: + allocationBytes = 256 / numDrivers / 4; + break; + } + void* buffer = pool->allocate(allocationBytes); + SCOPE_EXIT { + pool->free(buffer, allocationBytes); + }; + controller->updateAndTryScale(i, pool->peakBytes()); + } + if (controller->close()) { + ++closeCount; + } + })); + } + for (auto& thread : driverThreads) { + thread.join(); + } + ASSERT_EQ(closeCount.load(), 1); +} +} // namespace facebook::velox::exec::test diff --git a/velox/tool/trace/TraceReplayRunner.h b/velox/tool/trace/TraceReplayRunner.h index 65640357e533..75bc8ad05f8e 100644 --- a/velox/tool/trace/TraceReplayRunner.h +++ b/velox/tool/trace/TraceReplayRunner.h @@ -31,7 +31,6 @@ DECLARE_string(node_id); DECLARE_int32(driver_id); DECLARE_string(driver_ids); DECLARE_string(table_writer_output_dir); -DECLARE_double(hiveConnectorExecutorHwMultiplier); DECLARE_int32(shuffle_serialization_format); namespace facebook::velox::tool::trace {