Skip to content

Commit

Permalink
Merge pull request #73 from ericqzhao/master
Browse files Browse the repository at this point in the history
pfs: add restful api to collect all spdk thread stats
  • Loading branch information
qiyuanzhi authored Feb 20, 2024
2 parents dc073a7 + 4f80f47 commit db9f558
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 54 deletions.
4 changes: 2 additions & 2 deletions common/include/pf_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ struct BufferDescriptor
class BufferPool
{
public:
BufferPool(){dma_buffer_used = 0;}
BufferPool() dma_buffer_used(false) {}
size_t buf_size;
int buf_count;
int dma_buffer_used;
bool dma_buffer_used;
struct ibv_mr* mrs[4];
int init(size_t buffer_size, int count);
inline BufferDescriptor* alloc() { return free_bds.dequeue(); }
Expand Down
1 change: 1 addition & 0 deletions common/include/pf_event_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ enum S5EventType : int
EVT_CONN_CLOSED,
EVT_SAVEMD,
EVT_WAIT_OWNER_LOCK,
EVT_GET_STAT,
EVT_FORCE_RELEASE_CONN,
};
const char* EventTypeToStr(S5EventType t);
Expand Down
20 changes: 20 additions & 0 deletions common/include/pf_event_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,24 @@ class PfEventThread
int sync_invoke(std::function<int(void)> _f);
virtual int commit_batch(){return 0;};
};

typedef void (*pf_event_fn)(void *ctx);

struct thread_stat {
std::string name;
pthread_t tid;
pf_thread_stats stats;
};

struct get_stats_ctx {
pf_event_fn fn;
int next_thread_id;
int num_threads;
std::vector<pfqueue*> threads;
std::vector<thread_stat> thread_stats;
uint64_t now;
};

