Skip to content

Commit

Permalink
Add query memory per node session override (#6931)
Browse files Browse the repository at this point in the history
Summary:
Query memory per node is currently only configured through system config properties. Queries should be able to override the property by passing session properties. This PR allows this behavior.

Pull Request resolved: #6931

Reviewed By: xiaoxmeng

Differential Revision: D50052157

Pulled By: tanjialiang

fbshipit-source-id: cd57fc8ce1e54ee277d3d05c58e3a10a3f6a75ef
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Oct 10, 2023
1 parent 9ae3b23 commit 0b8b842
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 29 deletions.
2 changes: 1 addition & 1 deletion velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
: MemoryReclaimer::create());
auto queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(),
std::unordered_map<std::string, std::string>{},
core::QueryConfig({}),
configs,
cache::AsyncDataCache::getInstance(),
std::move(pool));
Expand Down
85 changes: 85 additions & 0 deletions velox/core/QueryConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,95 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <re2/re2.h>

#include "velox/core/QueryConfig.h"

namespace facebook::velox::core {

double toBytesPerCapacityUnit(CapacityUnit unit) {
switch (unit) {
case CapacityUnit::BYTE:
return 1;
case CapacityUnit::KILOBYTE:
return exp2(10);
case CapacityUnit::MEGABYTE:
return exp2(20);
case CapacityUnit::GIGABYTE:
return exp2(30);
case CapacityUnit::TERABYTE:
return exp2(40);
case CapacityUnit::PETABYTE:
return exp2(50);
default:
VELOX_USER_FAIL("Invalid capacity unit '{}'", (int)unit);
}
}

CapacityUnit valueOfCapacityUnit(const std::string& unitStr) {
if (unitStr == "B") {
return CapacityUnit::BYTE;
}
if (unitStr == "kB") {
return CapacityUnit::KILOBYTE;
}
if (unitStr == "MB") {
return CapacityUnit::MEGABYTE;
}
if (unitStr == "GB") {
return CapacityUnit::GIGABYTE;
}
if (unitStr == "TB") {
return CapacityUnit::TERABYTE;
}
if (unitStr == "PB") {
return CapacityUnit::PETABYTE;
}
VELOX_USER_FAIL("Invalid capacity unit '{}'", unitStr);
}

// Convert capacity string with unit to the capacity number in the specified
// units
uint64_t toCapacity(const std::string& from, CapacityUnit to) {
static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$)");
double value;
std::string unit;
if (!RE2::FullMatch(from, kPattern, &value, &unit)) {
VELOX_USER_FAIL("Invalid capacity string '{}'", from);
}

return value *
(toBytesPerCapacityUnit(valueOfCapacityUnit(unit)) /
toBytesPerCapacityUnit(to));
}

std::chrono::duration<double> toDuration(const std::string& str) {
static const RE2 kPattern(R"(^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*)");

double value;
std::string unit;
if (!RE2::FullMatch(str, kPattern, &value, &unit)) {
VELOX_USER_FAIL("Invalid duration {}", str);
}
if (unit == "ns") {
return std::chrono::duration<double, std::nano>(value);
} else if (unit == "us") {
return std::chrono::duration<double, std::micro>(value);
} else if (unit == "ms") {
return std::chrono::duration<double, std::milli>(value);
} else if (unit == "s") {
return std::chrono::duration<double>(value);
} else if (unit == "m") {
return std::chrono::duration<double, std::ratio<60>>(value);
} else if (unit == "h") {
return std::chrono::duration<double, std::ratio<60 * 60>>(value);
} else if (unit == "d") {
return std::chrono::duration<double, std::ratio<60 * 60 * 24>>(value);
}
VELOX_USER_FAIL("Invalid duration {}", str);
}

QueryConfig::QueryConfig(
const std::unordered_map<std::string, std::string>& values)
: config_{std::make_unique<MemConfig>(values)} {}
Expand Down
64 changes: 46 additions & 18 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@
#include "velox/core/Config.h"

