Skip to content

Commit

Permalink
feat: Add auto table scan scaling based on memory usage
Browse files Browse the repository at this point in the history
Summary:
Adds auto table scan scaling support to solve the query OOM caused by high concurrent memory intensive
table scan operations. Instead of running all the table scan threads at the start of the query, we start from running
one single table scan thread and gradually scheduling more table scan thread when there is sufficient free available
memory capacity for the query (measured as the current used memory versus the query max capacity). When the
query is approaching its max limit, we stop scheduling more table scan threads and even stop current running scan
threads to free up memory to prevent OOM.

The scale up/down decision happens when a table scan operator finishes process a non-empty split. A scale
controller is added to each table scan plan node for this coordinated control and two memory ratios are defined for
scale up/down decision, and it estimate the per-scan driver memory usage based on the memory usage report when
a table scan thread finishes a non-empty split. The Meta internal test shows that a query failed with OOM right after 1 min
execution with 10 leaf threads, and finished in 2 hrs if reduced leaf thread count to 5. Java took 1 hour to finish with
persistent shuffle (LBM). With auto scale, the query finish on Prestissimo in 30 mins. We don't expect this
feature to be enabled by default as adhoc small query needs to run all the scan threads in parallel at the start. This will
only be used for some large pipeline that could be enabled by query config in velox and session properties in Prestissimo

Differential Revision: D67114511
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 16, 2024
1 parent 2d31862 commit 975bf22
Show file tree
Hide file tree
Showing 18 changed files with 984 additions and 54 deletions.
41 changes: 41 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,35 @@ class QueryConfig {
static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold =
"scaled_writer_min_processed_bytes_rebalance_threshold";

/// If true, enables the scaled table scan processing. For each table scan
/// plan node, a scan controller is used to dynamically adjusts the number of
/// running scan threads in response to the query memory usage change. It
/// increases the number of running threads when the query has sufficient free
/// available memory or reduces the number when the query has less available
/// memory.
static constexpr const char* kTableScanScaledProcessingEnabled =
"table_scan_scaled_processing_enabled";

/// The query memory usage ratio defined for the scaled scan processing. When
/// the query memory usage is below this ratio, the scan controller keeps
/// scaling up the scan processing. Once exceeds this ratio, it stops scaling
/// up processing. The value is in the range of [0, 1].
///
/// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true.
static constexpr const char* kTableScanScaleUpMemoryUsageRatio =
"table_scan_scale_up_memory_usage_ratio";

/// The query memory usage ratio defined to control scan scaling. When the
/// query memory usage is above this ratio, we start to scale down table scan
/// processing to free up memory. The value is in the range of (0, 1].
///
/// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true.
/// 'table_scan_down_scale_memory_usage_ratio' needs to be set to a value
/// higher than 'table_scan_up_scale_memory_usage_ratio' to avoid scan scaling
/// fluctuation around a certain query memory usage ratio.
static constexpr const char* kTableScanScaleDownMemoryUsageRatio =
"table_scan_scale_down_memory_usage_ratio";

bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, false);
}
Expand Down Expand Up @@ -880,6 +909,18 @@ class QueryConfig {
kScaleWriterMinProcessedBytesRebalanceThreshold, 256 << 20);
}

bool tableScanScaledProcessingEnabled() const {
return get<bool>(kTableScanScaledProcessingEnabled, true);
}

double tableScanScaleUpMemoryUsageRatio() const {
return get<double>(kTableScanScaleUpMemoryUsageRatio, 0.5);
}

