diff --git a/media-proxy/include/libfabric_cq.h b/media-proxy/include/libfabric_cq.h index e483352f..f63399af 100644 --- a/media-proxy/include/libfabric_cq.h +++ b/media-proxy/include/libfabric_cq.h @@ -13,13 +13,14 @@ extern "C" { #include "libfabric_ep.h" -void rdma_cq_set_wait_attr(struct fi_cq_attr cq_attr, enum cq_comp_method method, +void rdma_cq_set_wait_attr(struct fi_cq_attr *cq_attr, enum cq_comp_method method, struct fid_wait *waitset); -int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout); +int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout, + struct fi_cq_err_entry *entries); int rdma_get_cq_fd(struct fid_cq *cq, int *fd, enum cq_comp_method method); int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout); + int timeout, struct fi_cq_err_entry *entry); int rdma_cq_readerr(struct fid_cq *cq); #ifdef __cplusplus diff --git a/media-proxy/include/libfabric_ep.h b/media-proxy/include/libfabric_ep.h index d782e1e6..383f01bb 100644 --- a/media-proxy/include/libfabric_ep.h +++ b/media-proxy/include/libfabric_ep.h @@ -24,8 +24,6 @@ typedef struct { typedef struct { struct fid_ep *ep; - char *data_buf; - size_t data_buf_size; int rx_fd, tx_fd; struct fid_cq *txcq, *rxcq; struct fid_av *av; @@ -34,8 +32,6 @@ typedef struct { fi_addr_t dest_av_entry; struct fid_wait *waitset; - struct fi_context *recv_ctx; - struct fi_context *send_ctx; uint64_t tx_cq_cntr; uint64_t rx_cq_cntr; @@ -44,16 +40,17 @@ typedef struct { typedef struct { libfabric_ctx *rdma_ctx; - char *data_buf; - size_t data_buf_size; rdma_addr remote_addr; rdma_addr local_addr; enum direction dir; } ep_cfg_t; -int ep_send_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size); +int ep_send_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size); // *buf has to point to registered memory -int ep_recv_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size); +int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx); +int ep_rxcq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout); +int ep_txcq_read(ep_ctx_t *ep_ctx, int timeout); +int ep_reg_mr(ep_ctx_t *ep_ctx, void *data_buf, size_t data_buf_size); int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg); int ep_destroy(ep_ctx_t **ep_ctx); diff --git a/media-proxy/include/libfabric_mr.h b/media-proxy/include/libfabric_mr.h index 6b75835c..85c07b8c 100644 --- a/media-proxy/include/libfabric_mr.h +++ b/media-proxy/include/libfabric_mr.h @@ -13,9 +13,9 @@ extern "C" { #include "libfabric_dev.h" -int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, struct fi_info *fi, void *buf, - size_t size, uint64_t access, uint64_t key, enum fi_hmem_iface iface, - uint64_t device, struct fid_mr **mr, void **desc); +int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, void *buf, size_t size, uint64_t access, + uint64_t key, enum fi_hmem_iface iface, uint64_t device, struct fid_mr **mr, + void **desc); uint64_t rdma_info_to_mr_access(struct fi_info *info); diff --git a/media-proxy/include/rdma_session.h b/media-proxy/include/rdma_session.h index a37e832a..3d87d4e8 100644 --- a/media-proxy/include/rdma_session.h +++ b/media-proxy/include/rdma_session.h @@ -33,23 +33,21 @@ typedef struct { } rdma_s_ops_t; typedef struct { - libfabric_ctx st; + memif_buffer_t shm_buf; + bool used; +} shm_buf_info_t; + +typedef struct { int idx; libfabric_ctx *rdma_ctx; ep_ctx_t *ep_ctx; - int frame_done_cnt; - int packet_done_cnt; - volatile bool stop; + pthread_t ep_thread; int fb_send; - pthread_cond_t wake_cond; - pthread_mutex_t wake_mutex; size_t transfer_size; - size_t pkt_len; - /* memif parameters */ memif_ops_t memif_ops; @@ -58,7 +56,6 @@ typedef struct { /* memif conenction handle */ memif_conn_handle_t memif_conn; - memif_buffer_t *shm_bufs; uint16_t shm_buf_num; atomic_bool shm_ready; @@ -69,21 +66,18 @@ typedef struct { } tx_rdma_session_context_t; typedef struct { - libfabric_ctx st; int idx; libfabric_ctx *rdma_ctx; ep_ctx_t *ep_ctx; volatile bool stop; - pthread_t frame_thread; + pthread_t ep_thread; int fb_recv; pthread_t app_thread; size_t transfer_size; - int pkt_len; - /* share memory arguments */ memif_socket_args_t memif_socket_args; @@ -93,17 +87,12 @@ typedef struct { memif_socket_handle_t memif_socket; memif_conn_handle_t memif_conn; - memif_buffer_t *shm_bufs; + shm_buf_info_t *shm_bufs; uint16_t shm_buf_num; atomic_bool shm_ready; char name[32]; pthread_t memif_event_thread; - - /* stat */ - int stat_frame_total_received; - uint64_t stat_frame_first_rx_time; - double expect_fps; } rx_rdma_session_context_t; /* TX: Create RDMA session */ diff --git a/media-proxy/include/shm_memif.h b/media-proxy/include/shm_memif.h index 09a53c4d..58df151e 100644 --- a/media-proxy/include/shm_memif.h +++ b/media-proxy/include/shm_memif.h @@ -89,6 +89,7 @@ int tx_rdma_on_receive(memif_conn_handle_t conn, void *priv_data, uint16_t qid); int memif_buffer_alloc_timeout(memif_conn_handle_t conn, uint16_t qid, memif_buffer_t * bufs, uint16_t count, uint16_t * count_out, uint32_t size, uint32_t timeout_ms); +int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *region); #ifdef __cplusplus } diff --git a/media-proxy/src/libfabric_cq.c b/media-proxy/src/libfabric_cq.c index f29c0a46..e1100c24 100644 --- a/media-proxy/src/libfabric_cq.c +++ b/media-proxy/src/libfabric_cq.c @@ -50,49 +50,55 @@ #define CQ_TIMEOUT (-1) -void rdma_cq_set_wait_attr(struct fi_cq_attr cq_attr, enum cq_comp_method method, +void rdma_cq_set_wait_attr(struct fi_cq_attr *cq_attr, enum cq_comp_method method, struct fid_wait *waitset) { switch (method) { case RDMA_COMP_SREAD: - cq_attr.wait_obj = FI_WAIT_UNSPEC; - cq_attr.wait_cond = FI_CQ_COND_NONE; + cq_attr->wait_obj = FI_WAIT_UNSPEC; + cq_attr->wait_cond = FI_CQ_COND_NONE; break; case RDMA_COMP_WAITSET: assert(waitset); - cq_attr.wait_obj = FI_WAIT_SET; - cq_attr.wait_cond = FI_CQ_COND_NONE; - cq_attr.wait_set = waitset; + cq_attr->wait_obj = FI_WAIT_SET; + cq_attr->wait_cond = FI_CQ_COND_NONE; + cq_attr->wait_set = waitset; break; case RDMA_COMP_WAIT_FD: - cq_attr.wait_obj = FI_WAIT_FD; - cq_attr.wait_cond = FI_CQ_COND_NONE; + cq_attr->wait_obj = FI_WAIT_FD; + cq_attr->wait_cond = FI_CQ_COND_NONE; break; case RDMA_COMP_YIELD: - cq_attr.wait_obj = FI_WAIT_YIELD; - cq_attr.wait_cond = FI_CQ_COND_NONE; + cq_attr->wait_obj = FI_WAIT_YIELD; + cq_attr->wait_cond = FI_CQ_COND_NONE; break; default: - cq_attr.wait_obj = FI_WAIT_NONE; + cq_attr->wait_obj = FI_WAIT_NONE; break; } } -/* - * fi_cq_err_entry can be cast to any CQ entry format. - */ static int rdma_spin_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout) + int timeout, struct fi_cq_err_entry *entries) { + uint64_t entries_num; struct fi_cq_err_entry comp; struct timespec a, b; int ret; + if (!cur) + return -EINVAL; + + if (total < *cur) + return -EINVAL; + + entries_num = total - *cur; + if (timeout >= 0) clock_gettime(CLOCK_MONOTONIC, &a); do { - ret = fi_cq_read(cq, &comp, 1); + ret = fi_cq_read(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1); if (ret > 0) { if (timeout >= 0) clock_gettime(CLOCK_MONOTONIC, &a); @@ -123,6 +129,7 @@ static int rdma_poll_fd(int fd, int timeout) RDMA_PRINTERR("poll", -errno); ret = -errno; } else if (!ret) { + RDMA_WARN("poll timed out"); ret = -EAGAIN; } else { ret = 0; @@ -130,16 +137,22 @@ static int rdma_poll_fd(int fd, int timeout) return ret; } -/* - * fi_cq_err_entry can be cast to any CQ entry format. - */ static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout) + int timeout, struct fi_cq_err_entry *entries) { + uint64_t entries_num; struct fi_cq_err_entry comp; struct fid *fids[1]; int fd, ret; + if (!cur) + return -EINVAL; + + if (total < *cur) + return -EINVAL; + + entries_num = total - *cur; + fd = cq == ep_ctx->txcq ? ep_ctx->tx_fd : ep_ctx->rx_fd; fids[0] = &cq->fid; @@ -147,14 +160,14 @@ static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *c ret = fi_trywait(ep_ctx->rdma_ctx->fabric, fids, 1); if (!ret) { ret = rdma_poll_fd(fd, timeout); - if (ret && ret != -FI_EAGAIN) + if (ret) return ret; } - ret = fi_cq_read(cq, &comp, 1); + ret = fi_cq_read(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1); if (ret > 0) { (*cur)++; - } else if (ret < 0 && ret != -FI_EAGAIN) { + } else if (ret < 0) { return ret; } } @@ -162,20 +175,27 @@ static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *c return 0; } -/* - * fi_cq_err_entry can be cast to any CQ entry format. - */ static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout) + int timeout, struct fi_cq_err_entry *entries) { + uint64_t entries_num; struct fi_cq_err_entry comp; int ret; + if (!cur) + return -EINVAL; + + if (total < *cur) + return -EINVAL; + + entries_num = total - *cur; + while (total != *cur) { - ret = fi_cq_sread(cq, &comp, 1, NULL, timeout); + ret = fi_cq_sread(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1, NULL, + timeout); if (ret > 0) { (*cur)++; - } else if (ret < 0 && ret != -FI_EAGAIN) { + } else if (ret < 0) { return ret; } } @@ -184,35 +204,39 @@ static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur } int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, - int timeout) + int timeout, struct fi_cq_err_entry *entries) { int ret; - ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout); + if (!cur) + return -EINVAL; + + ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout, entries); if (ret) { if (ret == -FI_EAVAIL) { ret = rdma_cq_readerr(cq); (*cur)++; - } else { + } else if ( ret != -FI_EAGAIN) { RDMA_PRINTERR("rdma_get_cq_comp", ret); } } return ret; } -int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout) +int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout, + struct fi_cq_err_entry *entries) { switch (ep_ctx->rdma_ctx->comp_method) { case RDMA_COMP_SREAD: case RDMA_COMP_YIELD: - return rdma_wait_for_comp(ep_ctx, cq, cur, total, timeout); + return rdma_wait_for_comp(ep_ctx, cq, cur, total, timeout, entries); break; case RDMA_COMP_WAIT_FD: - return rdma_fdwait_for_comp(ep_ctx, cq, cur, total, timeout); + return rdma_fdwait_for_comp(ep_ctx, cq, cur, total, timeout, entries); break; default: - return rdma_spin_for_comp(ep_ctx, cq, cur, total, timeout); + return rdma_spin_for_comp(ep_ctx, cq, cur, total, timeout, entries); break; } } diff --git a/media-proxy/src/libfabric_dev.c b/media-proxy/src/libfabric_dev.c index 0540cb40..b8930f13 100644 --- a/media-proxy/src/libfabric_dev.c +++ b/media-proxy/src/libfabric_dev.c @@ -101,7 +101,7 @@ int rdma_init(libfabric_ctx **ctx) return -ENOMEM; } - (*ctx)->comp_method = RDMA_COMP_SPIN; + (*ctx)->comp_method = RDMA_COMP_SREAD; hints = fi_allocinfo(); if (!hints) { diff --git a/media-proxy/src/libfabric_ep.c b/media-proxy/src/libfabric_ep.c index cd57f6f1..20423457 100644 --- a/media-proxy/src/libfabric_ep.c +++ b/media-proxy/src/libfabric_ep.c @@ -121,7 +121,7 @@ static int ep_alloc_res(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_inf cq_attr.format = FI_CQ_FORMAT_CONTEXT; } - rdma_cq_set_wait_attr(cq_attr, rdma_ctx->comp_method, NULL); + rdma_cq_set_wait_attr(&cq_attr, rdma_ctx->comp_method, NULL); if (tx_cq_size) cq_attr.size = tx_cq_size; else @@ -133,7 +133,7 @@ static int ep_alloc_res(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_inf return ret; } - rdma_cq_set_wait_attr(cq_attr, rdma_ctx->comp_method, NULL); + rdma_cq_set_wait_attr(&cq_attr, rdma_ctx->comp_method, NULL); if (rx_cq_size) cq_attr.size = rx_cq_size; else @@ -159,48 +159,61 @@ static int ep_alloc_res(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_inf } return 0; } - -static int ep_reg_mr(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_info *fi) +int ep_reg_mr(ep_ctx_t *ep_ctx, void *data_buf, size_t data_buf_size) { int ret; /* TODO: I'm using address of ep_ctx as a key, * maybe there is more elegant solution */ - ret = rdma_reg_mr(rdma_ctx, ep_ctx->ep, fi, ep_ctx->data_buf, ep_ctx->data_buf_size, - rdma_info_to_mr_access(fi), (uint64_t)ep_ctx, FI_HMEM_SYSTEM, 0, - &ep_ctx->data_mr, &ep_ctx->data_desc); + ret = rdma_reg_mr(ep_ctx->rdma_ctx, ep_ctx->ep, data_buf, data_buf_size, + rdma_info_to_mr_access(ep_ctx->rdma_ctx->info), (uint64_t)ep_ctx, + FI_HMEM_SYSTEM, 0, &ep_ctx->data_mr, &ep_ctx->data_desc); return ret; } -int ep_send_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size) +int ep_send_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size) { int ret; do { - ret = fi_send(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, ep_ctx->dest_av_entry, - ep_ctx->send_ctx); + ret = fi_send(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, ep_ctx->dest_av_entry, NULL); if (ret == -EAGAIN) (void)fi_cq_read(ep_ctx->txcq, NULL, 0); } while (ret == -EAGAIN); - ret = rdma_get_cq_comp(ep_ctx, ep_ctx->txcq, &ep_ctx->tx_cq_cntr, ep_ctx->tx_cq_cntr + 1, -1); return ret; } -int ep_recv_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size) +int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx) { int ret; - double elapsed_time; - double fps = 0.0; do { - ret = - fi_recv(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, FI_ADDR_UNSPEC, ep_ctx->recv_ctx); - if (ret == -EAGAIN) + ret = fi_recv(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, FI_ADDR_UNSPEC, buf_ctx); + if (ret == -FI_EAGAIN) (void)fi_cq_read(ep_ctx->rxcq, NULL, 0); - } while (ret == -EAGAIN); + } while (ret == -FI_EAGAIN); + + return ret; +} + +int ep_rxcq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout) +{ + struct fi_cq_err_entry entry; + int err; + + err = rdma_get_cq_comp(ep_ctx, ep_ctx->rxcq, &ep_ctx->rx_cq_cntr, ep_ctx->rx_cq_cntr + 1, + timeout, &entry); + if (err) + return err; + *buf_ctx = entry.op_context; + return 0; +} - return rdma_get_cq_comp(ep_ctx, ep_ctx->rxcq, &ep_ctx->rx_cq_cntr, ep_ctx->rx_cq_cntr + 1, -1); +int ep_txcq_read(ep_ctx_t *ep_ctx, int timeout) +{ + return rdma_get_cq_comp(ep_ctx, ep_ctx->txcq, &ep_ctx->tx_cq_cntr, ep_ctx->tx_cq_cntr + 1, + timeout, NULL); } int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg) @@ -215,8 +228,6 @@ int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg) return -ENOMEM; } (*ep_ctx)->rdma_ctx = cfg->rdma_ctx; - (*ep_ctx)->data_buf = cfg->data_buf; - (*ep_ctx)->data_buf_size = cfg->data_buf_size; hints = fi_dupinfo((*ep_ctx)->rdma_ctx->info); @@ -237,13 +248,6 @@ int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg) return ret; } - ret = ep_reg_mr(*ep_ctx, (*ep_ctx)->rdma_ctx, fi); - if (ret) { - printf("%s, ep_reg_mr fail\n", __func__); - ep_destroy(ep_ctx); - return ret; - } - ret = ep_alloc_res(*ep_ctx, (*ep_ctx)->rdma_ctx, fi, 0, 0, 1); if (ret) { printf("%s, ep_alloc_res fail\n", __func__); diff --git a/media-proxy/src/libfabric_mr.c b/media-proxy/src/libfabric_mr.c index 86417c5d..d25f128d 100644 --- a/media-proxy/src/libfabric_mr.c +++ b/media-proxy/src/libfabric_mr.c @@ -79,9 +79,9 @@ static void rdma_fill_mr_attr(struct iovec *iov, struct fi_mr_dmabuf *dmabuf, in } } -int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, struct fi_info *fi, void *buf, - size_t size, uint64_t access, uint64_t key, enum fi_hmem_iface iface, - uint64_t device, struct fid_mr **mr, void **desc) +int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, void *buf, size_t size, uint64_t access, + uint64_t key, enum fi_hmem_iface iface, uint64_t device, struct fid_mr **mr, + void **desc) { struct fi_mr_dmabuf dmabuf = { 0 }; struct fi_mr_attr attr = { 0 }; diff --git a/media-proxy/src/rdma_session.c b/media-proxy/src/rdma_session.c index ac2df69d..f24e746b 100644 --- a/media-proxy/src/rdma_session.c +++ b/media-proxy/src/rdma_session.c @@ -84,11 +84,19 @@ int rx_rdma_shm_init(rx_rdma_session_context_t *rx_ctx, memif_ops_t *memif_ops) sizeof(rx_ctx->memif_conn_args.interface_name)); rx_ctx->memif_conn_args.is_master = memif_ops->is_master; + rx_ctx->shm_buf_num = 1 << rx_ctx->memif_conn_args.log2_ring_size; + rx_ctx->shm_bufs = (shm_buf_info_t *)calloc(rx_ctx->shm_buf_num, sizeof(shm_buf_info_t)); + if (!rx_ctx->shm_bufs) { + ERROR("%s Failed to allocate shared memory buffers", __func__); + return -ENOMEM; + } + INFO("create memif interface."); err = memif_create(&rx_ctx->memif_conn, &rx_ctx->memif_conn_args, rx_rdma_on_connect, rx_rdma_on_disconnect, rx_on_receive, rx_ctx); if (err != MEMIF_ERR_SUCCESS) { INFO("memif_create: %s", memif_strerror(err)); + free(rx_ctx->shm_bufs); return -1; } @@ -97,6 +105,7 @@ int rx_rdma_shm_init(rx_rdma_session_context_t *rx_ctx, memif_ops_t *memif_ops) rx_ctx->memif_conn_args.socket); if (err < 0) { printf("%s(%d), thread create fail\n", __func__, err); + free(rx_ctx->shm_bufs); return -1; } @@ -106,7 +115,6 @@ int rx_rdma_shm_init(rx_rdma_session_context_t *rx_ctx, memif_ops_t *memif_ops) int tx_rdma_shm_init(tx_rdma_session_context_t *tx_ctx, memif_ops_t *memif_ops) { memif_ops_t default_memif_ops = { 0 }; - const uint16_t FRAME_COUNT = 1; struct stat st = { 0 }; int err; @@ -167,16 +175,11 @@ int tx_rdma_shm_init(tx_rdma_session_context_t *tx_ctx, memif_ops_t *memif_ops) sizeof(tx_ctx->memif_conn_args.interface_name), "%s", memif_ops->interface_name); tx_ctx->memif_conn_args.is_master = memif_ops->is_master; - /* TX buffers */ - tx_ctx->shm_bufs = (memif_buffer_t *)malloc(sizeof(memif_buffer_t) * FRAME_COUNT); - tx_ctx->shm_buf_num = FRAME_COUNT; - INFO("Create memif interface."); err = memif_create(&tx_ctx->memif_conn, &tx_ctx->memif_conn_args, tx_rdma_on_connect, tx_rdma_on_disconnect, tx_rdma_on_receive, tx_ctx); if (err != MEMIF_ERR_SUCCESS) { - INFO("memif_create: %s", memif_strerror(err)); - free(tx_ctx->shm_bufs); + ERROR("memif_create: %s", memif_strerror(err)); return -1; } @@ -184,8 +187,7 @@ int tx_rdma_shm_init(tx_rdma_session_context_t *tx_ctx, memif_ops_t *memif_ops) err = pthread_create(&tx_ctx->memif_event_thread, NULL, memif_event_loop, tx_ctx->memif_conn_args.socket); if (err < 0) { - printf("%s(%d), thread create fail\n", __func__, err); - free(tx_ctx->shm_bufs); + ERROR("%s(%d), thread create failed\n", __func__, err); return -1; } @@ -248,108 +250,177 @@ static int tx_shm_deinit(tx_rdma_session_context_t *tx_ctx) unlink(tx_ctx->memif_socket_args.path); } - if (tx_ctx->shm_bufs) { - free(tx_ctx->shm_bufs); - tx_ctx->shm_bufs = NULL; + return 0; +} + +static void handle_sent_buffers(tx_rdma_session_context_t *s) +{ + shm_buf_info_t *buf_info; + uint16_t bursted_buf_num; + int err; + + err = ep_txcq_read(s->ep_ctx, 1); + if (err) { + if (err != -EAGAIN) + INFO("%s ep_txcq_read: %s", __func__, strerror(err)); + return; } + s->fb_send++; - return 0; + err = memif_refill_queue(s->memif_conn, 0, 1, 0); + if (err != MEMIF_ERR_SUCCESS) + INFO("memif_refill_queue: %s", memif_strerror(err)); +} + +static void *tx_rdma_ep_thread(void *arg) +{ + tx_rdma_session_context_t *s_ctx = (tx_rdma_session_context_t *)arg; + memif_region_details_t region; + int err = 0; + + while (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire) && !s_ctx->stop) + usleep(1000); + + INFO("%s(%d), TX RDMA thread started\n", __func__, s_ctx->idx); + while (!s_ctx->stop) { + if (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire)) + continue; + handle_sent_buffers(s_ctx); + } + + return NULL; } /* TX: Create RDMA session */ tx_rdma_session_context_t *rdma_tx_session_create(libfabric_ctx *dev_handle, rdma_s_ops_t *opts, memif_ops_t *memif_ops) { + tx_rdma_session_context_t *tx_ctx = NULL; + ep_cfg_t ep_cfg = { 0 }; int err; - tx_rdma_session_context_t *tx_ctx; + + if (!dev_handle || !opts || !memif_ops) { + ERROR("%s, A input parameter is NULL", __func__); + return NULL; + } + tx_ctx = calloc(1, sizeof(tx_rdma_session_context_t)); if (tx_ctx == NULL) { - printf("%s, TX session contex malloc fail\n", __func__); + ERROR("%s, TX session contex allocation failed\n", __func__); return NULL; } + + tx_ctx->transfer_size = opts->transfer_size; tx_ctx->rdma_ctx = dev_handle; tx_ctx->stop = false; - tx_ctx->transfer_size = opts->transfer_size; + ep_cfg.rdma_ctx = tx_ctx->rdma_ctx; + ep_cfg.local_addr = opts->local_addr; + ep_cfg.remote_addr = opts->remote_addr; + ep_cfg.dir = opts->dir; - err = tx_rdma_shm_init(tx_ctx, memif_ops); - if (err < 0) { - printf("%s, fail to initialize share memory.\n", __func__); - free(tx_ctx); - return NULL; + err = ep_init(&tx_ctx->ep_ctx, &ep_cfg); + if (err) { + ERROR("%s, failed to initialize libfabric's end point.\n", __func__); + goto exit_dealloc; } - /* TODO: use memif buffer with correct size */ - ep_cfg_t ep_cfg = { - .rdma_ctx = tx_ctx->rdma_ctx, - .data_buf_size = tx_ctx->transfer_size, - .local_addr = opts->local_addr, - .remote_addr = opts->remote_addr, - .data_buf = malloc(tx_ctx->transfer_size), - .dir = opts->dir, - }; - if (!ep_cfg.data_buf) { - printf("%s, session data buffer malloc fail\n", __func__); - return NULL; + err = tx_rdma_shm_init(tx_ctx, memif_ops); + if (err < 0) { + ERROR("%s, failed to initialize share memory.\n", __func__); + goto exit_deinit_ep; } - err = ep_init(&tx_ctx->ep_ctx, &ep_cfg); - if (err) { - printf("%s, fail to initialize libfabric's end point.\n", __func__); - free(tx_ctx); - return NULL; + err = pthread_create(&tx_ctx->ep_thread, NULL, tx_rdma_ep_thread, tx_ctx); + if (err < 0) { + ERROR("%s, Endpoint thread %d create failed: %s\n", __func__, tx_ctx->idx, strerror(err)); + goto exit_deinit_shm; } return tx_ctx; + +exit_deinit_shm: + tx_shm_deinit(tx_ctx); +exit_deinit_ep: + ep_destroy(&tx_ctx->ep_ctx); +exit_dealloc: + free(tx_ctx); + return NULL; +} + +static shm_buf_info_t *get_free_shm_buf(rx_rdma_session_context_t *s) +{ + uint32_t i; + for (i = 0; i < s->shm_buf_num; i++) { + if (!s->shm_bufs[i].used) { + return &s->shm_bufs[i]; + } + } + return NULL; } -static void rx_rdma_consume_frame(rx_rdma_session_context_t *s, char *frame) +static int pass_empty_buf_to_libfabric(rx_rdma_session_context_t *s) { int err; - uint16_t qid = 0; - mcm_buffer *rx_mcm_buff = NULL; - memif_buffer_t *rx_bufs = NULL; - uint16_t buf_num = 1; - memif_conn_handle_t conn; + shm_buf_info_t *buf_info = NULL; uint32_t buf_size = s->transfer_size; - uint16_t rx_buf_num = 0, rx = 0; + uint16_t rx_buf_num = 0; - rx_bufs = s->shm_bufs; + buf_info = get_free_shm_buf(s); + if (!buf_info) + return -ENOMEM; - /* allocate memory */ - err = memif_buffer_alloc_timeout(s->memif_conn, qid, rx_bufs, 1, &rx_buf_num, buf_size, 10); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_rdma_consume_frame: Failed to alloc memif buffer: %s", memif_strerror(err)); - return; - } + err = memif_buffer_alloc(s->memif_conn, 0, &buf_info->shm_buf, 1, &rx_buf_num, buf_size); + if (err != MEMIF_ERR_SUCCESS) + return -ENOMEM; - memcpy(rx_bufs->data, frame, s->transfer_size); + buf_info->used = true; - /* Send to microservice application. */ - err = memif_tx_burst(s->memif_conn, qid, rx_bufs, rx_buf_num, &rx); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_rdma_consume_frame memif_tx_burst: %s", memif_strerror(err)); + err = ep_recv_buf(s->ep_ctx, buf_info->shm_buf.data, buf_size, buf_info); + if (err) { + ERROR("%s ep_recv_buf failed with errno: %s", __func__, fi_strerror(err)); + return err; } + return 0; +} +static void handle_received_buffers(rx_rdma_session_context_t *s) +{ + shm_buf_info_t *buf_info; + int err; + uint16_t bursted_buf_num; + + err = ep_rxcq_read(s->ep_ctx, (void **)&buf_info, 1); + if (err) { + if (err != -EAGAIN) + INFO("%s ep_rxcq_read: %s", __func__, strerror(err)); + return; + } s->fb_recv++; + + err = memif_tx_burst(s->memif_conn, 0, &buf_info->shm_buf, 1, &bursted_buf_num); + if (err != MEMIF_ERR_SUCCESS && bursted_buf_num != 1) { + INFO("%s memif_tx_burst: %s", __func__, memif_strerror(err)); + return; + } + buf_info->used = false; } -static void *rx_rdma_frame_thread(void *arg) +static void *rx_rdma_ep_thread(void *arg) { rx_rdma_session_context_t *s_ctx = (rx_rdma_session_context_t *)arg; - libfabric_ctx *rdma_ctx = s_ctx->rdma_ctx; - ep_ctx_t *cp_ctx = s_ctx->ep_ctx; + memif_region_details_t region; + int err = 0; while (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire) && !s_ctx->stop) usleep(1000); - printf("%s(%d), RX RDMA thread started\n", __func__, s_ctx->idx); + INFO("%s(%d), RX RDMA thread started\n", __func__, s_ctx->idx); while (!s_ctx->stop) { - ep_recv_buf(cp_ctx, cp_ctx->data_buf, s_ctx->transfer_size); if (!atomic_load_explicit(&s_ctx->shm_ready, memory_order_acquire)) continue; - - rx_rdma_consume_frame(s_ctx, cp_ctx->data_buf); + while (!pass_empty_buf_to_libfabric(s_ctx)) {} + handle_received_buffers(s_ctx); } return NULL; @@ -359,55 +430,57 @@ static void *rx_rdma_frame_thread(void *arg) rx_rdma_session_context_t *rdma_rx_session_create(libfabric_ctx *dev_handle, rdma_s_ops_t *opts, memif_ops_t *memif_ops) { - rx_rdma_session_context_t *rx_ctx; + rx_rdma_session_context_t *rx_ctx = NULL; ep_cfg_t ep_cfg = { 0 }; int err; - rx_ctx = calloc(1, sizeof(rx_rdma_session_context_t)); - if (rx_ctx == NULL) { - printf("%s, TX session contex malloc fail\n", __func__); + if (!dev_handle || !opts || !memif_ops) { + ERROR("%s, A input parameter is NULL", __func__); return NULL; } - rx_ctx->rdma_ctx = dev_handle; - rx_ctx->stop = false; - - rx_ctx->transfer_size = opts->transfer_size; - err = rx_rdma_shm_init(rx_ctx, memif_ops); - - if (err < 0) { - printf("%s, fail to initialize share memory.\n", __func__); - free(rx_ctx); + rx_ctx = calloc(1, sizeof(rx_rdma_session_context_t)); + if (!rx_ctx) { + ERROR("%s, TX session contex allocation failed\n", __func__); return NULL; } - /* TODO: use memif buffer with correct size */ + rx_ctx->transfer_size = opts->transfer_size; + rx_ctx->rdma_ctx = dev_handle; + rx_ctx->stop = false; + ep_cfg.rdma_ctx = rx_ctx->rdma_ctx; - ep_cfg.data_buf_size = rx_ctx->transfer_size; ep_cfg.local_addr = opts->local_addr; ep_cfg.remote_addr = opts->remote_addr; ep_cfg.dir = opts->dir; - ep_cfg.data_buf = malloc(rx_ctx->transfer_size); - if (!ep_cfg.data_buf) { - printf("%s, session data buffer malloc fail\n", __func__); - return NULL; - } err = ep_init(&rx_ctx->ep_ctx, &ep_cfg); if (err) { - printf("%s, fail to initialize libfabric's end point.\n", __func__); - free(rx_ctx); - return NULL; + ERROR("%s, failed to initialize libfabric's end point.\n", __func__); + goto exit_dealloc; } - err = pthread_create(&rx_ctx->frame_thread, NULL, rx_rdma_frame_thread, rx_ctx); + err = rx_rdma_shm_init(rx_ctx, memif_ops); if (err < 0) { - printf("%s(%d), thread create fail %d\n", __func__, err, rx_ctx->idx); - free(rx_ctx); - return NULL; + ERROR("%s, Failed to initialize share memory.\n", __func__); + goto exit_deinit_ep; + } + + err = pthread_create(&rx_ctx->ep_thread, NULL, rx_rdma_ep_thread, rx_ctx); + if (err < 0) { + ERROR("%s, Endpoint thread %d create failed: %s\n", __func__, rx_ctx->idx, strerror(err)); + goto exit_deinit_shm; } return rx_ctx; + +exit_deinit_shm: + rx_shm_deinit(rx_ctx); +exit_deinit_ep: + ep_destroy(&rx_ctx->ep_ctx); +exit_dealloc: + free(rx_ctx); + return NULL; } void rdma_rx_session_stop(rx_rdma_session_context_t *rx_ctx) @@ -421,7 +494,7 @@ void rdma_rx_session_stop(rx_rdma_session_context_t *rx_ctx) rx_ctx->stop = true; - err = pthread_join(rx_ctx->frame_thread, NULL); + err = pthread_join(rx_ctx->ep_thread, NULL); if (err && err != ESRCH) { ERROR("%s: Error joining thread: %s\n", __func__, strerror(err)); } @@ -438,8 +511,6 @@ void rdma_rx_session_destroy(rx_rdma_session_context_t **p_rx_ctx) } rx_ctx = *p_rx_ctx; - /* TODO: Remove free when memif buf will be used */ - free(rx_ctx->ep_ctx->data_buf); err = ep_destroy(&rx_ctx->ep_ctx); if (err < 0) { printf("%s, ep free failed\n", __func__); @@ -461,7 +532,12 @@ void rdma_tx_session_stop(tx_rdma_session_context_t *tx_ctx) return; } - /* No thread to stop */ + tx_ctx->stop = true; + + err = pthread_join(tx_ctx->ep_thread, NULL); + if (err && err != ESRCH) { + ERROR("%s: Error joining thread: %s\n", __func__, strerror(err)); + } } void rdma_tx_session_destroy(tx_rdma_session_context_t **p_tx_ctx) @@ -475,8 +551,6 @@ void rdma_tx_session_destroy(tx_rdma_session_context_t **p_tx_ctx) } tx_ctx = *p_tx_ctx; - /* TODO: Remove free when memif buf will be used */ - free(tx_ctx->ep_ctx->data_buf); err = ep_destroy(&tx_ctx->ep_ctx); if (err < 0) { printf("%s, ep free failed\n", __func__); diff --git a/media-proxy/src/shm_memif_common.c b/media-proxy/src/shm_memif_common.c index 74bef616..b65efe44 100644 --- a/media-proxy/src/shm_memif_common.c +++ b/media-proxy/src/shm_memif_common.c @@ -21,7 +21,7 @@ void print_memif_details(memif_conn_handle_t conn) int err = 0; buf = (char*)malloc(buflen); - if (buf == NULL) { + if (!buf) { INFO("Not Enough Memory."); return; } @@ -217,3 +217,38 @@ int memif_buffer_alloc_timeout(memif_conn_handle_t conn, uint16_t qid, } return MEMIF_ERR_NOBUF_RING; } + +int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *region) +{ + memif_details_t md = { 0 }; + ssize_t buflen = 2000; + char *buf = NULL; + int err = 0; + + if (!region || !conn) + return -EINVAL; + + buf = (char *)calloc(buflen, 1); + if (!buf) { + ERROR("Not Enough Memory."); + return -ENOMEM; + } + + err = memif_get_details(conn, &md, buf, buflen); + if (err != MEMIF_ERR_SUCCESS) { + ERROR("%s", memif_strerror(err)); + free(buf); + return -EINVAL; + } + /* Region number 1 holds data buffers */ + if (md.regions_num < 1) { + ERROR("Data buffers not found in memif regions"); + free(buf); + return -EINVAL; + } + + memcpy(region, &md.regions[1], sizeof(md.regions[1])); + free(buf); + + return 0; +} diff --git a/media-proxy/src/shm_memif_rdma.c b/media-proxy/src/shm_memif_rdma.c index fce6534a..8ff5b3c3 100644 --- a/media-proxy/src/shm_memif_rdma.c +++ b/media-proxy/src/shm_memif_rdma.c @@ -18,24 +18,29 @@ int rx_rdma_on_connect(memif_conn_handle_t conn, void *priv_data) { rx_rdma_session_context_t *rx_ctx = (rx_rdma_session_context_t *)priv_data; + memif_region_details_t region; int err; INFO("RX RDMA memif connected!"); - /* rx buffers */ - rx_ctx->shm_buf_num = 1; - rx_ctx->shm_bufs = (memif_buffer_t *)malloc(sizeof(memif_buffer_t) * rx_ctx->shm_buf_num); - if (!rx_ctx->shm_bufs) { - ERROR("Failed to allocate memory"); - return -ENOMEM; - } - err = memif_refill_queue(conn, 0, -1, 0); if (err != MEMIF_ERR_SUCCESS) { INFO("memif_refill_queue: %s", memif_strerror(err)); return err; } + err = memif_get_buffs_region(conn, ®ion); + if (err) { + ERROR("%s, Getting memory buffers from memif failed. \n", __func__); + return err; + } + + err = ep_reg_mr(rx_ctx->ep_ctx, region.addr, region.size); + if (errno) { + ERROR("%s, ep_reg_mr failed: %s\n", __func__, fi_strerror(err)); + return err; + } + print_memif_details(conn); atomic_store_explicit(&rx_ctx->shm_ready, true, memory_order_release); @@ -81,6 +86,7 @@ int rx_rdma_on_disconnect(memif_conn_handle_t conn, void *priv_data) int tx_rdma_on_connect(memif_conn_handle_t conn, void *priv_data) { tx_rdma_session_context_t *tx_ctx = (tx_rdma_session_context_t *)priv_data; + memif_region_details_t region; int err = 0; INFO("TX RDMA memif connected!"); @@ -91,6 +97,18 @@ int tx_rdma_on_connect(memif_conn_handle_t conn, void *priv_data) return err; } + err = memif_get_buffs_region(conn, ®ion); + if (err) { + ERROR("%s, Getting memory buffers from memif failed. \n", __func__); + return err; + } + + err = ep_reg_mr(tx_ctx->ep_ctx, region.addr, region.size); + if (errno) { + ERROR("%s, ep_reg_mr failed: %s\n", __func__, fi_strerror(err)); + return err; + } + atomic_store_explicit(&tx_ctx->shm_ready, true, memory_order_release); print_memif_details(conn); @@ -149,19 +167,15 @@ int tx_rdma_on_receive(memif_conn_handle_t conn, void *priv_data, uint16_t qid) /* receive packets from the shared memory */ err = memif_rx_burst(conn, qid, &shm_bufs, 1, &buf_num); if (err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_NOBUF) { - INFO("memif_rx_burst: %s", memif_strerror(err)); + ERROR("memif_rx_burst: %s", memif_strerror(err)); return err; } - /* TODO: Use memif buffer directly. It has to be registered by libfabric */ - memcpy(tx_ctx->ep_ctx->data_buf, shm_bufs.data, shm_bufs.len); - ep_send_buf(tx_ctx->ep_ctx, tx_ctx->ep_ctx->data_buf, shm_bufs.len); - - err = memif_refill_queue(conn, qid, buf_num, 0); - if (err != MEMIF_ERR_SUCCESS) - INFO("memif_refill_queue: %s", memif_strerror(err)); - - tx_ctx->fb_send++; + err = ep_send_buf(tx_ctx->ep_ctx, shm_bufs.data, shm_bufs.len); + if (err) { + ERROR("ep_send_buf failed with: %s", fi_strerror(err)); + return err; + } return 0; } diff --git a/tests/single-node-sample-apps/test-rdma.sh b/tests/single-node-sample-apps/test-rdma.sh index a124a5e3..524e70cb 100755 --- a/tests/single-node-sample-apps/test-rdma.sh +++ b/tests/single-node-sample-apps/test-rdma.sh @@ -17,9 +17,8 @@ duration="${2:-10}" frames_number="${3:-300}" width="${4:-640}" height="${5:-360}" -fps="${6:-60}" -pixel_format="${7:-yuv422p10le}" -rdma_iface_ip="${8:-127.0.0.1}" +pixel_format="${6:-yuv422p10le}" +rdma_iface_ip="${7:-127.0.0.1}" # Test configuration (cont'd) wait_interval=$((duration + 5)) @@ -159,14 +158,14 @@ function run_test_rdma() { info "Starting recver_app" export MCM_MEDIA_PROXY_PORT=8003 - local recver_app_cmd="recver_app -r $rdma_iface_ip -t rdma -w $width -h $height -f $fps -x $pixel_format -b $output_file -o auto" + local recver_app_cmd="recver_app -r $rdma_iface_ip -t rdma -w $width -h $height -x $pixel_format -b $output_file -o auto" run_in_background "$bin_dir/$recver_app_cmd" "$recver_app_out" recver_app_pid="$!" info "Starting sender_app" export MCM_MEDIA_PROXY_PORT=8002 - local sender_app_cmd="sender_app -s $rdma_iface_ip -t rdma -w $width -h $height -f $fps -x $pixel_format -b $input_file -n $frames_number -o auto" + local sender_app_cmd="sender_app -s $rdma_iface_ip -t rdma -w $width -h $height -x $pixel_format -b $input_file -n $frames_number -o auto" run_in_background "$bin_dir/$sender_app_cmd" "$sender_app_out" sender_app_pid="$!" @@ -217,7 +216,7 @@ info " Binary directory: $(realpath $bin_dir)" info " Output directory: $(realpath $out_dir)" info " Input file path: $input_file" info " Input file size: $(stat -c%s $input_file) byte(s)" -info " Frame size: $width x $height, FPS: $fps" +info " Frame size: $width x $height" info " Pixel format: $pixel_format" info " Duration: ${duration}s" info " Frames number: $frames_number"