Skip to content

Commit

Permalink
Introduce enable_expression_evaluation_cache query config (#6898)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #6898

Some Velox optimizations cache vectors for possible reuse later to reduce runtime overhead.
Expression evaluator also has a code path evalWithMemo that caches the base vector of
dictionary input to avoid unnecessary computation later. These caches are cleared when
Tasks are destroyed. An internal streaming use case, however, observed large memory usage
by these caches when the streaming pipeline takes large nested-complex-typed input vectors,
has a large number of operators, and runs for very long time without Task destruction.

This diff introduces an enable_expression_evaluation_cache query config. When this flag is set to false,
optimizations including VectorPool, DecodedVector pool, SelectivityVector pool, and Expr::evalWithMemo are disabled.

Reviewed By: xiaoxmeng, bikramSingh91

Differential Revision: D49922027

fbshipit-source-id: dce4bf6f1a896c7b05a504dd60ce9c2480759434
  • Loading branch information
kagamiori authored and facebook-github-bot committed Oct 9, 2023
1 parent f558c2e commit 36f9621
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 129 deletions.
12 changes: 12 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ class QueryConfig {
static constexpr const char* kValidateOutputFromOperators =
"debug.validate_output_from_operators";

/// If true, enable caches in expression evaluation for performance, including
/// ExecCtx::vectorPool_, ExecCtx::decodedVectorPool_,
/// ExecCtx::selectivityVectorPool_, Expr::baseDictionary_,
/// Expr::dictionaryCache_, and Expr::cachedDictionaryIndices_. Otherwise,
/// disable the caches.
static constexpr const char* kEnableExpressionEvaluationCache =
"enable_expression_evaluation_cache";

uint64_t maxPartialAggregationMemoryUsage() const {
static constexpr uint64_t kDefault = 1L << 24;
return get<uint64_t>(kMaxPartialAggregationMemory, kDefault);
Expand Down Expand Up @@ -567,6 +575,10 @@ class QueryConfig {
return get<bool>(kValidateOutputFromOperators, false);
}

bool isExpressionEvaluationCacheEnabled() const {
return get<bool>(kEnableExpressionEvaluationCache, true);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
68 changes: 52 additions & 16 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,15 @@ class QueryCtx {
class ExecCtx {
public:
ExecCtx(memory::MemoryPool* pool, QueryCtx* queryCtx)
: pool_(pool), queryCtx_(queryCtx), vectorPool_{pool} {}
: pool_(pool),
queryCtx_(queryCtx),
isExpressionEvaluationCacheEnabled_(
!queryCtx ||
queryCtx->queryConfig().isExpressionEvaluationCacheEnabled()),
vectorPool_(
isExpressionEvaluationCacheEnabled_
? std::make_unique<VectorPool>(pool)
: nullptr) {}

velox::memory::MemoryPool* pool() const {
return pool_;
Expand All @@ -162,7 +170,8 @@ class ExecCtx {
/// Prefer using LocalSelectivityVector which takes care of returning the
/// vector to the pool on destruction.
std::unique_ptr<SelectivityVector> getSelectivityVector(int32_t size) {
if (selectivityVectorPool_.empty()) {
if (!isExpressionEvaluationCacheEnabled_ ||
selectivityVectorPool_.empty()) {
return std::make_unique<SelectivityVector>(size);
}
auto vector = std::move(selectivityVectorPool_.back());
Expand All @@ -175,63 +184,90 @@ class ExecCtx {
// content. The caller is responsible for setting the size and
// assigning the contents.
std::unique_ptr<SelectivityVector> getSelectivityVector() {
if (selectivityVectorPool_.empty()) {
if (!isExpressionEvaluationCacheEnabled_ ||
selectivityVectorPool_.empty()) {
return std::make_unique<SelectivityVector>();
}
auto vector = std::move(selectivityVectorPool_.back());
selectivityVectorPool_.pop_back();
return vector;
}

void releaseSelectivityVector(std::unique_ptr<SelectivityVector>&& vector) {
selectivityVectorPool_.push_back(std::move(vector));
// Returns true if the vector was moved into the pool.
bool releaseSelectivityVector(std::unique_ptr<SelectivityVector>&& vector) {
if (isExpressionEvaluationCacheEnabled_) {
selectivityVectorPool_.push_back(std::move(vector));
return true;
}
return false;
}

std::unique_ptr<DecodedVector> getDecodedVector() {
if (decodedVectorPool_.empty()) {
if (!isExpressionEvaluationCacheEnabled_ || decodedVectorPool_.empty()) {
return std::make_unique<DecodedVector>();
}
auto vector = std::move(decodedVectorPool_.back());
decodedVectorPool_.pop_back();
return vector;
}

void releaseDecodedVector(std::unique_ptr<DecodedVector>&& vector) {
decodedVectorPool_.push_back(std::move(vector));
// Returns true if the vector was moved into the pool.
bool releaseDecodedVector(std::unique_ptr<DecodedVector>&& vector) {
if (isExpressionEvaluationCacheEnabled_) {
decodedVectorPool_.push_back(std::move(vector));
return true;
}
return false;
}

VectorPool& vectorPool() {
return vectorPool_;
VectorPool* vectorPool() {
return vectorPool_.get();
}

/// Gets a possibly recycled vector of 'type and 'size'. Allocates from
/// 'pool_' if no pre-allocated vector.
VectorPtr getVector(const TypePtr& type, vector_size_t size) {
return vectorPool_.get(type, size);
if (vectorPool_) {
return vectorPool_->get(type, size);
} else {
return BaseVector::create(type, size, pool_);
}
}

/// Moves 'vector' to the pool if it is reusable, else leaves it in
/// place. Returns true if the vector was moved into the pool.
bool releaseVector(VectorPtr& vector) {
return vectorPool_.release(vector);
if (vectorPool_) {
return vectorPool_->release(vector);
}
return false;
}

/// Moves elements of 'vectors' to the pool if reusable, else leaves them
/// in place. Returns number of vectors that were moved into the pool.
size_t releaseVectors(std::vector<VectorPtr>& vectors) {
return vectorPool_.release(vectors);
if (vectorPool_) {
return vectorPool_->release(vectors);
}
return 0;
}

bool isExpressionEvaluationCacheEnabled() const {
return isExpressionEvaluationCacheEnabled_;
}

private:
// Pool for all Buffers for this thread.
memory::MemoryPool* pool_;
QueryCtx* queryCtx_;
memory::MemoryPool* const pool_;
QueryCtx* const queryCtx_;

const bool isExpressionEvaluationCacheEnabled_;
// A pool of preallocated DecodedVectors for use by expressions and operators.
std::vector<std::unique_ptr<DecodedVector>> decodedVectorPool_;
// A pool of preallocated SelectivityVectors for use by expressions
// and operators.
std::vector<std::unique_ptr<SelectivityVector>> selectivityVectorPool_;
VectorPool vectorPool_;
std::unique_ptr<VectorPool> vectorPool_;
};

} // namespace facebook::velox::core
46 changes: 46 additions & 0 deletions velox/core/tests/QueryConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/core/QueryCtx.h"
#include "velox/expression/EvalCtx.h"

namespace facebook::velox::core::test {

Expand Down Expand Up @@ -134,4 +135,49 @@ TEST(TestQueryConfig, taskWriterCountConfig) {
}
}

TEST(TestQueryConfig, enableExpressionEvaluationCacheConfig) {
std::shared_ptr<memory::MemoryPool> rootPool_{
memory::defaultMemoryManager().addRootPool()};
std::shared_ptr<memory::MemoryPool> pool_{rootPool_->addLeafChild("leaf")};

auto testConfig = [&](bool enableExpressionEvaluationCache) {
std::unordered_map<std::string, std::string> configData(
{{core::QueryConfig::kEnableExpressionEvaluationCache,
enableExpressionEvaluationCache ? "true" : "false"}});
auto queryCtx =
std::make_shared<core::QueryCtx>(nullptr, std::move(configData));
const core::QueryConfig& config = queryCtx->queryConfig();
ASSERT_EQ(
config.isExpressionEvaluationCacheEnabled(),
enableExpressionEvaluationCache);

auto execCtx = std::make_shared<core::ExecCtx>(pool_.get(), queryCtx.get());
ASSERT_EQ(
execCtx->isExpressionEvaluationCacheEnabled(),
enableExpressionEvaluationCache);
ASSERT_EQ(
execCtx->vectorPool() != nullptr, enableExpressionEvaluationCache);

auto evalCtx = std::make_shared<exec::EvalCtx>(execCtx.get());
ASSERT_EQ(evalCtx->isCacheEnabled(), enableExpressionEvaluationCache);

// Test ExecCtx::selectivityVectorPool_.
auto rows = execCtx->getSelectivityVector(100);
ASSERT_NE(rows, nullptr);
ASSERT_EQ(
execCtx->releaseSelectivityVector(std::move(rows)),
enableExpressionEvaluationCache);

// Test ExecCtx::decodedVectorPool_.
auto decoded = execCtx->getDecodedVector();
ASSERT_NE(decoded, nullptr);
ASSERT_EQ(
execCtx->releaseDecodedVector(std::move(decoded)),
enableExpressionEvaluationCache);
};

testConfig(true);
testConfig(false);
}

} // namespace facebook::velox::core::test
5 changes: 5 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ Generic Configuration
- If set to true, then during execution of tasks, the output vectors of every operator are validated for consistency.
This is an expensive check so should only be used for debugging. It can help debug issues where malformed vector
cause failures or crashes by helping identify which operator is generating them.
* - enable_expression_evaluation_cache
- bool
- true
- Whether to enable caches in expression evaluation. If set to true, optimizations including vector pools and
evalWithMemo are enabled.

.. _expression-evaluation-conf:

Expand Down
10 changes: 8 additions & 2 deletions velox/expression/EvalCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ ScopedContextSaver::~ScopedContextSaver() {
}

EvalCtx::EvalCtx(core::ExecCtx* execCtx, ExprSet* exprSet, const RowVector* row)
: execCtx_(execCtx), exprSet_(exprSet), row_(row) {
: execCtx_(execCtx),
exprSet_(exprSet),
row_(row),
isCacheEnabled_(execCtx->isExpressionEvaluationCacheEnabled()) {
// TODO Change the API to replace raw pointers with non-const references.
// Sanity check inputs to prevent crashes.
VELOX_CHECK_NOT_NULL(execCtx);
Expand All @@ -47,7 +50,10 @@ EvalCtx::EvalCtx(core::ExecCtx* execCtx, ExprSet* exprSet, const RowVector* row)
}

EvalCtx::EvalCtx(core::ExecCtx* execCtx)
: execCtx_(execCtx), exprSet_(nullptr), row_(nullptr) {
: execCtx_(execCtx),
exprSet_(nullptr),
row_(nullptr),
isCacheEnabled_(execCtx->isExpressionEvaluationCacheEnabled()) {
VELOX_CHECK_NOT_NULL(execCtx);
}

Expand Down
11 changes: 9 additions & 2 deletions velox/expression/EvalCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class EvalCtx {
const TypePtr& type,
VectorPtr& result);

VectorPool& vectorPool() const {
VectorPool* vectorPool() const {
return execCtx_->vectorPool();
}

Expand All @@ -309,7 +309,7 @@ class EvalCtx {
const TypePtr& type,
VectorPtr& result) {
BaseVector::ensureWritable(
rows, type, execCtx_->pool(), result, &execCtx_->vectorPool());
rows, type, execCtx_->pool(), result, execCtx_->vectorPool());
}

/// Make sure the vector is addressable up to index `size`-1. Initialize all
Expand All @@ -319,10 +319,17 @@ class EvalCtx {
return peeledEncoding_.get();
}

/// Return true if caching in expression evaluation is enabled, such as
/// Expr::evalWithMemo.
bool isCacheEnabled() const {
return isCacheEnabled_;
}

private:
core::ExecCtx* const FOLLY_NONNULL execCtx_;
ExprSet* FOLLY_NULLABLE const exprSet_;
const RowVector* FOLLY_NULLABLE row_;
const bool isCacheEnabled_;
bool inputFlatNoNulls_;

// Corresponds 1:1 to children of 'row_'. Set to an inner vector
Expand Down
9 changes: 6 additions & 3 deletions velox/expression/Expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -999,9 +999,12 @@ Expr::PeelEncodingsResult Expr::peelEncodings(
}

// If the expression depends on one dictionary, results are cacheable.
bool mayCache = distinctFields_.size() == 1 &&
VectorEncoding::isDictionary(context.wrapEncoding()) &&
!peeledVectors[0]->memoDisabled();
bool mayCache = false;
if (context.isCacheEnabled()) {
mayCache = distinctFields_.size() == 1 &&
VectorEncoding::isDictionary(context.wrapEncoding()) &&
!peeledVectors[0]->memoDisabled();
}

common::testutil::TestValue::adjust(
"facebook::velox::exec::Expr::peelEncodings::mayCache", &mayCache);
Expand Down
Loading

0 comments on commit 36f9621

Please sign in to comment.