Skip to content

Commit

Permalink
Use zero copy between memif and libfabric
Browse files Browse the repository at this point in the history
Use memif's buffer to send data directly from them
and to receive data directly into them

Signed-off-by: Kasiewicz, Marek <[email protected]>
  • Loading branch information
Sakoram committed Oct 17, 2024
1 parent 765163d commit a9891c6
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 204 deletions.
7 changes: 4 additions & 3 deletions media-proxy/include/libfabric_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ extern "C" {

#include "libfabric_ep.h"

void rdma_cq_set_wait_attr(struct fi_cq_attr cq_attr, enum cq_comp_method method,
void rdma_cq_set_wait_attr(struct fi_cq_attr *cq_attr, enum cq_comp_method method,
struct fid_wait *waitset);
int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout);
int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout,
struct fi_cq_err_entry *entries);

int rdma_get_cq_fd(struct fid_cq *cq, int *fd, enum cq_comp_method method);
int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout);
int timeout, struct fi_cq_err_entry *entry);
int rdma_cq_readerr(struct fid_cq *cq);

#ifdef __cplusplus
Expand Down
13 changes: 9 additions & 4 deletions media-proxy/include/libfabric_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ typedef struct {
fi_addr_t dest_av_entry;

struct fid_wait *waitset;
struct fi_context *recv_ctx;
struct fi_context *send_ctx;
uint64_t tx_cq_cntr;
uint64_t rx_cq_cntr;

Expand All @@ -51,9 +49,16 @@ typedef struct {
enum direction dir;
} ep_cfg_t;

int ep_send_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size);
typedef struct {
ep_cfg_t *ep_cfg;
void *s_ctx;
} ep_thread_arg_t;

int ep_send_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size);
// *buf has to point to registered memory
int ep_recv_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size);
int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx);
int ep_rxcq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout);
int ep_txcq_read(ep_ctx_t *ep_ctx, int timeout);
int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg);
int ep_destroy(ep_ctx_t **ep_ctx);

Expand Down
28 changes: 9 additions & 19 deletions media-proxy/include/rdma_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,22 @@ typedef struct {
} rdma_s_ops_t;

typedef struct {
libfabric_ctx st;
memif_buffer_t shm_buf;
bool used;
} shm_buf_info_t;

