Skip to content

Commit

Permalink
feat: Add auto table scan scaling based on memory usage (facebookincu…
Browse files Browse the repository at this point in the history
…bator#11879)

Summary:
Pull Request resolved: facebookincubator#11879

Adds auto table scan scaling support to solve the query OOM caused by high concurrent memory intensive
table scan operations. Instead of running all the table scan threads at the start of the query, we start from running
one single table scan thread and gradually scheduling more table scan thread when there is sufficient free available
memory capacity for the query (measured as the current used memory versus the query max capacity). When the
query is approaching its max limit, we stop scheduling more table scan threads to prevent OOM caused by table scan.

The scale decision happens when a table scan operator finishes process a non-empty split. A scale
controller is added to each table scan plan node for this coordinated control and two memory ratios are defined for
scale up decision, and it estimate the per-scan driver memory usage based on the memory usage report when
a table scan thread finishes a non-empty split. The Meta internal test shows that a query failed with OOM right after 1 min
execution with 10 leaf threads, and finished in 2 hrs if reduced leaf thread count to 5. Java took 1 hour to finish with
persistent shuffle (LBM). With auto scale, the query finish on Prestissimo in 30 mins. We don't expect this
feature to be enabled by default as adhoc small query needs to run all the scan threads in parallel at the start. This will
only be used for some large pipeline that could be enabled by query config in velox and session properties in Prestissimo

Differential Revision: D67114511
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 17, 2024
1 parent 9b64b94 commit af24860
Show file tree
Hide file tree
Showing 17 changed files with 995 additions and 31 deletions.
26 changes: 26 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,24 @@ class QueryConfig {
static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold =
"scaled_writer_min_processed_bytes_rebalance_threshold";

/// If true, enables the scaled table scan processing. For each table scan
/// plan node, a scan controller is used to control the number of running scan
/// threads based on the query memory usage. It keeps increasing the number of
/// running threads until the query memory usage exceeds the threshold defined
/// by 'table_scan_scale_up_memory_usage_ratio'.
static constexpr const char* kTableScanScaledProcessingEnabled =
"table_scan_scaled_processing_enabled";

/// The query memory usage ratio used by scan controller to decide if it can
/// increase the number of running scan threads. When the query memory usage
/// is below this ratio, the scan controller keeps increasing the running scan
/// thread for scale up, and stop once exceeds this ratio. The value is in the
/// range of [0, 1].
///
/// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true.
static constexpr const char* kTableScanScaleUpMemoryUsageRatio =
"table_scan_scale_up_memory_usage_ratio";

bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, false);
}
Expand Down Expand Up @@ -880,6 +898,14 @@ class QueryConfig {
kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20);
}

bool tableScanScaledProcessingEnabled() const {
return get<bool>(kTableScanScaledProcessingEnabled, false);
}

