Skip to content

Commit

Permalink
[improve](memory) more grace logging for memory exceed limit (apache#…
Browse files Browse the repository at this point in the history
…21311)

more grace logging for Allocator and MemTracker when memory exceed limit
fix bthread grace exit.
  • Loading branch information
xinyiZzz authored Jul 5, 2023
1 parent f9bc433 commit 38c8657
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 60 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ DEFINE_mString(process_full_gc_size, "20%");
// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
DEFINE_mBool(enable_query_memory_overcommit, "true");

DEFINE_mBool(disable_memory_gc, "false");

// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");

Expand Down
7 changes: 5 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ DECLARE_Int32(brpc_num_threads);
// If no ip match this rule, will choose one randomly.
DECLARE_String(priority_networks);

// memory mode
// performance or compact
// performance moderate or or compact, only tcmalloc compile
DECLARE_String(memory_mode);

// process memory limit specified as number of bytes
Expand Down Expand Up @@ -166,6 +165,10 @@ DECLARE_mString(process_full_gc_size);
// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
DECLARE_mBool(enable_query_memory_overcommit);

// gc will release cache, cancel task, and task will wait for gc to release memory,
// default gc strategy is conservative, if you want to exclude the interference of gc, let it be true
DECLARE_mBool(disable_memory_gc);

// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
DECLARE_mInt32(thread_wait_gc_max_milliseconds);