namespace facebook::velox::core {
enum class CapacityUnit {
BYTE,
KILOBYTE,
MEGABYTE,
GIGABYTE,
TERABYTE,
PETABYTE
};

double toBytesPerCapacityUnit(CapacityUnit unit);

CapacityUnit valueOfCapacityUnit(const std::string& unitStr);

/// Convert capacity string with unit to the capacity number in the specified
/// units
uint64_t toCapacity(const std::string& from, CapacityUnit to);

std::chrono::duration<double> toDuration(const std::string& str);

/// A simple wrapper around velox::Config. Defines constants for query
/// config properties and accessor methods.
Expand All @@ -32,40 +50,45 @@ class QueryConfig {

static constexpr const char* kCodegenEnabled = "codegen.enabled";

/// Maximum memory that a query can use on a single host.
static constexpr const char* kQueryMaxMemoryPerNode =
"query_max_memory_per_node";

static constexpr const char* kCodegenConfigurationFilePath =
"codegen.configuration_file_path";

static constexpr const char* kCodegenLazyLoading = "codegen.lazy_loading";

// User provided session timezone. Stores a string with the actual timezone
// name, e.g: "America/Los_Angeles".
/// User provided session timezone. Stores a string with the actual timezone
/// name, e.g: "America/Los_Angeles".
static constexpr const char* kSessionTimezone = "session_timezone";

// If true, timezone-less timestamp conversions (e.g. string to timestamp,
// when the string does not specify a timezone) will be adjusted to the user
// provided session timezone (if any).
//
// For instance:
//
// if this option is true and user supplied "America/Los_Angeles",
// "1970-01-01" will be converted to -28800 instead of 0.
//
// False by default.
/// If true, timezone-less timestamp conversions (e.g. string to timestamp,
/// when the string does not specify a timezone) will be adjusted to the user
/// provided session timezone (if any).
///
/// For instance:
///
/// if this option is true and user supplied "America/Los_Angeles",
/// "1970-01-01" will be converted to -28800 instead of 0.
///
/// False by default.
static constexpr const char* kAdjustTimestampToTimezone =
"adjust_timestamp_to_session_timezone";

// Whether to use the simplified expression evaluation path. False by default.
/// Whether to use the simplified expression evaluation path. False by
/// default.
static constexpr const char* kExprEvalSimplified =
"expression.eval_simplified";

// Whether to track CPU usage for individual expressions (supported by call
// and cast expressions). False by default. Can be expensive when processing
// small batches, e.g. < 10K rows.
/// Whether to track CPU usage for individual expressions (supported by call
/// and cast expressions). False by default. Can be expensive when processing
/// small batches, e.g. < 10K rows.
static constexpr const char* kExprTrackCpuUsage =
"expression.track_cpu_usage";

// Whether to track CPU usage for stages of individual operators. True by
// default. Can be expensive when processing small batches, e.g. < 10K rows.
/// Whether to track CPU usage for stages of individual operators. True by
/// default. Can be expensive when processing small batches, e.g. < 10K rows.
static constexpr const char* kOperatorTrackCpuUsage =
"track_operator_cpu_usage";

Expand Down Expand Up @@ -285,6 +308,11 @@ class QueryConfig {
static constexpr const char* kEnableExpressionEvaluationCache =
"enable_expression_evaluation_cache";

uint64_t queryMaxMemoryPerNode() const {
return toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE);
}

uint64_t maxPartialAggregationMemoryUsage() const {
static constexpr uint64_t kDefault = 1L << 24;
return get<uint64_t>(kMaxPartialAggregationMemory, kDefault);
Expand Down
18 changes: 18 additions & 0 deletions velox/core/QueryCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ QueryCtx::QueryCtx(
initPool(queryId);
}

QueryCtx::QueryCtx(
folly::Executor* executor,
QueryConfig&& queryConfig,
std::unordered_map<std::string, std::shared_ptr<Config>> connectorConfigs,
cache::AsyncDataCache* cache,
std::shared_ptr<memory::MemoryPool> pool,
std::shared_ptr<folly::Executor> spillExecutor,
const std::string& queryId)
: queryId_(queryId),
connectorConfigs_(connectorConfigs),
cache_(cache),
pool_(std::move(pool)),
executor_(executor),
queryConfig_{std::move(queryConfig)},
spillExecutor_(std::move(spillExecutor)) {
initPool(queryId);
}

QueryCtx::QueryCtx(
folly::Executor::KeepAlive<> executorKeepalive,
std::unordered_map<std::string, std::string> queryConfigValues,
Expand Down
14 changes: 13 additions & 1 deletion velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,21 @@ class QueryCtx {
/// mode, executor is not needed. Hence, we don't require executor to always
/// be passed in here, but instead, ensure that executor exists when actually
/// being used.
// TODO(jtan6): Deprecate this constructor after external dependencies are
// migrated
QueryCtx(
folly::Executor* executor,
std::unordered_map<std::string, std::string> queryConfigValues,
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorConfigs = {},
cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(),
std::shared_ptr<memory::MemoryPool> pool = nullptr,
std::shared_ptr<folly::Executor> spillExecutor = nullptr,
const std::string& queryId = "");

QueryCtx(
folly::Executor* executor = nullptr,
std::unordered_map<std::string, std::string> queryConfigValues = {},
QueryConfig&& queryConfig = QueryConfig{{}},
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorConfigs = {},
cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(),
Expand Down
59 changes: 59 additions & 0 deletions velox/core/tests/QueryConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,63 @@ TEST(TestQueryConfig, enableExpressionEvaluationCacheConfig) {
testConfig(false);
}

TEST(TestQueryConfig, capacityConversion) {
folly::Random::DefaultGenerator rng;
rng.seed(1);

std::unordered_map<CapacityUnit, std::string> unitStrLookup{
{CapacityUnit::BYTE, "B"},
{CapacityUnit::KILOBYTE, "kB"},
{CapacityUnit::MEGABYTE, "MB"},
{CapacityUnit::GIGABYTE, "GB"},
{CapacityUnit::TERABYTE, "TB"},
{CapacityUnit::PETABYTE, "PB"}};

std::vector<std::pair<CapacityUnit, double>> units{
{CapacityUnit::BYTE, 1},
{CapacityUnit::KILOBYTE, 1024},
{CapacityUnit::MEGABYTE, 1024 * 1024},
{CapacityUnit::GIGABYTE, 1024 * 1024 * 1024},
{CapacityUnit::TERABYTE, 1024ll * 1024 * 1024 * 1024},
{CapacityUnit::PETABYTE, 1024ll * 1024 * 1024 * 1024 * 1024}};
for (int32_t i = 0; i < units.size(); i++) {
for (int32_t j = 0; j < units.size(); j++) {
// We use this diffRatio to prevent float conversion overflow when
// converting from one unit to another.
uint64_t diffRatio = i < j ? units[j].second / units[i].second
: units[i].second / units[j].second;
uint64_t randNumber = folly::Random::rand64(rng);
uint64_t testNumber = i > j ? randNumber / diffRatio : randNumber;
ASSERT_EQ(
toCapacity(
std::string(
std::to_string(testNumber) + unitStrLookup[units[i].first]),
units[j].first),
(uint64_t)(testNumber * (units[i].second / units[j].second)));
}
}
}

TEST(TestQueryConfig, durationConversion) {
folly::Random::DefaultGenerator rng;
rng.seed(1);

std::vector<std::pair<std::string, uint64_t>> units{
{"ns", 1},
{"us", 1000},
{"ms", 1000 * 1000},
{"s", 1000ll * 1000 * 1000},
{"m", 1000ll * 1000 * 1000 * 60},
{"h", 1000ll * 1000 * 1000 * 60 * 60},
{"d", 1000ll * 1000 * 1000 * 60 * 60 * 24}};
for (uint32_t i = 0; i < units.size(); i++) {
auto testNumber = folly::Random::rand32(rng) % 10000;
auto duration =
toDuration(std::string(std::to_string(testNumber) + units[i].first));
ASSERT_EQ(
testNumber * units[i].second,
std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count());
}
}

} // namespace facebook::velox::core::test
2 changes: 1 addition & 1 deletion velox/exec/benchmarks/ExchangeBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class ExchangeBenchmark : public VectorTestBase {
int64_t maxMemory = kMaxMemory) {
auto configCopy = configSettings_;
auto queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(), std::move(configCopy));
executor_.get(), core::QueryConfig(std::move(configCopy)));
queryCtx->testingOverrideMemoryPool(
memory::defaultMemoryManager().addRootPool(
queryCtx->queryId(), maxMemory));
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/DriverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ TEST_F(DriverTest, pause) {
// Make sure CPU usage tracking is enabled.
std::unordered_map<std::string, std::string> queryConfig{
{core::QueryConfig::kOperatorTrackCpuUsage, "true"}};
params.queryCtx =
std::make_shared<core::QueryCtx>(executor_.get(), std::move(queryConfig));
params.queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(), core::QueryConfig(std::move(queryConfig)));
int32_t numRead = 0;
readResults(params, ResultOperation::kPause, 370'000'000, &numRead);
// Each thread will fully read the 1M rows in values.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MultiFragmentTest : public HiveConnectorTestBase {
int64_t maxMemory = memory::kMaxMemory) {
auto configCopy = configSettings_;
auto queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(), std::move(configCopy));
executor_.get(), core::QueryConfig(std::move(configCopy)));
queryCtx->testingOverrideMemoryPool(
memory::defaultMemoryManager().addRootPool(
queryCtx->queryId(), maxMemory));
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/PartitionedOutputBufferManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class PartitionedOutputBufferManagerTest : public testing::Test {
std::to_string(maxPartitionedOutputBufferSize);
}
auto queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(), std::move(configSettings));
executor_.get(), core::QueryConfig(std::move(configSettings)));

