diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7186d8c9705ea4..95a583b9b01490 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -126,6 +126,8 @@ DEFINE_mString(process_full_gc_size, "20%"); // If false, cancel query when the memory used exceeds exec_mem_limit, same as before. DEFINE_mBool(enable_query_memory_overcommit, "true"); +DEFINE_mBool(disable_memory_gc, "false"); + // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 6230e4caf2250e..656d955c4bb8b3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -106,8 +106,7 @@ DECLARE_Int32(brpc_num_threads); // If no ip match this rule, will choose one randomly. DECLARE_String(priority_networks); -// memory mode -// performance or compact +// performance moderate or or compact, only tcmalloc compile DECLARE_String(memory_mode); // process memory limit specified as number of bytes @@ -166,6 +165,10 @@ DECLARE_mString(process_full_gc_size); // If false, cancel query when the memory used exceeds exec_mem_limit, same as before. DECLARE_mBool(enable_query_memory_overcommit); +// gc will release cache, cancel task, and task will wait for gc to release memory, +// default gc strategy is conservative, if you want to exclude the interference of gc, let it be true +DECLARE_mBool(disable_memory_gc); + // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. DECLARE_mInt32(thread_wait_gc_max_milliseconds); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index cfb002a4c45c17..cfff81088c0095 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -225,10 +225,12 @@ void Daemon::memory_gc_thread() { int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms; int32_t memory_minor_gc_sleep_time_ms = 0; int32_t memory_full_gc_sleep_time_ms = 0; + int32_t memory_gc_sleep_time_ms = config::memory_gc_sleep_time_ms; while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(interval_milliseconds)) && !k_doris_exit) { - if (!MemInfo::initialized() || !ExecEnv::GetInstance()->initialized()) { + if (config::disable_memory_gc || !MemInfo::initialized() || + !ExecEnv::GetInstance()->initialized()) { continue; } auto sys_mem_available = doris::MemInfo::sys_mem_available(); @@ -243,9 +245,11 @@ void Daemon::memory_gc_thread() { (sys_mem_available < doris::MemInfo::sys_mem_available_low_water_mark() || proc_mem_no_allocator_cache >= doris::MemInfo::mem_limit())) { // No longer full gc and minor gc during sleep. - memory_full_gc_sleep_time_ms = config::memory_gc_sleep_time_ms; - memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_ms; - doris::MemTrackerLimiter::print_log_process_usage("Start Full GC", false); + memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms; + memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms; + LOG(INFO) << fmt::format("Start Full GC, {}.", + MemTrackerLimiter::process_limit_exceeded_errmsg_str()); + doris::MemTrackerLimiter::print_log_process_usage(); if (doris::MemInfo::process_full_gc()) { // If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc. doris::MemTrackerLimiter::enable_print_log_process_usage(); @@ -254,8 +258,10 @@ void Daemon::memory_gc_thread() { (sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() || proc_mem_no_allocator_cache >= doris::MemInfo::soft_mem_limit())) { // No minor gc during sleep, but full gc is possible. - memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_ms; - doris::MemTrackerLimiter::print_log_process_usage("Start Minor GC", false); + memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms; + LOG(INFO) << fmt::format("Start Minor GC, {}.", + MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str()); + doris::MemTrackerLimiter::print_log_process_usage(); if (doris::MemInfo::process_minor_gc()) { doris::MemTrackerLimiter::enable_print_log_process_usage(); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 683971ecac4765..e6c09d51c1e932 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -35,7 +35,6 @@ #include "util/mem_info.h" #include "util/perf_counters.h" #include "util/pretty_printer.h" -#include "util/stack_util.h" namespace doris { @@ -45,7 +44,6 @@ namespace doris { static std::vector mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM); std::atomic MemTrackerLimiter::_enable_print_log_process_usage {true}; -bool MemTrackerLimiter::_oom_avoidance {true}; MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit, RuntimeProfile* profile, @@ -207,7 +205,6 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { _enable_print_log_usage = false; std::string detail = msg; detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); - detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); detail += "\nMemory Tracker Summary: " + log_usage(); std::string child_trackers_usage; std::vector snapshots; @@ -221,10 +218,9 @@ void MemTrackerLimiter::print_log_usage(const std::string& msg) { } } -std::string MemTrackerLimiter::log_process_usage_str(const std::string& msg, bool with_stacktrace) { - std::string detail = msg; +std::string MemTrackerLimiter::log_process_usage_str() { + std::string detail; detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); - if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); std::vector snapshots; MemTrackerLimiter::make_process_snapshots(&snapshots); MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); @@ -246,18 +242,15 @@ std::string MemTrackerLimiter::log_process_usage_str(const std::string& msg, boo return detail; } -void MemTrackerLimiter::print_log_process_usage(const std::string& msg, bool with_stacktrace) { +void MemTrackerLimiter::print_log_process_usage() { // The default interval between two prints is 100ms (config::memory_maintenance_sleep_time_ms). if (MemTrackerLimiter::_enable_print_log_process_usage) { MemTrackerLimiter::_enable_print_log_process_usage = false; - LOG(WARNING) << log_process_usage_str(msg, with_stacktrace); + LOG(WARNING) << log_process_usage_str(); } } bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) { - if (!_oom_avoidance) { - return false; - } // Limit process memory usage using the actual physical memory of the process in `/proc/self/status`. // This is independent of the consumption value of the mem tracker, which counts the virtual memory // of the process malloc. @@ -268,8 +261,6 @@ bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) { // but it may not actually alloc physical memory, which is not expected in mem hook fail. if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() || MemInfo::sys_mem_available() < MemInfo::sys_mem_available_low_water_mark()) { - print_log_process_usage( - fmt::format("System Mem Exceed Limit Check Faild, Try Alloc: {}", bytes)); return true; } return false; @@ -297,6 +288,15 @@ std::string MemTrackerLimiter::process_limit_exceeded_errmsg_str() { PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)); } +std::string MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str() { + return fmt::format( + "process memory used {} exceed soft limit {} or sys mem available {} less than warning " + "water mark {}.", + PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(), + MemInfo::sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES)); +} + std::string MemTrackerLimiter::query_tracker_limit_exceeded_str( const std::string& tracker_limit_exceeded, const std::string& last_consumer_tracker, const std::string& executing_msg) { @@ -352,8 +352,9 @@ int64_t MemTrackerLimiter::free_top_memory_query( MemTrackerMinQueue min_pq; // After greater than min_free_mem, will not be modified. int64_t prepare_free_mem = 0; + std::vector canceling_task; - auto cancel_top_query = [&cancel_msg, type](auto& min_pq) -> int64_t { + auto cancel_top_query = [&cancel_msg, type](auto& min_pq, auto& canceling_task) -> int64_t { std::vector usage_strings; int64_t freed_mem = 0; while (!min_pq.empty()) { @@ -371,10 +372,10 @@ int64_t MemTrackerLimiter::free_top_memory_query( min_pq.top().first)); min_pq.pop(); } - if (!usage_strings.empty()) { - LOG(INFO) << "Process GC Free Top Memory Usage " << type_string(type) << ": " - << join(usage_strings, ","); - } + + LOG(INFO) << "Process GC Free Top Memory Usage " << type_string(type) << ": " + << join(usage_strings, ",") + << ". previous canceling task: " << join(canceling_task, ","); return freed_mem; }; @@ -383,12 +384,14 @@ int64_t MemTrackerLimiter::free_top_memory_query( for (auto tracker : tracker_groups[i].trackers) { if (tracker->type() == type) { if (tracker->is_query_cancelled()) { + canceling_task.push_back( + fmt::format("{}:{} Bytes", tracker->label(), tracker->consumption())); continue; } if (tracker->consumption() > min_free_mem) { MemTrackerMinQueue min_pq_single; min_pq_single.emplace(tracker->consumption(), tracker->label()); - return cancel_top_query(min_pq_single); + return cancel_top_query(min_pq_single, canceling_task); } else if (tracker->consumption() + prepare_free_mem < min_free_mem) { min_pq.emplace(tracker->consumption(), tracker->label()); prepare_free_mem += tracker->consumption(); @@ -403,7 +406,7 @@ int64_t MemTrackerLimiter::free_top_memory_query( } } } - return cancel_top_query(min_pq); + return cancel_top_query(min_pq, canceling_task); } int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, @@ -433,6 +436,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( const std::function& cancel_msg) { std::priority_queue> max_pq; std::unordered_map query_consumption; + std::vector canceling_task; for (unsigned i = 1; i < tracker_groups.size(); ++i) { std::lock_guard l(tracker_groups[i].group_lock); @@ -444,6 +448,8 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( continue; } if (tracker->is_query_cancelled()) { + canceling_task.push_back( + fmt::format("{}:{} Bytes", tracker->label(), tracker->consumption())); continue; } int64_t overcommit_ratio = @@ -480,10 +486,10 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( } max_pq.pop(); } - if (!usage_strings.empty()) { - LOG(INFO) << "Process GC Free Top Memory Overcommit " << type_string(type) << ": " - << join(usage_strings, ","); - } + + LOG(INFO) << "Process GC Free Top Memory Overcommit " << type_string(type) << ": " + << join(usage_strings, ",") + << ". previous canceling task: " << join(canceling_task, ","); return freed_mem; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index cb939518bc4bb1..6e3dd3d51efe26 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -142,8 +142,6 @@ class MemTrackerLimiter final : public MemTracker { void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } - static void disable_oom_avoidance() { _oom_avoidance = false; } - public: // If need to consume the tracker frequently, use it void cache_consume(int64_t bytes); @@ -168,9 +166,10 @@ class MemTrackerLimiter final : public MemTracker { static std::string type_detail_usage(const std::string& msg, Type type); void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } + // process memory changes more than 256M, or the GC ends static void enable_print_log_process_usage() { _enable_print_log_process_usage = true; } - static std::string log_process_usage_str(const std::string& msg, bool with_stacktrace = true); - static void print_log_process_usage(const std::string& msg, bool with_stacktrace = true); + static std::string log_process_usage_str(); + static void print_log_process_usage(); // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed. // vm_rss_str and mem_available_str recorded when gc is triggered, for log printing. @@ -221,6 +220,7 @@ class MemTrackerLimiter final : public MemTracker { static std::string process_mem_log_str(); static std::string process_limit_exceeded_errmsg_str(); + static std::string process_soft_limit_exceeded_errmsg_str(); // Log the memory usage when memory limit is exceeded. std::string query_tracker_limit_exceeded_str(const std::string& tracker_limit_exceeded, const std::string& last_consumer_tracker, @@ -264,7 +264,6 @@ class MemTrackerLimiter final : public MemTracker { // Avoid frequent printing. bool _enable_print_log_usage = false; static std::atomic _enable_print_log_process_usage; - static bool _oom_avoidance; // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. std::list::iterator _tracker_limiter_group_it; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 0cd42648ae632e..2a46e22a18a4da 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -99,6 +99,7 @@ class ThreadContext; class MemTracker; class RuntimeState; +extern bool k_doris_exit; extern bthread_key_t btls_key; // Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error, @@ -230,7 +231,7 @@ class SwitchBthreadLocal { // The brpc server should respond as quickly as possible. bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); + CHECK((0 == bthread_setspecific(btls_key, bthread_context)) || doris::k_doris_exit); thread_context_ptr.init = true; } bthread_id = bthread_self(); diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 246032ef9ac953..8f65f19c18fd27 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -345,10 +345,6 @@ int main(int argc, char** argv) { } #endif - if (doris::config::memory_mode == std::string("performance")) { - doris::MemTrackerLimiter::disable_oom_avoidance(); - } - std::vector paths; auto olap_res = doris::parse_conf_store_paths(doris::config::storage_root_path, &paths); if (!olap_res) { diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 200d346dede34a..97125d0942eeaa 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -392,16 +392,13 @@ void MemInfo::init() { // https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773 // - // available_low_water_mark = p1 - p2 - // p1: upper sys_mem_available_low_water_mark, avoid wasting too much memory. - // p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory, some cloud machines vm/min_free_kbytes is 5%, - // in order to avoid wasting too much memory, available_low_water_mark minus 1% at most. - int64_t p1 = std::min( - std::min(_s_physical_mem - _s_mem_limit, _s_physical_mem * 0.1), - config::max_sys_mem_available_low_water_mark_bytes); - int64_t p2 = std::max(_s_vm_min_free_kbytes - _s_physical_mem * 0.01, 0); - _s_sys_mem_available_low_water_mark = std::max(p1 - p2, 0); - _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1; + // upper sys_mem_available_low_water_mark, avoid wasting too much memory. + _s_sys_mem_available_low_water_mark = std::max( + std::min( + std::min(_s_physical_mem - _s_mem_limit, _s_physical_mem * 0.1), + config::max_sys_mem_available_low_water_mark_bytes), + 0); + _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark * 2; } // Expect vm overcommit memory value to be 1, system will no longer throw bad_alloc, memory alloc are always accepted, diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 366fc48ce29ee1..d15552824e7669 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -33,6 +33,7 @@ #include "runtime/thread_context.h" #include "util/defer_op.h" #include "util/mem_info.h" +#include "util/stack_util.h" #include "util/uid_util.h" template @@ -48,6 +49,9 @@ void Allocator::sys_memory_check(size_t size, doris::thread_context()->thread_mem_tracker()->label(), doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(), doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str()); + if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc) { // 1G + err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace(); + } // TODO, Save the query context in the thread context, instead of finding whether the query id is canceled in fragment_mgr. if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled( @@ -57,12 +61,15 @@ void Allocator::sys_memory_check(size_t } return; } - if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && + if (!doris::config::disable_memory_gc && + doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() && doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) { int64_t wait_milliseconds = 0; - LOG(INFO) << fmt::format("Query:{} waiting for enough memory, maximum {}ms, {}.", - print_id(doris::thread_context()->task_id()), - doris::config::thread_wait_gc_max_milliseconds, err_msg); + LOG(INFO) << fmt::format( + "Query:{} waiting for enough memory in thread id:{}, maximum {}ms, {}.", + print_id(doris::thread_context()->task_id()), + doris::thread_context()->get_thread_id(), + doris::config::thread_wait_gc_max_milliseconds, err_msg); while (wait_milliseconds < doris::config::thread_wait_gc_max_milliseconds) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { @@ -81,7 +88,7 @@ void Allocator::sys_memory_check(size_t if (wait_milliseconds >= doris::config::thread_wait_gc_max_milliseconds) { // Make sure to completely wait thread_wait_gc_max_milliseconds only once. doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); - doris::MemTrackerLimiter::print_log_process_usage(err_msg); + doris::MemTrackerLimiter::print_log_process_usage(); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. if (!doris::enable_thread_catch_bad_alloc) { LOG(INFO) << fmt::format( @@ -100,7 +107,7 @@ void Allocator::sys_memory_check(size_t // else, enough memory is available, the query continues execute. } else if (doris::enable_thread_catch_bad_alloc) { LOG(INFO) << fmt::format("throw exception, {}.", err_msg); - doris::MemTrackerLimiter::print_log_process_usage(err_msg); + doris::MemTrackerLimiter::print_log_process_usage(); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg); } } @@ -155,8 +162,15 @@ void Allocator::release_memory(size_t si template void Allocator::throw_bad_alloc( const std::string& err) const { - LOG(WARNING) << err; - doris::MemTrackerLimiter::print_log_process_usage(err); + LOG(WARNING) << err + << fmt::format( + " OS physical memory {}. Process memory usage {}, Sys available memory " + "{}, Stacktrace: {}", + doris::PrettyPrinter::print(doris::MemInfo::physical_mem(), + doris::TUnit::BYTES), + doris::PerfCounters::get_vm_rss_str(), + doris::MemInfo::sys_mem_available_str(), doris::get_stack_trace()); + doris::MemTrackerLimiter::print_log_process_usage(); throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); }