Skip to content

Commit

Permalink
Introduce optimize_for_low_memory query config (#6898)
Browse files Browse the repository at this point in the history
Summary:

Some Velox optimizations cache vectors for possible reuse later to reduce runtime overhead.
Expression evaluator also has a code path evalWithMemo that caches the base vector of
dictionary input to avoid unnecessary computation later. These caches are cleared when
Tasks are destroyed. An internal streaming use case, however, observed large memory usage
by these caches when the streaming pipeline takes large nested-complex-typed input vectors,
has a large number of operators, and runs for very long time without Task destruction.

This diff introduces an optimize_for_low_memory query config to trade performance for memory.
When this flag is set to true, optimizations VectorPool and Expr::evalWithMemo are disabled.

Reviewed By: bikramSingh91

Differential Revision: D49922027
  • Loading branch information
kagamiori authored and facebook-github-bot committed Oct 5, 2023
1 parent 0de536f commit e3fc961
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 117 deletions.
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ class QueryConfig {
static constexpr const char* kValidateOutputFromOperators =
"debug.validate_output_from_operators";

/// If true, trade performance for memory. Optimizations including VectorPool
/// and Expr::evalWithMemo are disabled.
static constexpr const char* kOptimizeForLowMemory =
"optimize_for_low_memory";

uint64_t maxPartialAggregationMemoryUsage() const {
static constexpr uint64_t kDefault = 1L << 24;
return get<uint64_t>(kMaxPartialAggregationMemory, kDefault);
Expand Down Expand Up @@ -567,6 +572,10 @@ class QueryConfig {
return get<bool>(kValidateOutputFromOperators, false);
}

bool optimizeForLowMemory() const {
return get<bool>(kOptimizeForLowMemory, false);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
36 changes: 29 additions & 7 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,13 @@ class QueryCtx {
class ExecCtx {
public:
ExecCtx(memory::MemoryPool* pool, QueryCtx* queryCtx)
: pool_(pool), queryCtx_(queryCtx), vectorPool_{pool} {}
: pool_(pool),
queryCtx_(queryCtx),
optimizeForLowMemory_(
queryCtx && queryCtx->queryConfig().optimizeForLowMemory()) {
vectorPool_ =
optimizeForLowMemory_ ? nullptr : std::make_unique<VectorPool>(pool);
}

velox::memory::MemoryPool* pool() const {
return pool_;
Expand Down Expand Up @@ -200,38 +206,54 @@ class ExecCtx {
decodedVectorPool_.push_back(std::move(vector));
}

VectorPool& vectorPool() {
return vectorPool_;
VectorPool* vectorPool() {
return vectorPool_.get();
}

/// Gets a possibly recycled vector of 'type and 'size'. Allocates from
/// 'pool_' if no pre-allocated vector.
VectorPtr getVector(const TypePtr& type, vector_size_t size) {
return vectorPool_.get(type, size);
if (vectorPool_) {
return vectorPool_->get(type, size);
} else {
return BaseVector::create(type, size, pool_);
}
}

/// Moves 'vector' to the pool if it is reusable, else leaves it in
/// place. Returns true if the vector was moved into the pool.
bool releaseVector(VectorPtr& vector) {
return vectorPool_.release(vector);
if (vectorPool_) {
return vectorPool_->release(vector);
}
return false;
}

/// Moves elements of 'vectors' to the pool if reusable, else leaves them
/// in place. Returns number of vectors that were moved into the pool.
size_t releaseVectors(std::vector<VectorPtr>& vectors) {
return vectorPool_.release(vectors);
if (vectorPool_) {
return vectorPool_->release(vectors);
}
return 0;
}

bool optimizeForLowMemory() const {
return optimizeForLowMemory_;
}

private:
// Pool for all Buffers for this thread.
memory::MemoryPool* pool_;
QueryCtx* queryCtx_;

bool optimizeForLowMemory_;
// A pool of preallocated DecodedVectors for use by expressions and operators.
std::vector<std::unique_ptr<DecodedVector>> decodedVectorPool_;
// A pool of preallocated SelectivityVectors for use by expressions
// and operators.
std::vector<std::unique_ptr<SelectivityVector>> selectivityVectorPool_;
VectorPool vectorPool_;
std::unique_ptr<VectorPool> vectorPool_;
};

} // namespace facebook::velox::core
27 changes: 27 additions & 0 deletions velox/core/tests/QueryConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/core/QueryCtx.h"
#include "velox/expression/EvalCtx.h"

namespace facebook::velox::core::test {

Expand Down Expand Up @@ -134,4 +135,30 @@ TEST(TestQueryConfig, taskWriterCountConfig) {
}
}

TEST(TestQueryConfig, optimizeForLowMemoryConfig) {
std::shared_ptr<memory::MemoryPool> rootPool_{
memory::defaultMemoryManager().addRootPool()};
std::shared_ptr<memory::MemoryPool> pool_{rootPool_->addLeafChild("leaf")};

auto testConfig = [&](bool optimizeForLowMemory) {
std::unordered_map<std::string, std::string> configData(
{{core::QueryConfig::kOptimizeForLowMemory,
optimizeForLowMemory ? "true" : "false"}});
auto queryCtx =
std::make_shared<core::QueryCtx>(nullptr, std::move(configData));
const core::QueryConfig& config = queryCtx->queryConfig();
ASSERT_EQ(config.optimizeForLowMemory(), optimizeForLowMemory);

auto execCtx = std::make_shared<core::ExecCtx>(pool_.get(), queryCtx.get());
ASSERT_EQ(execCtx->optimizeForLowMemory(), optimizeForLowMemory);
ASSERT_EQ(execCtx->vectorPool() == nullptr, optimizeForLowMemory);

auto evalCtx = std::make_shared<exec::EvalCtx>(execCtx.get());
ASSERT_EQ(evalCtx->optimizeForLowMemory(), optimizeForLowMemory);
};

testConfig(true);
testConfig(false);
}

} // namespace facebook::velox::core::test
5 changes: 5 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ Generic Configuration
- If set to true, then during execution of tasks, the output vectors of every operator are validated for consistency.
This is an expensive check so should only be used for debugging. It can help debug issues where malformed vector
cause failures or crashes by helping identify which operator is generating them.
* - optimize_for_low_memory
- bool
- false
- Whether to trade performance for memory. If set to true, optimizations including VectorPool and evalWithMemo in
FilterProject are disabled.

.. _expression-evaluation-conf:

Expand Down
4 changes: 4 additions & 0 deletions velox/expression/EvalCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ EvalCtx::EvalCtx(core::ExecCtx* execCtx, ExprSet* exprSet, const RowVector* row)
VELOX_CHECK_NOT_NULL(exprSet);
VELOX_CHECK_NOT_NULL(row);

optimizeForLowMemory_ = execCtx->optimizeForLowMemory();

inputFlatNoNulls_ = true;
for (const auto& child : row->children()) {
VELOX_CHECK_NOT_NULL(child);
Expand All @@ -49,6 +51,8 @@ EvalCtx::EvalCtx(core::ExecCtx* execCtx, ExprSet* exprSet, const RowVector* row)
EvalCtx::EvalCtx(core::ExecCtx* execCtx)
: execCtx_(execCtx), exprSet_(nullptr), row_(nullptr) {
VELOX_CHECK_NOT_NULL(execCtx);

optimizeForLowMemory_ = execCtx->optimizeForLowMemory();
}

void EvalCtx::saveAndReset(
Expand Down
10 changes: 8 additions & 2 deletions velox/expression/EvalCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class EvalCtx {
const TypePtr& type,
VectorPtr& result);

VectorPool& vectorPool() const {
VectorPool* vectorPool() const {
return execCtx_->vectorPool();
}

Expand All @@ -309,7 +309,7 @@ class EvalCtx {
const TypePtr& type,
VectorPtr& result) {
BaseVector::ensureWritable(
rows, type, execCtx_->pool(), result, &execCtx_->vectorPool());
rows, type, execCtx_->pool(), result, execCtx_->vectorPool());
}

/// Make sure the vector is addressable up to index `size`-1. Initialize all
Expand All @@ -319,6 +319,10 @@ class EvalCtx {
return peeledEncoding_.get();
}

bool optimizeForLowMemory() const {
return optimizeForLowMemory_;
}

private:
core::ExecCtx* const FOLLY_NONNULL execCtx_;
ExprSet* FOLLY_NULLABLE const exprSet_;
Expand All @@ -342,6 +346,8 @@ class EvalCtx {
// True if the current set of rows will not grow, e.g. not under and IF or OR.
bool isFinalSelection_{true};

bool optimizeForLowMemory_;

// If isFinalSelection_ is false, the set of rows for the upper-most IF or
// OR. Used to determine the set of rows for loading lazy vectors.
const SelectivityVector* FOLLY_NULLABLE finalSelection_;
Expand Down
9 changes: 6 additions & 3 deletions velox/expression/Expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -999,9 +999,12 @@ Expr::PeelEncodingsResult Expr::peelEncodings(
}

// If the expression depends on one dictionary, results are cacheable.
bool mayCache = distinctFields_.size() == 1 &&
VectorEncoding::isDictionary(context.wrapEncoding()) &&
!peeledVectors[0]->memoDisabled();
bool mayCache = false;
if (!context.optimizeForLowMemory()) {
mayCache = distinctFields_.size() == 1 &&
VectorEncoding::isDictionary(context.wrapEncoding()) &&
!peeledVectors[0]->memoDisabled();
}

common::testutil::TestValue::adjust(
"facebook::velox::exec::Expr::peelEncodings::mayCache", &mayCache);
Expand Down
Loading

0 comments on commit e3fc961

Please sign in to comment.