Expand Down
18 changes: 12 additions & 6 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,12 @@ void Daemon::memory_gc_thread() {
int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
int32_t memory_minor_gc_sleep_time_ms = 0;
int32_t memory_full_gc_sleep_time_ms = 0;
int32_t memory_gc_sleep_time_ms = config::memory_gc_sleep_time_ms;
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(interval_milliseconds)) &&
!k_doris_exit) {
if (!MemInfo::initialized() || !ExecEnv::GetInstance()->initialized()) {
if (config::disable_memory_gc || !MemInfo::initialized() ||
!ExecEnv::GetInstance()->initialized()) {
continue;
}
auto sys_mem_available = doris::MemInfo::sys_mem_available();
Expand All @@ -243,9 +245,11 @@ void Daemon::memory_gc_thread() {
(sys_mem_available < doris::MemInfo::sys_mem_available_low_water_mark() ||
proc_mem_no_allocator_cache >= doris::MemInfo::mem_limit())) {
// No longer full gc and minor gc during sleep.
memory_full_gc_sleep_time_ms = config::memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_ms;
doris::MemTrackerLimiter::print_log_process_usage("Start Full GC", false);
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("Start Full GC, {}.",
MemTrackerLimiter::process_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_full_gc()) {
// If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
Expand All @@ -254,8 +258,10 @@ void Daemon::memory_gc_thread() {
(sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() ||
proc_mem_no_allocator_cache >= doris::MemInfo::soft_mem_limit())) {
// No minor gc during sleep, but full gc is possible.
memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_ms;
doris::MemTrackerLimiter::print_log_process_usage("Start Minor GC", false);
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("Start Minor GC, {}.",
MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_minor_gc()) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
Expand Down
54 changes: 30 additions & 24 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "util/mem_info.h"
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
#include "util/stack_util.h"

namespace doris {

Expand All @@ -45,7 +44,6 @@ namespace doris {
static std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM);

std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true};
bool MemTrackerLimiter::_oom_avoidance {true};

MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit,
RuntimeProfile* profile,
Expand Down Expand Up @@ -207,7 +205,6 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) {
_enable_print_log_usage = false;
std::string detail = msg;
detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str();
detail += "\nAlloc Stacktrace:\n" + get_stack_trace();
detail += "\nMemory Tracker Summary: " + log_usage();
std::string child_trackers_usage;
std::vector<MemTracker::Snapshot> snapshots;
Expand All @@ -221,10 +218,9 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) {
}
}

std::string MemTrackerLimiter::log_process_usage_str(const std::string& msg, bool with_stacktrace) {
std::string detail = msg;
std::string MemTrackerLimiter::log_process_usage_str() {
std::string detail;
detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str();
if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace();
std::vector<MemTracker::Snapshot> snapshots;
MemTrackerLimiter::make_process_snapshots(&snapshots);
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL);
Expand All @@ -246,18 +242,15 @@ std::string MemTrackerLimiter::log_process_usage_str(const std::string& msg, boo
return detail;
}

void MemTrackerLimiter::print_log_process_usage(const std::string& msg, bool with_stacktrace) {
void MemTrackerLimiter::print_log_process_usage() {
// The default interval between two prints is 100ms (config::memory_maintenance_sleep_time_ms).
if (MemTrackerLimiter::_enable_print_log_process_usage) {
MemTrackerLimiter::_enable_print_log_process_usage = false;
LOG(WARNING) << log_process_usage_str(msg, with_stacktrace);
LOG(WARNING) << log_process_usage_str();
}
}

bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) {
if (!_oom_avoidance) {
return false;
}
// Limit process memory usage using the actual physical memory of the process in `/proc/self/status`.
// This is independent of the consumption value of the mem tracker, which counts the virtual memory
// of the process malloc.
Expand All @@ -268,8 +261,6 @@ bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) {
// but it may not actually alloc physical memory, which is not expected in mem hook fail.
if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() ||
MemInfo::sys_mem_available() < MemInfo::sys_mem_available_low_water_mark()) {
print_log_process_usage(
fmt::format("System Mem Exceed Limit Check Faild, Try Alloc: {}", bytes));
return true;
}
return false;
Expand Down Expand Up @@ -297,6 +288,15 @@ std::string MemTrackerLimiter::process_limit_exceeded_errmsg_str() {
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
}

std::string MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str() {
return fmt::format(
"process memory used {} exceed soft limit {} or sys mem available {} less than warning "
"water mark {}.",
PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(),
MemInfo::sys_mem_available_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES));
}

std::string MemTrackerLimiter::query_tracker_limit_exceeded_str(
const std::string& tracker_limit_exceeded, const std::string& last_consumer_tracker,
const std::string& executing_msg) {
Expand Down Expand Up @@ -352,8 +352,9 @@ int64_t MemTrackerLimiter::free_top_memory_query(
MemTrackerMinQueue min_pq;
// After greater than min_free_mem, will not be modified.
int64_t prepare_free_mem = 0;
std::vector<std::string> canceling_task;

auto cancel_top_query = [&cancel_msg, type](auto& min_pq) -> int64_t {
auto cancel_top_query = [&cancel_msg, type](auto& min_pq, auto& canceling_task) -> int64_t {
std::vector<std::string> usage_strings;
int64_t freed_mem = 0;
while (!min_pq.empty()) {
Expand All @@ -371,10 +372,10 @@ int64_t MemTrackerLimiter::free_top_memory_query(
min_pq.top().first));
min_pq.pop();
}
if (!usage_strings.empty()) {
LOG(INFO) << "Process GC Free Top Memory Usage " << type_string(type) << ": "
<< join(usage_strings, ",");
}

LOG(INFO) << "Process GC Free Top Memory Usage " << type_string(type) << ": "
<< join(usage_strings, ",")
<< ". previous canceling task: " << join(canceling_task, ",");
return freed_mem;
};

Expand All @@ -383,12 +384,14 @@ int64_t MemTrackerLimiter::free_top_memory_query(
for (auto tracker : tracker_groups[i].trackers) {
if (tracker->type() == type) {
if (tracker->is_query_cancelled()) {
canceling_task.push_back(
fmt::format("{}:{} Bytes", tracker->label(), tracker->consumption()));
continue;
}
if (tracker->consumption() > min_free_mem) {
MemTrackerMinQueue min_pq_single;
min_pq_single.emplace(tracker->consumption(), tracker->label());
return cancel_top_query(min_pq_single);
return cancel_top_query(min_pq_single, canceling_task);
} else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
min_pq.emplace(tracker->consumption(), tracker->label());
prepare_free_mem += tracker->consumption();
Expand All @@ -403,7 +406,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(
}
}
}
return cancel_top_query(min_pq);
return cancel_top_query(min_pq, canceling_task);
}

int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
Expand Down Expand Up @@ -433,6 +436,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
const std::function<std::string(int64_t, const std::string&)>& cancel_msg) {
std::priority_queue<std::pair<int64_t, std::string>> max_pq;
std::unordered_map<std::string, int64_t> query_consumption;
std::vector<std::string> canceling_task;

for (unsigned i = 1; i < tracker_groups.size(); ++i) {
std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
Expand All @@ -444,6 +448,8 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
continue;
}
if (tracker->is_query_cancelled()) {
canceling_task.push_back(
fmt::format("{}:{} Bytes", tracker->label(), tracker->consumption()));
continue;
}
int64_t overcommit_ratio =
Expand Down Expand Up @@ -480,10 +486,10 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
}
max_pq.pop();
}
if (!usage_strings.empty()) {
LOG(INFO) << "Process GC Free Top Memory Overcommit " << type_string(type) << ": "
<< join(usage_strings, ",");
}

LOG(INFO) << "Process GC Free Top Memory Overcommit " << type_string(type) << ": "
<< join(usage_strings, ",")
<< ". previous canceling task: " << join(canceling_task, ",");
return freed_mem;
}

Expand Down
9 changes: 4 additions & 5 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ class MemTrackerLimiter final : public MemTracker {

void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); }

static void disable_oom_avoidance() { _oom_avoidance = false; }

public:
// If need to consume the tracker frequently, use it
void cache_consume(int64_t bytes);
Expand All @@ -168,9 +166,10 @@ class MemTrackerLimiter final : public MemTracker {
static std::string type_detail_usage(const std::string& msg, Type type);
void print_log_usage(const std::string& msg);
void enable_print_log_usage() { _enable_print_log_usage = true; }
// process memory changes more than 256M, or the GC ends
static void enable_print_log_process_usage() { _enable_print_log_process_usage = true; }
static std::string log_process_usage_str(const std::string& msg, bool with_stacktrace = true);
static void print_log_process_usage(const std::string& msg, bool with_stacktrace = true);
static std::string log_process_usage_str();
static void print_log_process_usage();

// Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed.
// vm_rss_str and mem_available_str recorded when gc is triggered, for log printing.
Expand Down Expand Up @@ -221,6 +220,7 @@ class MemTrackerLimiter final : public MemTracker {

static std::string process_mem_log_str();
static std::string process_limit_exceeded_errmsg_str();
static std::string process_soft_limit_exceeded_errmsg_str();
// Log the memory usage when memory limit is exceeded.
std::string query_tracker_limit_exceeded_str(const std::string& tracker_limit_exceeded,
const std::string& last_consumer_tracker,
Expand Down Expand Up @@ -264,7 +264,6 @@ class MemTrackerLimiter final : public MemTracker {
// Avoid frequent printing.
bool _enable_print_log_usage = false;
static std::atomic<bool> _enable_print_log_process_usage;
static bool _oom_avoidance;

// Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove.
std::list<MemTrackerLimiter*>::iterator _tracker_limiter_group_it;
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class ThreadContext;
class MemTracker;
class RuntimeState;

extern bool k_doris_exit;
extern bthread_key_t btls_key;

// Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error,
Expand Down Expand Up @@ -230,7 +231,7 @@ class SwitchBthreadLocal {
// The brpc server should respond as quickly as possible.
bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
// set the data so that next time bthread_getspecific in the thread returns the data.
CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
CHECK((0 == bthread_setspecific(btls_key, bthread_context)) || doris::k_doris_exit);
thread_context_ptr.init = true;
}
bthread_id = bthread_self();
Expand Down
4 changes: 0 additions & 4 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,6 @@ int main(int argc, char** argv) {
}
#endif

if (doris::config::memory_mode == std::string("performance")) {
doris::MemTrackerLimiter::disable_oom_avoidance();
}

std::vector<doris::StorePath> paths;
auto olap_res = doris::parse_conf_store_paths(doris::config::storage_root_path, &paths);
if (!olap_res) {
Expand Down
17 changes: 7 additions & 10 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,16 +392,13 @@ void MemInfo::init() {
// https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached
// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773
//
// available_low_water_mark = p1 - p2
// p1: upper sys_mem_available_low_water_mark, avoid wasting too much memory.
// p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory, some cloud machines vm/min_free_kbytes is 5%,
// in order to avoid wasting too much memory, available_low_water_mark minus 1% at most.
int64_t p1 = std::min<int64_t>(
std::min<int64_t>(_s_physical_mem - _s_mem_limit, _s_physical_mem * 0.1),
config::max_sys_mem_available_low_water_mark_bytes);
int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem * 0.01, 0);
_s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0);
_s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1;
// upper sys_mem_available_low_water_mark, avoid wasting too much memory.
_s_sys_mem_available_low_water_mark = std::max<int64_t>(
std::min<int64_t>(
std::min<int64_t>(_s_physical_mem - _s_mem_limit, _s_physical_mem * 0.1),
config::max_sys_mem_available_low_water_mark_bytes),
0);
_s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark * 2;
}

// Expect vm overcommit memory value to be 1, system will no longer throw bad_alloc, memory alloc are always accepted,
Expand Down
Loading

0 comments on commit 38c8657

Please sign in to comment.