Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use zero copy between memif and libfabric #237

Merged
merged 6 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions media-proxy/include/libfabric_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ extern "C" {

#include "libfabric_ep.h"

void rdma_cq_set_wait_attr(struct fi_cq_attr cq_attr, enum cq_comp_method method,
void rdma_cq_set_wait_attr(struct fi_cq_attr *cq_attr, enum cq_comp_method method,
struct fid_wait *waitset);
int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout);
int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout,
struct fi_cq_err_entry *entries);

int rdma_get_cq_fd(struct fid_cq *cq, int *fd, enum cq_comp_method method);
int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout);
int timeout, struct fi_cq_err_entry *entry);
int rdma_cq_readerr(struct fid_cq *cq);

#ifdef __cplusplus
Expand Down
13 changes: 5 additions & 8 deletions media-proxy/include/libfabric_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ typedef struct {

typedef struct {
struct fid_ep *ep;
char *data_buf;
size_t data_buf_size;
int rx_fd, tx_fd;
struct fid_cq *txcq, *rxcq;
struct fid_av *av;
Expand All @@ -34,8 +32,6 @@ typedef struct {
fi_addr_t dest_av_entry;

struct fid_wait *waitset;
struct fi_context *recv_ctx;
struct fi_context *send_ctx;
uint64_t tx_cq_cntr;
uint64_t rx_cq_cntr;

Expand All @@ -44,16 +40,17 @@ typedef struct {

typedef struct {
libfabric_ctx *rdma_ctx;
char *data_buf;
size_t data_buf_size;
rdma_addr remote_addr;
rdma_addr local_addr;
enum direction dir;
} ep_cfg_t;

int ep_send_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size);
int ep_send_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size);
// *buf has to point to registered memory
int ep_recv_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size);
int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx);
int ep_rxcq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout);
int ep_txcq_read(ep_ctx_t *ep_ctx, int timeout);
int ep_reg_mr(ep_ctx_t *ep_ctx, void *data_buf, size_t data_buf_size);
int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg);
int ep_destroy(ep_ctx_t **ep_ctx);

Expand Down
6 changes: 3 additions & 3 deletions media-proxy/include/libfabric_mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ extern "C" {

#include "libfabric_dev.h"

int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, struct fi_info *fi, void *buf,
size_t size, uint64_t access, uint64_t key, enum fi_hmem_iface iface,
uint64_t device, struct fid_mr **mr, void **desc);
int rdma_reg_mr(libfabric_ctx *rdma_ctx, struct fid_ep *ep, void *buf, size_t size, uint64_t access,
uint64_t key, enum fi_hmem_iface iface, uint64_t device, struct fid_mr **mr,
void **desc);

uint64_t rdma_info_to_mr_access(struct fi_info *info);

Expand Down
27 changes: 8 additions & 19 deletions media-proxy/include/rdma_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,21 @@ typedef struct {
} rdma_s_ops_t;

typedef struct {
libfabric_ctx st;
memif_buffer_t shm_buf;
bool used;
} shm_buf_info_t;

