diff --git a/README.md b/README.md index d46cdfe..fce29b0 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,10 @@ simple pingpong application using one-sided RDMA primitive. Server side ```c++ +/** + * Note, RDMA usually uses some other communication method (e.g. TCP/IP) to exchange QP informations. + * RLib uses TCP for the pre-communication. + */ int server_node_id = 1; int tcp_port = 8888; diff --git a/common.hpp b/common.hpp index 78042b6..268fd37 100644 --- a/common.hpp +++ b/common.hpp @@ -10,11 +10,12 @@ namespace rdmaio { // connection status enum ConnStatus { - SUCC = 0, - TIMEOUT, - WRONG_ARG, - ERR, - NOT_READY + SUCC = 0, + TIMEOUT = 1, + WRONG_ARG = 2, + ERR = 3, + NOT_READY = 4, + UNKNOWN = 5 }; /** @@ -64,6 +65,13 @@ struct ConnReply { } payload; }; +inline constexpr struct timeval no_timeout() { + return timeval { + .tv_sec = 0, + .tv_usec = 0 + }; +} + inline int convert_mtu(ibv_mtu type) { int mtu = 0; switch(type) { diff --git a/logging.hpp b/logging.hpp index ba2601b..a21a41b 100644 --- a/logging.hpp +++ b/logging.hpp @@ -27,14 +27,16 @@ namespace rdmaio { * Log everything */ -#define LOG_NONE 7 -#define LOG_FATAL 6 -#define LOG_ERROR 5 -#define LOG_WARNING 4 -#define LOG_EMPH 3 -#define LOG_INFO 2 -#define LOG_DEBUG 1 -#define LOG_EVERYTHING 0 +enum loglevel { + LOG_NONE = 7, + LOG_FATAL = 6, + LOG_ERROR = 5, + LOG_WARNING = 4, + LOG_EMPH = 3, + LOG_INFO = 2, + LOG_DEBUG = 1, + LOG_EVERYTHING = 0 +}; #define unlikely(x) __builtin_expect(!!(x), 0) @@ -60,7 +62,7 @@ namespace rdmaio { #define RDMA_ASSERT(condition) \ if(unlikely(!(condition))) \ - ::rdmaio::MessageLogger((char*)__FILE__, __LINE__, LOG_FATAL + 1).stream() << "Assertion! " + ::rdmaio::MessageLogger((char*)__FILE__, __LINE__, ::rdmaio::LOG_FATAL + 1).stream() << "Assertion! " #define RDMA_VERIFY(n,condition) RDMA_LOG_IF(n,(!(condition))) @@ -77,7 +79,7 @@ class MessageLogger { stream_ << "\n"; std::cout << "\033[" << RDMA_DEBUG_LEVEL_COLOR[std::min(level_,6)] << "m" << stream_.str() << EndcolorFlag(); - if(level_ >= LOG_FATAL) + if(level_ >= ::rdmaio::LOG_FATAL) abort(); } } diff --git a/mr.hpp b/mr.hpp index 1be58d1..3586681 100644 --- a/mr.hpp +++ b/mr.hpp @@ -10,8 +10,8 @@ struct MemoryAttr { uint32_t key; }; -struct Memory { - +class Memory { + public: /** * The default protection flag of a memory region. * In default, the memory can be read/write by local and remote RNIC operations. @@ -19,13 +19,13 @@ struct Memory { static const int DEFAULT_PROTECTION_FLAG = (IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | \ IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC); - Memory(char *addr,uint64_t len,ibv_pd *pd,int flag): - addr(addr),len(len){ - - mr = ibv_reg_mr(pd,addr,len,flag); + addr(addr), + len(len), + mr(ibv_reg_mr(pd,addr,len,flag)) + { if(mr == nullptr) { - + RDMA_LOG(LOG_WARNING) << "failed to register mr, for addr " << addr << "; len " << len; } else { rattr.buf = (uintptr_t)addr; rattr.key = mr->rkey; @@ -47,7 +47,7 @@ struct Memory { uint64_t len; MemoryAttr rattr; // RDMA registered attr - ibv_mr *mr = NULL; // mr in the driver + ibv_mr *mr = nullptr; // mr in the driver }; diff --git a/msg_interface.hpp b/msg_interface.hpp index e89f19f..aefe7c2 100644 --- a/msg_interface.hpp +++ b/msg_interface.hpp @@ -38,11 +38,11 @@ class MsgAdapter { } virtual ConnStatus send_pending(int node_id,char *msg,int len) { - return send_to(node_id,msg,len); + RDMA_ASSERT(false); // not implemented } virtual ConnStatus send_pending(int node_id,int tid,char *msg,int len) { - return send_to(node_id,tid,msg,len); + return send_pending(node_id,msg,len); } /** diff --git a/pre_connector.hpp b/pre_connector.hpp index 752c412..a527683 100644 --- a/pre_connector.hpp +++ b/pre_connector.hpp @@ -14,7 +14,7 @@ namespace rdmaio { -constexpr struct timeval default_timeout = {0,2000}; +constexpr struct timeval default_timeout = {0,8000}; inline __attribute__ ((always_inline)) // inline to avoid multiple-definiations int64_t diff_time(struct timeval &end, struct timeval &start) { diff --git a/qp.hpp b/qp.hpp index 21cab76..51f1e5c 100644 --- a/qp.hpp +++ b/qp.hpp @@ -28,11 +28,14 @@ constexpr QPIdx create_ud_idx(int worker_id,int idx = 0) { }; } +/** + * Wrappers over ibv_qp & ibv_cq + */ class QP { public: QP(RNicHandler *rnic,QPIdx idx): - rnic_(rnic), - idx_(idx) + idx_(idx), + rnic_(rnic) { } @@ -71,12 +74,12 @@ class QP { QPAttr get_attr() { QPAttr res = { - .addr = rnic_->query_addr(), - .lid = rnic_->lid, - .qpn = (qp_ != nullptr)?qp_->qp_num:0, - .psn = DEFAULT_PSN, - .node_id = 0, // a place holder - .port_id = rnic_->port_id + .addr = rnic_->query_addr(), + .lid = rnic_->lid, + .qpn = (qp_ != nullptr)?qp_->qp_num:0, + .psn = DEFAULT_PSN, // TODO! this may be filled later + .node_id = 0, // a place holder + .port_id = rnic_->port_id }; return res; } @@ -147,23 +150,31 @@ class RRCQP : public QP { ConnStatus connect(std::string ip,int port,QPIdx idx) { - ConnArg arg; ConnReply reply; + // first check whether QP is valid to connect + enum ibv_qp_state state; + if( (state = QPImpl::query_qp_status(qp_)) != IBV_QPS_INIT) { + RDMA_LOG(LOG_WARNING) << "qp not in a connect state to connect!"; + return (state == IBV_QPS_RTS)?SUCC:UNKNOWN; + } + ConnArg arg = {} ; ConnReply reply = {}; arg.type = ConnArg::QP; arg.payload.qp.from_node = idx.node_id; arg.payload.qp.from_worker = idx.worker_id; - arg.payload.qp.qp_type = IBV_QPT_RC; + arg.payload.qp.qp_type = IBV_QPT_RC; auto ret = QPImpl::get_remote_helper(&arg,&reply,ip,port); if(ret == SUCC) { // change QP status if(!RCQPImpl::ready2rcv(qp_,reply.payload.qp,rnic_)) { RDMA_LOG(LOG_WARNING) << "change qp status to ready to receive error: " << strerror(errno); - ret = ERR; goto CONN_END; + ret = ERR; + goto CONN_END; } if(!RCQPImpl::ready2send(qp_)) { RDMA_LOG(LOG_WARNING) << "change qp status to ready to send error: " << strerror(errno); - ret = ERR; goto CONN_END; + ret = ERR; + goto CONN_END; } } CONN_END: diff --git a/qp_impl.hpp b/qp_impl.hpp index 7c6d6c4..be9956f 100644 --- a/qp_impl.hpp +++ b/qp_impl.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include "pre_connector.hpp" namespace rdmaio { @@ -44,9 +46,19 @@ inline uint32_t decode_qp_index(uint32_t key) { class QPImpl { public: - QPImpl() = default; + QPImpl() = default; ~QPImpl() = default; + static enum ibv_qp_state query_qp_status(ibv_qp *qp) { + struct ibv_qp_attr attr; + struct ibv_qp_init_attr init_attr; + + if (ibv_query_qp(qp, &attr,IBV_QP_STATE, &init_attr)) { + RDMA_ASSERT(false) << "query qp cannot cause error"; + } + return attr.qp_state; + } + static ConnStatus get_remote_helper(ConnArg *arg, ConnReply *reply,std::string ip,int port) { ConnStatus ret = SUCC; @@ -98,18 +110,21 @@ class QPImpl { struct timeval start_time; gettimeofday (&start_time, NULL); int poll_result = 0; int64_t diff; + int64_t numeric_timeout = (timeout.tv_sec == 0 && timeout.tv_usec == 0) ? std::numeric_limits::max() : + timeout.tv_sec * 1000 + timeout.tv_usec; do { + // RDMA_LOG(4) << "polled"; sleep(1); + asm volatile("" ::: "memory"); poll_result = ibv_poll_cq (cq, 1, &wc); struct timeval cur_time; gettimeofday(&cur_time,NULL); diff = diff_time(cur_time,start_time); - - } while( (poll_result == 0) && (diff <= (timeout.tv_sec * 1000 + timeout.tv_usec))); + } while((poll_result == 0) && (diff <= numeric_timeout)); if(poll_result == 0) { - RDMA_ASSERT(false); return TIMEOUT; } + if(poll_result < 0) { RDMA_ASSERT(false); return ERR; @@ -178,6 +193,7 @@ class RCQPImpl { | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER; auto rc = ibv_modify_qp(qp, &qp_attr,flags); return rc == 0; + } template @@ -264,8 +280,8 @@ class UDQPImpl { qp_init_attr.recv_cq = recv_cq; qp_init_attr.qp_type = IBV_QPT_UD; - qp_init_attr.cap.max_send_wr = config.max_send_size; - qp_init_attr.cap.max_recv_wr = config.max_recv_size; + qp_init_attr.cap.max_send_wr = config.max_send_size; + qp_init_attr.cap.max_recv_wr = config.max_recv_size; qp_init_attr.cap.max_send_sge = 1; qp_init_attr.cap.max_recv_sge = 1; qp_init_attr.cap.max_inline_data = MAX_INLINE_SIZE; @@ -277,16 +293,13 @@ class UDQPImpl { // change QP status ready2init(qp, rnic,config); // shall always succeed -#if 1 + if(!ready2rcv(qp,rnic)) { RDMA_LOG(LOG_WARNING) << "change ud qp to ready to recv error: " << strerror(errno); } -#endif -#if 2 if(!ready2send(qp,config)) { RDMA_LOG(LOG_WARNING) << "change ud qp to ready to send error: " << strerror(errno); } -#endif } /** diff --git a/rdma_ctrl_impl.hpp b/rdma_ctrl_impl.hpp index 76e7929..1719f8c 100644 --- a/rdma_ctrl_impl.hpp +++ b/rdma_ctrl_impl.hpp @@ -4,12 +4,47 @@ namespace rdmaio { +/** + * Simple critical section + * It uses a single global block to guard RdmaCtrl. + * This is acceptable, since RdmaCtrl is only the control plane. + */ +class SCS { + public: + SCS() { + get_lock().lock(); + } + + ~SCS() { + get_lock().unlock(); + } + + private: + static std::mutex &get_lock() { + static std::mutex lock; + return lock; + } +}; + +/** + * convert qp idx(node,worker,idx) -> key + */ +inline uint32_t get_rc_key (const QPIdx idx) { + return ::rdmaio::encode_qp_id(idx.node_id,RC_ID_BASE + idx.worker_id * 64 + idx.index); +} + +inline uint32_t get_ud_key(const QPIdx idx) { + return ::rdmaio::encode_qp_id(idx.worker_id,UD_ID_BASE + idx.index); +} + +/** + * Control plane of RLib + */ class RdmaCtrl::RdmaCtrlImpl { public: RdmaCtrlImpl(int node_id, int tcp_base_port,std::string local_ip): - lock_(), - tcp_base_port_(tcp_base_port), node_id_(node_id), + tcp_base_port_(tcp_base_port), local_ip_(local_ip) { // start the background thread to handle QP connection request @@ -79,69 +114,72 @@ class RdmaCtrl::RdmaCtrlImpl { } RCQP *get_rc_qp(QPIdx idx) { - lock_.lock(); - uint64_t qid = get_rc_key(idx); - if(qps_.find(qid) == qps_.end()) { - lock_.unlock(); - return nullptr; - } - QP *res = qps_[qid]; - lock_.unlock(); - return dynamic_cast(res); + RCQP *res = nullptr; + { + SCS s; + res = get_qp(idx); + }; + return res; } UDQP *get_ud_qp(QPIdx idx) { - lock_.lock(); - uint64_t qid = get_ud_key(idx); - if(qps_.find(qid) == qps_.end()) { - lock_.unlock(); + + UDQP *res = nullptr; + { + SCS s; + res = get_qp(idx); + }; + return res; + } + + /** + * Note! this is not a thread-safe function + */ + template + T *get_qp(QPIdx idx) { + uint32_t key = F(idx); + if(qps_.find(key) == qps_.end()) return nullptr; - } - QP *res = qps_[qid]; - lock_.unlock(); - return dynamic_cast(res); + else + return dynamic_cast(qps_[key]); } RCQP *create_rc_qp(QPIdx idx, RNicHandler *dev,MemoryAttr *attr) { RCQP *res = nullptr; - lock_.lock(); - - uint64_t qid = get_rc_key(idx); - - if(qps_.find(qid) != qps_.end()) { - res = dynamic_cast(qps_[qid]); - goto END; - } - - if(attr == NULL) - res = new RCQP(dev,idx); - else - res = new RCQP(dev,idx,*attr); - qps_.insert(std::make_pair(qid,res)); - END: - lock_.unlock(); + { + SCS s; + uint64_t qid = get_rc_key(idx); + if(qps_.find(qid) != qps_.end()) { + res = dynamic_cast(qps_[qid]); + } else { + if(attr == NULL) + res = new RCQP(dev,idx); + else + res = new RCQP(dev,idx,*attr); + qps_.insert(std::make_pair(qid,res)); + } + }; return res; } UDQP *create_ud_qp(QPIdx idx, RNicHandler *dev,MemoryAttr *attr) { UDQP *res = nullptr; - lock_.lock(); uint64_t qid = get_ud_key(idx); - if(qps_.find(qid) != qps_.end()) { - res = dynamic_cast(qps_[qid]); - RDMA_LOG(LOG_WARNING) << "create an existing UD QP:" << idx.worker_id << " " << idx.index; - goto END; - } - if(attr == NULL) - res = new UDQP(dev,idx); - else - res = new UDQP(dev,idx,*attr); - qps_.insert(std::make_pair(qid,res)); - END: - lock_.unlock(); + { + SCS s; + if(qps_.find(qid) != qps_.end()) { + res = dynamic_cast(qps_[qid]); + } else { + if(attr == NULL) + res = new UDQP(dev,idx); + else + res = new UDQP(dev,idx,*attr); + qps_.insert(std::make_pair(qid,res)); + } + }; return res; } @@ -153,25 +191,25 @@ class RdmaCtrl::RdmaCtrlImpl { delete m; return false; } - - lock_.lock(); - if(mrs_.find(mr_id) != mrs_.end()) { - RDMA_LOG(LOG_WARNING) << "mr " << mr_id << " has already been registered!"; - delete m; - } else { - mrs_.insert(std::make_pair(mr_id,m)); - } - lock_.unlock(); - + { + SCS s; + if(mrs_.find(mr_id) != mrs_.end()) { + RDMA_LOG(LOG_WARNING) << "mr " << mr_id << " has already been registered!"; + delete m; + } else { + mrs_.insert(std::make_pair(mr_id,m)); + } + }; return true; } MemoryAttr get_local_mr(int mr_id) { MemoryAttr attr; - lock_.lock(); - if(mrs_.find(mr_id) != mrs_.end()) - attr = mrs_[mr_id]->rattr; - lock_.unlock(); + { + SCS s; + if(mrs_.find(mr_id) != mrs_.end()) + attr = mrs_[mr_id]->rattr; + } return attr; } @@ -219,17 +257,6 @@ class RdmaCtrl::RdmaCtrlImpl { return std::vector(cached_infos_.begin(),cached_infos_.end()); } - /** - * convert qp idx(node,worker,idx) -> key - */ - uint32_t get_rc_key (const QPIdx idx) { - return ::rdmaio::encode_qp_id(idx.node_id,RC_ID_BASE + idx.worker_id * 64 + idx.index); - } - - uint32_t get_ud_key(const QPIdx idx) { - return ::rdmaio::encode_qp_id(idx.worker_id,UD_ID_BASE + idx.index); - } - RdmaCtrl::DevIdx convert_port_idx(int idx) { if(cached_infos_.size() == 0) @@ -289,7 +316,7 @@ class RdmaCtrl::RdmaCtrlImpl { continue; } - if(!PreConnector::wait_recv(csfd,6000)) { RDMA_LOG(3) << "wrong"; + if(!PreConnector::wait_recv(csfd,6000)) { close(csfd); continue; } @@ -305,50 +332,49 @@ class RdmaCtrl::RdmaCtrlImpl { ConnReply reply; reply.ack = ERR; - lock_.lock(); - switch(arg.type) { - case ConnArg::MR: - if(mrs_.find(arg.payload.mr.mr_id) != mrs_.end()) { - memcpy((char *)(&(reply.payload.mr)), - (char *)(&(mrs_[arg.payload.mr.mr_id]->rattr)),sizeof(MemoryAttr)); - reply.ack = SUCC; + { // in a global critical section + SCS s; + switch(arg.type) { + case ConnArg::MR: + if(mrs_.find(arg.payload.mr.mr_id) != mrs_.end()) { + memcpy((char *)(&(reply.payload.mr)), + (char *)(&(mrs_[arg.payload.mr.mr_id]->rattr)),sizeof(MemoryAttr)); + reply.ack = SUCC; + }; + break; + case ConnArg::QP: { + QP *qp = NULL; + switch(arg.payload.qp.qp_type) { + case IBV_QPT_UD: + { + UDQP *ud_qp = get_qp( + create_ud_idx(arg.payload.qp.from_node,arg.payload.qp.from_worker)); + if(ud_qp != nullptr && ud_qp->ready()) { + qp = ud_qp; + } + } + break; + case IBV_QPT_RC: + { + RCQP *rc_qp = get_qp( + create_rc_idx(arg.payload.qp.from_node,arg.payload.qp.from_worker)); + qp = rc_qp; + } + break; + default: + RDMA_LOG(LOG_ERROR) << "unknown QP connection type: " << arg.payload.qp.qp_type; + } + if(qp != nullptr) { + reply.payload.qp = qp->get_attr(); + reply.ack = SUCC; + } + reply.payload.qp.node_id = node_id_; + break; } - break; - case ConnArg::QP: { - QP *qp = NULL; - switch(arg.payload.qp.qp_type) { - case IBV_QPT_UD: - { - lock_.unlock(); - UDQP *ud_qp = get_ud_qp(create_ud_idx(arg.payload.qp.from_node,arg.payload.qp.from_worker)); - if(ud_qp != nullptr && ud_qp->ready()) { - qp = ud_qp; - } - lock_.lock(); - } - break; - case IBV_QPT_RC: - { - lock_.unlock(); - QP *rc_qp = get_rc_qp(create_rc_idx(arg.payload.qp.from_node,arg.payload.qp.from_worker)); - qp = rc_qp; - lock_.lock(); - } - break; - default: - RDMA_LOG(LOG_ERROR) << "unknown QP connection type: " << arg.payload.qp.qp_type; - } - if(qp != nullptr) { - reply.payload.qp = qp->get_attr(); - reply.ack = SUCC; - } - reply.payload.qp.node_id = node_id_; - break; + default: + RDMA_LOG(LOG_WARNING) << "received unknown connect type " << arg.type; } - default: - RDMA_LOG(LOG_WARNING) << "received unknown connect type " << arg.type; - } - lock_.unlock(); + } // end simple critical section protection PreConnector::send_to(csfd,(char *)(&reply),sizeof(ConnReply)); PreConnector::wait_close(csfd); // wait for the client to close the connection @@ -377,8 +403,6 @@ class RdmaCtrl::RdmaCtrlImpl { const int tcp_base_port_; const std::string local_ip_; - std::mutex lock_; - pthread_t handler_tid_; bool running_ = true; @@ -392,7 +416,7 @@ class RdmaCtrl::RdmaCtrlImpl { for(auto s : cluster) { // get the target mr retry: - MemoryAttr mr; + MemoryAttr mr = {}; auto rc = QP::get_remote_mr(s,tcp_base_port_,mr_id,&mr); if(rc != SUCC) { usleep(2000); @@ -400,6 +424,9 @@ class RdmaCtrl::RdmaCtrlImpl { } mrs.push_back(mr); } + + RDMA_ASSERT(mrs.size() == cluster.size()); + while(true) { int connected = 0, i = 0; for(auto s : cluster) { @@ -408,16 +435,17 @@ class RdmaCtrl::RdmaCtrlImpl { i++; connected++; continue; } - RCQP *qp = create_rc_qp(QPIdx { .node_id = i,.worker_id = wid,.index = idx }, + RCQP *qp = create_rc_qp(QPIdx {.node_id = i,.worker_id = wid,.index = idx }, get_device(),&local_mr); RDMA_ASSERT(qp != nullptr); - qp->bind_remote_mr(mrs[i]); - - if(qp->connect(s,tcp_base_port_) == SUCC) { - ready_list[i++] = true; + if(qp->connect(s,tcp_base_port_, + QPIdx {.node_id = node_id_,.worker_id = wid, .index = idx}) == SUCC) { + ready_list[i] = true; connected++; + qp->bind_remote_mr(mrs[i]); } + i++; } if(connected == cluster.size()) break; diff --git a/rnic.hpp b/rnic.hpp index 6c4ae6f..88f357f 100644 --- a/rnic.hpp +++ b/rnic.hpp @@ -22,7 +22,9 @@ struct RNicInfo { } PortInfo; RNicInfo(const char *name,int id,ibv_context *ctx): - dev_name(name),dev_id(id) { + dev_id(id), + dev_name(name) + { query_port_infos(ctx); query_active_gids(ctx); } @@ -123,12 +125,18 @@ class RdmaCtrl; struct RNicHandler { RNicHandler(int dev_id,int port_id,ibv_context *ctx,ibv_pd *pd,int lid,int gid = 0): - dev_id(dev_id), port_id(port_id), - ctx(ctx),pd(pd),lid(lid),gid(gid) + dev_id(dev_id), + port_id(port_id), + ctx(ctx), + pd(pd), + lid(lid), + gid(gid) { } - address_t query_addr() { return query_addr(gid); } + address_t query_addr() { + return query_addr(gid); + } address_t query_addr(uint8_t gid_index) { @@ -148,8 +156,8 @@ struct RNicHandler { ~RNicHandler() { // delete ctx & pd RDMA_VERIFY(LOG_INFO,ibv_close_device(ctx) == 0) << "failed to close device " << dev_id; - RDMA_VERIFY(LOG_INFO,ibv_dealloc_pd(pd) == 0) << "failed to dealloc pd at device " << dev_id - << "; w error " << strerror(errno); + RDMA_VERIFY(LOG_INFO,ibv_dealloc_pd(pd) == 0) << "failed to dealloc pd at device " << dev_id + << "; w error " << strerror(errno); } public: @@ -160,7 +168,6 @@ struct RNicHandler { struct ibv_pd *pd; uint16_t lid; uint16_t gid; - }; diff --git a/ud_adapter.hpp b/ud_adapter.hpp index e2da600..c08d33e 100644 --- a/ud_adapter.hpp +++ b/ud_adapter.hpp @@ -137,6 +137,7 @@ class UDAdapter : public MsgAdapter, public UDRecvManager { ssges_[0].addr = (uint64_t)msg; ssges_[0].length = len; + if(send_qp_->need_poll()) { ibv_wc wc; auto ret = send_qp_->poll_till_completion(wc); RDMA_ASSERT(ret == SUCC) << "poll UD completion reply error: " << ret; @@ -165,7 +166,6 @@ class UDAdapter : public MsgAdapter, public UDRecvManager { | ((len < MAX_INLINE_SIZE) ? IBV_SEND_INLINE : 0); if(send_qp_->need_poll()) { - RDMA_LOG(4) << "need poll "; ibv_wc wc;auto ret = send_qp_->poll_till_completion(wc); RDMA_ASSERT(ret == SUCC) << "poll UD completion reply error: " << ret; send_qp_->pendings = 0;