Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add auto table scan scaling based on memory usage #11879

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,24 @@ class QueryConfig {
static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold =
"scaled_writer_min_processed_bytes_rebalance_threshold";

/// If true, enables the scaled table scan processing. For each table scan
/// plan node, a scan controller is used to control the number of running scan
/// threads based on the query memory usage. It keeps increasing the number of
/// running threads until the query memory usage exceeds the threshold defined
/// by 'table_scan_scale_up_memory_usage_ratio'.
static constexpr const char* kTableScanScaledProcessingEnabled =
"table_scan_scaled_processing_enabled";

/// The query memory usage ratio used by scan controller to decide if it can
/// increase the number of running scan threads. When the query memory usage
/// is below this ratio, the scan controller keeps increasing the running scan
/// thread for scale up, and stop once exceeds this ratio. The value is in the
/// range of [0, 1].
///
/// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true.
static constexpr const char* kTableScanScaleUpMemoryUsageRatio =
"table_scan_scale_up_memory_usage_ratio";

bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, false);
}
Expand Down Expand Up @@ -880,6 +898,14 @@ class QueryConfig {
kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20);
}

bool tableScanScaledProcessingEnabled() const {
return get<bool>(kTableScanScaledProcessingEnabled, false);
}

double tableScanScaleUpMemoryUsageRatio() const {
return get<double>(kTableScanScaleUpMemoryUsageRatio, 0.7);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
37 changes: 27 additions & 10 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,23 @@ Table Scan
- integer
- 2
- Maximum number of splits to preload per driver. Set to 0 to disable preloading.
* - table_scan_scaled_processing_enabled
- bool
- false
- If true, enables the scaled table scan processing. For each table scan
plan node, a scan controller is used to control the number of running scan
threads based on the query memory usage. It keeps increasing the number of
running threads until the query memory usage exceeds the threshold defined
by 'table_scan_scale_up_memory_usage_ratio'.
* - table_scan_scale_up_memory_usage_ratio
- double
- 0.5
- The query memory usage ratio used by scan controller to decide if it can
increase the number of running scan threads. When the query memory usage
is below this ratio, the scan controller scale up the scan processing by
increasing the number of running scan threads, and stop once exceeds this
ratio. The value is in the range of [0, 1]. This only applies if
'table_scan_scaled_processing_enabled' is true.

Table Writer
------------
Expand All @@ -414,26 +431,26 @@ Table Writer
- double
- 0.7
- The max ratio of a query used memory to its max capacity, and the scale
- writer exchange stops scaling writer processing if the query's current
- memory usage exceeds this ratio. The value is in the range of (0, 1].
writer exchange stops scaling writer processing if the query's current
memory usage exceeds this ratio. The value is in the range of (0, 1].
* - scaled_writer_max_partitions_per_writer
- integer
- 128
- The max number of logical table partitions that can be assigned to a
- single table writer thread. The logical table partition is used by local
- exchange writer for writer scaling, and multiple physical table
- partitions can be mapped to the same logical table partition based on the
- hash value of calculated partitioned ids.
single table writer thread. The logical table partition is used by local
exchange writer for writer scaling, and multiple physical table
partitions can be mapped to the same logical table partition based on the
hash value of calculated partitioned ids.
* - scaled_writer_min_partition_processed_bytes_rebalance_threshold
- integer
- 128MB
* - scaled_writer_min_partition_processed_bytes_rebalance_threshold
- Minimum amount of data processed by a logical table partition to trigger
- writer scaling if it is detected as overloaded by scale wrirer exchange.
writer scaling if it is detected as overloaded by scale wrirer exchange.
* - scaled_writer_min_processed_bytes_rebalance_threshold
- Minimum amount of data processed by all the logical table partitions to
- trigger skewed partition rebalancing by scale writer exchange.
- integer
- 256MB
- Minimum amount of data processed by all the logical table partitions to
trigger skewed partition rebalancing by scale writer exchange.

Hive Connector
--------------
Expand Down
15 changes: 15 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ These stats are reported only by HashBuild and HashAggregation operators.
- Time spent on building the hash table from rows collected by all the
hash build operators. This stat is only reported by the HashBuild operator.

TableScan
---------
These stats are reported only by TableScan operator

.. list-table::
:widths: 50 25 50
:header-rows: 1

* - Stats
- Unit
- Description
* - numRunningScanThreads
-
- The number of running table scan drivers.

TableWriter
-----------
These stats are reported only by TableWriter operator
Expand Down
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ velox_add_library(
RowsStreamingWindowBuild.cpp
RowContainer.cpp
RowNumber.cpp
ScaledScanController.cpp
ScaleWriterLocalPartition.cpp
SortBuffer.cpp
SortedAggregations.cpp
Expand Down
9 changes: 5 additions & 4 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,10 @@ StopReason Driver::runInternal(
nextOp,
curOperatorId_ + 1,
kOpMethodAddInput);

// The next iteration will see if operators_[i + 1] has
// output now that it got input.
i += 2;
});
// The next iteration will see if operators_[i + 1] has
// output now that it got input.
i += 2;
continue;
} else {
stop = task()->shouldStop();
Expand Down Expand Up @@ -1129,6 +1128,8 @@ std::string blockingReasonToString(BlockingReason reason) {
return "kYield";
case BlockingReason::kWaitForArbitration:
return "kWaitForArbitration";
case BlockingReason::kWaitForScanScaleUp:
return "kWaitForScanScaleUp";
default:
VELOX_UNREACHABLE(
fmt::format("Unknown blocking reason {}", static_cast<int>(reason)));
Expand Down
16 changes: 16 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ enum class BlockingReason {
/// Operator is blocked waiting for its associated query memory arbitration to
/// finish.
kWaitForArbitration,
/// For a table scan operator, it is blocked waiting for the scan controller
/// to increase the number of table scan processing threads to start
/// processing.
kWaitForScanScaleUp,
};

std::string blockingReasonToString(BlockingReason reason);
Expand Down Expand Up @@ -702,6 +706,18 @@ struct DriverFactory {
return false;
}

/// Returns true if the pipeline gets data from a table scan. The function
/// sets plan node id in 'planNodeId'.
bool needsTableScan(core::PlanNodeId& planNodeId) const {
VELOX_CHECK(!planNodes.empty());
if (auto scanNode = std::dynamic_pointer_cast<const core::TableScanNode>(
planNodes.front())) {
planNodeId = scanNode->id();
return true;
}
return false;
}

/// Returns plan node IDs for which Hash Join Bridges must be created based
/// on this pipeline.
std::vector<core::PlanNodeId> needsHashJoinBridges() const;
Expand Down
190 changes: 190 additions & 0 deletions velox/exec/ScaledScanController.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/ScaledScanController.h"

using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::exec {

ScaledScanController::ScaledScanController(
memory::MemoryPool* nodePool,
uint32_t numDrivers,
double scaleUpMemoryUsageRatio)
: queryPool_(nodePool->root()),
nodePool_(nodePool),
numDrivers_(numDrivers),
scaleUpMemoryUsageRatio_(scaleUpMemoryUsageRatio),
driverPromises_(numDrivers_) {
VELOX_CHECK_NOT_NULL(queryPool_);
VELOX_CHECK_NOT_NULL(nodePool_);
VELOX_CHECK_GT(numDrivers_, 0);
VELOX_CHECK_GE(scaleUpMemoryUsageRatio_, 0.0);
VELOX_CHECK_LE(scaleUpMemoryUsageRatio_, 1.0);
}

bool ScaledScanController::shouldStop(
uint32_t driverIdx,
facebook::velox::ContinueFuture* future) {
VELOX_CHECK_LT(driverIdx, numDrivers_);

std::lock_guard<std::mutex> l(lock_);
if (closed_) {
return false;
}
return shouldStopLocked(driverIdx, future);
}

bool ScaledScanController::shouldStopLocked(
uint32_t driverIdx,
facebook::velox::ContinueFuture* future) {
VELOX_CHECK(!closed_);
if (driverIdx < numRunningDrivers_) {
return false;
}

VELOX_CHECK(!driverPromises_[driverIdx].has_value());
auto [driverPromise, driverFuture] = makeVeloxContinuePromiseContract(
fmt::format("Table scan driver {} scale promise", driverIdx));
driverPromises_[driverIdx] = std::move(driverPromise);
*future = std::move(driverFuture);
return true;
}

void ScaledScanController::updateAndTryScale(
uint32_t driverIdx,
uint64_t memoryUsage) {
VELOX_CHECK_LT(driverIdx, numDrivers_);

std::optional<ContinuePromise> driverPromise;
SCOPE_EXIT {
if (driverPromise.has_value()) {
driverPromise->setValue();
}
};
{
std::lock_guard<std::mutex> l(lock_);
VELOX_CHECK_LT(driverIdx, numRunningDrivers_);

if (closed_) {
return;
}

updateDriverScanUsageLocked(driverIdx, memoryUsage);

tryScaleLocked(driverPromise);
}
}

void ScaledScanController::updateDriverScanUsageLocked(
uint32_t driverIdx,
uint64_t memoryUsage) {
if (estimatedDriverUsage_ == 0) {
estimatedDriverUsage_ = memoryUsage;
} else {
estimatedDriverUsage_ = (estimatedDriverUsage_ * 3 + memoryUsage) / 4;
}

if (numDriverReportedUsage_ == numRunningDrivers_) {
return;
}
VELOX_CHECK_EQ(numDriverReportedUsage_ + 1, numRunningDrivers_);

if (driverIdx + 1 < numRunningDrivers_) {
return;
}
VELOX_CHECK_EQ(driverIdx, numRunningDrivers_ - 1);
++numDriverReportedUsage_;
}

void ScaledScanController::tryScaleLocked(
std::optional<ContinuePromise>& driverPromise) {
VELOX_CHECK_LE(numDriverReportedUsage_, numRunningDrivers_);

if (numRunningDrivers_ == numDrivers_) {
return;
}
if (numDriverReportedUsage_ < numRunningDrivers_) {
// We shall only make the next scale up decision until we have received
// the memory usage updates from all the running scan drivers.
return;
}

const uint64_t peakNodeUsage = nodePool_->peakBytes();
const uint64_t estimatedPeakNodeUsageAfterScale = std::max(
estimatedDriverUsage_ * (numRunningDrivers_ + 1),
peakNodeUsage + estimatedDriverUsage_);

const uint64_t currNodeUsage = nodePool_->reservedBytes();
const uint64_t currQueryUsage = queryPool_->reservedBytes();
const uint64_t currOtherUsage =
currQueryUsage > currNodeUsage ? currQueryUsage - currNodeUsage : 0;

const uint64_t estimatedQueryUsageAfterScale = std::max(
currQueryUsage + estimatedDriverUsage_,
currOtherUsage + estimatedPeakNodeUsageAfterScale);

const uint64_t maxQueryCapacity = queryPool_->maxCapacity();
if (estimatedQueryUsageAfterScale >
maxQueryCapacity * scaleUpMemoryUsageRatio_) {
return;
}

scaleUpLocked(driverPromise);
}

void ScaledScanController::scaleUpLocked(
std::optional<ContinuePromise>& driverPromise) {
VELOX_CHECK_LT(numRunningDrivers_, numDrivers_);

++numRunningDrivers_;
if (driverPromises_[numRunningDrivers_ - 1].has_value()) {
driverPromise = std::move(driverPromises_[numRunningDrivers_ - 1]);
driverPromises_[numRunningDrivers_ - 1].reset();
}
}

ScaledScanController::~ScaledScanController() {
close();
}

bool ScaledScanController::close() {
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(lock_);
if (closed_) {
return false;
}

promises.reserve(driverPromises_.size());
for (auto& promise : driverPromises_) {
if (promise.has_value()) {
promises.emplace_back(std::move(promise.value()));
promise.reset();
}
}
closed_ = true;
}

for (auto& promise : promises) {
promise.setValue();
}
return true;
}

std::string ScaledScanController::Stats::toString() const {
return fmt::format("numRunningDrivers: {}", numRunningDrivers);
}
} // namespace facebook::velox::exec
Loading
Loading