typedef struct {
int idx;
libfabric_ctx *rdma_ctx;
ep_ctx_t *ep_ctx;

int frame_done_cnt;
int packet_done_cnt;

volatile bool stop;
pthread_t ep_thread;
atomic_bool ep_ready;

int fb_send;
pthread_cond_t wake_cond;
pthread_mutex_t wake_mutex;

size_t transfer_size;
size_t pkt_len;


/* memif parameters */
memif_ops_t memif_ops;
Expand All @@ -58,7 +57,6 @@ typedef struct {
/* memif conenction handle */
memif_conn_handle_t memif_conn;

memif_buffer_t *shm_bufs;
uint16_t shm_buf_num;
atomic_bool shm_ready;

Expand All @@ -69,21 +67,18 @@ typedef struct {
} tx_rdma_session_context_t;

typedef struct {
libfabric_ctx st;
int idx;
libfabric_ctx *rdma_ctx;
ep_ctx_t *ep_ctx;

volatile bool stop;
pthread_t frame_thread;
pthread_t ep_thread;

int fb_recv;

pthread_t app_thread;

size_t transfer_size;
int pkt_len;


/* share memory arguments */
memif_socket_args_t memif_socket_args;
Expand All @@ -93,17 +88,12 @@ typedef struct {
memif_socket_handle_t memif_socket;
memif_conn_handle_t memif_conn;

memif_buffer_t *shm_bufs;
shm_buf_info_t *shm_bufs;
uint16_t shm_buf_num;
atomic_bool shm_ready;

char name[32];
pthread_t memif_event_thread;

/* stat */
int stat_frame_total_received;
uint64_t stat_frame_first_rx_time;
double expect_fps;
} rx_rdma_session_context_t;

/* TX: Create RDMA session */
Expand Down
1 change: 1 addition & 0 deletions media-proxy/include/shm_memif.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ int tx_rdma_on_receive(memif_conn_handle_t conn, void *priv_data, uint16_t qid);
int memif_buffer_alloc_timeout(memif_conn_handle_t conn, uint16_t qid,
memif_buffer_t * bufs, uint16_t count, uint16_t * count_out,
uint32_t size, uint32_t timeout_ms);
int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *region);

#ifdef __cplusplus
}
Expand Down
74 changes: 34 additions & 40 deletions media-proxy/src/libfabric_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,40 +50,38 @@

#define CQ_TIMEOUT (-1)

void rdma_cq_set_wait_attr(struct fi_cq_attr cq_attr, enum cq_comp_method method,
void rdma_cq_set_wait_attr(struct fi_cq_attr *cq_attr, enum cq_comp_method method,
struct fid_wait *waitset)
{
switch (method) {
case RDMA_COMP_SREAD:
cq_attr.wait_obj = FI_WAIT_UNSPEC;
cq_attr.wait_cond = FI_CQ_COND_NONE;
cq_attr->wait_obj = FI_WAIT_UNSPEC;
cq_attr->wait_cond = FI_CQ_COND_NONE;
break;
case RDMA_COMP_WAITSET:
assert(waitset);
cq_attr.wait_obj = FI_WAIT_SET;
cq_attr.wait_cond = FI_CQ_COND_NONE;
cq_attr.wait_set = waitset;
cq_attr->wait_obj = FI_WAIT_SET;
cq_attr->wait_cond = FI_CQ_COND_NONE;
cq_attr->wait_set = waitset;
break;
case RDMA_COMP_WAIT_FD:
cq_attr.wait_obj = FI_WAIT_FD;
cq_attr.wait_cond = FI_CQ_COND_NONE;
cq_attr->wait_obj = FI_WAIT_FD;
cq_attr->wait_cond = FI_CQ_COND_NONE;
break;
case RDMA_COMP_YIELD:
cq_attr.wait_obj = FI_WAIT_YIELD;
cq_attr.wait_cond = FI_CQ_COND_NONE;
cq_attr->wait_obj = FI_WAIT_YIELD;
cq_attr->wait_cond = FI_CQ_COND_NONE;
break;
default:
cq_attr.wait_obj = FI_WAIT_NONE;
cq_attr->wait_obj = FI_WAIT_NONE;
break;
}
}

/*
* fi_cq_err_entry can be cast to any CQ entry format.
*/
static int rdma_spin_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout)
int timeout, struct fi_cq_err_entry *entries)
{
uint64_t entries_num = total - *cur;
struct fi_cq_err_entry comp;
struct timespec a, b;
int ret;
Expand All @@ -92,7 +90,7 @@ static int rdma_spin_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur
clock_gettime(CLOCK_MONOTONIC, &a);

do {
ret = fi_cq_read(cq, &comp, 1);
ret = fi_cq_read(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1);
if (ret > 0) {
if (timeout >= 0)
clock_gettime(CLOCK_MONOTONIC, &a);
Expand All @@ -119,23 +117,18 @@ static int rdma_poll_fd(int fd, int timeout)
fds.fd = fd;
fds.events = POLLIN;
ret = poll(&fds, 1, timeout);
if (ret == -1) {
if (ret == !ret) {
RDMA_PRINTERR("poll", -errno);
ret = -errno;
} else if (!ret) {
ret = -EAGAIN;
} else {
ret = 0;
return 0;
}
return ret;
}

/*
* fi_cq_err_entry can be cast to any CQ entry format.
*/
static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout)
int timeout, struct fi_cq_err_entry *entries)
{
uint64_t entries_num = total - *cur;
struct fi_cq_err_entry comp;
struct fid *fids[1];
int fd, ret;
Expand All @@ -147,35 +140,33 @@ static int rdma_fdwait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *c
ret = fi_trywait(ep_ctx->rdma_ctx->fabric, fids, 1);
if (!ret) {
ret = rdma_poll_fd(fd, timeout);
if (ret && ret != -FI_EAGAIN)
if (ret)
return ret;
}

ret = fi_cq_read(cq, &comp, 1);
ret = fi_cq_read(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1);
if (ret > 0) {
(*cur)++;
} else if (ret < 0 && ret != -FI_EAGAIN) {
} else if (ret < 0) {
return ret;
}
}

return 0;
}

/*
* fi_cq_err_entry can be cast to any CQ entry format.
*/
static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout)
int timeout, struct fi_cq_err_entry *entries)
{
uint64_t entries_num = total - *cur;
struct fi_cq_err_entry comp;
int ret;

while (total != *cur) {
ret = fi_cq_sread(cq, &comp, 1, NULL, timeout);
ret = fi_cq_sread(cq, entries ? &entries[entries_num - (total - *cur)] : &comp, 1, NULL, timeout);
if (ret > 0) {
(*cur)++;
} else if (ret < 0 && ret != -FI_EAGAIN) {
} else if (ret < 0) {
return ret;
}
}
Expand All @@ -184,11 +175,13 @@ static int rdma_wait_for_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur
}

int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total,
int timeout)
int timeout, struct fi_cq_err_entry *entries)
{
int ret;

ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout);
assert(total > *cur);

ret = rdma_read_cq(ep_ctx, cq, cur, total, timeout, entries);