int get_thread_stats(pf_thread_stats *stats);
PfEventThread * get_current_thread();
#endif // pf_event_thread_h__
22 changes: 11 additions & 11 deletions common/src/pf_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@ int BufferPool::init(size_t buffer_size, int count)
if(rc != 0)
throw std::runtime_error(format_string("init memory pool failed, rc:%d", rc));
clean.push_back([this](){free_bds.destroy(); });
if (dma_buffer_used == 0) {
data_buf = memalign(4096, buffer_size*count);
if (dma_buffer_used) {
data_buf = spdk_dma_zmalloc(buffer_size*count, 4096, NULL);
if(data_buf == NULL)
throw std::runtime_error(format_string("Failed to alloc memory of:%d bytes", buffer_size*count));
clean.push_back([this](){ ::free(data_buf); });
}
else {
data_buf = spdk_dma_zmalloc(buffer_size*count, 4096, NULL);
clean.push_back([this](){ ::spdk_dma_free(data_buf); });
} else {
data_buf = memalign(4096, buffer_size*count);
if(data_buf == NULL)
throw std::runtime_error(format_string("Failed to alloc memory of:%d bytes", buffer_size*count));
clean.push_back([this](){ ::spdk_dma_free(data_buf); });
clean.push_back([this](){ ::free(data_buf); });
}

data_bds = (BufferDescriptor*)calloc(count, sizeof(BufferDescriptor));
if(data_bds == NULL)
throw std::runtime_error(format_string("Failed to alloc memory of:%d bytes", count * sizeof(BufferDescriptor)));
clean.push_back([this](){ ::free(data_bds); });
for(int i=0;i<count;i++)
for(int i = 0; i < count; i++)
{
data_bds[i].buf = (char*)data_buf + buffer_size * i;
data_bds[i].buf_capacity = (int)buffer_size;
Expand All @@ -50,10 +49,11 @@ int BufferPool::init(size_t buffer_size, int count)
void BufferPool::destroy()
{
::free(data_bds);
if (dma_buffer_used == 0)
::free(data_buf);
else
if (dma_buffer_used) {
spdk_dma_free(data_buf);
} else {
::free(data_buf);
}

free_bds.destroy();
}
Expand Down
37 changes: 32 additions & 5 deletions common/src/pf_event_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
void* thread_proc_spdkr(void* arg);
void* thread_proc_eventq(void* arg);

static __thread PfEventThread *cur_thread = NULL;

PfEventThread::PfEventThread() {
inited = false;
}

int PfEventThread::init(const char* name, int qd)
{
int rc;
Expand Down Expand Up @@ -101,7 +104,7 @@ void PfEventThread::stop()
if(rc) {
S5LOG_ERROR("Failed call pthread_join on thread:%s, rc:%d", name, rc);
}
tid=0;
tid = 0;

}

Expand Down Expand Up @@ -140,6 +143,23 @@ void* thread_proc_eventq(void* arg)
return NULL;
}

int get_thread_stats(pf_thread_stats *stats) {
if (cur_thread == NULL) {
S5LOG_ERROR("thread is not exist");
return -1;
}
*stats = cur_thread->stats;
return 0;
}

PfEventThread * get_current_thread() {
if (cur_thread == NULL) {
S5LOG_ERROR("thread is not exist");
assert(0);
}
return cur_thread;
}

static void thread_update_stats(PfEventThread *thread, uint64_t end,
uint64_t start, int rc)
{
Expand Down Expand Up @@ -175,6 +195,12 @@ static int event_queue_run_batch(PfEventThread *thread) {
thread->exiting = true;
break;
}
case EVT_GET_STAT:
{
auto ctx = (get_stats_ctx*)event->event.arg_p;
ctx->fn(ctx);
break;
}
default:
{
thread->process_event(event->event.type, event->event.arg_i, event->event.arg_p, event->event.arg_q);
Expand All @@ -196,10 +222,10 @@ static int func_run(PfEventThread *thread) {
}

static int thread_poll(PfEventThread *thread) {
int rc = 0;
rc = event_queue_run_batch(thread);
rc = func_run(thread);
return rc;
if (event_queue_run_batch(thread) || func_run(thread)) {
return PF_POLLER_BUSY;
}
return PF_POLLER_IDLE;
}

void* thread_proc_spdkr(void* arg)
Expand All @@ -209,6 +235,7 @@ void* thread_proc_spdkr(void* arg)
pThis->tsc_last = spdk_get_ticks();
PfSpdkQueue *eq = (PfSpdkQueue *)pThis->event_queue;
eq->set_thread_queue();
cur_thread = pThis;
while(!pThis->exiting) {
int rc = 0;
rc = thread_poll(pThis);
Expand Down
15 changes: 8 additions & 7 deletions common/src/pf_ioengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ int PfAioEngine::init()
aio_poller = std::thread(&PfAioEngine::polling_proc, this);
return 0;
}

PfAioEngine::~PfAioEngine()
{
pthread_cancel(aio_poller.native_handle());
Expand All @@ -100,12 +101,12 @@ int PfAioEngine::submit_io(struct IoSubTask* io, int64_t media_offset, int64_t m
#else
BufferDescriptor* data_bd = io->parent_iocb->data_bd;
//below is the most possible case
if(IS_READ_OP(io->opcode))
if (IS_READ_OP(io->opcode))
io_prep_pread(&io->aio_cb, fd, data_bd->buf, media_len, media_offset);
else
io_prep_pwrite(&io->aio_cb, fd, data_bd->buf, media_len, media_offset);
batch_iocb[batch_io_cnt++] = &io->aio_cb;
if(batch_io_cnt >= (BATCH_IO_CNT>>1)){
if (batch_io_cnt >= (BATCH_IO_CNT>>1)) {
if (batch_io_cnt >= BATCH_IO_CNT) {
S5LOG_FATAL("Too many fails to submit IO on ssd:%s", disk_name.c_str());
}
Expand All @@ -114,6 +115,7 @@ int PfAioEngine::submit_io(struct IoSubTask* io, int64_t media_offset, int64_t m
#endif
return 0;
}

int PfAioEngine::submit_cow_io(struct CowTask* io, int64_t media_offset, int64_t media_len)
{
//S5LOG_DEBUG("Begin cow_io %d", io->opcode);
Expand All @@ -124,6 +126,7 @@ int PfAioEngine::submit_cow_io(struct CowTask* io, int64_t media_offset, int64_t
struct iocb* ios[1] = { &io->aio_cb };
return io_submit(aio_ctx, 1, ios);
}

int PfAioEngine::submit_batch()
{
if(batch_io_cnt == 0)
Expand All @@ -137,6 +140,7 @@ int PfAioEngine::submit_batch()
batch_io_cnt = 0;
return 0;
}

#if 0
int PfAioEngine::poll_io(int *completions)
{
Expand All @@ -154,12 +158,9 @@ void PfAioEngine::polling_proc()
int rc = 0;
while (1) {
rc = io_getevents(aio_ctx, 1, MAX_EVT_CNT, evts, NULL);
if (rc < 0)
{
if (rc < 0) {
continue;
}
else
{
} else {
for (int i = 0; i < rc; i++)
{
struct iocb* aiocb = (struct iocb*)evts[i].obj;
Expand Down
5 changes: 3 additions & 2 deletions pfs/include/pf_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ struct PfServerIocb : public PfIocb
if (s->replicas[i] == NULL) {
continue;
}
if(s->replicas[i]->status == HS_OK || s->replicas[i]->status == HS_RECOVERYING) {
subtasks[i]->complete_status=PfMessageStatus::MSG_STATUS_SUCCESS;
if (s->replicas[i]->status == HS_OK || s->replicas[i]->status == HS_RECOVERYING) {
subtasks[i]->complete_status = PfMessageStatus::MSG_STATUS_SUCCESS;
subtasks[i]->opcode = opcode; //subtask opcode will be OP_WRITE or OP_REPLICATE_WRITE
task_mask |= subtasks[i]->task_mask;
add_ref();
}
}
}

void inline setup_one_subtask(PfShard* s, int rep_index, PfOpCode opcode)
{
subtasks[rep_index]->complete_status=PfMessageStatus::MSG_STATUS_SUCCESS;
Expand Down
29 changes: 28 additions & 1 deletion pfs/include/pf_restful_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#include <vector>
#include <list>
#include <stdint.h>
#include <semaphore.h> //for sem_t
#include <nlohmann/json.hpp>
#include <pf_message.h>
#include "pf_volume_type.h"
#include "pf_event_thread.h"

class ReplicaArg
{
Expand Down Expand Up @@ -42,6 +44,12 @@ class DeleteVolumeArg
uint64_t volume_id;
};

class GetThreadStatsArg
{
public:
std::string op;
};

class PrepareVolumeArg
{
public:
Expand Down Expand Up @@ -78,6 +86,13 @@ class RestfulReply
};
void from_json(const nlohmann::json& j, RestfulReply& p);

class GetThreadStatsReply : public RestfulReply {
public:
std::vector<thread_stat> thread_stats;
};

void from_json(const nlohmann::json& j, GetThreadStatsReply& p);

class GetSnapListReply : public RestfulReply {
public:
std::vector<int> snap_list;
Expand All @@ -92,11 +107,13 @@ class BackgroudTaskReply : public RestfulReply {

BackgroudTaskReply():task_id(0), progress(0){ }
};

class ErrorReportReply : public RestfulReply {
public:
PfMessageStatus action_code;
uint16_t meta_ver;
};

class CalcMd5Reply : public RestfulReply {
public:
CalcMd5Reply() : RestfulReply("calculate_replica_md5_reply"){
Expand All @@ -122,6 +139,13 @@ class ObjectMd5Reply : public RestfulReply {
uint64_t offset_in_vol=0;
std::list< SnapshotMd5> snap_md5;
};

struct restful_get_stats_ctx {
struct get_stats_ctx ctx;
struct mg_connection *nc;
sem_t sem;
};

class PerfReply : public RestfulReply {
public:
PerfReply() : RestfulReply("perf_reply") {
Expand All @@ -134,7 +158,9 @@ void from_json(const nlohmann::json& j, ReplicaArg& p);
void from_json(const nlohmann::json& j, ShardArg& p);
void from_json(const nlohmann::json& j, PrepareVolumeArg& p);
void from_json(const nlohmann::json& j, DeleteVolumeArg& p);
void from_json(const nlohmann::json& j, GetSnapListReply& p) ;
void from_json(const nlohmann::json& j, GetThreadStatsArg& p);
void from_json(const nlohmann::json& j, GetSnapListReply& p);
void from_json(const nlohmann::json& j, GetThreadStatsReply& p);
void to_json(nlohmann::json& j, const RestfulReply& r);
void to_json(nlohmann::json& j, const BackgroudTaskReply& r);
void from_json(const nlohmann::json& j, ErrorReportReply& p);
Expand All @@ -150,6 +176,7 @@ int64_t get_http_param_as_int64(const struct mg_str *http_content, const char *n

void handle_prepare_volume(struct mg_connection *nc, struct http_message * hm);
void handle_delete_volume(struct mg_connection *nc, struct http_message * hm);
void handle_get_thread_stats(struct mg_connection *nc, struct http_message * hm);
void handle_set_snap_seq(struct mg_connection *nc, struct http_message * hm);
void handle_set_meta_ver(struct mg_connection *nc, struct http_message * hm);
void handle_delete_snapshot(struct mg_connection *nc, struct http_message * hm);
Expand Down
Loading

0 comments on commit db9f558

Please sign in to comment.