typedef struct {
int idx;
libfabric_ctx *rdma_ctx;
ep_ctx_t *ep_ctx;

int frame_done_cnt;
int packet_done_cnt;

volatile bool stop;
pthread_t ep_thread;

int fb_send;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t transfer_size;
size_t pkt_len;


/* memif parameters */
memif_ops_t memif_ops;
Expand All @@ -58,7 +56,6 @@ typedef struct {
/* memif conenction handle */
memif_conn_handle_t memif_conn;

memif_buffer_t *shm_bufs;
uint16_t shm_buf_num;
atomic_bool shm_ready;

Expand All @@ -69,21 +66,18 @@ typedef struct {
} tx_rdma_session_context_t;

typedef struct {
libfabric_ctx st;
int idx;
libfabric_ctx *rdma_ctx;
ep_ctx_t *ep_ctx;

volatile bool stop;
pthread_t frame_thread;
pthread_t ep_thread;

int fb_recv;

pthread_t app_thread;

size_t transfer_size;
int pkt_len;


/* share memory arguments */
memif_socket_args_t memif_socket_args;
Expand All @@ -93,17 +87,12 @@ typedef struct {
memif_socket_handle_t memif_socket;
memif_conn_handle_t memif_conn;

memif_buffer_t *shm_bufs;
shm_buf_info_t *shm_bufs;
uint16_t shm_buf_num;
atomic_bool shm_ready;

char name[32];
pthread_t memif_event_thread;

/* stat */
int stat_frame_total_received;
uint64_t stat_frame_first_rx_time;
double expect_fps;
} rx_rdma_session_context_t;

/* TX: Create RDMA session */
Expand Down
1 change: 1 addition & 0 deletions media-proxy/include/shm_memif.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ int tx_rdma_on_receive(memif_conn_handle_t conn, void *priv_data, uint16_t qid);
int memif_buffer_alloc_timeout(memif_conn_handle_t conn, uint16_t qid,
memif_buffer_t * bufs, uint16_t count, uint16_t * count_out,
uint32_t size, uint32_t timeout_ms);
int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *region);

#ifdef __cplusplus
}
Expand Down
96 changes: 60 additions & 36 deletions media-proxy/src/libfabric_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,49 +50,55 @@

#define CQ_TIMEOUT (-1)

void rdma_cq_set_wait_attr(struct fi_cq_attr cq_attr, enum cq_comp_method method,
void rdma_cq_set_wait_attr(struct fi_cq_attr *cq_attr, enum cq_comp_method method,
struct fid_wait *waitset)
{
switch (method) {
case RDMA_COMP_SREAD:
cq_attr.wait_obj = FI_WAIT_UNSPEC;
cq_attr.wait_cond = FI_CQ_COND_NONE;
cq_attr->wait_obj = FI_WAIT_UNSPEC;
cq_attr->wait_cond = FI_CQ_COND_NONE;
break;
case RDMA_COMP_WAITSET:
assert(waitset);
cq_attr.wait_obj = FI_WAIT_SET;
cq_attr.wait_cond = FI_CQ_COND_NONE;
cq_attr.wait_set = waitset;
cq_attr->wait_obj = FI_WAIT_SET;
cq_attr->wait_cond = FI_CQ_COND_NONE;
cq_attr->wait_set = waitset;
break;
case RDMA_COMP_WAIT_FD:
cq_attr.wait_obj = FI_WAIT_FD;
cq_attr.wait_cond = FI_CQ_COND_NONE;
cq_attr->wait_obj = FI_WAIT_FD;
cq_attr->wait_cond = FI_CQ_COND_NONE;
break;
case RDMA_COMP_YIELD:
cq_attr.wait_obj = FI_WAIT_YIELD;
cq_attr.wait_cond = FI_CQ_COND_NONE;
cq_attr->wait_obj = FI_WAIT_YIELD;
cq_attr->wait_cond = FI_CQ_COND_NONE;
break;
default:
cq_attr.wait_obj = FI_WAIT_NONE;
cq_attr->wait_obj = FI_WAIT_NONE;
break;
}
}

/*
* fi_cq_err_entry can be cast to any CQ entry format.
*/
static int rdma_spin_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout)
int timeout, struct fi_cq_err_entry *entries)
{
uint64_t entries_num;
struct fi_cq_err_entry comp;
struct timespec a, b;
int ret;

if (!cur)
return -EINVAL;

if (total < *cur)
return -EINVAL;

entries_num = total - *cur;

if (timeout >= 0)
clock_gettime(CLOCK_MONOTONIC, &a);

do {
ret = fi_cq_read(cq, &comp, 1);
ret = fi_cq_read(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1);
if (ret > 0) {
if (timeout >= 0)
clock_gettime(CLOCK_MONOTONIC, &a);
Expand Down Expand Up @@ -123,59 +129,73 @@ static int rdma_poll_fd(int fd, int timeout)
RDMA_PRINTERR("poll", -errno);
ret = -errno;
} else if (!ret) {
RDMA_WARN("poll timed out");
ret = -EAGAIN;
} else {
ret = 0;
}
return ret;
}

/*
* fi_cq_err_entry can be cast to any CQ entry format.
*/
static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout)
int timeout, struct fi_cq_err_entry *entries)
{
uint64_t entries_num;
struct fi_cq_err_entry comp;
struct fid *fids[1];
int fd, ret;

if (!cur)
return -EINVAL;

if (total < *cur)
return -EINVAL;

entries_num = total - *cur;

fd = cq == ep_ctx->txcq ? ep_ctx->tx_fd : ep_ctx->rx_fd;
fids[0] = &cq->fid;

while (total != *cur) {
ret = fi_trywait(ep_ctx->rdma_ctx->fabric, fids, 1);
if (!ret) {
ret = rdma_poll_fd(fd, timeout);
if (ret && ret != -FI_EAGAIN)
if (ret)
return ret;
}

ret = fi_cq_read(cq, &comp, 1);
ret = fi_cq_read(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1);
if (ret > 0) {
(*cur)++;
} else if (ret < 0 && ret != -FI_EAGAIN) {
} else if (ret < 0) {
return ret;
}
}

return 0;
}

/*
* fi_cq_err_entry can be cast to any CQ entry format.
*/
static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout)
int timeout, struct fi_cq_err_entry *entries)
{
uint64_t entries_num;
struct fi_cq_err_entry comp;
int ret;

if (!cur)
return -EINVAL;

if (total < *cur)
return -EINVAL;

entries_num = total - *cur;

while (total != *cur) {
ret = fi_cq_sread(cq, &comp, 1, NULL, timeout);
ret = fi_cq_sread(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1, NULL,
timeout);
if (ret > 0) {
(*cur)++;
} else if (ret < 0 && ret != -FI_EAGAIN) {
} else if (ret < 0) {
return ret;
}
}
Expand All @@ -184,35 +204,39 @@ static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur
}

int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout)
int timeout, struct fi_cq_err_entry *entries)
{
int ret;

ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout);
if (!cur)
return -EINVAL;

ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout, entries);

if (ret) {
if (ret == -FI_EAVAIL) {
ret = rdma_cq_readerr(cq);
(*cur)++;
} else {
} else if ( ret != -FI_EAGAIN) {
RDMA_PRINTERR("rdma_get_cq_comp", ret);
}
}
return ret;
}

int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout)
int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout,
struct fi_cq_err_entry *entries)
{
switch (ep_ctx->rdma_ctx->comp_method) {
case RDMA_COMP_SREAD:
case RDMA_COMP_YIELD:
return rdma_wait_for_comp(ep_ctx, cq, cur, total, timeout);
return rdma_wait_for_comp(ep_ctx, cq, cur, total, timeout, entries);
break;
case RDMA_COMP_WAIT_FD:
return rdma_fdwait_for_comp(ep_ctx, cq, cur, total, timeout);
return rdma_fdwait_for_comp(ep_ctx, cq, cur, total, timeout, entries);
break;
default:
return rdma_spin_for_comp(ep_ctx, cq, cur, total, timeout);
return rdma_spin_for_comp(ep_ctx, cq, cur, total, timeout, entries);
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion media-proxy/src/libfabric_dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ int rdma_init(libfabric_ctx **ctx)
return -ENOMEM;
}

(*ctx)->comp_method = RDMA_COMP_SPIN;
(*ctx)->comp_method = RDMA_COMP_SREAD;

hints = fi_allocinfo();
if (!hints) {
Expand Down
Loading