Skip to content

Commit

Permalink
Add query configs to turn off expression evaluation optimizations (fa…
Browse files Browse the repository at this point in the history
…cebookincubator#10902)

Summary:
Pull Request resolved: facebookincubator#10902

This change adds query configs to individually turn off expression
evaluation optimizations like dictionary peeling, dictionary
memoization, reusing shared subexpression results and deferring
lazy vector loading.
The goal is to streamline debugging in production and enable prompt
mitigation of bugs or regressions caused by the optimization or
surfaced due to it.

Note: When peeling is turned off, we still ensure that single arg
functions recieve a flat input

Reviewed By: mbasmanova

Differential Revision: D61943875
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Sep 9, 2024
1 parent cc10dc2 commit c052d6a
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 58 deletions.
37 changes: 37 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,43 @@ class QueryConfig {
/// Empty string if only want to trace the query metadata.
static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids";

/// Disable optimization in expression evaluation to peel common dictionary
/// layer from inputs.
static constexpr const char* kDebugDisableExpressionWithPeeling =
"debug_disable_expression_with_peeling";

/// Disable optimization in expression evaluation to re-use cached results for
/// common sub-expressions.
static constexpr const char* kDebugDisableCommonSubExpressions =
"debug_disable_common_sub_expressions";

/// Disable optimization in expression evaluation to re-use cached results
/// between subsequent input batches that are dictionary encoded and have the
/// same alphabet(underlying flat vector).
static constexpr const char* kDebugDisableExpressionWithMemoization =
"debug_disable_expression_with_memoization";

/// Disable optimization in expression evaluation to delay loading of lazy
/// inputs unless required.
static constexpr const char* kDebugDisableExpressionWithLazyInputs =
"debug_disable_expression_with_lazy_inputs";

bool debugDisableExpressionsWithPeeling() const {
return get<bool>(kDebugDisableExpressionWithPeeling, false);
}

bool debugDisableCommonSubExpressions() const {
return get<bool>(kDebugDisableCommonSubExpressions, false);
}

bool debugDisableExpressionsWithMemoization() const {
return get<bool>(kDebugDisableExpressionWithMemoization, false);
}

bool debugDisableExpressionsWithLazyInputs() const {
return get<bool>(kDebugDisableExpressionWithLazyInputs, false);
}

uint64_t queryMaxMemoryPerNode() const {
return config::toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"),
Expand Down
75 changes: 61 additions & 14 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,53 @@ class ExecCtx {
ExecCtx(memory::MemoryPool* pool, QueryCtx* queryCtx)
: pool_(pool),
queryCtx_(queryCtx),
exprEvalCacheEnabled_(
!queryCtx ||
queryCtx->queryConfig().isExpressionEvaluationCacheEnabled()),
optimizationParams_(queryCtx),
vectorPool_(
exprEvalCacheEnabled_ ? std::make_unique<VectorPool>(pool)
: nullptr) {}
optimizationParams_.exprEvalCacheEnabled
? std::make_unique<VectorPool>(pool)
: nullptr) {}

struct OptimizationParams {
explicit OptimizationParams(QueryCtx* queryCtx) {
const core::QueryConfig defaultQueryConfig = core::QueryConfig({});

const core::QueryConfig& queryConfig =
queryCtx ? queryCtx->queryConfig() : defaultQueryConfig;

exprEvalCacheEnabled = queryConfig.isExpressionEvaluationCacheEnabled();
dictionaryMemoizationEnabled =
!queryConfig.debugDisableExpressionsWithMemoization() &&
exprEvalCacheEnabled;
peelingEnabled = !queryConfig.debugDisableExpressionsWithPeeling();
sharedSubExpressionReuseEnabled =
!queryConfig.debugDisableCommonSubExpressions();
deferredLazyLoadingEnabled =
!queryConfig.debugDisableExpressionsWithLazyInputs();
maxSharedSubexprResultsCached =
queryConfig.maxSharedSubexprResultsCached();
}

/// True if caches in expression evaluation used for performance are
/// enabled, including VectorPool, DecodedVectorPool, SelectivityVectorPool
/// and dictionary memoization.
bool exprEvalCacheEnabled;
/// True if dictionary memoization optimization is enabled during experssion
/// evaluation, whichallows the reuse of results between consecutive input
/// batches if they are dictionary encoded and have the same
/// alphabet(undelying flat vector).
bool dictionaryMemoizationEnabled;
/// True if peeling is enabled during experssion evaluation.
bool peelingEnabled;
/// True if shared subexpression reuse is enabled during experssion
/// evaluation.
bool sharedSubExpressionReuseEnabled;
/// True if loading lazy inputs are deferred till they need to be
/// accessed during experssion evaluation.
bool deferredLazyLoadingEnabled;
/// The maximum number of distinct inputs to cache results in a
/// given shared subexpression during experssion evaluation.
uint32_t maxSharedSubexprResultsCached;
};

velox::memory::MemoryPool* pool() const {
return pool_;
Expand All @@ -251,7 +292,9 @@ class ExecCtx {
/// Prefer using LocalSelectivityVector which takes care of returning the
/// vector to the pool on destruction.
std::unique_ptr<SelectivityVector> getSelectivityVector(int32_t size) {
VELOX_CHECK(exprEvalCacheEnabled_ || selectivityVectorPool_.empty());
VELOX_CHECK(
optimizationParams_.exprEvalCacheEnabled ||
selectivityVectorPool_.empty());
if (selectivityVectorPool_.empty()) {
return std::make_unique<SelectivityVector>(size);
}
Expand All @@ -265,7 +308,9 @@ class ExecCtx {
// content. The caller is responsible for setting the size and
// assigning the contents.
std::unique_ptr<SelectivityVector> getSelectivityVector() {
VELOX_CHECK(exprEvalCacheEnabled_ || selectivityVectorPool_.empty());
VELOX_CHECK(
optimizationParams_.exprEvalCacheEnabled ||
selectivityVectorPool_.empty());
if (selectivityVectorPool_.empty()) {
return std::make_unique<SelectivityVector>();
}
Expand All @@ -276,15 +321,16 @@ class ExecCtx {

// Returns true if the vector was moved into the pool.
bool releaseSelectivityVector(std::unique_ptr<SelectivityVector>&& vector) {
if (exprEvalCacheEnabled_) {
if (optimizationParams_.exprEvalCacheEnabled) {
selectivityVectorPool_.push_back(std::move(vector));
return true;
}
return false;
}

std::unique_ptr<DecodedVector> getDecodedVector() {
VELOX_CHECK(exprEvalCacheEnabled_ || decodedVectorPool_.empty());
VELOX_CHECK(
optimizationParams_.exprEvalCacheEnabled || decodedVectorPool_.empty());
if (decodedVectorPool_.empty()) {
return std::make_unique<DecodedVector>();
}
Expand All @@ -295,7 +341,7 @@ class ExecCtx {

// Returns true if the vector was moved into the pool.
bool releaseDecodedVector(std::unique_ptr<DecodedVector>&& vector) {
if (exprEvalCacheEnabled_) {
if (optimizationParams_.exprEvalCacheEnabled) {
decodedVectorPool_.push_back(std::move(vector));
return true;
}
Expand Down Expand Up @@ -334,17 +380,18 @@ class ExecCtx {
return 0;
}

bool exprEvalCacheEnabled() const {
return exprEvalCacheEnabled_;
const OptimizationParams& optimizationParams() const {
return optimizationParams_;
}

private:
// Pool for all Buffers for this thread.
memory::MemoryPool* const pool_;
QueryCtx* const queryCtx_;

const bool exprEvalCacheEnabled_;
// A pool of preallocated DecodedVectors for use by expressions and operators.
const OptimizationParams optimizationParams_;
// A pool of preallocated DecodedVectors for use by expressions and
// operators.
std::vector<std::unique_ptr<DecodedVector>> decodedVectorPool_;
// A pool of preallocated SelectivityVectors for use by expressions
// and operators.
Expand Down
62 changes: 60 additions & 2 deletions velox/core/tests/QueryConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,16 @@ TEST_F(QueryConfigTest, enableExpressionEvaluationCacheConfig) {
enableExpressionEvaluationCache);

auto execCtx = std::make_shared<core::ExecCtx>(pool.get(), queryCtx.get());
ASSERT_EQ(execCtx->exprEvalCacheEnabled(), enableExpressionEvaluationCache);
ASSERT_EQ(
execCtx->optimizationParams().exprEvalCacheEnabled,
enableExpressionEvaluationCache);
ASSERT_EQ(
execCtx->vectorPool() != nullptr, enableExpressionEvaluationCache);

auto evalCtx = std::make_shared<exec::EvalCtx>(execCtx.get());
ASSERT_EQ(evalCtx->cacheEnabled(), enableExpressionEvaluationCache);
ASSERT_EQ(
evalCtx->dictionaryMemoizationEnabled(),
enableExpressionEvaluationCache);

// Test ExecCtx::selectivityVectorPool_.
auto rows = execCtx->getSelectivityVector(100);
Expand All @@ -144,4 +148,58 @@ TEST_F(QueryConfigTest, enableExpressionEvaluationCacheConfig) {
testConfig(false);
}

TEST_F(QueryConfigTest, expressionEvaluationRelatedConfigs) {
// Verify that the expression evaluation related configs are porpogated
// correctly to ExprCtx which is used during expression evaluation. Each
// config is individually set and verified.
std::shared_ptr<memory::MemoryPool> rootPool{
memory::memoryManager()->addRootPool()};
std::shared_ptr<memory::MemoryPool> pool{rootPool->addLeafChild("leaf")};

auto testConfig =
[&](std::unordered_map<std::string, std::string> configData) {
auto queryCtx =
core::QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
const auto& queryConfig = queryCtx->queryConfig();
auto execCtx =
std::make_shared<core::ExecCtx>(pool.get(), queryCtx.get());
auto evalCtx = std::make_shared<exec::EvalCtx>(execCtx.get());

ASSERT_EQ(
evalCtx->peelingEnabled(),
!queryConfig.debugDisableExpressionsWithPeeling());
ASSERT_EQ(
evalCtx->sharedSubExpressionReuseEnabled(),
!queryConfig.debugDisableCommonSubExpressions());
ASSERT_EQ(
evalCtx->dictionaryMemoizationEnabled(),
!queryConfig.debugDisableExpressionsWithMemoization());
ASSERT_EQ(
evalCtx->deferredLazyLoadingEnabled(),
!queryConfig.debugDisableExpressionsWithLazyInputs());
};

auto createConfig = [&](bool debugDisableExpressionsWithPeeling,
bool debugDisableCommonSubExpressions,
bool debugDisableExpressionsWithMemoization,
bool debugDisableExpressionsWithLazyInputs) -> auto {
std::unordered_map<std::string, std::string> configData(
{{core::QueryConfig::kDebugDisableExpressionWithPeeling,
std::to_string(debugDisableExpressionsWithPeeling)},
{core::QueryConfig::kDebugDisableCommonSubExpressions,
std::to_string(debugDisableCommonSubExpressions)},
{core::QueryConfig::kDebugDisableExpressionWithMemoization,
std::to_string(debugDisableExpressionsWithMemoization)},
{core::QueryConfig::kDebugDisableExpressionWithLazyInputs,
std::to_string(debugDisableExpressionsWithLazyInputs)}});
return configData;
};

testConfig({}); // Verify default config.
testConfig(createConfig(true, false, false, false));
testConfig(createConfig(false, true, false, false));
testConfig(createConfig(false, false, true, false));
testConfig(createConfig(false, false, false, true));
}

} // namespace facebook::velox::core::test
16 changes: 16 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ Expression Evaluation Configuration
- bool
- false
- This flag makes the Row conversion to by applied in a way that the casting row field are matched by name instead of position.
* - debug_disable_expression_with_peeling
- bool
- false
- Disable optimization in expression evaluation to peel common dictionary layer from inputs. Should only be used for debugging.
* - debug_disable_common_sub_expressions
- bool
- false
- Disable optimization in expression evaluation to re-use cached results for common sub-expressions. Should only be used for debugging.
* - debug_disable_expression_with_memoization
- bool
- false
- Disable optimization in expression evaluation to re-use cached results between subsequent input batches that are dictionary encoded and have the same alphabet(underlying flat vector). Should only be used for debugging.
* - debug_disable_expression_with_lazy_inputs
- bool
- false
- Disable optimization in expression evaluation to delay loading of lazy inputs unless required. Should only be used for debugging.

Memory Management
-----------------
Expand Down
22 changes: 2 additions & 20 deletions velox/expression/EvalCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,7 @@ using facebook::velox::common::testutil::TestValue;
namespace facebook::velox::exec {

EvalCtx::EvalCtx(core::ExecCtx* execCtx, ExprSet* exprSet, const RowVector* row)
: execCtx_(execCtx),
exprSet_(exprSet),
row_(row),
cacheEnabled_(execCtx->exprEvalCacheEnabled()),
maxSharedSubexprResultsCached_(
execCtx->queryCtx()
? execCtx->queryCtx()
->queryConfig()
.maxSharedSubexprResultsCached()
: core::QueryConfig({}).maxSharedSubexprResultsCached()) {
: execCtx_(execCtx), exprSet_(exprSet), row_(row) {
// TODO Change the API to replace raw pointers with non-const references.
// Sanity check inputs to prevent crashes.
VELOX_CHECK_NOT_NULL(execCtx);
Expand All @@ -53,16 +44,7 @@ EvalCtx::EvalCtx(core::ExecCtx* execCtx, ExprSet* exprSet, const RowVector* row)
}

EvalCtx::EvalCtx(core::ExecCtx* execCtx)
: execCtx_(execCtx),
exprSet_(nullptr),
row_(nullptr),
cacheEnabled_(execCtx->exprEvalCacheEnabled()),
maxSharedSubexprResultsCached_(
execCtx->queryCtx()
? execCtx->queryCtx()
->queryConfig()
.maxSharedSubexprResultsCached()
: core::QueryConfig({}).maxSharedSubexprResultsCached()) {
: execCtx_(execCtx), exprSet_(nullptr), row_(nullptr) {
VELOX_CHECK_NOT_NULL(execCtx);
}

Expand Down
29 changes: 22 additions & 7 deletions velox/expression/EvalCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,33 @@ class EvalCtx {
return peeledEncoding_.get();
}

/// Returns true if caching in expression evaluation is enabled, such as
/// Expr::evalWithMemo.
bool cacheEnabled() const {
return cacheEnabled_;
/// Returns true if dictionary memoization optimization is enabled, which
/// allows the reuse of results between consecutive input batches if they are
/// dictionary encoded and have the same alphabet(undelying flat vector).
bool dictionaryMemoizationEnabled() const {
return execCtx_->optimizationParams().dictionaryMemoizationEnabled;
}

/// Returns the maximum number of distinct inputs to cache results for in a
/// given shared subexpression.
uint32_t maxSharedSubexprResultsCached() const {
return maxSharedSubexprResultsCached_;
return execCtx_->optimizationParams().maxSharedSubexprResultsCached;
}

/// Returns true if peeling is enabled.
bool peelingEnabled() const {
return execCtx_->optimizationParams().peelingEnabled;
}

/// Returns true if shared subexpression reuse is enabled.
bool sharedSubExpressionReuseEnabled() const {
return execCtx_->optimizationParams().sharedSubExpressionReuseEnabled;
}

/// Returns true if loading lazy inputs are deferred till they need to be
/// accessed.
bool deferredLazyLoadingEnabled() const {
return execCtx_->optimizationParams().deferredLazyLoadingEnabled;
}

private:
Expand All @@ -550,8 +567,6 @@ class EvalCtx {
core::ExecCtx* const execCtx_;
ExprSet* const exprSet_;
const RowVector* row_;
const bool cacheEnabled_;
const uint32_t maxSharedSubexprResultsCached_;
bool inputFlatNoNulls_;

// Corresponds 1:1 to children of 'row_'. Set to an inner vector
Expand Down
Loading

0 comments on commit c052d6a

Please sign in to comment.