Skip to content

Commit

Permalink
bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
roccrtx committed Oct 24, 2018
1 parent bbe6f6c commit 8ae8352
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 180 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ simple pingpong application using one-sided RDMA primitive.

Server side
```c++
/**
* Note, RDMA usually uses some other communication method (e.g. TCP/IP) to exchange QP informations.
* RLib uses TCP for the pre-communication.
*/
int server_node_id = 1;
int tcp_port = 8888;

Expand Down
18 changes: 13 additions & 5 deletions common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ namespace rdmaio {

// connection status
enum ConnStatus {
SUCC = 0,
TIMEOUT,
WRONG_ARG,
ERR,
NOT_READY
SUCC = 0,
TIMEOUT = 1,
WRONG_ARG = 2,
ERR = 3,
NOT_READY = 4,
UNKNOWN = 5
};

/**
Expand Down Expand Up @@ -64,6 +65,13 @@ struct ConnReply {
} payload;
};

inline constexpr struct timeval no_timeout() {
return timeval {
.tv_sec = 0,
.tv_usec = 0
};
}

inline int convert_mtu(ibv_mtu type) {
int mtu = 0;
switch(type) {
Expand Down
22 changes: 12 additions & 10 deletions logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ namespace rdmaio {
* Log everything
*/

#define LOG_NONE 7
#define LOG_FATAL 6
#define LOG_ERROR 5
#define LOG_WARNING 4
#define LOG_EMPH 3
#define LOG_INFO 2
#define LOG_DEBUG 1
#define LOG_EVERYTHING 0
enum loglevel {
LOG_NONE = 7,
LOG_FATAL = 6,
LOG_ERROR = 5,
LOG_WARNING = 4,
LOG_EMPH = 3,
LOG_INFO = 2,
LOG_DEBUG = 1,
LOG_EVERYTHING = 0
};

#define unlikely(x) __builtin_expect(!!(x), 0)

Expand All @@ -60,7 +62,7 @@ namespace rdmaio {

#define RDMA_ASSERT(condition) \
if(unlikely(!(condition))) \
::rdmaio::MessageLogger((char*)__FILE__, __LINE__, LOG_FATAL + 1).stream() << "Assertion! "
::rdmaio::MessageLogger((char*)__FILE__, __LINE__, ::rdmaio::LOG_FATAL + 1).stream() << "Assertion! "

#define RDMA_VERIFY(n,condition) RDMA_LOG_IF(n,(!(condition)))

Expand All @@ -77,7 +79,7 @@ class MessageLogger {
stream_ << "\n";
std::cout << "\033[" << RDMA_DEBUG_LEVEL_COLOR[std::min(level_,6)] << "m"
<< stream_.str() << EndcolorFlag();
if(level_ >= LOG_FATAL)
if(level_ >= ::rdmaio::LOG_FATAL)
abort();
}
}
Expand Down
16 changes: 8 additions & 8 deletions mr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ struct MemoryAttr {
uint32_t key;
};

struct Memory {

class Memory {
public:
/**
* The default protection flag of a memory region.
* In default, the memory can be read/write by local and remote RNIC operations.
*/
static const int DEFAULT_PROTECTION_FLAG = (IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | \
IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC);


Memory(char *addr,uint64_t len,ibv_pd *pd,int flag):
addr(addr),len(len){

mr = ibv_reg_mr(pd,addr,len,flag);
addr(addr),
len(len),
mr(ibv_reg_mr(pd,addr,len,flag))
{
if(mr == nullptr) {

RDMA_LOG(LOG_WARNING) << "failed to register mr, for addr " << addr << "; len " << len;
} else {
rattr.buf = (uintptr_t)addr;
rattr.key = mr->rkey;
Expand All @@ -47,7 +47,7 @@ struct Memory {
uint64_t len;

MemoryAttr rattr; // RDMA registered attr
ibv_mr *mr = NULL; // mr in the driver
ibv_mr *mr = nullptr; // mr in the driver
};


Expand Down
4 changes: 2 additions & 2 deletions msg_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class MsgAdapter {
}

virtual ConnStatus send_pending(int node_id,char *msg,int len) {
return send_to(node_id,msg,len);
RDMA_ASSERT(false); // not implemented
}

virtual ConnStatus send_pending(int node_id,int tid,char *msg,int len) {
return send_to(node_id,tid,msg,len);
return send_pending(node_id,msg,len);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion pre_connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

namespace rdmaio {

constexpr struct timeval default_timeout = {0,2000};
constexpr struct timeval default_timeout = {0,8000};

inline __attribute__ ((always_inline)) // inline to avoid multiple-definiations
int64_t diff_time(struct timeval &end, struct timeval &start) {
Expand Down
35 changes: 23 additions & 12 deletions qp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ constexpr QPIdx create_ud_idx(int worker_id,int idx = 0) {
};
}

/**
* Wrappers over ibv_qp & ibv_cq
*/
class QP {
public:
QP(RNicHandler *rnic,QPIdx idx):
rnic_(rnic),
idx_(idx)
idx_(idx),
rnic_(rnic)
{
}

Expand Down Expand Up @@ -71,12 +74,12 @@ class QP {

QPAttr get_attr() {
QPAttr res = {
.addr = rnic_->query_addr(),
.lid = rnic_->lid,
.qpn = (qp_ != nullptr)?qp_->qp_num:0,
.psn = DEFAULT_PSN,
.node_id = 0, // a place holder
.port_id = rnic_->port_id
.addr = rnic_->query_addr(),
.lid = rnic_->lid,
.qpn = (qp_ != nullptr)?qp_->qp_num:0,
.psn = DEFAULT_PSN, // TODO! this may be filled later
.node_id = 0, // a place holder
.port_id = rnic_->port_id
};
return res;
}
Expand Down Expand Up @@ -147,23 +150,31 @@ class RRCQP : public QP {

ConnStatus connect(std::string ip,int port,QPIdx idx) {

ConnArg arg; ConnReply reply;
// first check whether QP is valid to connect
enum ibv_qp_state state;
if( (state = QPImpl::query_qp_status(qp_)) != IBV_QPS_INIT) {
RDMA_LOG(LOG_WARNING) << "qp not in a connect state to connect!";
return (state == IBV_QPS_RTS)?SUCC:UNKNOWN;
}
ConnArg arg = {} ; ConnReply reply = {};
arg.type = ConnArg::QP;
arg.payload.qp.from_node = idx.node_id;
arg.payload.qp.from_worker = idx.worker_id;
arg.payload.qp.qp_type = IBV_QPT_RC;
arg.payload.qp.qp_type = IBV_QPT_RC;

auto ret = QPImpl::get_remote_helper(&arg,&reply,ip,port);
if(ret == SUCC) {
// change QP status
if(!RCQPImpl::ready2rcv<F>(qp_,reply.payload.qp,rnic_)) {
RDMA_LOG(LOG_WARNING) << "change qp status to ready to receive error: " << strerror(errno);
ret = ERR; goto CONN_END;
ret = ERR;
goto CONN_END;
}

if(!RCQPImpl::ready2send<F>(qp_)) {
RDMA_LOG(LOG_WARNING) << "change qp status to ready to send error: " << strerror(errno);
ret = ERR; goto CONN_END;
ret = ERR;
goto CONN_END;
}
}
CONN_END:
Expand Down
33 changes: 23 additions & 10 deletions qp_impl.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <limits>

#include "pre_connector.hpp"

namespace rdmaio {
Expand Down Expand Up @@ -44,9 +46,19 @@ inline uint32_t decode_qp_index(uint32_t key) {

class QPImpl {
public:
QPImpl() = default;
QPImpl() = default;
~QPImpl() = default;

static enum ibv_qp_state query_qp_status(ibv_qp *qp) {
struct ibv_qp_attr attr;
struct ibv_qp_init_attr init_attr;

if (ibv_query_qp(qp, &attr,IBV_QP_STATE, &init_attr)) {
RDMA_ASSERT(false) << "query qp cannot cause error";
}
return attr.qp_state;
}

static ConnStatus get_remote_helper(ConnArg *arg, ConnReply *reply,std::string ip,int port) {

ConnStatus ret = SUCC;
Expand Down Expand Up @@ -98,18 +110,21 @@ class QPImpl {

struct timeval start_time; gettimeofday (&start_time, NULL);
int poll_result = 0; int64_t diff;
int64_t numeric_timeout = (timeout.tv_sec == 0 && timeout.tv_usec == 0) ? std::numeric_limits<int64_t>::max() :
timeout.tv_sec * 1000 + timeout.tv_usec;
do {
// RDMA_LOG(4) << "polled"; sleep(1);
asm volatile("" ::: "memory");
poll_result = ibv_poll_cq (cq, 1, &wc);

struct timeval cur_time; gettimeofday(&cur_time,NULL);
diff = diff_time(cur_time,start_time);

} while( (poll_result == 0) && (diff <= (timeout.tv_sec * 1000 + timeout.tv_usec)));
} while((poll_result == 0) && (diff <= numeric_timeout));

if(poll_result == 0) {
RDMA_ASSERT(false);
return TIMEOUT;
}

if(poll_result < 0) {
RDMA_ASSERT(false);
return ERR;
Expand Down Expand Up @@ -178,6 +193,7 @@ class RCQPImpl {
| IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
auto rc = ibv_modify_qp(qp, &qp_attr,flags);
return rc == 0;

}

template <RCConfig (*F)(void)>
Expand Down Expand Up @@ -264,8 +280,8 @@ class UDQPImpl {
qp_init_attr.recv_cq = recv_cq;
qp_init_attr.qp_type = IBV_QPT_UD;

qp_init_attr.cap.max_send_wr = config.max_send_size;
qp_init_attr.cap.max_recv_wr = config.max_recv_size;
qp_init_attr.cap.max_send_wr = config.max_send_size;
qp_init_attr.cap.max_recv_wr = config.max_recv_size;
qp_init_attr.cap.max_send_sge = 1;
qp_init_attr.cap.max_recv_sge = 1;
qp_init_attr.cap.max_inline_data = MAX_INLINE_SIZE;
Expand All @@ -277,16 +293,13 @@ class UDQPImpl {

// change QP status
ready2init(qp, rnic,config); // shall always succeed
#if 1

if(!ready2rcv(qp,rnic)) {
RDMA_LOG(LOG_WARNING) << "change ud qp to ready to recv error: " << strerror(errno);
}
#endif
#if 2
if(!ready2send(qp,config)) {
RDMA_LOG(LOG_WARNING) << "change ud qp to ready to send error: " << strerror(errno);
}
#endif
}

/**
Expand Down
Loading

0 comments on commit 8ae8352

Please sign in to comment.