From 4baa4cc922db9adbbed914980936fc4032a5c0f3 Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Mon, 7 Oct 2024 12:58:17 +0000 Subject: [PATCH 1/6] Use zero copy between memif and libfabric Use memif's buffer to send data directly from them and to receive data directly into them Signed-off-by: Kasiewicz, Marek --- media-proxy/include/libfabric_cq.h | 7 +- media-proxy/include/libfabric_ep.h | 13 +- media-proxy/include/rdma_session.h | 28 +- media-proxy/include/shm_memif.h | 1 + media-proxy/src/libfabric_cq.c | 74 +++-- media-proxy/src/libfabric_dev.c | 2 +- media-proxy/src/libfabric_ep.c | 42 ++- media-proxy/src/rdma_session.c | 326 ++++++++++++++------- media-proxy/src/shm_memif_common.c | 35 +++ media-proxy/src/shm_memif_rdma.c | 27 +- tests/single-node-sample-apps/test-rdma.sh | 11 +- 11 files changed, 362 insertions(+), 204 deletions(-) diff --git a/media-proxy/include/libfabric_cq.h b/media-proxy/include/libfabric_cq.h index e483352f..f63399af 100644 --- a/media-proxy/include/libfabric_cq.h +++ b/media-proxy/include/libfabric_cq.h @@ -13,13 +13,14 @@ extern "C" { #include "libfabric_ep.h" -void rdma_cq_set_wait_attr(struct fi_cq_attr cq_attr, enum cq_comp_method method, +void rdma_cq_set_wait_attr(struct fi_cq_attr *cq_attr, enum cq_comp_method method, struct fid_wait *waitset); -int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout); +int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout, + struct fi_cq_err_entry *entries); int rdma_get_cq_fd(struct fid_cq *cq, int *fd, enum cq_comp_method method); int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout); + int timeout, struct fi_cq_err_entry *entry); int rdma_cq_readerr(struct fid_cq *cq); #ifdef __cplusplus diff --git a/media-proxy/include/libfabric_ep.h b/media-proxy/include/libfabric_ep.h index d782e1e6..4b6c0be8 100644 --- a/media-proxy/include/libfabric_ep.h +++ b/media-proxy/include/libfabric_ep.h @@ -34,8 +34,6 @@ typedef struct { fi_addr_t dest_av_entry; struct fid_wait *waitset; - struct fi_context *recv_ctx; - struct fi_context *send_ctx; uint64_t tx_cq_cntr; uint64_t rx_cq_cntr; @@ -51,9 +49,16 @@ typedef struct { enum direction dir; } ep_cfg_t; -int ep_send_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size); +typedef struct { + ep_cfg_t *ep_cfg; + void *s_ctx; +} ep_thread_arg_t; + +int ep_send_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size); // *buf has to point to registered memory -int ep_recv_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size); +int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx); +int ep_rxcq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout); +int ep_txcq_read(ep_ctx_t *ep_ctx, int timeout); int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg); int ep_destroy(ep_ctx_t **ep_ctx); diff --git a/media-proxy/include/rdma_session.h b/media-proxy/include/rdma_session.h index a37e832a..aa8d6480 100644 --- a/media-proxy/include/rdma_session.h +++ b/media-proxy/include/rdma_session.h @@ -33,23 +33,22 @@ typedef struct { } rdma_s_ops_t; typedef struct { - libfabric_ctx st; + memif_buffer_t shm_buf; + bool used; +} shm_buf_info_t; + +typedef struct { int idx; libfabric_ctx *rdma_ctx; ep_ctx_t *ep_ctx; - int frame_done_cnt; - int packet_done_cnt; - volatile bool stop; + pthread_t ep_thread; + atomic_bool ep_ready; int fb_send; - pthread_cond_t wake_cond; - pthread_mutex_t wake_mutex; size_t transfer_size; - size_t pkt_len; - /* memif parameters */ memif_ops_t memif_ops; @@ -58,7 +57,6 @@ typedef struct { /* memif conenction handle */ memif_conn_handle_t memif_conn; - memif_buffer_t *shm_bufs; uint16_t shm_buf_num; atomic_bool shm_ready; @@ -69,21 +67,18 @@ typedef struct { } tx_rdma_session_context_t; typedef struct { - libfabric_ctx st; int idx; libfabric_ctx *rdma_ctx; ep_ctx_t *ep_ctx; volatile bool stop; - pthread_t frame_thread; + pthread_t ep_thread; int fb_recv; pthread_t app_thread; size_t transfer_size; - int pkt_len; - /* share memory arguments */ memif_socket_args_t memif_socket_args; @@ -93,17 +88,12 @@ typedef struct { memif_socket_handle_t memif_socket; memif_conn_handle_t memif_conn; - memif_buffer_t *shm_bufs; + shm_buf_info_t *shm_bufs; uint16_t shm_buf_num; atomic_bool shm_ready; char name[32]; pthread_t memif_event_thread; - - /* stat */ - int stat_frame_total_received; - uint64_t stat_frame_first_rx_time; - double expect_fps; } rx_rdma_session_context_t; /* TX: Create RDMA session */ diff --git a/media-proxy/include/shm_memif.h b/media-proxy/include/shm_memif.h index 09a53c4d..58df151e 100644 --- a/media-proxy/include/shm_memif.h +++ b/media-proxy/include/shm_memif.h @@ -89,6 +89,7 @@ int tx_rdma_on_receive(memif_conn_handle_t conn, void *priv_data, uint16_t qid); int memif_buffer_alloc_timeout(memif_conn_handle_t conn, uint16_t qid, memif_buffer_t * bufs, uint16_t count, uint16_t * count_out, uint32_t size, uint32_t timeout_ms); +int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *region); #ifdef __cplusplus } diff --git a/media-proxy/src/libfabric_cq.c b/media-proxy/src/libfabric_cq.c index f29c0a46..0910541a 100644 --- a/media-proxy/src/libfabric_cq.c +++ b/media-proxy/src/libfabric_cq.c @@ -50,40 +50,38 @@ #define CQ_TIMEOUT (-1) -void rdma_cq_set_wait_attr(struct fi_cq_attr cq_attr, enum cq_comp_method method, +void rdma_cq_set_wait_attr(struct fi_cq_attr *cq_attr, enum cq_comp_method method, struct fid_wait *waitset) { switch (method) { case RDMA_COMP_SREAD: - cq_attr.wait_obj = FI_WAIT_UNSPEC; - cq_attr.wait_cond = FI_CQ_COND_NONE; + cq_attr->wait_obj = FI_WAIT_UNSPEC; + cq_attr->wait_cond = FI_CQ_COND_NONE; break; case RDMA_COMP_WAITSET: assert(waitset); - cq_attr.wait_obj = FI_WAIT_SET; - cq_attr.wait_cond = FI_CQ_COND_NONE; - cq_attr.wait_set = waitset; + cq_attr->wait_obj = FI_WAIT_SET; + cq_attr->wait_cond = FI_CQ_COND_NONE; + cq_attr->wait_set = waitset; break; case RDMA_COMP_WAIT_FD: - cq_attr.wait_obj = FI_WAIT_FD; - cq_attr.wait_cond = FI_CQ_COND_NONE; + cq_attr->wait_obj = FI_WAIT_FD; + cq_attr->wait_cond = FI_CQ_COND_NONE; break; case RDMA_COMP_YIELD: - cq_attr.wait_obj = FI_WAIT_YIELD; - cq_attr.wait_cond = FI_CQ_COND_NONE; + cq_attr->wait_obj = FI_WAIT_YIELD; + cq_attr->wait_cond = FI_CQ_COND_NONE; break; default: - cq_attr.wait_obj = FI_WAIT_NONE; + cq_attr->wait_obj = FI_WAIT_NONE; break; } } -/* - * fi_cq_err_entry can be cast to any CQ entry format. - */ static int rdma_spin_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout) + int timeout, struct fi_cq_err_entry *entries) { + uint64_t entries_num = total - *cur; struct fi_cq_err_entry comp; struct timespec a, b; int ret; @@ -92,7 +90,7 @@ static int rdma_spin_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur clock_gettime(CLOCK_MONOTONIC, &a); do { - ret = fi_cq_read(cq, &comp, 1); + ret = fi_cq_read(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1); if (ret > 0) { if (timeout >= 0) clock_gettime(CLOCK_MONOTONIC, &a); @@ -119,23 +117,18 @@ static int rdma_poll_fd(int fd, int timeout) fds.fd = fd; fds.events = POLLIN; ret = poll(&fds, 1, timeout); - if (ret == -1) { + if (ret == !ret) { RDMA_PRINTERR("poll", -errno); ret = -errno; - } else if (!ret) { - ret = -EAGAIN; } else { - ret = 0; + return 0; } - return ret; } -/* - * fi_cq_err_entry can be cast to any CQ entry format. - */ static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout) + int timeout, struct fi_cq_err_entry *entries) { + uint64_t entries_num = total - *cur; struct fi_cq_err_entry comp; struct fid *fids[1]; int fd, ret; @@ -147,14 +140,14 @@ static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *c ret = fi_trywait(ep_ctx->rdma_ctx->fabric, fids, 1); if (!ret) { ret = rdma_poll_fd(fd, timeout); - if (ret && ret != -FI_EAGAIN) + if (ret) return ret; } - ret = fi_cq_read(cq, &comp, 1); + ret = fi_cq_read(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1); if (ret > 0) { (*cur)++; - } else if (ret < 0 && ret != -FI_EAGAIN) { + } else if (ret < 0) { return ret; } } @@ -162,20 +155,18 @@ static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *c return 0; } -/* - * fi_cq_err_entry can be cast to any CQ entry format. - */ static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout) + int timeout, struct fi_cq_err_entry *entries) { + uint64_t entries_num = total - *cur; struct fi_cq_err_entry comp; int ret; while (total != *cur) { - ret = fi_cq_sread(cq, &comp, 1, NULL, timeout); + ret = fi_cq_sread(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1, NULL, timeout); if (ret > 0) { (*cur)++; - } else if (ret < 0 && ret != -FI_EAGAIN) { + } else if (ret < 0) { return ret; } } @@ -184,11 +175,13 @@ static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur } int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout) + int timeout, struct fi_cq_err_entry *entries) { int ret; - ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout); + assert(total > *cur); + + ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout, entries); if (ret) { if (ret == -FI_EAVAIL) { @@ -201,18 +194,19 @@ int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_ return ret; } -int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout) +int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout, + struct fi_cq_err_entry *entries) { switch (ep_ctx->rdma_ctx->comp_method) { case RDMA_COMP_SREAD: case RDMA_COMP_YIELD: - return rdma_wait_for_comp(ep_ctx, cq, cur, total, timeout); + return rdma_wait_for_comp(ep_ctx, cq, cur, total, timeout, entries); break; case RDMA_COMP_WAIT_FD: - return rdma_fdwait_for_comp(ep_ctx, cq, cur, total, timeout); + return rdma_fdwait_for_comp(ep_ctx, cq, cur, total, timeout, entries); break; default: - return rdma_spin_for_comp(ep_ctx, cq, cur, total, timeout); + return rdma_spin_for_comp(ep_ctx, cq, cur, total, timeout, entries); break; } } diff --git a/media-proxy/src/libfabric_dev.c b/media-proxy/src/libfabric_dev.c index 0540cb40..b8930f13 100644 --- a/media-proxy/src/libfabric_dev.c +++ b/media-proxy/src/libfabric_dev.c @@ -101,7 +101,7 @@ int rdma_init(libfabric_ctx **ctx) return -ENOMEM; } - (*ctx)->comp_method = RDMA_COMP_SPIN; + (*ctx)->comp_method = RDMA_COMP_SREAD; hints = fi_allocinfo(); if (!hints) { diff --git a/media-proxy/src/libfabric_ep.c b/media-proxy/src/libfabric_ep.c index cd57f6f1..5714f6cc 100644 --- a/media-proxy/src/libfabric_ep.c +++ b/media-proxy/src/libfabric_ep.c @@ -121,7 +121,7 @@ static int ep_alloc_res(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_inf cq_attr.format = FI_CQ_FORMAT_CONTEXT; } - rdma_cq_set_wait_attr(cq_attr, rdma_ctx->comp_method, NULL); + rdma_cq_set_wait_attr(&cq_attr, rdma_ctx->comp_method, NULL); if (tx_cq_size) cq_attr.size = tx_cq_size; else @@ -133,7 +133,7 @@ static int ep_alloc_res(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_inf return ret; } - rdma_cq_set_wait_attr(cq_attr, rdma_ctx->comp_method, NULL); + rdma_cq_set_wait_attr(&cq_attr, rdma_ctx->comp_method, NULL); if (rx_cq_size) cq_attr.size = rx_cq_size; else @@ -172,35 +172,51 @@ static int ep_reg_mr(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_info * return ret; } -int ep_send_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size) +int ep_send_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size) { int ret; do { - ret = fi_send(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, ep_ctx->dest_av_entry, - ep_ctx->send_ctx); + ret = fi_send(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, ep_ctx->dest_av_entry, NULL); if (ret == -EAGAIN) (void)fi_cq_read(ep_ctx->txcq, NULL, 0); } while (ret == -EAGAIN); - ret = rdma_get_cq_comp(ep_ctx, ep_ctx->txcq, &ep_ctx->tx_cq_cntr, ep_ctx->tx_cq_cntr + 1, -1); return ret; } -int ep_recv_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size) +int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx) { int ret; - double elapsed_time; - double fps = 0.0; do { ret = - fi_recv(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, FI_ADDR_UNSPEC, ep_ctx->recv_ctx); - if (ret == -EAGAIN) + fi_recv(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, FI_ADDR_UNSPEC, buf_ctx); + if (ret == -FI_EAGAIN) (void)fi_cq_read(ep_ctx->rxcq, NULL, 0); - } while (ret == -EAGAIN); + } while (ret == -FI_EAGAIN); + + return ret; +} - return rdma_get_cq_comp(ep_ctx, ep_ctx->rxcq, &ep_ctx->rx_cq_cntr, ep_ctx->rx_cq_cntr + 1, -1); +int ep_rxcq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout) +{ + struct fi_cq_err_entry entry; + int err; + + err = rdma_get_cq_comp(ep_ctx, ep_ctx->rxcq, &ep_ctx->rx_cq_cntr, ep_ctx->rx_cq_cntr + 1, + timeout, &entry); + if (err) + return err; + *buf_ctx = entry.op_context; + return 0; +} + + +int ep_txcq_read(ep_ctx_t *ep_ctx, int timeout) +{ + return rdma_get_cq_comp(ep_ctx, ep_ctx->txcq, &ep_ctx->tx_cq_cntr, ep_ctx->tx_cq_cntr + 1, + timeout, NULL); } int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg) diff --git a/media-proxy/src/rdma_session.c b/media-proxy/src/rdma_session.c index ac2df69d..48748739 100644 --- a/media-proxy/src/rdma_session.c +++ b/media-proxy/src/rdma_session.c @@ -79,16 +79,24 @@ int rx_rdma_shm_init(rx_rdma_session_context_t *rx_ctx, memif_ops_t *memif_ops) rx_ctx->memif_conn_args.socket = rx_ctx->memif_socket; rx_ctx->memif_conn_args.interface_id = memif_ops->interface_id; rx_ctx->memif_conn_args.buffer_size = (uint32_t)rx_ctx->transfer_size; - rx_ctx->memif_conn_args.log2_ring_size = 4; + rx_ctx->memif_conn_args.log2_ring_size = 2; memcpy((char *)rx_ctx->memif_conn_args.interface_name, memif_ops->interface_name, sizeof(rx_ctx->memif_conn_args.interface_name)); rx_ctx->memif_conn_args.is_master = memif_ops->is_master; + rx_ctx->shm_buf_num = 1 << rx_ctx->memif_conn_args.log2_ring_size; + rx_ctx->shm_bufs = (shm_buf_info_t *)calloc(rx_ctx->shm_buf_num, sizeof(shm_buf_info_t)); + if (!rx_ctx->shm_bufs) { + ERROR("%s Fail to malloc shared memory buffers", __func__); + return -ENOMEM; + } + INFO("create memif interface."); err = memif_create(&rx_ctx->memif_conn, &rx_ctx->memif_conn_args, rx_rdma_on_connect, rx_rdma_on_disconnect, rx_on_receive, rx_ctx); if (err != MEMIF_ERR_SUCCESS) { INFO("memif_create: %s", memif_strerror(err)); + free(rx_ctx->shm_bufs); return -1; } @@ -97,6 +105,7 @@ int rx_rdma_shm_init(rx_rdma_session_context_t *rx_ctx, memif_ops_t *memif_ops) rx_ctx->memif_conn_args.socket); if (err < 0) { printf("%s(%d), thread create fail\n", __func__, err); + free(rx_ctx->shm_bufs); return -1; } @@ -106,7 +115,6 @@ int rx_rdma_shm_init(rx_rdma_session_context_t *rx_ctx, memif_ops_t *memif_ops) int tx_rdma_shm_init(tx_rdma_session_context_t *tx_ctx, memif_ops_t *memif_ops) { memif_ops_t default_memif_ops = { 0 }; - const uint16_t FRAME_COUNT = 1; struct stat st = { 0 }; int err; @@ -162,21 +170,16 @@ int tx_rdma_shm_init(tx_rdma_session_context_t *tx_ctx, memif_ops_t *memif_ops) tx_ctx->memif_conn_args.socket = tx_ctx->memif_socket; tx_ctx->memif_conn_args.interface_id = memif_ops->interface_id; tx_ctx->memif_conn_args.buffer_size = (uint32_t)tx_ctx->transfer_size; - tx_ctx->memif_conn_args.log2_ring_size = 4; + tx_ctx->memif_conn_args.log2_ring_size = 2; snprintf((char *)tx_ctx->memif_conn_args.interface_name, sizeof(tx_ctx->memif_conn_args.interface_name), "%s", memif_ops->interface_name); tx_ctx->memif_conn_args.is_master = memif_ops->is_master; - /* TX buffers */ - tx_ctx->shm_bufs = (memif_buffer_t *)malloc(sizeof(memif_buffer_t) * FRAME_COUNT); - tx_ctx->shm_buf_num = FRAME_COUNT; - INFO("Create memif interface."); err = memif_create(&tx_ctx->memif_conn, &tx_ctx->memif_conn_args, tx_rdma_on_connect, tx_rdma_on_disconnect, tx_rdma_on_receive, tx_ctx); if (err != MEMIF_ERR_SUCCESS) { - INFO("memif_create: %s", memif_strerror(err)); - free(tx_ctx->shm_bufs); + ERROR("memif_create: %s", memif_strerror(err)); return -1; } @@ -184,8 +187,7 @@ int tx_rdma_shm_init(tx_rdma_session_context_t *tx_ctx, memif_ops_t *memif_ops) err = pthread_create(&tx_ctx->memif_event_thread, NULL, memif_event_loop, tx_ctx->memif_conn_args.socket); if (err < 0) { - printf("%s(%d), thread create fail\n", __func__, err); - free(tx_ctx->shm_bufs); + ERROR("%s(%d), thread create fail\n", __func__, err); return -1; } @@ -248,108 +250,225 @@ static int tx_shm_deinit(tx_rdma_session_context_t *tx_ctx) unlink(tx_ctx->memif_socket_args.path); } - if (tx_ctx->shm_bufs) { - free(tx_ctx->shm_bufs); - tx_ctx->shm_bufs = NULL; + return 0; +} + +static void handle_sent_buffers(tx_rdma_session_context_t *s) +{ + shm_buf_info_t *buf_info; + uint16_t bursted_buf_num; + int err; + + err = ep_txcq_read(s->ep_ctx, 1); + if (err) { + if (err != -EAGAIN) + INFO("%s ep_txcq_read: %s", __func__, strerror(err)); + return; } + s->fb_send++; - return 0; + err = memif_refill_queue(s->memif_conn, 0, 1, 0); + if (err != MEMIF_ERR_SUCCESS) + INFO("memif_refill_queue: %s", memif_strerror(err)); +} + +static void *tx_rdma_ep_thread(void *arg) +{ + ep_thread_arg_t *ep_thread_arg = (ep_thread_arg_t *)arg; + ep_cfg_t *ep_cfg = ep_thread_arg->ep_cfg; + tx_rdma_session_context_t *s_ctx = (tx_rdma_session_context_t *)ep_thread_arg->s_ctx; + memif_region_details_t region; + int err = 0; + + free(ep_thread_arg); + + while (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire) && !s_ctx->stop) + usleep(1000); + + err = memif_get_buffs_region(s_ctx->memif_conn, ®ion); + if (err) { + ERROR("%s, Getting memory buffers from memif failed. \n", __func__); + return NULL; + } + ep_cfg->data_buf_size = region.size; + ep_cfg->data_buf = region.addr; + + err = ep_init(&s_ctx->ep_ctx, ep_cfg); + free(ep_cfg); + if (err) { + ERROR("%s, fail to initialize libfabric's end point.\n", __func__); + return NULL; + } + atomic_store_explicit(&s_ctx->ep_ready, true, memory_order_release); + + + INFO("%s(%d), TX RDMA thread started\n", __func__, s_ctx->idx); + while (!s_ctx->stop) { + if (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire)) + continue; + handle_sent_buffers(s_ctx); + } + + return NULL; } /* TX: Create RDMA session */ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdma_s_ops_t *opts, memif_ops_t *memif_ops) { + tx_rdma_session_context_t *tx_ctx = NULL; + ep_thread_arg_t *ep_th_arg = NULL; + ep_cfg_t *ep_cfg = NULL; int err; - tx_rdma_session_context_t *tx_ctx; + + ep_th_arg = calloc(1, sizeof(ep_thread_arg_t)); + if (!ep_th_arg) { + printf("%s, Endpoint thread arguments malloc fail\n", __func__); + goto exit_dealloc; + } tx_ctx = calloc(1, sizeof(tx_rdma_session_context_t)); if (tx_ctx == NULL) { printf("%s, TX session contex malloc fail\n", __func__); - return NULL; + goto exit_dealloc; + } + ep_cfg= calloc(1, sizeof(ep_cfg_t)); + if (!ep_cfg) { + printf("%s, RDMA endpoint config malloc fail\n", __func__); + goto exit_dealloc; } - tx_ctx->rdma_ctx = dev_handle; - tx_ctx->stop = false; tx_ctx->transfer_size = opts->transfer_size; + tx_ctx->rdma_ctx = dev_handle; + tx_ctx->stop = false; + tx_ctx->ep_ready = ATOMIC_VAR_INIT(false); err = tx_rdma_shm_init(tx_ctx, memif_ops); if (err < 0) { printf("%s, fail to initialize share memory.\n", __func__); - free(tx_ctx); - return NULL; + goto exit_dealloc; } - /* TODO: use memif buffer with correct size */ - ep_cfg_t ep_cfg = { - .rdma_ctx = tx_ctx->rdma_ctx, - .data_buf_size = tx_ctx->transfer_size, - .local_addr = opts->local_addr, - .remote_addr = opts->remote_addr, - .data_buf = malloc(tx_ctx->transfer_size), - .dir = opts->dir, - }; - if (!ep_cfg.data_buf) { - printf("%s, session data buffer malloc fail\n", __func__); - return NULL; - } - err = ep_init(&tx_ctx->ep_ctx, &ep_cfg); - if (err) { - printf("%s, fail to initialize libfabric's end point.\n", __func__); - free(tx_ctx); - return NULL; + ep_cfg->rdma_ctx = tx_ctx->rdma_ctx; + ep_cfg->local_addr = opts->local_addr; + ep_cfg->remote_addr = opts->remote_addr; + ep_cfg->dir = opts->dir; + + ep_th_arg->ep_cfg = ep_cfg; // maybe I should use rdma_s_ops_t directly? + ep_th_arg->s_ctx = tx_ctx; + err = pthread_create(&tx_ctx->ep_thread, NULL, tx_rdma_ep_thread, ep_th_arg); + if (err < 0) { + printf("%s(%d), thread create fail %d\n", __func__, err, tx_ctx->idx); + goto exit_deinit_shm; } + return tx_ctx; + +exit_deinit_shm: + tx_shm_deinit(tx_ctx); +exit_dealloc: + free(ep_cfg); + free(tx_ctx); + free(ep_th_arg); + return NULL; +} + +static shm_buf_info_t *get_free_shm_buf(rx_rdma_session_context_t *s) +{ + uint32_t i = 0; + for (; i < s->shm_buf_num; i++) { + if (!s->shm_bufs[i].used) { + return &s->shm_bufs[i]; + } + } + + return NULL; } -static void rx_rdma_consume_frame(rx_rdma_session_context_t *s, char *frame) + +static int pass_empty_buf_to_libfabric(rx_rdma_session_context_t *s) { int err; - uint16_t qid = 0; - mcm_buffer *rx_mcm_buff = NULL; - memif_buffer_t *rx_bufs = NULL; - uint16_t buf_num = 1; - memif_conn_handle_t conn; + shm_buf_info_t *buf_info = NULL; uint32_t buf_size = s->transfer_size; - uint16_t rx_buf_num = 0, rx = 0; + uint16_t rx_buf_num = 0; - rx_bufs = s->shm_bufs; + buf_info = get_free_shm_buf(s); + if (!buf_info) + return -ENOMEM; - /* allocate memory */ - err = memif_buffer_alloc_timeout(s->memif_conn, qid, rx_bufs, 1, &rx_buf_num, buf_size, 10); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_rdma_consume_frame: Failed to alloc memif buffer: %s", memif_strerror(err)); - return; - } + err = memif_buffer_alloc(s->memif_conn, 0, &buf_info->shm_buf, 1, &rx_buf_num, buf_size); + if (err != MEMIF_ERR_SUCCESS) + return -ENOMEM; - memcpy(rx_bufs->data, frame, s->transfer_size); + buf_info->used = true; - /* Send to microservice application. */ - err = memif_tx_burst(s->memif_conn, qid, rx_bufs, rx_buf_num, &rx); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_rdma_consume_frame memif_tx_burst: %s", memif_strerror(err)); + err = ep_recv_buf(s->ep_ctx, buf_info->shm_buf.data, buf_size, buf_info); + if (err) { + ERROR("%s ep_recv_buf failed with errno: %s", __func__, fi_strerror(err)); + return err; } + return 0; +} +static void handle_received_buffers(rx_rdma_session_context_t *s) +{ + shm_buf_info_t *buf_info; + int err; + uint16_t bursted_buf_num; + + err = ep_rxcq_read(s->ep_ctx, (void **)&buf_info, 1); + if (err) { + if (err != -EAGAIN) + INFO("%s ep_rxcq_read: %s", __func__, strerror(err)); + return; + } s->fb_recv++; + + err = memif_tx_burst(s->memif_conn, 0, &buf_info->shm_buf, 1, &bursted_buf_num); + if (err != MEMIF_ERR_SUCCESS && bursted_buf_num != 1) { + INFO("%s memif_tx_burst: %s", __func__, memif_strerror(err)); + return; + } + buf_info->used = false; + } -static void *rx_rdma_frame_thread(void *arg) +static void *rx_rdma_ep_thread(void *arg) { - rx_rdma_session_context_t *s_ctx = (rx_rdma_session_context_t *)arg; - libfabric_ctx *rdma_ctx = s_ctx->rdma_ctx; - ep_ctx_t *cp_ctx = s_ctx->ep_ctx; + ep_thread_arg_t *ep_thread_arg = (ep_thread_arg_t *)arg; + ep_cfg_t *ep_cfg = ep_thread_arg->ep_cfg; + rx_rdma_session_context_t *s_ctx = (rx_rdma_session_context_t *)ep_thread_arg->s_ctx; + memif_region_details_t region; + int err = 0; + + free(ep_thread_arg); while (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire) && !s_ctx->stop) usleep(1000); - printf("%s(%d), RX RDMA thread started\n", __func__, s_ctx->idx); + err = memif_get_buffs_region(s_ctx->memif_conn, ®ion); + if (err) { + ERROR("%s, Getting memory buffers from memif failed. \n", __func__); + return NULL; + } + ep_cfg->data_buf_size = region.size; + ep_cfg->data_buf = region.addr; + + err = ep_init(&s_ctx->ep_ctx, ep_cfg); + free(ep_cfg); + if (err) { + ERROR("%s, fail to initialize libfabric's end point.\n", __func__); + return NULL; + } + + INFO("%s(%d), RX RDMA thread started\n", __func__, s_ctx->idx); while (!s_ctx->stop) { - ep_recv_buf(cp_ctx, cp_ctx->data_buf, s_ctx->transfer_size); if (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire)) continue; - - rx_rdma_consume_frame(s_ctx, cp_ctx->data_buf); + while(!pass_empty_buf_to_libfabric(s_ctx)); + handle_received_buffers(s_ctx); } return NULL; @@ -359,14 +478,26 @@ static void *rx_rdma_frame_thread(void *arg) rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdma_s_ops_t *opts, memif_ops_t *memif_ops) { - rx_rdma_session_context_t *rx_ctx; - ep_cfg_t ep_cfg = { 0 }; + rx_rdma_session_context_t *rx_ctx = NULL; + ep_thread_arg_t *ep_th_arg = NULL; + ep_cfg_t *ep_cfg = NULL; int err; + ep_th_arg = calloc(1, sizeof(ep_thread_arg_t)); + if (!ep_th_arg) { + printf("%s, Endpoint thread arguments malloc fail\n", __func__); + goto exit_dealloc; + } + rx_ctx = calloc(1, sizeof(rx_rdma_session_context_t)); - if (rx_ctx == NULL) { + if (!rx_ctx) { printf("%s, TX session contex malloc fail\n", __func__); - return NULL; + goto exit_dealloc; + } + ep_cfg = calloc(1, sizeof(ep_cfg_t)); + if (!ep_cfg) { + printf("%s, RDMA endpoint config malloc fail\n", __func__); + goto exit_dealloc; } rx_ctx->rdma_ctx = dev_handle; rx_ctx->stop = false; @@ -374,40 +505,33 @@ rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdm rx_ctx->transfer_size = opts->transfer_size; err = rx_rdma_shm_init(rx_ctx, memif_ops); - if (err < 0) { printf("%s, fail to initialize share memory.\n", __func__); - free(rx_ctx); - return NULL; + goto exit_dealloc; } - /* TODO: use memif buffer with correct size */ - ep_cfg.rdma_ctx = rx_ctx->rdma_ctx; - ep_cfg.data_buf_size = rx_ctx->transfer_size; - ep_cfg.local_addr = opts->local_addr; - ep_cfg.remote_addr = opts->remote_addr; - ep_cfg.dir = opts->dir; - ep_cfg.data_buf = malloc(rx_ctx->transfer_size); - if (!ep_cfg.data_buf) { - printf("%s, session data buffer malloc fail\n", __func__); - return NULL; - } + ep_cfg->rdma_ctx = rx_ctx->rdma_ctx; + ep_cfg->local_addr = opts->local_addr; + ep_cfg->remote_addr = opts->remote_addr; + ep_cfg->dir = opts->dir; - err = ep_init(&rx_ctx->ep_ctx, &ep_cfg); - if (err) { - printf("%s, fail to initialize libfabric's end point.\n", __func__); - free(rx_ctx); - return NULL; - } - - err = pthread_create(&rx_ctx->frame_thread, NULL, rx_rdma_frame_thread, rx_ctx); + ep_th_arg->ep_cfg = ep_cfg; + ep_th_arg->s_ctx = rx_ctx; + err = pthread_create(&rx_ctx->ep_thread, NULL, rx_rdma_ep_thread, ep_th_arg); if (err < 0) { printf("%s(%d), thread create fail %d\n", __func__, err, rx_ctx->idx); - free(rx_ctx); - return NULL; + goto exit_deinit_shm; } return rx_ctx; + +exit_deinit_shm: + rx_shm_deinit(rx_ctx); +exit_dealloc: + free(ep_cfg); + free(rx_ctx); + free(ep_th_arg); + return NULL; } void rdma_rx_session_stop(rx_rdma_session_context_t *rx_ctx) @@ -421,7 +545,7 @@ void rdma_rx_session_stop(rx_rdma_session_context_t *rx_ctx) rx_ctx->stop = true; - err = pthread_join(rx_ctx->frame_thread, NULL); + err = pthread_join(rx_ctx->ep_thread, NULL); if (err && err != ESRCH) { ERROR("%s: Error joining thread: %s\n", __func__, strerror(err)); } @@ -438,8 +562,6 @@ void rdma_rx_session_destroy(rx_rdma_session_context_t **p_rx_ctx) } rx_ctx = *p_rx_ctx; - /* TODO: Remove free when memif buf will be used */ - free(rx_ctx->ep_ctx->data_buf); err = ep_destroy(&rx_ctx->ep_ctx); if (err < 0) { printf("%s, ep free failed\n", __func__); @@ -461,7 +583,12 @@ void rdma_tx_session_stop(tx_rdma_session_context_t *tx_ctx) return; } - /* No thread to stop */ + tx_ctx->stop = true; + + err = pthread_join(tx_ctx->ep_thread, NULL); + if (err && err != ESRCH) { + ERROR("%s: Error joining thread: %s\n", __func__, strerror(err)); + } } void rdma_tx_session_destroy(tx_rdma_session_context_t **p_tx_ctx) @@ -475,13 +602,12 @@ void rdma_tx_session_destroy(tx_rdma_session_context_t **p_tx_ctx) } tx_ctx = *p_tx_ctx; - /* TODO: Remove free when memif buf will be used */ - free(tx_ctx->ep_ctx->data_buf); err = ep_destroy(&tx_ctx->ep_ctx); if (err < 0) { printf("%s, ep free failed\n", __func__); return; } + atomic_store_explicit(&tx_ctx->ep_ready, false, memory_order_relaxed); tx_shm_deinit(tx_ctx); diff --git a/media-proxy/src/shm_memif_common.c b/media-proxy/src/shm_memif_common.c index 74bef616..431ab5c3 100644 --- a/media-proxy/src/shm_memif_common.c +++ b/media-proxy/src/shm_memif_common.c @@ -217,3 +217,38 @@ int memif_buffer_alloc_timeout(memif_conn_handle_t conn, uint16_t qid, } return MEMIF_ERR_NOBUF_RING; } + +int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *region) +{ + memif_details_t md = { 0 }; + ssize_t buflen = 2000; + char* buf = NULL; + int err = 0; + + if(!region || ! conn) + return -EINVAL; + + buf = (char*)calloc(buflen, 1); + if (buf == NULL) { + ERROR("Not Enough Memory."); + return -ENOMEM; + } + + err = memif_get_details(conn, &md, buf, buflen); + if (err != MEMIF_ERR_SUCCESS) { + ERROR("%s", memif_strerror(err)); + free(buf); + return -EINVAL; + } + /* Region number 1 holds data buffers */ + if (md.regions_num < 1){ + ERROR("Data buffers not found in memif regions"); + free(buf); + return -EINVAL; + } + + memcpy(region, &md.regions[1], sizeof(md.regions[1])); + free(buf); + + return 0; +} diff --git a/media-proxy/src/shm_memif_rdma.c b/media-proxy/src/shm_memif_rdma.c index fce6534a..1fe32491 100644 --- a/media-proxy/src/shm_memif_rdma.c +++ b/media-proxy/src/shm_memif_rdma.c @@ -22,14 +22,6 @@ int rx_rdma_on_connect(memif_conn_handle_t conn, void *priv_data) INFO("RX RDMA memif connected!"); - /* rx buffers */ - rx_ctx->shm_buf_num = 1; - rx_ctx->shm_bufs = (memif_buffer_t *)malloc(sizeof(memif_buffer_t) * rx_ctx->shm_buf_num); - if (!rx_ctx->shm_bufs) { - ERROR("Failed to allocate memory"); - return -ENOMEM; - } - err = memif_refill_queue(conn, 0, -1, 0); if (err != MEMIF_ERR_SUCCESS) { INFO("memif_refill_queue: %s", memif_strerror(err)); @@ -93,6 +85,9 @@ int tx_rdma_on_connect(memif_conn_handle_t conn, void *priv_data) atomic_store_explicit(&tx_ctx->shm_ready, true, memory_order_release); + while (!atomic_load_explicit(&tx_ctx->ep_ready, memory_order_acquire)) + usleep(1000); + print_memif_details(conn); return 0; @@ -149,19 +144,15 @@ int tx_rdma_on_receive(memif_conn_handle_t conn, void *priv_data, uint16_t qid) /* receive packets from the shared memory */ err = memif_rx_burst(conn, qid, &shm_bufs, 1, &buf_num); if (err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_NOBUF) { - INFO("memif_rx_burst: %s", memif_strerror(err)); + ERROR("memif_rx_burst: %s", memif_strerror(err)); return err; } - /* TODO: Use memif buffer directly. It has to be registered by libfabric */ - memcpy(tx_ctx->ep_ctx->data_buf, shm_bufs.data, shm_bufs.len); - ep_send_buf(tx_ctx->ep_ctx, tx_ctx->ep_ctx->data_buf, shm_bufs.len); - - err = memif_refill_queue(conn, qid, buf_num, 0); - if (err != MEMIF_ERR_SUCCESS) - INFO("memif_refill_queue: %s", memif_strerror(err)); - - tx_ctx->fb_send++; + err = ep_send_buf(tx_ctx->ep_ctx, shm_bufs.data, shm_bufs.len); + if (err) { + ERROR("ep_send_buf failed with: %d", err); + return err; + } return 0; } diff --git a/tests/single-node-sample-apps/test-rdma.sh b/tests/single-node-sample-apps/test-rdma.sh index a124a5e3..524e70cb 100755 --- a/tests/single-node-sample-apps/test-rdma.sh +++ b/tests/single-node-sample-apps/test-rdma.sh @@ -17,9 +17,8 @@ duration="${2:-10}" frames_number="${3:-300}" width="${4:-640}" height="${5:-360}" -fps="${6:-60}" -pixel_format="${7:-yuv422p10le}" -rdma_iface_ip="${8:-127.0.0.1}" +pixel_format="${6:-yuv422p10le}" +rdma_iface_ip="${7:-127.0.0.1}" # Test configuration (cont'd) wait_interval=$((duration + 5)) @@ -159,14 +158,14 @@ function run_test_rdma() { info "Starting recver_app" export MCM_MEDIA_PROXY_PORT=8003 - local recver_app_cmd="recver_app -r $rdma_iface_ip -t rdma -w $width -h $height -f $fps -x $pixel_format -b $output_file -o auto" + local recver_app_cmd="recver_app -r $rdma_iface_ip -t rdma -w $width -h $height -x $pixel_format -b $output_file -o auto" run_in_background "$bin_dir/$recver_app_cmd" "$recver_app_out" recver_app_pid="$!" info "Starting sender_app" export MCM_MEDIA_PROXY_PORT=8002 - local sender_app_cmd="sender_app -s $rdma_iface_ip -t rdma -w $width -h $height -f $fps -x $pixel_format -b $input_file -n $frames_number -o auto" + local sender_app_cmd="sender_app -s $rdma_iface_ip -t rdma -w $width -h $height -x $pixel_format -b $input_file -n $frames_number -o auto" run_in_background "$bin_dir/$sender_app_cmd" "$sender_app_out" sender_app_pid="$!" @@ -217,7 +216,7 @@ info " Binary directory: $(realpath $bin_dir)" info " Output directory: $(realpath $out_dir)" info " Input file path: $input_file" info " Input file size: $(stat -c%s $input_file) byte(s)" -info " Frame size: $width x $height, FPS: $fps" +info " Frame size: $width x $height" info " Pixel format: $pixel_format" info " Duration: ${duration}s" info " Frames number: $frames_number" From 75457573f81d3541797c052dc1f9f334f56f7d13 Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Thu, 17 Oct 2024 10:11:55 +0000 Subject: [PATCH 2/6] Fix code style, comments and error handling Signed-off-by: Kasiewicz, Marek --- media-proxy/src/libfabric_cq.c | 44 +++++++++++++++++++++++++----- media-proxy/src/libfabric_ep.c | 4 +-- media-proxy/src/rdma_session.c | 40 ++++++++++++--------------- media-proxy/src/shm_memif_common.c | 12 ++++---- 4 files changed, 61 insertions(+), 39 deletions(-) diff --git a/media-proxy/src/libfabric_cq.c b/media-proxy/src/libfabric_cq.c index 0910541a..a827e06c 100644 --- a/media-proxy/src/libfabric_cq.c +++ b/media-proxy/src/libfabric_cq.c @@ -81,11 +81,19 @@ void rdma_cq_set_wait_attr(struct fi_cq_attr *cq_attr, enum cq_comp_method metho static int rdma_spin_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout, struct fi_cq_err_entry *entries) { - uint64_t entries_num = total - *cur; + uint64_t entries_num; struct fi_cq_err_entry comp; struct timespec a, b; int ret; + if (!cur) + return EINVAL; + + if (total < *cur) + return EINVAL; + + entries_num = total - *cur; + if (timeout >= 0) clock_gettime(CLOCK_MONOTONIC, &a); @@ -117,22 +125,34 @@ static int rdma_poll_fd(int fd, int timeout) fds.fd = fd; fds.events = POLLIN; ret = poll(&fds, 1, timeout); - if (ret == !ret) { + if (ret == -1) { RDMA_PRINTERR("poll", -errno); ret = -errno; + } else if (!ret) { + RDMA_WARN("poll timed out"); + ret = -EAGAIN; } else { - return 0; + ret = 0; } + return ret; } static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout, struct fi_cq_err_entry *entries) { - uint64_t entries_num = total - *cur; + uint64_t entries_num; struct fi_cq_err_entry comp; struct fid *fids[1]; int fd, ret; + if (!cur) + return EINVAL; + + if (total < *cur) + return EINVAL; + + entries_num = total - *cur; + fd = cq == ep_ctx->txcq ? ep_ctx->tx_fd : ep_ctx->rx_fd; fids[0] = &cq->fid; @@ -158,12 +178,21 @@ static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *c static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout, struct fi_cq_err_entry *entries) { - uint64_t entries_num = total - *cur; + uint64_t entries_num; struct fi_cq_err_entry comp; int ret; + if (!cur) + return EINVAL; + + if (total < *cur) + return EINVAL; + + entries_num = total - *cur; + while (total != *cur) { - ret = fi_cq_sread(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1, NULL, timeout); + ret = fi_cq_sread(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1, NULL, + timeout); if (ret > 0) { (*cur)++; } else if (ret < 0) { @@ -179,7 +208,8 @@ int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_ { int ret; - assert(total > *cur); + if (!cur) + return EINVAL; ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout, entries); diff --git a/media-proxy/src/libfabric_ep.c b/media-proxy/src/libfabric_ep.c index 5714f6cc..f7be306b 100644 --- a/media-proxy/src/libfabric_ep.c +++ b/media-proxy/src/libfabric_ep.c @@ -190,8 +190,7 @@ int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx) int ret; do { - ret = - fi_recv(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, FI_ADDR_UNSPEC, buf_ctx); + ret = fi_recv(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, FI_ADDR_UNSPEC, buf_ctx); if (ret == -FI_EAGAIN) (void)fi_cq_read(ep_ctx->rxcq, NULL, 0); } while (ret == -FI_EAGAIN); @@ -212,7 +211,6 @@ int ep_rxcq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout) return 0; } - int ep_txcq_read(ep_ctx_t *ep_ctx, int timeout) { return rdma_get_cq_comp(ep_ctx, ep_ctx->txcq, &ep_ctx->tx_cq_cntr, ep_ctx->tx_cq_cntr + 1, diff --git a/media-proxy/src/rdma_session.c b/media-proxy/src/rdma_session.c index 48748739..a9de161b 100644 --- a/media-proxy/src/rdma_session.c +++ b/media-proxy/src/rdma_session.c @@ -79,7 +79,7 @@ int rx_rdma_shm_init(rx_rdma_session_context_t *rx_ctx, memif_ops_t *memif_ops) rx_ctx->memif_conn_args.socket = rx_ctx->memif_socket; rx_ctx->memif_conn_args.interface_id = memif_ops->interface_id; rx_ctx->memif_conn_args.buffer_size = (uint32_t)rx_ctx->transfer_size; - rx_ctx->memif_conn_args.log2_ring_size = 2; + rx_ctx->memif_conn_args.log2_ring_size = 4; memcpy((char *)rx_ctx->memif_conn_args.interface_name, memif_ops->interface_name, sizeof(rx_ctx->memif_conn_args.interface_name)); rx_ctx->memif_conn_args.is_master = memif_ops->is_master; @@ -87,7 +87,7 @@ int rx_rdma_shm_init(rx_rdma_session_context_t *rx_ctx, memif_ops_t *memif_ops) rx_ctx->shm_buf_num = 1 << rx_ctx->memif_conn_args.log2_ring_size; rx_ctx->shm_bufs = (shm_buf_info_t *)calloc(rx_ctx->shm_buf_num, sizeof(shm_buf_info_t)); if (!rx_ctx->shm_bufs) { - ERROR("%s Fail to malloc shared memory buffers", __func__); + ERROR("%s Failed to allocate shared memory buffers", __func__); return -ENOMEM; } @@ -170,7 +170,7 @@ int tx_rdma_shm_init(tx_rdma_session_context_t *tx_ctx, memif_ops_t *memif_ops) tx_ctx->memif_conn_args.socket = tx_ctx->memif_socket; tx_ctx->memif_conn_args.interface_id = memif_ops->interface_id; tx_ctx->memif_conn_args.buffer_size = (uint32_t)tx_ctx->transfer_size; - tx_ctx->memif_conn_args.log2_ring_size = 2; + tx_ctx->memif_conn_args.log2_ring_size = 4; snprintf((char *)tx_ctx->memif_conn_args.interface_name, sizeof(tx_ctx->memif_conn_args.interface_name), "%s", memif_ops->interface_name); tx_ctx->memif_conn_args.is_master = memif_ops->is_master; @@ -187,7 +187,7 @@ int tx_rdma_shm_init(tx_rdma_session_context_t *tx_ctx, memif_ops_t *memif_ops) err = pthread_create(&tx_ctx->memif_event_thread, NULL, memif_event_loop, tx_ctx->memif_conn_args.socket); if (err < 0) { - ERROR("%s(%d), thread create fail\n", __func__, err); + ERROR("%s(%d), thread create failed\n", __func__, err); return -1; } @@ -296,12 +296,11 @@ static void *tx_rdma_ep_thread(void *arg) err = ep_init(&s_ctx->ep_ctx, ep_cfg); free(ep_cfg); if (err) { - ERROR("%s, fail to initialize libfabric's end point.\n", __func__); + ERROR("%s, failed to initialize libfabric's end point.\n", __func__); return NULL; } atomic_store_explicit(&s_ctx->ep_ready, true, memory_order_release); - INFO("%s(%d), TX RDMA thread started\n", __func__, s_ctx->idx); while (!s_ctx->stop) { if (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire)) @@ -323,17 +322,17 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm ep_th_arg = calloc(1, sizeof(ep_thread_arg_t)); if (!ep_th_arg) { - printf("%s, Endpoint thread arguments malloc fail\n", __func__); + printf("%s, Endpoint thread arguments allocation failed\n", __func__); goto exit_dealloc; } tx_ctx = calloc(1, sizeof(tx_rdma_session_context_t)); if (tx_ctx == NULL) { - printf("%s, TX session contex malloc fail\n", __func__); + printf("%s, TX session contex allocation failed\n", __func__); goto exit_dealloc; } - ep_cfg= calloc(1, sizeof(ep_cfg_t)); + ep_cfg = calloc(1, sizeof(ep_cfg_t)); if (!ep_cfg) { - printf("%s, RDMA endpoint config malloc fail\n", __func__); + printf("%s, RDMA endpoint config allocation failed\n", __func__); goto exit_dealloc; } @@ -348,7 +347,6 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm goto exit_dealloc; } - ep_cfg->rdma_ctx = tx_ctx->rdma_ctx; ep_cfg->local_addr = opts->local_addr; ep_cfg->remote_addr = opts->remote_addr; @@ -362,7 +360,6 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm goto exit_deinit_shm; } - return tx_ctx; exit_deinit_shm: @@ -376,17 +373,15 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm static shm_buf_info_t *get_free_shm_buf(rx_rdma_session_context_t *s) { - uint32_t i = 0; - for (; i < s->shm_buf_num; i++) { + uint32_t i; + for (i = 0; i < s->shm_buf_num; i++) { if (!s->shm_bufs[i].used) { return &s->shm_bufs[i]; } } - return NULL; } - static int pass_empty_buf_to_libfabric(rx_rdma_session_context_t *s) { int err; @@ -428,11 +423,10 @@ static void handle_received_buffers(rx_rdma_session_context_t *s) err = memif_tx_burst(s->memif_conn, 0, &buf_info->shm_buf, 1, &bursted_buf_num); if (err != MEMIF_ERR_SUCCESS && bursted_buf_num != 1) { - INFO("%s memif_tx_burst: %s", __func__, memif_strerror(err)); + INFO("%s memif_tx_burst: %s", __func__, memif_strerror(err)); return; } buf_info->used = false; - } static void *rx_rdma_ep_thread(void *arg) @@ -459,7 +453,7 @@ static void *rx_rdma_ep_thread(void *arg) err = ep_init(&s_ctx->ep_ctx, ep_cfg); free(ep_cfg); if (err) { - ERROR("%s, fail to initialize libfabric's end point.\n", __func__); + ERROR("%s, failed to initialize libfabric's end point.\n", __func__); return NULL; } @@ -467,7 +461,7 @@ static void *rx_rdma_ep_thread(void *arg) while (!s_ctx->stop) { if (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire)) continue; - while(!pass_empty_buf_to_libfabric(s_ctx)); + while (!pass_empty_buf_to_libfabric(s_ctx)) {} handle_received_buffers(s_ctx); } @@ -485,18 +479,18 @@ rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdm ep_th_arg = calloc(1, sizeof(ep_thread_arg_t)); if (!ep_th_arg) { - printf("%s, Endpoint thread arguments malloc fail\n", __func__); + printf("%s, Endpoint thread arguments allocation failed\n", __func__); goto exit_dealloc; } rx_ctx = calloc(1, sizeof(rx_rdma_session_context_t)); if (!rx_ctx) { - printf("%s, TX session contex malloc fail\n", __func__); + printf("%s, TX session contex allocation failed\n", __func__); goto exit_dealloc; } ep_cfg = calloc(1, sizeof(ep_cfg_t)); if (!ep_cfg) { - printf("%s, RDMA endpoint config malloc fail\n", __func__); + printf("%s, RDMA endpoint config allocation failed\n", __func__); goto exit_dealloc; } rx_ctx->rdma_ctx = dev_handle; diff --git a/media-proxy/src/shm_memif_common.c b/media-proxy/src/shm_memif_common.c index 431ab5c3..b65efe44 100644 --- a/media-proxy/src/shm_memif_common.c +++ b/media-proxy/src/shm_memif_common.c @@ -21,7 +21,7 @@ void print_memif_details(memif_conn_handle_t conn) int err = 0; buf = (char*)malloc(buflen); - if (buf == NULL) { + if (!buf) { INFO("Not Enough Memory."); return; } @@ -222,14 +222,14 @@ int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *reg { memif_details_t md = { 0 }; ssize_t buflen = 2000; - char* buf = NULL; + char *buf = NULL; int err = 0; - if(!region || ! conn) + if (!region || !conn) return -EINVAL; - buf = (char*)calloc(buflen, 1); - if (buf == NULL) { + buf = (char *)calloc(buflen, 1); + if (!buf) { ERROR("Not Enough Memory."); return -ENOMEM; } @@ -241,7 +241,7 @@ int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *reg return -EINVAL; } /* Region number 1 holds data buffers */ - if (md.regions_num < 1){ + if (md.regions_num < 1) { ERROR("Data buffers not found in memif regions"); free(buf); return -EINVAL; From 83d0adb16aa7d1ca70623318edc036d1c3af2047 Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Thu, 17 Oct 2024 11:55:05 +0000 Subject: [PATCH 3/6] Check rdma_*_session_create input params for NULL Signed-off-by: Kasiewicz, Marek --- media-proxy/src/rdma_session.c | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/media-proxy/src/rdma_session.c b/media-proxy/src/rdma_session.c index a9de161b..68dfe0b4 100644 --- a/media-proxy/src/rdma_session.c +++ b/media-proxy/src/rdma_session.c @@ -320,19 +320,24 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm ep_cfg_t *ep_cfg = NULL; int err; + if (!dev_handle || !opts || !memif_ops) { + ERROR("%s, A input parameter is NULL", __func__); + return NULL; + } + ep_th_arg = calloc(1, sizeof(ep_thread_arg_t)); if (!ep_th_arg) { - printf("%s, Endpoint thread arguments allocation failed\n", __func__); + ERROR("%s, Endpoint thread arguments allocation failed\n", __func__); goto exit_dealloc; } tx_ctx = calloc(1, sizeof(tx_rdma_session_context_t)); if (tx_ctx == NULL) { - printf("%s, TX session contex allocation failed\n", __func__); + ERROR("%s, TX session contex allocation failed\n", __func__); goto exit_dealloc; } ep_cfg = calloc(1, sizeof(ep_cfg_t)); if (!ep_cfg) { - printf("%s, RDMA endpoint config allocation failed\n", __func__); + ERROR("%s, RDMA endpoint config allocation failed\n", __func__); goto exit_dealloc; } @@ -343,7 +348,7 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm err = tx_rdma_shm_init(tx_ctx, memif_ops); if (err < 0) { - printf("%s, fail to initialize share memory.\n", __func__); + ERROR("%s, failed to initialize share memory.\n", __func__); goto exit_dealloc; } @@ -356,7 +361,7 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm ep_th_arg->s_ctx = tx_ctx; err = pthread_create(&tx_ctx->ep_thread, NULL, tx_rdma_ep_thread, ep_th_arg); if (err < 0) { - printf("%s(%d), thread create fail %d\n", __func__, err, tx_ctx->idx); + ERROR("%s, Endpoint thread %d create failed: %s\n", __func__, tx_ctx->idx, strerror(err)); goto exit_deinit_shm; } @@ -477,20 +482,25 @@ rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdm ep_cfg_t *ep_cfg = NULL; int err; + if (!dev_handle || !opts || !memif_ops) { + ERROR("%s, A input parameter is NULL", __func__); + return NULL; + } + ep_th_arg = calloc(1, sizeof(ep_thread_arg_t)); if (!ep_th_arg) { - printf("%s, Endpoint thread arguments allocation failed\n", __func__); + ERROR("%s, Endpoint thread arguments allocation failed\n", __func__); goto exit_dealloc; } rx_ctx = calloc(1, sizeof(rx_rdma_session_context_t)); if (!rx_ctx) { - printf("%s, TX session contex allocation failed\n", __func__); + ERROR("%s, TX session contex allocation failed\n", __func__); goto exit_dealloc; } ep_cfg = calloc(1, sizeof(ep_cfg_t)); if (!ep_cfg) { - printf("%s, RDMA endpoint config allocation failed\n", __func__); + ERROR("%s, RDMA endpoint config allocation failed\n", __func__); goto exit_dealloc; } rx_ctx->rdma_ctx = dev_handle; @@ -500,7 +510,7 @@ rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdm err = rx_rdma_shm_init(rx_ctx, memif_ops); if (err < 0) { - printf("%s, fail to initialize share memory.\n", __func__); + ERROR("%s, Failed to initialize share memory.\n", __func__); goto exit_dealloc; } @@ -513,7 +523,7 @@ rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdm ep_th_arg->s_ctx = rx_ctx; err = pthread_create(&rx_ctx->ep_thread, NULL, rx_rdma_ep_thread, ep_th_arg); if (err < 0) { - printf("%s(%d), thread create fail %d\n", __func__, err, rx_ctx->idx); + ERROR("%s, Endpoint thread %d create failed: %s\n", __func__, rx_ctx->idx, strerror(err)); goto exit_deinit_shm; } From bf7edecc04419f22a79a8c8239d787582fa5845f Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Thu, 17 Oct 2024 14:51:53 +0000 Subject: [PATCH 4/6] Move ep_mr_reg from ep_init to *_rdma_on_connect Signed-off-by: Kasiewicz, Marek --- media-proxy/include/libfabric_ep.h | 11 +-- media-proxy/include/libfabric_mr.h | 6 +- media-proxy/include/rdma_session.h | 1 - media-proxy/src/libfabric_ep.c | 19 ++--- media-proxy/src/libfabric_mr.c | 6 +- media-proxy/src/rdma_session.c | 130 ++++++++--------------------- media-proxy/src/shm_memif_rdma.c | 31 ++++++- 7 files changed, 77 insertions(+), 127 deletions(-) diff --git a/media-proxy/include/libfabric_ep.h b/media-proxy/include/libfabric_ep.h index 4b6c0be8..1122faa5 100644 --- a/media-proxy/include/libfabric_ep.h +++ b/media-proxy/include/libfabric_ep.h @@ -24,12 +24,11 @@ typedef struct { typedef struct { struct fid_ep *ep; - char *data_buf; - size_t data_buf_size; int rx_fd, tx_fd; struct fid_cq *txcq, *rxcq; struct fid_av *av; struct fid_mr *data_mr; + uint64_t mr_access; void *data_desc; fi_addr_t dest_av_entry; @@ -42,23 +41,17 @@ typedef struct { typedef struct { libfabric_ctx *rdma_ctx; - char *data_buf; - size_t data_buf_size; rdma_addr remote_addr; rdma_addr local_addr; enum direction dir; } ep_cfg_t; -typedef struct { - ep_cfg_t *ep_cfg; - void *s_ctx; -} ep_thread_arg_t; - int ep_send_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size); // *buf has to point to registered memory int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx); int ep_rxcq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout); int ep_txcq_read(ep_ctx_t *ep_ctx, int timeout); +int ep_reg_mr(ep_ctx_t *ep_ctx, void *data_buf, size_t data_buf_size); int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg); int ep_destroy(ep_ctx_t **ep_ctx); diff --git a/media-proxy/include/libfabric_mr.h b/media-proxy/include/libfabric_mr.h index 6b75835c..85c07b8c 100644 --- a/media-proxy/include/libfabric_mr.h +++ b/media-proxy/include/libfabric_mr.h @@ -13,9 +13,9 @@ extern "C" { #include "libfabric_dev.h" -int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, struct fi_info *fi, void *buf, - size_t size, uint64_t access, uint64_t key, enum fi_hmem_iface iface, - uint64_t device, struct fid_mr **mr, void **desc); +int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, void *buf, size_t size, uint64_t access, + uint64_t key, enum fi_hmem_iface iface, uint64_t device, struct fid_mr **mr, + void **desc); uint64_t rdma_info_to_mr_access(struct fi_info *info); diff --git a/media-proxy/include/rdma_session.h b/media-proxy/include/rdma_session.h index aa8d6480..3d87d4e8 100644 --- a/media-proxy/include/rdma_session.h +++ b/media-proxy/include/rdma_session.h @@ -44,7 +44,6 @@ typedef struct { volatile bool stop; pthread_t ep_thread; - atomic_bool ep_ready; int fb_send; diff --git a/media-proxy/src/libfabric_ep.c b/media-proxy/src/libfabric_ep.c index f7be306b..77beb8a1 100644 --- a/media-proxy/src/libfabric_ep.c +++ b/media-proxy/src/libfabric_ep.c @@ -159,16 +159,15 @@ static int ep_alloc_res(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_inf } return 0; } - -static int ep_reg_mr(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_info *fi) +int ep_reg_mr(ep_ctx_t *ep_ctx, void *data_buf, size_t data_buf_size) { int ret; /* TODO: I'm using address of ep_ctx as a key, * maybe there is more elegant solution */ - ret = rdma_reg_mr(rdma_ctx, ep_ctx->ep, fi, ep_ctx->data_buf, ep_ctx->data_buf_size, - rdma_info_to_mr_access(fi), (uint64_t)ep_ctx, FI_HMEM_SYSTEM, 0, - &ep_ctx->data_mr, &ep_ctx->data_desc); + ret = rdma_reg_mr(ep_ctx->rdma_ctx, ep_ctx->ep, data_buf, data_buf_size, + ep_ctx->mr_access, (uint64_t)ep_ctx, FI_HMEM_SYSTEM, 0, &ep_ctx->data_mr, + &ep_ctx->data_desc); return ret; } @@ -229,8 +228,6 @@ int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg) return -ENOMEM; } (*ep_ctx)->rdma_ctx = cfg->rdma_ctx; - (*ep_ctx)->data_buf = cfg->data_buf; - (*ep_ctx)->data_buf_size = cfg->data_buf_size; hints = fi_dupinfo((*ep_ctx)->rdma_ctx->info); @@ -251,13 +248,6 @@ int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg) return ret; } - ret = ep_reg_mr(*ep_ctx, (*ep_ctx)->rdma_ctx, fi); - if (ret) { - printf("%s, ep_reg_mr fail\n", __func__); - ep_destroy(ep_ctx); - return ret; - } - ret = ep_alloc_res(*ep_ctx, (*ep_ctx)->rdma_ctx, fi, 0, 0, 1); if (ret) { printf("%s, ep_alloc_res fail\n", __func__); @@ -278,6 +268,7 @@ int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg) if (ret) return ret; } + (*ep_ctx)->mr_access = rdma_info_to_mr_access(fi); fi_freeinfo(fi); diff --git a/media-proxy/src/libfabric_mr.c b/media-proxy/src/libfabric_mr.c index 86417c5d..d25f128d 100644 --- a/media-proxy/src/libfabric_mr.c +++ b/media-proxy/src/libfabric_mr.c @@ -79,9 +79,9 @@ static void rdma_fill_mr_attr(struct iovec *iov, struct fi_mr_dmabuf *dmabuf, in } } -int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, struct fi_info *fi, void *buf, - size_t size, uint64_t access, uint64_t key, enum fi_hmem_iface iface, - uint64_t device, struct fid_mr **mr, void **desc) +int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, void *buf, size_t size, uint64_t access, + uint64_t key, enum fi_hmem_iface iface, uint64_t device, struct fid_mr **mr, + void **desc) { struct fi_mr_dmabuf dmabuf = { 0 }; struct fi_mr_attr attr = { 0 }; diff --git a/media-proxy/src/rdma_session.c b/media-proxy/src/rdma_session.c index 68dfe0b4..f24e746b 100644 --- a/media-proxy/src/rdma_session.c +++ b/media-proxy/src/rdma_session.c @@ -274,33 +274,13 @@ static void handle_sent_buffers(tx_rdma_session_context_t *s) static void *tx_rdma_ep_thread(void *arg) { - ep_thread_arg_t *ep_thread_arg = (ep_thread_arg_t *)arg; - ep_cfg_t *ep_cfg = ep_thread_arg->ep_cfg; - tx_rdma_session_context_t *s_ctx = (tx_rdma_session_context_t *)ep_thread_arg->s_ctx; + tx_rdma_session_context_t *s_ctx = (tx_rdma_session_context_t *)arg; memif_region_details_t region; int err = 0; - free(ep_thread_arg); - while (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire) && !s_ctx->stop) usleep(1000); - err = memif_get_buffs_region(s_ctx->memif_conn, ®ion); - if (err) { - ERROR("%s, Getting memory buffers from memif failed. \n", __func__); - return NULL; - } - ep_cfg->data_buf_size = region.size; - ep_cfg->data_buf = region.addr; - - err = ep_init(&s_ctx->ep_ctx, ep_cfg); - free(ep_cfg); - if (err) { - ERROR("%s, failed to initialize libfabric's end point.\n", __func__); - return NULL; - } - atomic_store_explicit(&s_ctx->ep_ready, true, memory_order_release); - INFO("%s(%d), TX RDMA thread started\n", __func__, s_ctx->idx); while (!s_ctx->stop) { if (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire)) @@ -316,8 +296,7 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm memif_ops_t *memif_ops) { tx_rdma_session_context_t *tx_ctx = NULL; - ep_thread_arg_t *ep_th_arg = NULL; - ep_cfg_t *ep_cfg = NULL; + ep_cfg_t ep_cfg = { 0 }; int err; if (!dev_handle || !opts || !memif_ops) { @@ -325,41 +304,34 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm return NULL; } - ep_th_arg = calloc(1, sizeof(ep_thread_arg_t)); - if (!ep_th_arg) { - ERROR("%s, Endpoint thread arguments allocation failed\n", __func__); - goto exit_dealloc; - } tx_ctx = calloc(1, sizeof(tx_rdma_session_context_t)); if (tx_ctx == NULL) { ERROR("%s, TX session contex allocation failed\n", __func__); - goto exit_dealloc; - } - ep_cfg = calloc(1, sizeof(ep_cfg_t)); - if (!ep_cfg) { - ERROR("%s, RDMA endpoint config allocation failed\n", __func__); - goto exit_dealloc; + return NULL; } tx_ctx->transfer_size = opts->transfer_size; tx_ctx->rdma_ctx = dev_handle; tx_ctx->stop = false; - tx_ctx->ep_ready = ATOMIC_VAR_INIT(false); + + ep_cfg.rdma_ctx = tx_ctx->rdma_ctx; + ep_cfg.local_addr = opts->local_addr; + ep_cfg.remote_addr = opts->remote_addr; + ep_cfg.dir = opts->dir; + + err = ep_init(&tx_ctx->ep_ctx, &ep_cfg); + if (err) { + ERROR("%s, failed to initialize libfabric's end point.\n", __func__); + goto exit_dealloc; + } err = tx_rdma_shm_init(tx_ctx, memif_ops); if (err < 0) { ERROR("%s, failed to initialize share memory.\n", __func__); - goto exit_dealloc; + goto exit_deinit_ep; } - ep_cfg->rdma_ctx = tx_ctx->rdma_ctx; - ep_cfg->local_addr = opts->local_addr; - ep_cfg->remote_addr = opts->remote_addr; - ep_cfg->dir = opts->dir; - - ep_th_arg->ep_cfg = ep_cfg; // maybe I should use rdma_s_ops_t directly? - ep_th_arg->s_ctx = tx_ctx; - err = pthread_create(&tx_ctx->ep_thread, NULL, tx_rdma_ep_thread, ep_th_arg); + err = pthread_create(&tx_ctx->ep_thread, NULL, tx_rdma_ep_thread, tx_ctx); if (err < 0) { ERROR("%s, Endpoint thread %d create failed: %s\n", __func__, tx_ctx->idx, strerror(err)); goto exit_deinit_shm; @@ -369,10 +341,10 @@ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdm exit_deinit_shm: tx_shm_deinit(tx_ctx); +exit_deinit_ep: + ep_destroy(&tx_ctx->ep_ctx); exit_dealloc: - free(ep_cfg); free(tx_ctx); - free(ep_th_arg); return NULL; } @@ -436,32 +408,13 @@ static void handle_received_buffers(rx_rdma_session_context_t *s) static void *rx_rdma_ep_thread(void *arg) { - ep_thread_arg_t *ep_thread_arg = (ep_thread_arg_t *)arg; - ep_cfg_t *ep_cfg = ep_thread_arg->ep_cfg; - rx_rdma_session_context_t *s_ctx = (rx_rdma_session_context_t *)ep_thread_arg->s_ctx; + rx_rdma_session_context_t *s_ctx = (rx_rdma_session_context_t *)arg; memif_region_details_t region; int err = 0; - free(ep_thread_arg); - while (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire) && !s_ctx->stop) usleep(1000); - err = memif_get_buffs_region(s_ctx->memif_conn, ®ion); - if (err) { - ERROR("%s, Getting memory buffers from memif failed. \n", __func__); - return NULL; - } - ep_cfg->data_buf_size = region.size; - ep_cfg->data_buf = region.addr; - - err = ep_init(&s_ctx->ep_ctx, ep_cfg); - free(ep_cfg); - if (err) { - ERROR("%s, failed to initialize libfabric's end point.\n", __func__); - return NULL; - } - INFO("%s(%d), RX RDMA thread started\n", __func__, s_ctx->idx); while (!s_ctx->stop) { if (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire)) @@ -478,8 +431,7 @@ rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdm memif_ops_t *memif_ops) { rx_rdma_session_context_t *rx_ctx = NULL; - ep_thread_arg_t *ep_th_arg = NULL; - ep_cfg_t *ep_cfg = NULL; + ep_cfg_t ep_cfg = { 0 }; int err; if (!dev_handle || !opts || !memif_ops) { @@ -487,41 +439,34 @@ rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdm return NULL; } - ep_th_arg = calloc(1, sizeof(ep_thread_arg_t)); - if (!ep_th_arg) { - ERROR("%s, Endpoint thread arguments allocation failed\n", __func__); - goto exit_dealloc; - } - rx_ctx = calloc(1, sizeof(rx_rdma_session_context_t)); if (!rx_ctx) { ERROR("%s, TX session contex allocation failed\n", __func__); - goto exit_dealloc; - } - ep_cfg = calloc(1, sizeof(ep_cfg_t)); - if (!ep_cfg) { - ERROR("%s, RDMA endpoint config allocation failed\n", __func__); - goto exit_dealloc; + return NULL; } + + rx_ctx->transfer_size = opts->transfer_size; rx_ctx->rdma_ctx = dev_handle; rx_ctx->stop = false; - rx_ctx->transfer_size = opts->transfer_size; + ep_cfg.rdma_ctx = rx_ctx->rdma_ctx; + ep_cfg.local_addr = opts->local_addr; + ep_cfg.remote_addr = opts->remote_addr; + ep_cfg.dir = opts->dir; + + err = ep_init(&rx_ctx->ep_ctx, &ep_cfg); + if (err) { + ERROR("%s, failed to initialize libfabric's end point.\n", __func__); + goto exit_dealloc; + } err = rx_rdma_shm_init(rx_ctx, memif_ops); if (err < 0) { ERROR("%s, Failed to initialize share memory.\n", __func__); - goto exit_dealloc; + goto exit_deinit_ep; } - ep_cfg->rdma_ctx = rx_ctx->rdma_ctx; - ep_cfg->local_addr = opts->local_addr; - ep_cfg->remote_addr = opts->remote_addr; - ep_cfg->dir = opts->dir; - - ep_th_arg->ep_cfg = ep_cfg; - ep_th_arg->s_ctx = rx_ctx; - err = pthread_create(&rx_ctx->ep_thread, NULL, rx_rdma_ep_thread, ep_th_arg); + err = pthread_create(&rx_ctx->ep_thread, NULL, rx_rdma_ep_thread, rx_ctx); if (err < 0) { ERROR("%s, Endpoint thread %d create failed: %s\n", __func__, rx_ctx->idx, strerror(err)); goto exit_deinit_shm; @@ -531,10 +476,10 @@ rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdm exit_deinit_shm: rx_shm_deinit(rx_ctx); +exit_deinit_ep: + ep_destroy(&rx_ctx->ep_ctx); exit_dealloc: - free(ep_cfg); free(rx_ctx); - free(ep_th_arg); return NULL; } @@ -611,7 +556,6 @@ void rdma_tx_session_destroy(tx_rdma_session_context_t **p_tx_ctx) printf("%s, ep free failed\n", __func__); return; } - atomic_store_explicit(&tx_ctx->ep_ready, false, memory_order_relaxed); tx_shm_deinit(tx_ctx); diff --git a/media-proxy/src/shm_memif_rdma.c b/media-proxy/src/shm_memif_rdma.c index 1fe32491..8ff5b3c3 100644 --- a/media-proxy/src/shm_memif_rdma.c +++ b/media-proxy/src/shm_memif_rdma.c @@ -18,6 +18,7 @@ int rx_rdma_on_connect(memif_conn_handle_t conn, void *priv_data) { rx_rdma_session_context_t *rx_ctx = (rx_rdma_session_context_t *)priv_data; + memif_region_details_t region; int err; INFO("RX RDMA memif connected!"); @@ -28,6 +29,18 @@ int rx_rdma_on_connect(memif_conn_handle_t conn, void *priv_data) return err; } + err = memif_get_buffs_region(conn, ®ion); + if (err) { + ERROR("%s, Getting memory buffers from memif failed. \n", __func__); + return err; + } + + err = ep_reg_mr(rx_ctx->ep_ctx, region.addr, region.size); + if (errno) { + ERROR("%s, ep_reg_mr failed: %s\n", __func__, fi_strerror(err)); + return err; + } + print_memif_details(conn); atomic_store_explicit(&rx_ctx->shm_ready, true, memory_order_release); @@ -73,6 +86,7 @@ int rx_rdma_on_disconnect(memif_conn_handle_t conn, void *priv_data) int tx_rdma_on_connect(memif_conn_handle_t conn, void *priv_data) { tx_rdma_session_context_t *tx_ctx = (tx_rdma_session_context_t *)priv_data; + memif_region_details_t region; int err = 0; INFO("TX RDMA memif connected!"); @@ -83,10 +97,19 @@ int tx_rdma_on_connect(memif_conn_handle_t conn, void *priv_data) return err; } - atomic_store_explicit(&tx_ctx->shm_ready, true, memory_order_release); + err = memif_get_buffs_region(conn, ®ion); + if (err) { + ERROR("%s, Getting memory buffers from memif failed. \n", __func__); + return err; + } - while (!atomic_load_explicit(&tx_ctx->ep_ready, memory_order_acquire)) - usleep(1000); + err = ep_reg_mr(tx_ctx->ep_ctx, region.addr, region.size); + if (errno) { + ERROR("%s, ep_reg_mr failed: %s\n", __func__, fi_strerror(err)); + return err; + } + + atomic_store_explicit(&tx_ctx->shm_ready, true, memory_order_release); print_memif_details(conn); @@ -150,7 +173,7 @@ int tx_rdma_on_receive(memif_conn_handle_t conn, void *priv_data, uint16_t qid) err = ep_send_buf(tx_ctx->ep_ctx, shm_bufs.data, shm_bufs.len); if (err) { - ERROR("ep_send_buf failed with: %d", err); + ERROR("ep_send_buf failed with: %s", fi_strerror(err)); return err; } From 2fc7123828f44b938edf31c43d2868e94f47e885 Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Fri, 18 Oct 2024 07:27:23 +0000 Subject: [PATCH 5/6] Remove mr_access from ep_ctx_t mr_access can be calculated basing on from fi_info stored in libfabric_ctx Signed-off-by: Kasiewicz, Marek --- media-proxy/include/libfabric_ep.h | 1 - media-proxy/src/libfabric_ep.c | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/media-proxy/include/libfabric_ep.h b/media-proxy/include/libfabric_ep.h index 1122faa5..383f01bb 100644 --- a/media-proxy/include/libfabric_ep.h +++ b/media-proxy/include/libfabric_ep.h @@ -28,7 +28,6 @@ typedef struct { struct fid_cq *txcq, *rxcq; struct fid_av *av; struct fid_mr *data_mr; - uint64_t mr_access; void *data_desc; fi_addr_t dest_av_entry; diff --git a/media-proxy/src/libfabric_ep.c b/media-proxy/src/libfabric_ep.c index 77beb8a1..20423457 100644 --- a/media-proxy/src/libfabric_ep.c +++ b/media-proxy/src/libfabric_ep.c @@ -166,8 +166,8 @@ int ep_reg_mr(ep_ctx_t *ep_ctx, void *data_buf, size_t data_buf_size) /* TODO: I'm using address of ep_ctx as a key, * maybe there is more elegant solution */ ret = rdma_reg_mr(ep_ctx->rdma_ctx, ep_ctx->ep, data_buf, data_buf_size, - ep_ctx->mr_access, (uint64_t)ep_ctx, FI_HMEM_SYSTEM, 0, &ep_ctx->data_mr, - &ep_ctx->data_desc); + rdma_info_to_mr_access(ep_ctx->rdma_ctx->info), (uint64_t)ep_ctx, + FI_HMEM_SYSTEM, 0, &ep_ctx->data_mr, &ep_ctx->data_desc); return ret; } @@ -268,7 +268,6 @@ int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg) if (ret) return ret; } - (*ep_ctx)->mr_access = rdma_info_to_mr_access(fi); fi_freeinfo(fi); From 8904807c682bb0783ccf9f15515811dd2514f8f9 Mon Sep 17 00:00:00 2001 From: "Kasiewicz, Marek" Date: Fri, 18 Oct 2024 11:34:30 +0000 Subject: [PATCH 6/6] Fix error handling in libfabric_cq - In in rdma_get_cq_comp: do not print error if FI_EAVAIL is returned - Return always negative error number Signed-off-by: Kasiewicz, Marek --- media-proxy/src/libfabric_cq.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/media-proxy/src/libfabric_cq.c b/media-proxy/src/libfabric_cq.c index a827e06c..e1100c24 100644 --- a/media-proxy/src/libfabric_cq.c +++ b/media-proxy/src/libfabric_cq.c @@ -87,10 +87,10 @@ static int rdma_spin_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur int ret; if (!cur) - return EINVAL; + return -EINVAL; if (total < *cur) - return EINVAL; + return -EINVAL; entries_num = total - *cur; @@ -146,10 +146,10 @@ static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *c int fd, ret; if (!cur) - return EINVAL; + return -EINVAL; if (total < *cur) - return EINVAL; + return -EINVAL; entries_num = total - *cur; @@ -183,10 +183,10 @@ static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur int ret; if (!cur) - return EINVAL; + return -EINVAL; if (total < *cur) - return EINVAL; + return -EINVAL; entries_num = total - *cur; @@ -209,7 +209,7 @@ int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_ int ret; if (!cur) - return EINVAL; + return -EINVAL; ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout, entries); @@ -217,7 +217,7 @@ int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_ if (ret == -FI_EAVAIL) { ret = rdma_cq_readerr(cq); (*cur)++; - } else { + } else if ( ret != -FI_EAGAIN) { RDMA_PRINTERR("rdma_get_cq_comp", ret); } }