From cf6b166427890909ff93b3c0fb170fad60a2d31b Mon Sep 17 00:00:00 2001 From: ericqzhao <125110732+ericqzhao@users.noreply.github.com> Date: Wed, 31 Jan 2024 18:39:17 +0800 Subject: [PATCH 1/2] pfs: add restful api to collect all spdk thread stats --- common/include/pf_buffer.h | 4 +- common/include/pf_event_queue.h | 3 + common/include/pf_event_thread.h | 20 +++++++ common/src/pf_buffer.cpp | 22 ++++---- common/src/pf_event_queue.cpp | 5 ++ common/src/pf_event_thread.cpp | 37 ++++++++++-- common/src/pf_ioengine.cpp | 15 ++--- pfs/include/pf_dispatcher.h | 5 +- pfs/include/pf_restful_api.h | 29 +++++++++- pfs/src/pf_dispatcher.cpp | 23 ++++---- pfs/src/pf_flash_store.cpp | 23 +++----- pfs/src/pf_restful_api.cpp | 97 ++++++++++++++++++++++++++++++++ pfs/src/pf_restful_server.cpp | 2 + 13 files changed, 232 insertions(+), 53 deletions(-) diff --git a/common/include/pf_buffer.h b/common/include/pf_buffer.h index b6ce5c9..a47f17c 100644 --- a/common/include/pf_buffer.h +++ b/common/include/pf_buffer.h @@ -63,10 +63,10 @@ struct BufferDescriptor class BufferPool { public: - BufferPool(){dma_buffer_used = 0;} + BufferPool() dma_buffer_used(false) {} size_t buf_size; int buf_count; - int dma_buffer_used; + bool dma_buffer_used; struct ibv_mr* mrs[4]; int init(size_t buffer_size, int count); inline BufferDescriptor* alloc() { return free_bds.dequeue(); } diff --git a/common/include/pf_event_queue.h b/common/include/pf_event_queue.h index e79ee61..2faf2a0 100644 --- a/common/include/pf_event_queue.h +++ b/common/include/pf_event_queue.h @@ -46,6 +46,7 @@ enum S5EventType : int EVT_CONN_CLOSED, EVT_SAVEMD, EVT_WAIT_OWNER_LOCK, + EVT_GET_STAT, }; const char* EventTypeToStr(S5EventType t); @@ -56,6 +57,7 @@ class pfqueue int event_fd; virtual int post_event(int type, int arg_i, void* arg_p, void* arg_q = NULL) = 0; + virtual int post_event_locked(int type, int arg_i, void* arg_p) = 0; virtual void destroy() = 0; virtual int sync_invoke(std::function f) = 0; }; @@ -75,6 +77,7 @@ class PfEventQueue : public pfqueue int init(const char* name, int size, BOOL semaphore_mode); void destroy(); int post_event(int type, int arg_i, void* arg_p, void* arg_q=NULL); + int post_event_locked(int type, int arg_i, void* arg_p); int get_events(PfFixedSizeQueue** /*out*/ q); int get_event(S5Event* /*out*/ evt); inline bool is_empty() { return current_queue->is_empty();} diff --git a/common/include/pf_event_thread.h b/common/include/pf_event_thread.h index c28994c..47e3c77 100644 --- a/common/include/pf_event_thread.h +++ b/common/include/pf_event_thread.h @@ -43,4 +43,24 @@ class PfEventThread int sync_invoke(std::function _f); virtual int commit_batch(){return 0;}; }; + +typedef void (*pf_event_fn)(void *ctx); + +struct thread_stat { + std::string name; + pthread_t tid; + pf_thread_stats stats; +}; + +struct get_stats_ctx { + pf_event_fn fn; + int next_thread_id; + int num_threads; + std::vector threads; + std::vector thread_stats; + uint64_t now; +}; + +int get_thread_stats(pf_thread_stats *stats); +PfEventThread * get_current_thread(); #endif // pf_event_thread_h__ diff --git a/common/src/pf_buffer.cpp b/common/src/pf_buffer.cpp index 3e8fc28..86c764d 100644 --- a/common/src/pf_buffer.cpp +++ b/common/src/pf_buffer.cpp @@ -18,24 +18,23 @@ int BufferPool::init(size_t buffer_size, int count) if(rc != 0) throw std::runtime_error(format_string("init memory pool failed, rc:%d", rc)); clean.push_back([this](){free_bds.destroy(); }); - if (dma_buffer_used == 0) { - data_buf = memalign(4096, buffer_size*count); + if (dma_buffer_used) { + data_buf = spdk_dma_zmalloc(buffer_size*count, 4096, NULL); if(data_buf == NULL) throw std::runtime_error(format_string("Failed to alloc memory of:%d bytes", buffer_size*count)); - clean.push_back([this](){ ::free(data_buf); }); - } - else { - data_buf = spdk_dma_zmalloc(buffer_size*count, 4096, NULL); + clean.push_back([this](){ ::spdk_dma_free(data_buf); }); + } else { + data_buf = memalign(4096, buffer_size*count); if(data_buf == NULL) throw std::runtime_error(format_string("Failed to alloc memory of:%d bytes", buffer_size*count)); - clean.push_back([this](){ ::spdk_dma_free(data_buf); }); + clean.push_back([this](){ ::free(data_buf); }); } data_bds = (BufferDescriptor*)calloc(count, sizeof(BufferDescriptor)); if(data_bds == NULL) throw std::runtime_error(format_string("Failed to alloc memory of:%d bytes", count * sizeof(BufferDescriptor))); clean.push_back([this](){ ::free(data_bds); }); - for(int i=0;itail = (current_queue->tail + 1) % current_queue->queue_depth; } +int PfEventQueue::post_event_locked(int type, int arg_i, void* arg_p) +{ + return 0; +} + /** * get events in queue. events are fetched via _@param q_. q may be empty if there * are no events. diff --git a/common/src/pf_event_thread.cpp b/common/src/pf_event_thread.cpp index a455029..d8870fc 100644 --- a/common/src/pf_event_thread.cpp +++ b/common/src/pf_event_thread.cpp @@ -7,9 +7,12 @@ void* thread_proc_spdkr(void* arg); void* thread_proc_eventq(void* arg); +static __thread PfEventThread *cur_thread = NULL; + PfEventThread::PfEventThread() { inited = false; } + int PfEventThread::init(const char* name, int qd) { int rc; @@ -101,7 +104,7 @@ void PfEventThread::stop() if(rc) { S5LOG_ERROR("Failed call pthread_join on thread:%s, rc:%d", name, rc); } - tid=0; + tid = 0; } @@ -140,6 +143,23 @@ void* thread_proc_eventq(void* arg) return NULL; } +int get_thread_stats(pf_thread_stats *stats) { + if (cur_thread == NULL) { + S5LOG_ERROR("thread is not exist"); + return -1; + } + *stats = cur_thread->stats; + return 0; +} + +PfEventThread * get_current_thread() { + if (cur_thread == NULL) { + S5LOG_ERROR("thread is not exist"); + assert(0); + } + return cur_thread; +} + static void thread_update_stats(PfEventThread *thread, uint64_t end, uint64_t start, int rc) { @@ -175,6 +195,12 @@ static int event_queue_run_batch(PfEventThread *thread) { thread->exiting = true; break; } + case EVT_GET_STAT: + { + auto ctx = (get_stats_ctx*)event->event.arg_p; + ctx->fn(ctx); + break; + } default: { thread->process_event(event->event.type, event->event.arg_i, event->event.arg_p, event->event.arg_q); @@ -196,10 +222,10 @@ static int func_run(PfEventThread *thread) { } static int thread_poll(PfEventThread *thread) { - int rc = 0; - rc = event_queue_run_batch(thread); - rc = func_run(thread); - return rc; + if (event_queue_run_batch(thread) || func_run(thread)) { + return PF_POLLER_BUSY; + } + return PF_POLLER_IDLE; } void* thread_proc_spdkr(void* arg) @@ -209,6 +235,7 @@ void* thread_proc_spdkr(void* arg) pThis->tsc_last = spdk_get_ticks(); PfSpdkQueue *eq = (PfSpdkQueue *)pThis->event_queue; eq->set_thread_queue(); + cur_thread = pThis; while(!pThis->exiting) { int rc = 0; rc = thread_poll(pThis); diff --git a/common/src/pf_ioengine.cpp b/common/src/pf_ioengine.cpp index 087935a..0f6d52e 100644 --- a/common/src/pf_ioengine.cpp +++ b/common/src/pf_ioengine.cpp @@ -85,6 +85,7 @@ int PfAioEngine::init() aio_poller = std::thread(&PfAioEngine::polling_proc, this); return 0; } + PfAioEngine::~PfAioEngine() { pthread_cancel(aio_poller.native_handle()); @@ -100,12 +101,12 @@ int PfAioEngine::submit_io(struct IoSubTask* io, int64_t media_offset, int64_t m #else BufferDescriptor* data_bd = io->parent_iocb->data_bd; //below is the most possible case - if(IS_READ_OP(io->opcode)) + if (IS_READ_OP(io->opcode)) io_prep_pread(&io->aio_cb, fd, data_bd->buf, media_len, media_offset); else io_prep_pwrite(&io->aio_cb, fd, data_bd->buf, media_len, media_offset); batch_iocb[batch_io_cnt++] = &io->aio_cb; - if(batch_io_cnt >= (BATCH_IO_CNT>>1)){ + if (batch_io_cnt >= (BATCH_IO_CNT>>1)) { if (batch_io_cnt >= BATCH_IO_CNT) { S5LOG_FATAL("Too many fails to submit IO on ssd:%s", disk_name.c_str()); } @@ -114,6 +115,7 @@ int PfAioEngine::submit_io(struct IoSubTask* io, int64_t media_offset, int64_t m #endif return 0; } + int PfAioEngine::submit_cow_io(struct CowTask* io, int64_t media_offset, int64_t media_len) { //S5LOG_DEBUG("Begin cow_io %d", io->opcode); @@ -124,6 +126,7 @@ int PfAioEngine::submit_cow_io(struct CowTask* io, int64_t media_offset, int64_t struct iocb* ios[1] = { &io->aio_cb }; return io_submit(aio_ctx, 1, ios); } + int PfAioEngine::submit_batch() { if(batch_io_cnt == 0) @@ -137,6 +140,7 @@ int PfAioEngine::submit_batch() batch_io_cnt = 0; return 0; } + #if 0 int PfAioEngine::poll_io(int *completions) { @@ -154,12 +158,9 @@ void PfAioEngine::polling_proc() int rc = 0; while (1) { rc = io_getevents(aio_ctx, 1, MAX_EVT_CNT, evts, NULL); - if (rc < 0) - { + if (rc < 0) { continue; - } - else - { + } else { for (int i = 0; i < rc; i++) { struct iocb* aiocb = (struct iocb*)evts[i].obj; diff --git a/pfs/include/pf_dispatcher.h b/pfs/include/pf_dispatcher.h index 8343e66..d8f04ed 100644 --- a/pfs/include/pf_dispatcher.h +++ b/pfs/include/pf_dispatcher.h @@ -68,14 +68,15 @@ struct PfServerIocb : public PfIocb if (s->replicas[i] == NULL) { continue; } - if(s->replicas[i]->status == HS_OK || s->replicas[i]->status == HS_RECOVERYING) { - subtasks[i]->complete_status=PfMessageStatus::MSG_STATUS_SUCCESS; + if (s->replicas[i]->status == HS_OK || s->replicas[i]->status == HS_RECOVERYING) { + subtasks[i]->complete_status = PfMessageStatus::MSG_STATUS_SUCCESS; subtasks[i]->opcode = opcode; //subtask opcode will be OP_WRITE or OP_REPLICATE_WRITE task_mask |= subtasks[i]->task_mask; add_ref(); } } } + void inline setup_one_subtask(PfShard* s, int rep_index, PfOpCode opcode) { subtasks[rep_index]->complete_status=PfMessageStatus::MSG_STATUS_SUCCESS; diff --git a/pfs/include/pf_restful_api.h b/pfs/include/pf_restful_api.h index e0e48b6..f90483a 100644 --- a/pfs/include/pf_restful_api.h +++ b/pfs/include/pf_restful_api.h @@ -10,9 +10,11 @@ #include #include #include +#include //for sem_t #include #include #include "pf_volume_type.h" +#include "pf_event_thread.h" class ReplicaArg { @@ -42,6 +44,12 @@ class DeleteVolumeArg uint64_t volume_id; }; +class GetThreadStatsArg +{ +public: + std::string op; +}; + class PrepareVolumeArg { public: @@ -78,6 +86,13 @@ class RestfulReply }; void from_json(const nlohmann::json& j, RestfulReply& p); +class GetThreadStatsReply : public RestfulReply { +public: + std::vector thread_stats; +}; + +void from_json(const nlohmann::json& j, GetThreadStatsReply& p); + class GetSnapListReply : public RestfulReply { public: std::vector snap_list; @@ -92,11 +107,13 @@ class BackgroudTaskReply : public RestfulReply { BackgroudTaskReply():task_id(0), progress(0){ } }; + class ErrorReportReply : public RestfulReply { public: PfMessageStatus action_code; uint16_t meta_ver; }; + class CalcMd5Reply : public RestfulReply { public: CalcMd5Reply() : RestfulReply("calculate_replica_md5_reply"){ @@ -122,6 +139,13 @@ class ObjectMd5Reply : public RestfulReply { uint64_t offset_in_vol=0; std::list< SnapshotMd5> snap_md5; }; + +struct restful_get_stats_ctx { + struct get_stats_ctx ctx; + struct mg_connection *nc; + sem_t sem; +}; + class PerfReply : public RestfulReply { public: PerfReply() : RestfulReply("perf_reply") { @@ -134,7 +158,9 @@ void from_json(const nlohmann::json& j, ReplicaArg& p); void from_json(const nlohmann::json& j, ShardArg& p); void from_json(const nlohmann::json& j, PrepareVolumeArg& p); void from_json(const nlohmann::json& j, DeleteVolumeArg& p); -void from_json(const nlohmann::json& j, GetSnapListReply& p) ; +void from_json(const nlohmann::json& j, GetThreadStatsArg& p); +void from_json(const nlohmann::json& j, GetSnapListReply& p); +void from_json(const nlohmann::json& j, GetThreadStatsReply& p); void to_json(nlohmann::json& j, const RestfulReply& r); void to_json(nlohmann::json& j, const BackgroudTaskReply& r); void from_json(const nlohmann::json& j, ErrorReportReply& p); @@ -150,6 +176,7 @@ int64_t get_http_param_as_int64(const struct mg_str *http_content, const char *n void handle_prepare_volume(struct mg_connection *nc, struct http_message * hm); void handle_delete_volume(struct mg_connection *nc, struct http_message * hm); +void handle_get_thread_stats(struct mg_connection *nc, struct http_message * hm); void handle_set_snap_seq(struct mg_connection *nc, struct http_message * hm); void handle_set_meta_ver(struct mg_connection *nc, struct http_message * hm); void handle_delete_snapshot(struct mg_connection *nc, struct http_message * hm); diff --git a/pfs/src/pf_dispatcher.cpp b/pfs/src/pf_dispatcher.cpp index 8dcfbd7..152932a 100644 --- a/pfs/src/pf_dispatcher.cpp +++ b/pfs/src/pf_dispatcher.cpp @@ -132,16 +132,16 @@ int PfDispatcher::dispatch_io(PfServerIocb *iocb) PfMessageHead* cmd = iocb->cmd_bd->cmd_bd; auto pos = opened_volumes.find(cmd->vol_id); - if(unlikely(pos == opened_volumes.end())){ + if (unlikely(pos == opened_volumes.end())) { S5LOG_ERROR("Cannot dispatch_io, op:%s, volume:0x%x not opened", PfOpCode2Str(cmd->opcode), cmd->vol_id); iocb->complete_status = PfMessageStatus::MSG_STATUS_REOPEN | PfMessageStatus::MSG_STATUS_INVALID_STATE; iocb->complete_meta_ver = -1; reply_io_to_client(iocb); return 0; } + PfVolume* vol = pos->second; - if(unlikely(cmd->meta_ver != vol->meta_ver)) - { + if (unlikely(cmd->meta_ver != vol->meta_ver)) { S5LOG_ERROR("Cannot dispatch_io, op:%s(%d), volume:0x%x meta_ver:%d diff than client:%d", PfOpCode2Str(cmd->opcode), cmd->opcode, cmd->vol_id, vol->meta_ver, cmd->meta_ver); iocb->complete_status = PfMessageStatus::MSG_STATUS_REOPEN ; @@ -149,6 +149,7 @@ int PfDispatcher::dispatch_io(PfServerIocb *iocb) reply_io_to_client(iocb); return 0; } + //S5LOG_DEBUG("dispatch_io, op:%s, volume:%s ", PfOpCode2Str(cmd->opcode), vol->name); if(unlikely(cmd->snap_seq == SNAP_SEQ_HEAD)) cmd->snap_seq = vol->snap_seq; @@ -201,7 +202,7 @@ int PfDispatcher::dispatch_write(PfServerIocb* iocb, PfVolume* vol, PfShard * s) { PfMessageHead* cmd = iocb->cmd_bd->cmd_bd; iocb->task_mask = 0; - if(unlikely(!s->is_primary_node || s->replicas[s->duty_rep_index]->status != HS_OK)){ + if (unlikely(!s->is_primary_node || s->replicas[s->duty_rep_index]->status != HS_OK)) { S5LOG_ERROR("Write on non-primary node, vol:0x%llx, %s, shard_index:%d, current replica_index:%d", vol->id, vol->name, s->id, s->duty_rep_index); iocb->complete_status = PfMessageStatus::MSG_STATUS_REOPEN; @@ -210,12 +211,12 @@ int PfDispatcher::dispatch_write(PfServerIocb* iocb, PfVolume* vol, PfShard * s) } iocb->setup_subtask(s, cmd->opcode); for (int i = 0; i < vol->rep_count; i++) { - if(s->replicas[i]->status == HS_OK || s->replicas[i]->status == HS_RECOVERYING) { + if (s->replicas[i]->status == HS_OK || s->replicas[i]->status == HS_RECOVERYING) { int rc = 0; iocb->subtasks[i]->rep_id = s->replicas[i]->id; iocb->subtasks[i]->store_id = s->replicas[i]->store_id; rc = s->replicas[i]->submit_io(&iocb->io_subtasks[i]); - if(rc) { + if (rc) { S5LOG_ERROR("submit_io, rc:%d", rc); } } else { @@ -308,7 +309,7 @@ int PfDispatcher::init_mempools(int disp_index) goto release1; S5LOG_INFO("Allocate data_pool with max IO size:%d, depth:%d", PF_MAX_IO_SIZE, pool_size * 2); if (spdk_engine_used()) - mem_pool.data_pool.dma_buffer_used = 1; + mem_pool.data_pool.dma_buffer_used = true; rc = mem_pool.data_pool.init(PF_MAX_IO_SIZE, pool_size * 2); if (rc) goto release2; @@ -316,9 +317,9 @@ int PfDispatcher::init_mempools(int disp_index) if (rc) goto release3; rc = iocb_pool.init(pool_size * 2); - if(rc) + if (rc) goto release4; - for(int i=0;icmd_bd = mem_pool.cmd_pool.alloc(); @@ -331,9 +332,9 @@ int PfDispatcher::init_mempools(int disp_index) cb->reply_bd = mem_pool.reply_pool.alloc(); cb->reply_bd->data_len = sizeof(PfMessageReply); cb->reply_bd->server_iocb = cb; - for(int i=0;i<3;i++) { + for (int i = 0; i < 3; i++) { cb->subtasks[i] = &cb->io_subtasks[i]; - cb->subtasks[i]->rep_index =i; + cb->subtasks[i]->rep_index = i; cb->subtasks[i]->task_mask = 1 << i; cb->subtasks[i]->parent_iocb = cb; cb->subtasks[i]->ops = &_server_task_complete_ops; diff --git a/pfs/src/pf_flash_store.cpp b/pfs/src/pf_flash_store.cpp index 174d5d7..b5054f8 100644 --- a/pfs/src/pf_flash_store.cpp +++ b/pfs/src/pf_flash_store.cpp @@ -504,8 +504,7 @@ int PfFlashStore::do_write(IoSubTask* io) auto block_pos = obj_lmt.find(key); lmt_entry *entry = NULL; - if (unlikely(block_pos == obj_lmt.end())) - { + if (unlikely(block_pos == obj_lmt.end())) { //S5LOG_DEBUG("Alloc object for rep:0x%llx slba:0x%llx cmd offset:0x%llx ", io->rep_id, key.slba, cmd->offset); if (free_obj_queue.is_empty()) { S5LOG_ERROR("Disk:%s is full!", tray_name); @@ -522,16 +521,13 @@ int PfFlashStore::do_write(IoSubTask* io) }; obj_lmt[key] = entry; int rc = redolog->log_allocation(&key, entry, free_obj_queue.head); - if (rc) - { + if (rc) { app_context.error_handler->submit_error(io, MSG_STATUS_LOGFAILED); S5LOG_ERROR("log_allocation error, rc:%d", rc); return 0; } - } - else - { + } else { //static int dirty_bit=0; entry = block_pos->second; if(unlikely(entry->status == EntryStatus::RECOVERYING)) { @@ -553,6 +549,7 @@ int PfFlashStore::do_write(IoSubTask* io) io->ops->complete(io, PfMessageStatus::MSG_STATUS_SUCCESS); return 0; } + if(likely(cmd->snap_seq == entry->snap_seq)) { if (unlikely(entry->status != EntryStatus::NORMAL)) { @@ -571,9 +568,8 @@ int PfFlashStore::do_write(IoSubTask* io) cmd->vol_id, cmd->snap_seq , entry->snap_seq); io->ops->complete(io, MSG_STATUS_READONLY); return 0; - } else if(unlikely(cmd->snap_seq > entry->snap_seq)) { - if (free_obj_queue.is_empty()) - { + } else if (unlikely(cmd->snap_seq > entry->snap_seq)) { + if (free_obj_queue.is_empty()) { app_context.error_handler->submit_error(io, MSG_STATUS_NOSPACE); return -ENOSPC; } @@ -587,8 +583,7 @@ int PfFlashStore::do_write(IoSubTask* io) }; obj_lmt[key] = cow_entry; int rc = redolog->log_allocation(&key, cow_entry, free_obj_queue.head); - if (rc) - { + if (rc) { app_context.error_handler->submit_error(io, MSG_STATUS_LOGFAILED); S5LOG_ERROR("log_allocation error, rc:%d", rc); return -EIO; @@ -1279,7 +1274,7 @@ int PfFlashStore::process_event(int event_type, int arg_i, void* arg_p, void*) snprintf(zk_node_name, sizeof(zk_node_name), "shared_disks/%s/owner_store", uuid_str); switch (event_type) { case EVT_WAIT_OWNER_LOCK: //this will be the first event received for shared disk - do{ + do { rc = app_context.zk_client.wait_lock(zk_node_name, store_id_str); //we will not process any event before get lock pthread_testcancel(); if(rc){ @@ -1294,7 +1289,7 @@ int PfFlashStore::process_event(int event_type, int arg_i, void* arg_p, void*) } } - }while(rc); + } while(rc); break; case EVT_IO_REQ: diff --git a/pfs/src/pf_restful_api.cpp b/pfs/src/pf_restful_api.cpp index d80330c..77b9e59 100644 --- a/pfs/src/pf_restful_api.cpp +++ b/pfs/src/pf_restful_api.cpp @@ -16,6 +16,7 @@ #include "pf_replica.h" #include "pf_scrub.h" #include "pf_stat.h" +#include "pf_event_queue.h" using nlohmann::json; using namespace std; @@ -69,6 +70,24 @@ void from_json(const json& j, DeleteVolumeArg& p) { j.at("volume_id").get_to(p.volume_id); } +void from_json(const json& j, GetThreadStatsArg& p) { + j.at("op").get_to(p.op); +} + +void from_json(const nlohmann::json& j, GetThreadStatsReply& p) { + from_json(j, *((RestfulReply*)&p)); + for (int i = 0; i < p.thread_stats.size(); i++) { + j.at("name").get_to(p.thread_stats[i].name); + j.at("tid").get_to(p.thread_stats[i].tid); + j.at("busy").get_to(p.thread_stats[i].stats.busy_tsc); + j.at("idle").get_to(p.thread_stats[i].stats.idle_tsc); + } +} + +void to_json(json& j, thread_stat& r) { + j = json{{ "name", r.name },{ "tid", r.tid },{ "busy", r.stats.busy_tsc }, {"idle", r.stats.idle_tsc}}; +} + void from_json(const json& j, GetSnapListReply& p) { from_json(j, *((RestfulReply*)&p)); j.at("snap_list").get_to(p.snap_list); @@ -335,6 +354,84 @@ void handle_delete_volume(struct mg_connection *nc, struct http_message * hm) send_reply_to_client(r, nc); } +static void _thread_get_stats(void *arg) +{ + struct restful_get_stats_ctx *ctx = (struct restful_get_stats_ctx*)arg; + struct PfEventThread *thread = get_current_thread(); + struct pf_thread_stats stats; + + if (0 == get_thread_stats(&stats)) { + ctx->ctx.thread_stats.push_back( {thread->name, thread->tid, stats} ); + } else { + sem_post(&ctx->sem); + return; + } + ctx->ctx.next_thread_id++; + + // collect done + if (ctx->ctx.next_thread_id == ctx->ctx.num_threads) { + for (int i = 0; i < ctx->ctx.thread_stats.size(); i++) { + S5LOG_INFO("thread stats: name: %s, tid: %llu, busy: %llu, idle: %llu", + ctx->ctx.thread_stats[i].name.c_str(), ctx->ctx.thread_stats[i].tid, + ctx->ctx.thread_stats[i].stats.busy_tsc, + ctx->ctx.thread_stats[i].stats.idle_tsc); + } + sem_post(&ctx->sem); + } else { + // continue to get next thread stat + ctx->ctx.threads[ctx->ctx.next_thread_id]->post_event_locked(EVT_GET_STAT, 0, ctx); + } +} + +void handle_get_thread_stats(struct mg_connection *nc, struct http_message * hm) +{ + S5LOG_INFO("Receive get thread stats req===========\n%.*s\n============", (int)hm->body.len, hm->body.p); + auto j = json::parse(hm->body.p, hm->body.p + hm->body.len); + GetThreadStatsReply reply; + GetThreadStatsArg arg = j.get(); + auto ctx = new restful_get_stats_ctx(); + ctx->ctx.next_thread_id = 0; + ctx->ctx.fn = _thread_get_stats; + ctx->nc = nc; + sem_init(&ctx->sem, 0, 0); + for (int i = 0; i < app_context.disps.size(); i++) { + ctx->ctx.threads.push_back(app_context.disps[i]->event_queue); + ctx->ctx.num_threads++; + } + + for (int i = 0; i < app_context.replicators.size(); i++) { + ctx->ctx.threads.push_back(app_context.replicators[i]->event_queue); + ctx->ctx.num_threads++; + } + + for (int i = 0; i < app_context.trays.size(); i++) { + ctx->ctx.threads.push_back(app_context.trays[i]->event_queue); + ctx->ctx.num_threads++; + } + + if (ctx->ctx.num_threads > 0) { + ctx->ctx.threads[ctx->ctx.next_thread_id]->post_event_locked(EVT_GET_STAT, 0, ctx); + } + sem_wait(&ctx->sem); + sem_destroy(&ctx->sem); + reply.thread_stats = ctx->ctx.thread_stats; + S5LOG_INFO("Succeeded get thread stats"); + reply.op = "get_thread_stats_reply"; + reply.ret_code = 0; + //send_reply_to_client(reply, nc); + auto jarray = nlohmann::json::array(); + for (int i = 0; i < reply.thread_stats.size(); i++) { + nlohmann::json j; + to_json(j, reply.thread_stats[i]) + jarray.emplace_back(j); + } + string jstr = jarray.dump(); + const char* cstr = jstr.c_str(); + mg_send_head(nc, reply.ret_code == 0 ? 200 : 400, strlen(cstr), "Content-Type: text/plain"); + mg_printf(nc, "%s", cstr); + free(ctx); +} + void handle_set_snap_seq(struct mg_connection *nc, struct http_message * hm) { int64_t vol_id = get_http_param_as_int64(&hm->query_string, "volume_id", 0, true); int snap_seq = (int)get_http_param_as_int64(&hm->query_string, "snap_seq", 0, true); diff --git a/pfs/src/pf_restful_server.cpp b/pfs/src/pf_restful_server.cpp index 3626aa2..9fa1231 100644 --- a/pfs/src/pf_restful_server.cpp +++ b/pfs/src/pf_restful_server.cpp @@ -59,6 +59,8 @@ static void handle_api(struct mg_connection *nc, int ev, void *p) { handle_cal_object_md5(nc, hm); else if (strcmp(opcode, "prepare_shards") == 0) handle_prepare_shards(nc, hm); + else if (strcmp(opcode, "get_thread_stats") == 0) + handle_get_thread_stats(nc, hm); else { S5LOG_ERROR("Unknown op:%s", opcode); string cstr = format_string("Unknown op:%s", opcode); From 4f80f47d207bc9405477cff757cb5c35240c8b22 Mon Sep 17 00:00:00 2001 From: ericqzhao <125110732+ericqzhao@users.noreply.github.com> Date: Wed, 31 Jan 2024 19:03:37 +0800 Subject: [PATCH 2/2] pfs: add restful api to collect all spdk thread stats --- common/include/pf_event_queue.h | 2 -- common/include/pf_event_thread.h | 2 +- common/src/pf_event_queue.cpp | 5 ----- pfs/src/pf_restful_api.cpp | 8 ++++---- 4 files changed, 5 insertions(+), 12 deletions(-) diff --git a/common/include/pf_event_queue.h b/common/include/pf_event_queue.h index a1c229e..f98f5b3 100644 --- a/common/include/pf_event_queue.h +++ b/common/include/pf_event_queue.h @@ -58,7 +58,6 @@ class pfqueue int event_fd; virtual int post_event(int type, int arg_i, void* arg_p, void* arg_q = NULL) = 0; - virtual int post_event_locked(int type, int arg_i, void* arg_p) = 0; virtual void destroy() = 0; virtual int sync_invoke(std::function f) = 0; }; @@ -78,7 +77,6 @@ class PfEventQueue : public pfqueue int init(const char* name, int size, BOOL semaphore_mode); void destroy(); int post_event(int type, int arg_i, void* arg_p, void* arg_q=NULL); - int post_event_locked(int type, int arg_i, void* arg_p); int get_events(PfFixedSizeQueue** /*out*/ q); int get_event(S5Event* /*out*/ evt); inline bool is_empty() { return current_queue->is_empty();} diff --git a/common/include/pf_event_thread.h b/common/include/pf_event_thread.h index 47e3c77..ee90329 100644 --- a/common/include/pf_event_thread.h +++ b/common/include/pf_event_thread.h @@ -57,7 +57,7 @@ struct get_stats_ctx { int next_thread_id; int num_threads; std::vector threads; - std::vector thread_stats; + std::vector thread_stats; uint64_t now; }; diff --git a/common/src/pf_event_queue.cpp b/common/src/pf_event_queue.cpp index c2f1948..940937d 100644 --- a/common/src/pf_event_queue.cpp +++ b/common/src/pf_event_queue.cpp @@ -75,11 +75,6 @@ int PfEventQueue::post_event(int type, int arg_i, void* arg_p, void* arg_q) //current_queue->tail = (current_queue->tail + 1) % current_queue->queue_depth; } -int PfEventQueue::post_event_locked(int type, int arg_i, void* arg_p) -{ - return 0; -} - /** * get events in queue. events are fetched via _@param q_. q may be empty if there * are no events. diff --git a/pfs/src/pf_restful_api.cpp b/pfs/src/pf_restful_api.cpp index dfa44b1..0ad6959 100644 --- a/pfs/src/pf_restful_api.cpp +++ b/pfs/src/pf_restful_api.cpp @@ -187,7 +187,7 @@ static PfVolume* convert_argument_to_volume(const PrepareVolumeArg& arg) { if (app_context.shard_to_replicator) { // case1: primary shard is asigned to this store, alloc PfLocalReplica and PfSyncRemoteReplica - // case2£ºprimary shard is not asigned to this store but slave shard is asigned to this store, + // case2: primary shard is not asigned to this store but slave shard is asigned to this store, // only alloc PfLocalReplica // case3: no shard is asigned to this store, do noting if (app_context.store_id != arg.shards[i].replicas[shard->primary_replica_index].store_id && @@ -228,7 +228,7 @@ static PfVolume* convert_argument_to_volume(const PrepareVolumeArg& arg) if (app_context.shard_to_replicator) { rp = app_context.get_replicator(); } else { - rp = app_context.replicators[(vol->i24)%app_context.replicators.size()]; + rp = app_context.replicators[(vol->id>>24)%app_context.replicators.size()]; } ((PfSyncRemoteReplica*)r)->replicator = rp; @@ -380,7 +380,7 @@ static void _thread_get_stats(void *arg) sem_post(&ctx->sem); } else { // continue to get next thread stat - ctx->ctx.threads[ctx->ctx.next_thread_id]->post_event_locked(EVT_GET_STAT, 0, ctx); + ((PfSpdkQueue *)ctx->ctx.threads[ctx->ctx.next_thread_id])->post_event_locked(EVT_GET_STAT, 0, ctx); } } @@ -411,7 +411,7 @@ void handle_get_thread_stats(struct mg_connection *nc, struct http_message * hm) } if (ctx->ctx.num_threads > 0) { - ctx->ctx.threads[ctx->ctx.next_thread_id]->post_event_locked(EVT_GET_STAT, 0, ctx); + ((PfSpdkQueue *)ctx->ctx.threads[ctx->ctx.next_thread_id])->post_event_locked(EVT_GET_STAT, 0, ctx); } sem_wait(&ctx->sem); sem_destroy(&ctx->sem);