if (ret) {
if (ret == -FI_EAVAIL) {
Expand All @@ -201,18 +194,19 @@ int rdma_get_cq_comp(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_
return ret;
}

int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout)
int rdma_read_cq(ep_ctx_t *ep_ctx, struct fid_cq *cq, uint64_t *cur, uint64_t total, int timeout,
struct fi_cq_err_entry *entries)
{
switch (ep_ctx->rdma_ctx->comp_method) {
case RDMA_COMP_SREAD:
case RDMA_COMP_YIELD:
return rdma_wait_for_comp(ep_ctx, cq, cur, total, timeout);
return rdma_wait_for_comp(ep_ctx, cq, cur, total, timeout, entries);
break;
case RDMA_COMP_WAIT_FD:
return rdma_fdwait_for_comp(ep_ctx, cq, cur, total, timeout);
return rdma_fdwait_for_comp(ep_ctx, cq, cur, total, timeout, entries);
break;
default:
return rdma_spin_for_comp(ep_ctx, cq, cur, total, timeout);
return rdma_spin_for_comp(ep_ctx, cq, cur, total, timeout, entries);
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion media-proxy/src/libfabric_dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ int rdma_init(libfabric_ctx **ctx)
return -ENOMEM;
}

(*ctx)->comp_method = RDMA_COMP_SPIN;
(*ctx)->comp_method = RDMA_COMP_SREAD;

hints = fi_allocinfo();
if (!hints) {
Expand Down
42 changes: 29 additions & 13 deletions media-proxy/src/libfabric_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ static int ep_alloc_res(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_inf
cq_attr.format = FI_CQ_FORMAT_CONTEXT;
}

rdma_cq_set_wait_attr(cq_attr, rdma_ctx->comp_method, NULL);
rdma_cq_set_wait_attr(&cq_attr, rdma_ctx->comp_method, NULL);
if (tx_cq_size)
cq_attr.size = tx_cq_size;
else
Expand All @@ -133,7 +133,7 @@ static int ep_alloc_res(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_inf
return ret;
}

rdma_cq_set_wait_attr(cq_attr, rdma_ctx->comp_method, NULL);
rdma_cq_set_wait_attr(&cq_attr, rdma_ctx->comp_method, NULL);
if (rx_cq_size)
cq_attr.size = rx_cq_size;
else
Expand Down Expand Up @@ -172,35 +172,51 @@ static int ep_reg_mr(ep_ctx_t *ep_ctx, libfabric_ctx *rdma_ctx, struct fi_info *
return ret;
}

int ep_send_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size)
int ep_send_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size)
{
int ret;

do {
ret = fi_send(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, ep_ctx->dest_av_entry,
ep_ctx->send_ctx);
ret = fi_send(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, ep_ctx->dest_av_entry, NULL);
if (ret == -EAGAIN)
(void)fi_cq_read(ep_ctx->txcq, NULL, 0);
} while (ret == -EAGAIN);

ret = rdma_get_cq_comp(ep_ctx, ep_ctx->txcq, &ep_ctx->tx_cq_cntr, ep_ctx->tx_cq_cntr + 1, -1);
return ret;
}

int ep_recv_buf(ep_ctx_t *ep_ctx, char *buf, size_t buf_size)
int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx)
{
int ret;
double elapsed_time;
double fps = 0.0;

do {
ret =
fi_recv(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, FI_ADDR_UNSPEC, ep_ctx->recv_ctx);
if (ret == -EAGAIN)
fi_recv(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, FI_ADDR_UNSPEC, buf_ctx);
if (ret == -FI_EAGAIN)
(void)fi_cq_read(ep_ctx->rxcq, NULL, 0);
} while (ret == -EAGAIN);
} while (ret == -FI_EAGAIN);

return ret;
}

return rdma_get_cq_comp(ep_ctx, ep_ctx->rxcq, &ep_ctx->rx_cq_cntr, ep_ctx->rx_cq_cntr + 1, -1);
int ep_rxcq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout)
{
struct fi_cq_err_entry entry;
int err;

err = rdma_get_cq_comp(ep_ctx, ep_ctx->rxcq, &ep_ctx->rx_cq_cntr, ep_ctx->rx_cq_cntr + 1,
timeout, &entry);
if (err)
return err;
*buf_ctx = entry.op_context;
return 0;
}


int ep_txcq_read(ep_ctx_t *ep_ctx, int timeout)
{
return rdma_get_cq_comp(ep_ctx, ep_ctx->txcq, &ep_ctx->tx_cq_cntr, ep_ctx->tx_cq_cntr + 1,
timeout, NULL);
}

int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg)
Expand Down
Loading

0 comments on commit a9891c6

Please sign in to comment.