double tableScanScaleUpMemoryUsageRatio() const {
return get<double>(kTableScanScaleUpMemoryUsageRatio, 0.7);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
37 changes: 27 additions & 10 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,23 @@ Table Scan
- integer
- 2
- Maximum number of splits to preload per driver. Set to 0 to disable preloading.
* - table_scan_scaled_processing_enabled
- bool
- false
- If true, enables the scaled table scan processing. For each table scan
plan node, a scan controller is used to control the number of running scan
threads based on the query memory usage. It keeps increasing the number of
running threads until the query memory usage exceeds the threshold defined
by 'table_scan_scale_up_memory_usage_ratio'.
* - table_scan_scale_up_memory_usage_ratio
- double
- 0.5
- The query memory usage ratio used by scan controller to decide if it can
increase the number of running scan threads. When the query memory usage
is below this ratio, the scan controller scale up the scan processing by
increasing the number of running scan threads, and stop once exceeds this
ratio. The value is in the range of [0, 1]. This only applies if
'table_scan_scaled_processing_enabled' is true.

Table Writer
------------
Expand All @@ -414,26 +431,26 @@ Table Writer
- double
- 0.7
- The max ratio of a query used memory to its max capacity, and the scale
- writer exchange stops scaling writer processing if the query's current
- memory usage exceeds this ratio. The value is in the range of (0, 1].
writer exchange stops scaling writer processing if the query's current
memory usage exceeds this ratio. The value is in the range of (0, 1].
* - scaled_writer_max_partitions_per_writer
- integer
- 128
- The max number of logical table partitions that can be assigned to a
- single table writer thread. The logical table partition is used by local
- exchange writer for writer scaling, and multiple physical table
- partitions can be mapped to the same logical table partition based on the
- hash value of calculated partitioned ids.
single table writer thread. The logical table partition is used by local
exchange writer for writer scaling, and multiple physical table
partitions can be mapped to the same logical table partition based on the
hash value of calculated partitioned ids.
* - scaled_writer_min_partition_processed_bytes_rebalance_threshold
- integer
- 128MB
* - scaled_writer_min_partition_processed_bytes_rebalance_threshold
- Minimum amount of data processed by a logical table partition to trigger
- writer scaling if it is detected as overloaded by scale wrirer exchange.
writer scaling if it is detected as overloaded by scale wrirer exchange.
* - scaled_writer_min_processed_bytes_rebalance_threshold
- Minimum amount of data processed by all the logical table partitions to
- trigger skewed partition rebalancing by scale writer exchange.
- integer
- 256MB
- Minimum amount of data processed by all the logical table partitions to
trigger skewed partition rebalancing by scale writer exchange.

Hive Connector
--------------
Expand Down
15 changes: 15 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ These stats are reported only by HashBuild and HashAggregation operators.
- Time spent on building the hash table from rows collected by all the
hash build operators. This stat is only reported by the HashBuild operator.

TableScan
---------
These stats are reported only by TableScan operator

.. list-table::
:widths: 50 25 50
:header-rows: 1

* - Stats
- Unit
- Description
* - numRunningScanThreads
-
- The number of running table scan drivers.

TableWriter
-----------
These stats are reported only by TableWriter operator
Expand Down
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ velox_add_library(
RowsStreamingWindowBuild.cpp
RowContainer.cpp
RowNumber.cpp
ScaledScanController.cpp
ScaleWriterLocalPartition.cpp
SortBuffer.cpp
SortedAggregations.cpp
Expand Down
9 changes: 5 additions & 4 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,10 @@ StopReason Driver::runInternal(
nextOp,
curOperatorId_ + 1,
kOpMethodAddInput);

// The next iteration will see if operators_[i + 1] has
// output now that it got input.
i += 2;
});
// The next iteration will see if operators_[i + 1] has
// output now that it got input.
i += 2;
continue;
} else {
stop = task()->shouldStop();
Expand Down Expand Up @@ -1129,6 +1128,8 @@ std::string blockingReasonToString(BlockingReason reason) {
return "kYield";
case BlockingReason::kWaitForArbitration:
return "kWaitForArbitration";
case BlockingReason::kWaitForScanScaleUp:
return "kWaitForScanScaleUp";
default:
VELOX_UNREACHABLE(
fmt::format("Unknown blocking reason {}", static_cast<int>(reason)));
Expand Down
16 changes: 16 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ enum class BlockingReason {
/// Operator is blocked waiting for its associated query memory arbitration to
/// finish.
kWaitForArbitration,
/// For a table scan operator, it is blocked waiting for the scan controller
/// to increase the number of table scan processing threads to start
/// processing.
kWaitForScanScaleUp,
};

std::string blockingReasonToString(BlockingReason reason);
Expand Down Expand Up @@ -702,6 +706,18 @@ struct DriverFactory {
return false;
}

/// Returns true if the pipeline gets data from a table scan. The function
/// sets plan node id in 'planNodeId'.
bool needsTableScan(core::PlanNodeId& planNodeId) const {
VELOX_CHECK(!planNodes.empty());
if (auto scanNode = std::dynamic_pointer_cast<const core::TableScanNode>(
planNodes.front())) {
planNodeId = scanNode->id();
return true;
}
return false;
}

/// Returns plan node IDs for which Hash Join Bridges must be created based
/// on this pipeline.
std::vector<core::PlanNodeId> needsHashJoinBridges() const;
Expand Down
190 changes: 190 additions & 0 deletions velox/exec/ScaledScanController.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/ScaledScanController.h"

using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::exec {

ScaledScanController::ScaledScanController(
memory::MemoryPool* nodePool,
uint32_t numDrivers,
double scaleUpMemoryUsageRatio)
: queryPool_(nodePool->root()),
nodePool_(nodePool),
numDrivers_(numDrivers),
scaleUpMemoryUsageRatio_(scaleUpMemoryUsageRatio),
driverPromises_(numDrivers_) {
VELOX_CHECK_NOT_NULL(queryPool_);
VELOX_CHECK_NOT_NULL(nodePool_);
VELOX_CHECK_GT(numDrivers_, 0);
VELOX_CHECK_GE(scaleUpMemoryUsageRatio_, 0.0);
VELOX_CHECK_LE(scaleUpMemoryUsageRatio_, 1.0);
}

bool ScaledScanController::shouldStop(
uint32_t driverIdx,
facebook::velox::ContinueFuture* future) {
VELOX_CHECK_LT(driverIdx, numDrivers_);

std::lock_guard<std::mutex> l(lock_);
if (closed_) {
return false;
}
return shouldStopLocked(driverIdx, future);
}

bool ScaledScanController::shouldStopLocked(
uint32_t driverIdx,
facebook::velox::ContinueFuture* future) {
VELOX_CHECK(!closed_);
if (driverIdx < numRunningDrivers_) {
return false;
}

VELOX_CHECK(!driverPromises_[driverIdx].has_value());
auto [driverPromise, driverFuture] = makeVeloxContinuePromiseContract(
fmt::format("Table scan driver {} scale promise", driverIdx));
driverPromises_[driverIdx] = std::move(driverPromise);
*future = std::move(driverFuture);
return true;
}

void ScaledScanController::updateAndTryScale(
uint32_t driverIdx,
uint64_t memoryUsage) {
VELOX_CHECK_LT(driverIdx, numDrivers_);

std::optional<ContinuePromise> driverPromise;
SCOPE_EXIT {
if (driverPromise.has_value()) {
driverPromise->setValue();
}
};
{
std::lock_guard<std::mutex> l(lock_);
VELOX_CHECK_LT(driverIdx, numRunningDrivers_);

if (closed_) {
return;
}

updateDriverScanUsageLocked(driverIdx, memoryUsage);

tryScaleLocked(driverPromise);
}
}

void ScaledScanController::updateDriverScanUsageLocked(
uint32_t driverIdx,
uint64_t memoryUsage) {
if (estimatedDriverUsage_ == 0) {
estimatedDriverUsage_ = memoryUsage;
} else {
estimatedDriverUsage_ = (estimatedDriverUsage_ * 3 + memoryUsage) / 4;
}

if (numDriverUsageReports_ == numRunningDrivers_) {
return;
}
VELOX_CHECK_EQ(numDriverUsageReports_ + 1, numRunningDrivers_);

if (driverIdx + 1 < numRunningDrivers_) {
return;
}
VELOX_CHECK_EQ(driverIdx, numRunningDrivers_ - 1);
++numDriverUsageReports_;
}

void ScaledScanController::tryScaleLocked(
std::optional<ContinuePromise>& driverPromise) {
VELOX_CHECK_LE(numDriverUsageReports_, numRunningDrivers_);

if (numRunningDrivers_ == numDrivers_) {
return;
}
if (numDriverUsageReports_ < numRunningDrivers_) {
// We shall only make the next scale up decision until we have received
// the memory usage updates from all the running scan drivers.
return;
}

const uint64_t peakNodeUsage = nodePool_->peakBytes();
const uint64_t estimatedPeakNodeUsageAfterScale = std::max(
estimatedDriverUsage_ * (numRunningDrivers_ + 1),
peakNodeUsage + estimatedDriverUsage_);

const uint64_t currNodeUsage = nodePool_->reservedBytes();
const uint64_t currQueryUsage = queryPool_->reservedBytes();
const uint64_t currOtherUsage =
currQueryUsage > currNodeUsage ? currQueryUsage - currNodeUsage : 0;

const uint64_t estimatedQueryUsageAfterScale = std::max(
currQueryUsage + estimatedDriverUsage_,
currOtherUsage + estimatedPeakNodeUsageAfterScale);

const uint64_t maxQueryCapacity = queryPool_->maxCapacity();
if (estimatedQueryUsageAfterScale >
maxQueryCapacity * scaleUpMemoryUsageRatio_) {
return;
}

scaleUpLocked(driverPromise);
}

void ScaledScanController::scaleUpLocked(
std::optional<ContinuePromise>& driverPromise) {
VELOX_CHECK_LT(numRunningDrivers_, numDrivers_);

++numRunningDrivers_;
if (driverPromises_[numRunningDrivers_ - 1].has_value()) {
driverPromise = std::move(driverPromises_[numRunningDrivers_ - 1]);
driverPromises_[numRunningDrivers_ - 1].reset();
}
}

ScaledScanController::~ScaledScanController() {
close();
}

bool ScaledScanController::close() {
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(lock_);
if (closed_) {
return false;
}

promises.reserve(driverPromises_.size());
for (auto& promise : driverPromises_) {
if (promise.has_value()) {
promises.emplace_back(std::move(promise.value()));
promise.reset();
}
}
closed_ = true;
}

for (auto& promise : promises) {
promise.setValue();
}
return true;
}

std::string ScaledScanController::Stats::toString() const {
return fmt::format("numRunningDrivers: {}", numRunningDrivers);
}
} // namespace facebook::velox::exec
Loading

0 comments on commit af24860

Please sign in to comment.