Skip to content

Commit

Permalink
[FEAT MERGE] dynamic_server_spec phase1
Browse files Browse the repository at this point in the history
  • Loading branch information
obdev authored and ob-robot committed Dec 10, 2023
1 parent ff0ec7e commit 75f0ce0
Show file tree
Hide file tree
Showing 29 changed files with 3,379 additions and 2,676 deletions.
7 changes: 7 additions & 0 deletions deps/oblib/src/lib/alloc/alloc_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@ namespace oceanbase
namespace lib
{
class ObTenantCtxAllocator;
struct AChunk;
struct ABlock;
struct ObMemAttr;
class IChunkMgr
{
public:
virtual AChunk *alloc_chunk(const uint64_t size, const ObMemAttr &attr) = 0;
virtual void free_chunk(AChunk *chunk, const ObMemAttr &attr) = 0;
}; // end of class IChunkMgr

class IBlockMgr
{
Expand Down
63 changes: 16 additions & 47 deletions deps/oblib/src/lib/alloc/block_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,14 @@
using namespace oceanbase;
using namespace oceanbase::lib;

void BlockSet::Lock::lock()
{
int64_t tid = common::get_itid() + 1;
while (!ATOMIC_BCAS(&tid_, 0, tid)) {
sched_yield();
}
}

BlockSet::BlockSet()
: mutex_(common::ObLatchIds::ALLOC_BLOCK_LOCK), clist_(NULL),
: tallocator_(NULL),
locker_(NULL),
chunk_mgr_(NULL),
clist_(NULL),
avail_bm_(BLOCKS_PER_CHUNK+1, avail_bm_buf_),
total_hold_(0), total_payload_(0), total_used_(0), tallocator_(NULL),
chunk_free_list_(false/*with_mutex*/), locker_(nullptr)
total_hold_(0), total_payload_(0), total_used_(0)
{
chunk_free_list_.set_max_chunk_cache_size(0);
}

BlockSet::~BlockSet()
Expand All @@ -56,17 +49,6 @@ void BlockSet::reset()
//MEMSET(block_list_, 0, sizeof(block_list_));
clist_ = nullptr;
avail_bm_.clear();
LockGuard lock(cache_shared_lock_);
for (AChunk *chunk = nullptr; (chunk = chunk_free_list_.pop()) != nullptr;) {
uint64_t payload = 0;
UNUSED(ATOMIC_FAA(&total_hold_, -chunk->hold(&payload)));
UNUSED(ATOMIC_FAA(&total_payload_, -payload));
if (chunk->washed_size_ != 0) {
tallocator_->update_wash_stat(-1, -chunk->washed_blks_, -chunk->washed_size_);
}
tallocator_->free_chunk(chunk, attr_);
}
cache_shared_lock_.reset();
}

void BlockSet::set_tenant_ctx_allocator(ObTenantCtxAllocator &allocator)
Expand Down Expand Up @@ -293,17 +275,11 @@ AChunk *BlockSet::alloc_chunk(const uint64_t size, const ObMemAttr &attr)
AChunk *chunk = NULL;
if (OB_NOT_NULL(tallocator_)) {
const uint64_t all_size = AChunkMgr::aligned(size);
if (INTACT_ACHUNK_SIZE == all_size && chunk_free_list_.count() > 0) {
LockGuard lock(cache_shared_lock_);
chunk = chunk_free_list_.pop();
}
if (nullptr == chunk) {
chunk = tallocator_->alloc_chunk(static_cast<int64_t>(size), attr);
if (chunk != nullptr) {
uint64_t payload = 0;
UNUSED(ATOMIC_FAA(&total_hold_, chunk->hold(&payload)));
UNUSED(ATOMIC_FAA(&total_payload_, payload));
}
chunk = chunk_mgr_->alloc_chunk(static_cast<int64_t>(size), attr);
if (chunk != nullptr) {
uint64_t payload = 0;
UNUSED(ATOMIC_FAA(&total_hold_, chunk->hold(&payload)));
UNUSED(ATOMIC_FAA(&total_payload_, payload));
}
if (NULL != chunk) {
if (NULL != clist_) {
Expand Down Expand Up @@ -351,20 +327,13 @@ void BlockSet::free_chunk(AChunk *const chunk)
}
uint64_t payload = 0;
const uint64_t hold = chunk->hold(&payload);
bool freed = false;
if (INTACT_ACHUNK_SIZE == hold) {
LockGuard lock(cache_shared_lock_);
freed = chunk_free_list_.push(chunk);
}
if (!freed) {
if (OB_NOT_NULL(tallocator_)) {
UNUSED(ATOMIC_FAA(&total_hold_, -hold));
UNUSED(ATOMIC_FAA(&total_payload_, -payload));
if (chunk->washed_size_ != 0) {
tallocator_->update_wash_stat(-1, -chunk->washed_blks_, -chunk->washed_size_);
}
tallocator_->free_chunk(chunk, attr_);
if (OB_NOT_NULL(tallocator_)) {
UNUSED(ATOMIC_FAA(&total_hold_, -hold));
UNUSED(ATOMIC_FAA(&total_payload_, -payload));
if (chunk->washed_size_ != 0) {
tallocator_->update_wash_stat(-1, -chunk->washed_blks_, -chunk->washed_size_);
}
chunk_mgr_->free_chunk(chunk, attr_);
}
}

Expand Down
37 changes: 5 additions & 32 deletions deps/oblib/src/lib/alloc/block_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,14 @@

namespace oceanbase
{
namespace common
{
class ObPageManagerCenter;
}
namespace lib
{

class ObTenantCtxAllocator;
class ISetLocker;
class BlockSet
{
friend class common::ObPageManagerCenter;
friend class ObTenantCtxAllocator;
class Lock
{
public:
Lock() : tid_(0) {}
~Lock() { reset(); }
void reset() { ATOMIC_STORE(&tid_, 0); }
void lock();
void unlock() { ATOMIC_STORE(&tid_, 0); }
private:
int64_t tid_;
};
class LockGuard
{
public:
LockGuard(Lock &lock) : lock_(lock) { lock_.lock(); }
~LockGuard() { lock_.unlock(); }
private:
Lock &lock_;
};
public:
BlockSet();
~BlockSet();
Expand All @@ -68,13 +44,12 @@ class BlockSet
inline uint64_t get_total_used() const;

void set_tenant_ctx_allocator(ObTenantCtxAllocator &allocator);
void set_max_chunk_cache_size(const int64_t max_cache_size)
{ chunk_free_list_.set_max_chunk_cache_size(max_cache_size); }
void reset();
void set_locker(ISetLocker *locker) { locker_ = locker; }
int64_t sync_wash(int64_t wash_size=INT64_MAX);
bool check_has_unfree();
ObTenantCtxAllocator *get_tenant_ctx_allocator() const { return tallocator_; }
void set_chunk_mgr(IChunkMgr *chunk_mgr) { chunk_mgr_ = chunk_mgr; }

private:
DISALLOW_COPY_AND_ASSIGN(BlockSet);
Expand All @@ -87,7 +62,10 @@ class BlockSet
void free_chunk(AChunk *const chunk);

private:
lib::ObMutex mutex_;
ObTenantCtxAllocator *tallocator_;
ObMemAttr attr_;
ISetLocker *locker_;
IChunkMgr *chunk_mgr_;
// block_list_ can not be initialized, the state is maintained by avail_bm_
union {
ABlock *block_list_[BLOCKS_PER_CHUNK+1];
Expand All @@ -98,11 +76,6 @@ class BlockSet
uint64_t total_hold_;
uint64_t total_payload_;
uint64_t total_used_;
ObTenantCtxAllocator *tallocator_;
ObMemAttr attr_;
lib::AChunkList chunk_free_list_;
ISetLocker *locker_;
Lock cache_shared_lock_;
}; // end of class BlockSet

void BlockSet::lock()
Expand Down
4 changes: 1 addition & 3 deletions deps/oblib/src/lib/alloc/ob_malloc_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,6 @@ void ObMallocAllocator::print_tenant_memory_usage(uint64_t tenant_id) const
get_global_ctx_info().get_ctx_name(i), ctx_hold_bytes[i], limit);
}
}
if (OB_SUCC(ret)) {
ObPageManagerCenter::get_instance().print_tenant_stat(tenant_id, buf, BUFLEN, ctx_pos);
}
buf[std::min(ctx_pos, BUFLEN - 1)] = '\0';
allow_next_syslog();
_LOG_INFO("[MEMORY] tenant: %lu, limit: %'lu hold: %'lu rpc_hold: %'lu cache_hold: %'lu "
Expand Down Expand Up @@ -679,6 +676,7 @@ int ObMallocAllocator::recycle_tenant_allocator(uint64_t tenant_id)
// wash idle chunks
for (int64_t ctx_id = 0; ctx_id < ObCtxIds::MAX_CTX_ID; ctx_id++) {
ta[ctx_id].set_idle(0);
ta[ctx_id].reset_req_chunk_mgr();
}

ObTenantCtxAllocator *tas[ObCtxIds::MAX_CTX_ID] = {NULL};
Expand Down
10 changes: 4 additions & 6 deletions deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ void ObTenantCtxAllocator::print_usage() const
allow_next_syslog();
_LOG_INFO("\n[MEMORY] tenant_id=%5ld ctx_id=%25s hold=% '15ld used=% '15ld limit=% '15ld"
"\n[MEMORY] idle_size=% '10ld free_size=% '10ld"
"\n[MEMORY] wash_related_chunks=% '10ld washed_blocks=% '10ld washed_size=% '10ld\n%s",
"\n[MEMORY] wash_related_chunks=% '10ld washed_blocks=% '10ld washed_size=% '10ld"
"\n[MEMORY] request_cached_chunk_cnt=% '5ld\n%s",
tenant_id_,
get_global_ctx_info().get_ctx_name(ctx_id_),
ctx_hold_bytes,
Expand All @@ -173,6 +174,7 @@ void ObTenantCtxAllocator::print_usage() const
ATOMIC_LOAD(&wash_related_chunks_),
ATOMIC_LOAD(&washed_blocks_),
ATOMIC_LOAD(&washed_size_),
req_chunk_mgr_.n_chunks(),
buf);
}
}
Expand Down Expand Up @@ -220,11 +222,7 @@ AChunk *ObTenantCtxAllocator::alloc_chunk(const int64_t size, const ObMemAttr &a
}
}

if (OB_ISNULL(chunk)) {
if (INTACT_ACHUNK_SIZE == AChunkMgr::hold(size) && get_ctx_id() != ObCtxIds::CO_STACK) {
chunk = ObPageManagerCenter::get_instance().alloc_from_thread_local_cache(tenant_id_, ctx_id_);
}
} else {
if (OB_NOT_NULL(chunk)) {
ObDisableDiagnoseGuard disable_diagnose_guard;
lib::ObMutexGuard guard(using_list_mutex_);
chunk->prev2_ = &using_list_head_;
Expand Down
102 changes: 97 additions & 5 deletions deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,102 @@ class ObTenantCtxAllocator
friend class ObTenantCtxAllocatorGuard;
friend class ObMallocAllocator;
using InvokeFunc = std::function<int (const ObTenantMemoryMgr*)>;

class ChunkMgr : public IChunkMgr
{
public:
ChunkMgr(ObTenantCtxAllocator &ta) : ta_(ta) {}
AChunk *alloc_chunk(const uint64_t size, const ObMemAttr &attr) override
{
AChunk *chunk = ta_.alloc_chunk(size, attr);
if (OB_ISNULL(chunk)) {
ta_.req_chunk_mgr_.reclaim_chunks();
chunk = ta_.alloc_chunk(size, attr);
}
return chunk;
}
void free_chunk(AChunk *chunk, const ObMemAttr &attr) override
{
ta_.free_chunk(chunk, attr);
}
private:
ObTenantCtxAllocator &ta_;
};

class ReqChunkMgr : public IChunkMgr
{
public:
ReqChunkMgr(ObTenantCtxAllocator &ta)
: ta_(ta), parallel_(CTX_ATTR(ta_.get_ctx_id()).parallel_)
{
abort_unless(parallel_ <= ARRAYSIZEOF(chunks_));
MEMSET(chunks_, 0, sizeof(chunks_));
}
AChunk *alloc_chunk(const uint64_t size, const ObMemAttr &attr) override
{
AChunk *chunk = NULL;
if (INTACT_ACHUNK_SIZE == AChunk::calc_hold(size)) {
const uint64_t idx = common::get_itid() % parallel_;
chunk = ATOMIC_TAS(&chunks_[idx], NULL);
}
if (OB_ISNULL(chunk)) {
chunk = ta_.alloc_chunk(size, attr);
}
return chunk;
}
void free_chunk(AChunk *chunk, const ObMemAttr &attr) override
{
bool freed = false;
if (INTACT_ACHUNK_SIZE == chunk->hold()) {
const uint64_t idx = common::get_itid() % parallel_;
freed = ATOMIC_BCAS(&chunks_[idx], NULL, chunk);
}
if (!freed) {
ta_.free_chunk(chunk, attr);
}
}
void reclaim_chunks()
{
for (int i = 0; i < parallel_; i++) {
AChunk *chunk = ATOMIC_TAS(&chunks_[i], NULL);
if (chunk != NULL) {
ta_.free_chunk(chunk,
ObMemAttr(ta_.get_tenant_id(), "unused", ta_.get_ctx_id()));
}
}
}
int64_t n_chunks() const
{
int64_t n = 0;
for (int i = 0; i < parallel_; i++) {
AChunk *chunk = ATOMIC_LOAD(&chunks_[i]);
if (chunk != NULL) {
n++;
}
}
return n;
}
private:
ObTenantCtxAllocator &ta_;
const int parallel_;
AChunk *chunks_[32];
};

public:
explicit ObTenantCtxAllocator(uint64_t tenant_id, uint64_t ctx_id = 0)
: resource_handle_(), ref_cnt_(0), tenant_id_(tenant_id),
ctx_id_(ctx_id), deleted_(false),
obj_mgr_(*this, tenant_id_, ctx_id_, INTACT_NORMAL_AOBJECT_SIZE,
obj_mgr_(*this,
CTX_ATTR(ctx_id).enable_no_log_,
INTACT_NORMAL_AOBJECT_SIZE,
CTX_ATTR(ctx_id).parallel_,
CTX_ATTR(ctx_id).enable_dirty_list_,
NULL),
idle_size_(0), head_chunk_(), chunk_cnt_(0),
chunk_freelist_mutex_(common::ObLatchIds::CHUNK_FREE_LIST_LOCK),
using_list_mutex_(common::ObLatchIds::CHUNK_USING_LIST_LOCK),
using_list_head_(), wash_related_chunks_(0), washed_blocks_(0), washed_size_(0)
using_list_head_(), wash_related_chunks_(0), washed_blocks_(0), washed_size_(0),
chunk_mgr_(*this), req_chunk_mgr_(*this)
{
MEMSET(&head_chunk_, 0, sizeof(AChunk));
using_list_head_.prev2_ = &using_list_head_;
Expand All @@ -60,8 +144,12 @@ using InvokeFunc = std::function<int (const ObTenantMemoryMgr*)>;
chunk_freelist_mutex_.enable_record_stat(false);
using_list_mutex_.enable_record_stat(false);
for (int i = 0; i < ObSubCtxIds::MAX_SUB_CTX_ID; ++i) {
new (obj_mgrs_ + i) ObjectMgr(*this, tenant_id_, ctx_id_, INTACT_MIDDLE_AOBJECT_SIZE,
4/*parallel*/, false/*enable_dirty_list*/, &obj_mgr_);
new (obj_mgrs_ + i) ObjectMgr(*this,
CTX_ATTR(ctx_id).enable_no_log_,
INTACT_MIDDLE_AOBJECT_SIZE,
4/*parallel*/,
false/*enable_dirty_list*/,
&obj_mgr_);
}
}
virtual ~ObTenantCtxAllocator()
Expand Down Expand Up @@ -172,6 +260,8 @@ using InvokeFunc = std::function<int (const ObTenantMemoryMgr*)>;
bool update_hold(const int64_t size);
int set_idle(const int64_t size, const bool reserve = false);
IBlockMgr &get_block_mgr() { return obj_mgr_; }
IChunkMgr &get_chunk_mgr() { return chunk_mgr_; }
IChunkMgr &get_req_chunk_mgr() { return req_chunk_mgr_; }
void get_chunks(AChunk **chunks, int cap, int &cnt);
using VisitFunc = std::function<int(ObLabel &label,
common::LabelItem *l_item)>;
Expand All @@ -190,6 +280,7 @@ using InvokeFunc = std::function<int (const ObTenantMemoryMgr*)>;
return has_unfree;
}
void update_wash_stat(int64_t related_chunks, int64_t blocks, int64_t size);
void reset_req_chunk_mgr() { req_chunk_mgr_.reclaim_chunks(); }
private:
int64_t inc_ref_cnt(int64_t cnt) { return ATOMIC_FAA(&ref_cnt_, cnt); }
int64_t get_ref_cnt() const { return ATOMIC_LOAD(&ref_cnt_); }
Expand All @@ -207,7 +298,6 @@ using InvokeFunc = std::function<int (const ObTenantMemoryMgr*)>;
}
return ret;
}

public:
template <typename T>
static void* common_alloc(const int64_t size, const ObMemAttr &attr,
Expand Down Expand Up @@ -240,6 +330,8 @@ using InvokeFunc = std::function<int (const ObTenantMemoryMgr*)>;
union {
ObjectMgr obj_mgrs_[ObSubCtxIds::MAX_SUB_CTX_ID];
};
ChunkMgr chunk_mgr_;
ReqChunkMgr req_chunk_mgr_;
}; // end of class ObTenantCtxAllocator

} // end of namespace lib
Expand Down
Loading

0 comments on commit 75f0ce0

Please sign in to comment.