Skip to content

Commit

Permalink
[Feature](multi-catalog) Add memory tracker for orc reader/writer and…
Browse files Browse the repository at this point in the history
… arrow parquet writer。 (apache#37234)

## Proposed changes

[Feature] (multi-catalog) Add memory tracker for orc reader/writer and
arrow parquet writer。

## Future work

- Since the parquet reader is written by ourself and does not use the
arrow third-party library, some memory usage needs to be added to the
memory track.
- Added read and write operator-level memory tracker to the profile.
  • Loading branch information
kaka11chen authored Jul 30, 2024
1 parent a15b00f commit c5a1998
Show file tree
Hide file tree
Showing 14 changed files with 591 additions and 61 deletions.
13 changes: 13 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header
#include "util/threadpool.h"

namespace orc {
class MemoryPool;
}
namespace arrow {
class MemoryPool;
}

namespace doris {
namespace vectorized {
class VDataStreamMgr;
Expand Down Expand Up @@ -305,6 +312,9 @@ class ExecEnv {
segment_v2::TmpFileDirs* get_tmp_file_dirs() { return _tmp_file_dirs.get(); }
io::FDCache* file_cache_open_fd_cache() const { return _file_cache_open_fd_cache.get(); }

orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; }
arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; }

private:
ExecEnv();

Expand Down Expand Up @@ -435,6 +445,9 @@ class ExecEnv {
std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs;
doris::vectorized::SpillStreamManager* _spill_stream_mgr = nullptr;

orc::MemoryPool* _orc_memory_pool = nullptr;
arrow::MemoryPool* _arrow_memory_pool = nullptr;
};

template <>
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
#include "util/timezone_utils.h"
#include "vec/exec/format/orc/orc_memory_pool.h"
#include "vec/exec/format/parquet/arrow_memory_pool.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/sink/delta_writer_v2_pool.h"
Expand Down Expand Up @@ -573,6 +575,10 @@ Status ExecEnv::_init_mem_env() {
<< PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
<< ", origin config value: " << config::inverted_index_query_cache_limit;

// init orc memory pool
_orc_memory_pool = new doris::vectorized::ORCMemoryPool();
_arrow_memory_pool = new doris::vectorized::ArrowMemoryPool();

return Status::OK();
}

Expand Down Expand Up @@ -751,6 +757,9 @@ void ExecEnv::destroy() {
// We should free task scheduler finally because task queue / scheduler maybe used by pipelineX.
SAFE_DELETE(_without_group_task_scheduler);

SAFE_DELETE(_arrow_memory_pool);
SAFE_DELETE(_orc_memory_pool);

// dns cache is a global instance and need to be released at last
SAFE_DELETE(_dns_cache);

Expand Down
2 changes: 1 addition & 1 deletion be/src/util/faststring.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace doris {
// common use cases (in particular, resize() will fill with uninitialized data
// instead of memsetting to \0)
// only build() can transfer data to the outside.
class faststring : private Allocator<false, false, false> {
class faststring : private Allocator<false, false, false, DefaultMemoryAllocator> {
public:
enum { kInitialCapacity = 32 };

Expand Down
2 changes: 1 addition & 1 deletion be/src/util/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ struct SliceMap {
//
// only receive the memory allocated by Allocator and disables mmap,
// otherwise the memory may not be freed correctly, currently only be constructed by faststring.
class OwnedSlice : private Allocator<false, false, false> {
class OwnedSlice : private Allocator<false, false, false, DefaultMemoryAllocator> {
public:
OwnedSlice() : _slice((uint8_t*)nullptr, 0) {}

Expand Down
90 changes: 59 additions & 31 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@
#include "util/stack_util.h"
#include "util/uid_util.h"

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t size) const {
std::unordered_map<void*, size_t> RecordSizeMemoryAllocator::_allocated_sizes;
std::mutex RecordSizeMemoryAllocator::_mutex;

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_memory_check(
size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
Expand Down Expand Up @@ -155,8 +159,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
}
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_tracker_check(
size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
Expand Down Expand Up @@ -191,24 +196,27 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(siz
}
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_check(size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_check(
size_t size) const {
sys_memory_check(size);
memory_tracker_check(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::consume_memory(size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::consume_memory(
size_t size) const {
CONSUME_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::release_memory(size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::release_memory(
size_t size) const {
RELEASE_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::throw_bad_alloc(
const std::string& err) const {
LOG(WARNING) << err
<< fmt::format("{}, Stacktrace: {}",
Expand All @@ -219,9 +227,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(
}

#ifndef NDEBUG
template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::add_address_sanitizers(void* buf,
size_t size) const {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::add_address_sanitizers(
void* buf, size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
Expand All @@ -230,8 +238,8 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::add_address_sanitizers(v
doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate, use_mmap>::remove_address_sanitizers(
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::remove_address_sanitizers(
void* buf, size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
Expand All @@ -242,23 +250,43 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::remove_address_sanitizer
}
#endif

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size, size_t alignment) {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::alloc(size_t size,
size_t alignment) {
return alloc_impl(size, alignment);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void* Allocator<clear_memory_, mmap_populate, use_mmap>::realloc(void* buf, size_t old_size,
size_t new_size,
size_t alignment) {
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::realloc(
void* buf, size_t old_size, size_t new_size, size_t alignment) {
return realloc_impl(buf, old_size, new_size, alignment);
}

template class Allocator<true, true, true>;
template class Allocator<true, true, false>;
template class Allocator<true, false, true>;
template class Allocator<true, false, false>;
template class Allocator<false, true, true>;
template class Allocator<false, true, false>;
template class Allocator<false, false, true>;
template class Allocator<false, false, false>;
template class Allocator<true, true, true, DefaultMemoryAllocator>;
template class Allocator<true, true, false, DefaultMemoryAllocator>;
template class Allocator<true, false, true, DefaultMemoryAllocator>;
template class Allocator<true, false, false, DefaultMemoryAllocator>;
template class Allocator<false, true, true, DefaultMemoryAllocator>;
template class Allocator<false, true, false, DefaultMemoryAllocator>;
template class Allocator<false, false, true, DefaultMemoryAllocator>;
template class Allocator<false, false, false, DefaultMemoryAllocator>;

/** It would be better to put these Memory Allocators where they are used, such as in the orc memory pool and arrow memory pool.
* But currently allocators use templates in .cpp instead of all in .h, so they can only be placed here.
*/
template class Allocator<true, true, false, ORCMemoryAllocator>;
template class Allocator<true, false, true, ORCMemoryAllocator>;
template class Allocator<true, false, false, ORCMemoryAllocator>;
template class Allocator<false, true, true, ORCMemoryAllocator>;
template class Allocator<false, true, false, ORCMemoryAllocator>;
template class Allocator<false, false, true, ORCMemoryAllocator>;
template class Allocator<false, false, false, ORCMemoryAllocator>;

template class Allocator<true, true, true, RecordSizeMemoryAllocator>;
template class Allocator<true, true, false, RecordSizeMemoryAllocator>;
template class Allocator<true, false, true, RecordSizeMemoryAllocator>;
template class Allocator<true, false, false, RecordSizeMemoryAllocator>;
template class Allocator<false, true, true, RecordSizeMemoryAllocator>;
template class Allocator<false, true, false, RecordSizeMemoryAllocator>;
template class Allocator<false, false, true, RecordSizeMemoryAllocator>;
template class Allocator<false, false, false, RecordSizeMemoryAllocator>;
Loading

0 comments on commit c5a1998

Please sign in to comment.