double tableScanScaleDownMemoryUsageRatio() const {
return get<double>(kTableScanScaleDownMemoryUsageRatio, 0.7);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
47 changes: 37 additions & 10 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,33 @@ Table Scan
- integer
- 2
- Maximum number of splits to preload per driver. Set to 0 to disable preloading.
* - table_scan_scaled_processing_enabled
- bool
- false
- If true, enables the scaled table scan processing. For each table scan
plan node, a scan controller is used to dynamically adjusts the number of
running scan threads in response to the query memory usage change. It
increases the number of running threads when the query has sufficient
free available memory or reduces the number when the query has less
available memory.
* - table_scan_scale_up_memory_usage_ratio
- double
- 0.5
- The query memory usage ratio defined for the scaled scan processing. When
the query memory usage is below this ratio, the scan controller keeps
scaling up the scan processing. Once exceeds this ratio, it stops scaling
up processing. The value is in the range of [0, 1]. This only applies if
'table_scan_scaled_processing_enabled' is set to true.
* - table_scan_scale_down_memory_usage_ratio
- double
- 0.7
- The query memory usage ratio defined to control scan scaling. When the
query memory usage is above this ratio, we start to scale down table scan
processing to free up memory. The value is in the range of (0, 1]. This
only applies if 'table_scan_scaled_processing_enabled' is set to true.
'table_scan_down_scale_memory_usage_ratio' needs to be set to a value
higher than 'table_scan_up_scale_memory_usage_ratio' to avoid scan scaling
fluctuation around a certain query memory usage ratio.

Table Writer
------------
Expand All @@ -414,26 +441,26 @@ Table Writer
- double
- 0.7
- The max ratio of a query used memory to its max capacity, and the scale
- writer exchange stops scaling writer processing if the query's current
- memory usage exceeds this ratio. The value is in the range of (0, 1].
writer exchange stops scaling writer processing if the query's current
memory usage exceeds this ratio. The value is in the range of (0, 1].
* - scaled_writer_max_partitions_per_writer
- integer
- 128
- The max number of logical table partitions that can be assigned to a
- single table writer thread. The logical table partition is used by local
- exchange writer for writer scaling, and multiple physical table
- partitions can be mapped to the same logical table partition based on the
- hash value of calculated partitioned ids.
single table writer thread. The logical table partition is used by local
exchange writer for writer scaling, and multiple physical table
partitions can be mapped to the same logical table partition based on the
hash value of calculated partitioned ids.
* - scaled_writer_min_partition_processed_bytes_rebalance_threshold
- integer
- 128MB
* - scaled_writer_min_partition_processed_bytes_rebalance_threshold
- Minimum amount of data processed by a logical table partition to trigger
- writer scaling if it is detected as overloaded by scale wrirer exchange.
writer scaling if it is detected as overloaded by scale wrirer exchange.
* - scaled_writer_min_processed_bytes_rebalance_threshold
- Minimum amount of data processed by all the logical table partitions to
- trigger skewed partition rebalancing by scale writer exchange.
- integer
- 256MB
- Minimum amount of data processed by all the logical table partitions to
trigger skewed partition rebalancing by scale writer exchange.

Hive Connector
--------------
Expand Down
21 changes: 21 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@ These stats are reported only by HashBuild and HashAggregation operators.
- Time spent on building the hash table from rows collected by all the
hash build operators. This stat is only reported by the HashBuild operator.

TableScan
---------
These stats are reported only by TableScan operator

.. list-table::
:widths: 50 25 50
:header-rows: 1

* - Stats
- Unit
- Description
* - numScaleUp
-
- Number of times the scaled scan controller triggers scale up processing.
* - numScaleDown
-
- Number of times the scaled scan controller triggers scale down processing.
* - numRunningScaleThreads
-
- The number of running table scan drivers.

TableWriter
-----------
These stats are reported only by TableWriter operator
Expand Down
30 changes: 26 additions & 4 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,10 @@ StopReason Driver::runInternal(
nextOp,
curOperatorId_ + 1,
kOpMethodAddInput);

// The next iteration will see if operators_[i + 1] has
// output now that it got input.
i += 2;
});
// The next iteration will see if operators_[i + 1] has
// output now that it got input.
i += 2;
continue;
} else {
stop = task()->shouldStop();
Expand Down Expand Up @@ -934,6 +933,17 @@ std::unordered_set<column_index_t> Driver::canPushdownFilters(
return supportedChannels;
}

bool Driver::needsFlush(FlushReason reason) const {
// Starts checking from the upstream operators first.
for (auto i = 0; i < operators_.size(); ++i) {
auto op = operators_[i].get();
if (op->needsFlush(reason)) {
return true;
}
}
return false;
}

Operator* Driver::findOperator(std::string_view planNodeId) const {
for (auto& op : operators_) {
if (op->planNodeId() == planNodeId) {
Expand Down Expand Up @@ -1129,12 +1139,24 @@ std::string blockingReasonToString(BlockingReason reason) {
return "kYield";
case BlockingReason::kWaitForArbitration:
return "kWaitForArbitration";
case BlockingReason::kWaitForScanScaleUp:
return "kWaitForScanScaleUp";
default:
VELOX_UNREACHABLE(
fmt::format("Unknown blocking reason {}", static_cast<int>(reason)));
}
}

std::string flushReasonToString(FlushReason reason) {
switch (reason) {
case FlushReason::kScanScaleDown:
return "kScanScaleDown";
default:
VELOX_UNREACHABLE(
fmt::format("Unknown flush reason {}", static_cast<int>(reason)));
}
}

DriverThreadContext* driverThreadContext() {
return driverThreadCtx;
}
Expand Down
30 changes: 30 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ enum class BlockingReason {
/// Operator is blocked waiting for its associated query memory arbitration to
/// finish.
kWaitForArbitration,
/// For a table scan operator, it is blocked waiting for the scan controller
/// to increase the number of table scan processing threads to start or resume
/// processing. This only applies when scaled table scan processing is
/// enabled. The scan controller dynamically adjusts the number of running
/// scan threads in response to the query memory usage change. It increases
/// the number of running threads when the query has sufficient free available
/// memory or reduces the number when the query has less available memory.
kWaitForScanScaleUp,
};

std::string blockingReasonToString(BlockingReason reason);
Expand Down Expand Up @@ -258,6 +266,16 @@ class BlockingState {
static std::atomic_uint64_t numBlockedDrivers_;
};

/// Defines the reason for a driver to be flushed.
enum class FlushReason : uint8_t {
/// Triggers by a table scan operator to stop it processing in response to
/// scale down the scan processing. Correspondingly, all the operators in the
/// driver pipeline needs to flush out the buffered data to stop processing.
kScanScaleDown,
};

std::string flushReasonToString(FlushReason reason);

/// Special group id to reflect the ungrouped execution.
constexpr uint32_t kUngroupedGroupId{std::numeric_limits<uint32_t>::max()};

Expand Down Expand Up @@ -420,6 +438,9 @@ class Driver : public std::enable_shared_from_this<Driver> {
const Operator* filterSource,
const std::vector<column_index_t>& channels) const;

/// Returns true if any operator in the pipeline has pending data to flush.
bool needsFlush(FlushReason reason) const;

/// Returns the Operator with 'planNodeId' or nullptr if not found. For
/// example, hash join probe accesses the corresponding build by id.
Operator* findOperator(std::string_view planNodeId) const;
Expand Down Expand Up @@ -759,6 +780,15 @@ struct fmt::formatter<facebook::velox::exec::BlockingReason>
}
};

template <>
struct fmt::formatter<facebook::velox::exec::FlushReason>
: formatter<std::string> {
auto format(facebook::velox::exec::FlushReason f, format_context& ctx) const {
return formatter<std::string>::format(
facebook::velox::exec::flushReasonToString(f), ctx);
}
};

template <>
struct fmt::formatter<facebook::velox::exec::StopReason>
: formatter<std::string> {
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/FilterProject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ RowVectorPtr FilterProject::getOutput() {
rows->setAll();
EvalCtx evalCtx(operatorCtx_->execCtx(), exprs_.get(), input_.get());

// Pre-load lazy vectors which are referenced by both expressions and identity
// projections.
// Pre-load lazy vectors which are referenced by both expressions and
// identity projections.
for (auto fieldIdx : multiplyReferencedFieldIndices_) {
evalCtx.ensureFieldLoaded(fieldIdx, *rows);
}
Expand Down
31 changes: 25 additions & 6 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,23 @@ void HashAggregation::setupGroupingKeyChannelProjections(
}
}

bool HashAggregation::needsFlush(FlushReason reason) {
VELOX_CHECK_EQ(reason, FlushReason::kScanScaleDown);
if (needsFlush_) {
return true;
}

// We only support flush on a partial aggregation for now.
if (!isPartialOutput_) {
return false;
}
if (groupingSet_->numRows() == 0) {
return false;
}
needsFlush_ = true;
return true;
}

bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
VELOX_CHECK(isPartialOutput_ && !isGlobal_);
return numInputRows_ > abandonPartialAggregationMinRows_ &&
Expand Down Expand Up @@ -256,7 +273,7 @@ void HashAggregation::prepareOutput(vector_size_t size) {
}

void HashAggregation::resetPartialOutputIfNeed() {
if (!partialFull_) {
if (!partialFull_ && !needsFlush_) {
return;
}
VELOX_DCHECK(!isGlobal_);
Expand All @@ -272,9 +289,10 @@ void HashAggregation::resetPartialOutputIfNeed() {
}
groupingSet_->resetTable(/*freeTable=*/false);
partialFull_ = false;
if (!finished_) {
if (!finished_ && !needsFlush_) {
maybeIncreasePartialAggregationMemoryUsage(aggregationPct);
}
needsFlush_ = false;
numOutputRows_ = 0;
numInputRows_ = 0;
}
Expand Down Expand Up @@ -339,16 +357,19 @@ RowVectorPtr HashAggregation::getOutput() {
// Produce results if one of the following is true:
// - received no-more-input message;
// - partial aggregation reached memory limit;
// - received flush request;
// - distinct aggregation has new keys;
// - running in partial streaming mode and have some output ready.
if (!noMoreInput_ && !partialFull_ && !newDistincts_ &&
if (!noMoreInput_ && !partialFull_ && !needsFlush_ && !newDistincts_ &&
!groupingSet_->hasOutput()) {
input_ = nullptr;
return nullptr;
}

if (isDistinct_) {
return getDistinctOutput();
auto output = getDistinctOutput();
resetPartialOutputIfNeed();
return output;
}

const auto& queryConfig = operatorCtx_->driverCtx()->queryConfig();
Expand Down Expand Up @@ -393,8 +414,6 @@ RowVectorPtr HashAggregation::getDistinctOutput() {
// Drop reference to input_ to make it singly-referenced at the producer and
// allow for memory reuse.
input_ = nullptr;

resetPartialOutputIfNeed();
return output;
}
VELOX_CHECK(!newDistincts_);
Expand Down
17 changes: 10 additions & 7 deletions velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class HashAggregation : public Operator {

RowVectorPtr getOutput() override;

bool needsFlush(FlushReason reason) override;

bool needsInput() const override {
return !noMoreInput_ && !partialFull_;
}
Expand Down Expand Up @@ -105,22 +107,23 @@ class HashAggregation : public Operator {
// 'groupingSet_->estimateRowSize()' across all accumulated data set.
std::optional<int64_t> estimatedOutputRowSize_;

bool partialFull_ = false;
bool newDistincts_ = false;
bool finished_ = false;
bool partialFull_{false};
bool needsFlush_{false};
bool newDistincts_{false};
bool finished_{false};
// True if partial aggregation has been found to be non-reducing.
bool abandonedPartialAggregation_{false};

RowContainerIterator resultIterator_;
bool pushdownChecked_ = false;
bool mayPushdown_ = false;
bool pushdownChecked_{false};
bool mayPushdown_{false};

// Count the number of input rows. It is reset on partial aggregation output
// flush.
int64_t numInputRows_ = 0;
int64_t numInputRows_{0};
// Count the number of output rows. It is reset on partial aggregation output
// flush.
int64_t numOutputRows_ = 0;
int64_t numOutputRows_{0};

// Possibly reusable output vector.
RowVectorPtr output_;
Expand Down
Loading

0 comments on commit 975bf22

Please sign in to comment.