auto task =
Task::create(taskId, std::move(planFragment), 0, std::move(queryCtx));
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/utils/AssertQueryBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ AssertQueryBuilder::readCursor() {
static std::atomic<uint64_t> cursorQueryId{0};
params_.queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(),
std::unordered_map<std::string, std::string>{},
core::QueryConfig({}),
std::unordered_map<std::string, std::shared_ptr<Config>>{},
cache::AsyncDataCache::getInstance(),
nullptr,
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/utils/Cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ TaskCursor::TaskCursor(const CursorParameters& params)
static std::atomic<uint64_t> cursorQueryId{0};
queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(),
std::unordered_map<std::string, std::string>{},
core::QueryConfig({}),
std::unordered_map<std::string, std::shared_ptr<Config>>{},
cache::AsyncDataCache::getInstance(),
nullptr,
Expand Down
4 changes: 2 additions & 2 deletions velox/expression/tests/ExprTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,8 @@ class ParameterizedExprTest : public ExprTest,
std::unordered_map<std::string, std::string> configData(
{{core::QueryConfig::kEnableExpressionEvaluationCache,
GetParam() ? "true" : "false"}});
queryCtx_ =
std::make_shared<core::QueryCtx>(nullptr, std::move(configData));
queryCtx_ = std::make_shared<core::QueryCtx>(
nullptr, core::QueryConfig(std::move(configData)));
execCtx_ = std::make_unique<core::ExecCtx>(pool_.get(), queryCtx_.get());
}
};
Expand Down

0 comments on commit 0b8b842

Please sign in to comment.