From cf5aae32423b6275be6967d7a96bc64a98c687e7 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Thu, 4 Jun 2020 20:26:46 +0300 Subject: [PATCH 01/21] issue: 1792164 Use NOTIFY_ON_EVENTS in all places NOTIFY_ON_EVENTS should be used in all place to provide single way for passing any epoll events. Signed-off-by: Igor Ivanov --- src/vma/sock/sockinfo_tcp.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index ccff41b951..dc69dd0b8f 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -2770,7 +2770,7 @@ void sockinfo_tcp::auto_accept_connection(sockinfo_tcp *parent, sockinfo_tcp *ch child->m_socketxtreme.ec.completion.src = parent->m_socketxtreme.ec.completion.src; child->m_socketxtreme.ec.completion.listen_fd = child->m_parent->get_fd(); } - child->set_events(VMA_SOCKETXTREME_NEW_CONNECTION_ACCEPTED); + NOTIFY_ON_EVENTS(child, VMA_SOCKETXTREME_NEW_CONNECTION_ACCEPTED); } else { vlog_printf(VLOG_ERROR, "VMA_SOCKETXTREME_NEW_CONNECTION_ACCEPTED: can't find listen socket for new connected socket with [fd=%d]", From e81bdb1608caeb91649dd6876e2845afa758bba9 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Fri, 5 Jun 2020 18:20:21 +0300 Subject: [PATCH 02/21] issue: 1792164 Introduce socket error queue Signed-off-by: Igor Ivanov --- src/vma/proto/mem_buf_desc.h | 78 ++++++++++++++++++++++++++++++------ src/vma/sock/sockinfo.h | 2 + 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/src/vma/proto/mem_buf_desc.h b/src/vma/proto/mem_buf_desc.h index 8bf4cef9dc..dffd029f0a 100644 --- a/src/vma/proto/mem_buf_desc.h +++ b/src/vma/proto/mem_buf_desc.h @@ -35,6 +35,8 @@ #define MEM_BUF_DESC_H #include +#include + #include "utils/atomic.h" #include "vma/util/vma_list.h" #include "vma/lwip/pbuf.h" @@ -58,23 +60,42 @@ struct timestamps_t * received data in TX) */ class mem_buf_desc_t { +public: + enum flags { + TYPICAL = 0, + CLONED = 0x01, + }; + public: mem_buf_desc_t(uint8_t *buffer, size_t size, pbuf_free_custom_fn custom_free_function) : - p_buffer(buffer), lkey(0), p_next_desc(0), - p_prev_desc(0), sz_buffer(size), sz_data(0), + p_buffer(buffer), + m_flags(mem_buf_desc_t::TYPICAL), + lkey(0), + p_next_desc(0), + p_prev_desc(0), + sz_buffer(size), + sz_data(0), p_desc_owner(0) { + memset(&lwip_pbuf, 0, sizeof(lwip_pbuf)); memset(&rx, 0, sizeof(rx)); memset(&tx, 0, sizeof(tx)); + memset(&ee, 0, sizeof(ee)); reset_ref_count(); lwip_pbuf.custom_free_function = custom_free_function; } - struct pbuf_custom lwip_pbuf; //Do not change the location of this field. + /* This filed should be first in this class + * It encapsulates pbuf structure from lwip + * and extra fields to proceed customer specific requirements + */ + struct pbuf_custom lwip_pbuf; uint8_t* const p_buffer; - static inline size_t buffer_node_offset(void) {return NODE_OFFSET(mem_buf_desc_t, buffer_node);} + static inline size_t buffer_node_offset(void) { + return NODE_OFFSET(mem_buf_desc_t, buffer_node); + } list_node buffer_node; union { @@ -116,10 +137,14 @@ class mem_buf_desc_t { } tx; }; + /* This field is needed for error queue processing */ + struct sock_extended_err ee; + private: atomic_t n_ref_count; // number of interested receivers (sockinfo) [can be modified only in cq_mgr context] -public: +public: + int m_flags; /* object description */ uint32_t lkey; // Buffers lkey for QP access mem_buf_desc_t* p_next_desc; // A general purpose linked list of mem_buf_desc mem_buf_desc_t* p_prev_desc; @@ -130,14 +155,43 @@ class mem_buf_desc_t { // Rx: cq_mgr owns the mem_buf_desc and the associated data buffer ring_slave* p_desc_owner; - inline int get_ref_count() const {return atomic_read(&n_ref_count);} - inline void reset_ref_count() {atomic_set(&n_ref_count, 0);} - inline int inc_ref_count() {return atomic_fetch_and_inc(&n_ref_count);} - inline int dec_ref_count() {return atomic_fetch_and_dec(&n_ref_count);} + inline mem_buf_desc_t* clone() { + mem_buf_desc_t* p_desc = new mem_buf_desc_t(*this); + INIT_LIST_HEAD(&p_desc->buffer_node.head); + p_desc->m_flags |= mem_buf_desc_t::CLONED; + return p_desc; + } + + inline int get_ref_count() const { + return atomic_read(&n_ref_count); + } - inline unsigned int lwip_pbuf_inc_ref_count() {return ++lwip_pbuf.pbuf.ref;} - inline unsigned int lwip_pbuf_dec_ref_count() {if (likely(lwip_pbuf.pbuf.ref)) --lwip_pbuf.pbuf.ref; return lwip_pbuf.pbuf.ref;} - inline unsigned int lwip_pbuf_get_ref_count() const {return lwip_pbuf.pbuf.ref;} + inline void reset_ref_count() { + atomic_set(&n_ref_count, 0); + } + + inline int inc_ref_count() { + return atomic_fetch_and_inc(&n_ref_count); + } + + inline int dec_ref_count() { + return atomic_fetch_and_dec(&n_ref_count); + } + + inline unsigned int lwip_pbuf_inc_ref_count() { + return ++lwip_pbuf.pbuf.ref; + } + + inline unsigned int lwip_pbuf_dec_ref_count() { + if (likely(lwip_pbuf.pbuf.ref)) { + --lwip_pbuf.pbuf.ref; + } + return lwip_pbuf.pbuf.ref; + } + + inline unsigned int lwip_pbuf_get_ref_count() const { + return lwip_pbuf.pbuf.ref; + } }; typedef vma_list_t descq_t; diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index 7c6e7d4581..d373887348 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -248,6 +248,8 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou ring_alloc_logic_attr m_ring_alloc_log_tx; uint32_t m_pcp; + descq_t m_error_queue; + struct { /* Track internal events to return in socketxtreme_poll() * Current design support single event for socket at a particular time From a4583443b90f43e25326f07d1623a3885160b11c Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Fri, 5 Jun 2020 19:10:44 +0300 Subject: [PATCH 03/21] issue: 1792164 Change handle_cmsg() prototype Added flags argument that comes from original recv() call. It is needed to return information from error queue that should be done if MSG_ERRQUEUE is passed. Signed-off-by: Igor Ivanov --- src/vma/sock/sockinfo.cpp | 2 +- src/vma/sock/sockinfo.h | 2 +- src/vma/sock/sockinfo_tcp.cpp | 2 +- src/vma/sock/sockinfo_udp.cpp | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index 29d4e0665e..86775e153a 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -1673,7 +1673,7 @@ void sockinfo::insert_cmsg(struct cmsg_state * cm_state, int level, int type, vo cm_state->cmhdr = next; } -void sockinfo::handle_cmsg(struct msghdr * msg) +void sockinfo::handle_cmsg(struct msghdr * msg, int flags) { struct cmsg_state cm_state; diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index d373887348..83b6973b74 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -330,7 +330,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou virtual void handle_ip_pktinfo(struct cmsg_state *cm_state) = 0; inline void handle_recv_timestamping(struct cmsg_state *cm_state); void insert_cmsg(struct cmsg_state *cm_state, int level, int type, void *data, int len); - void handle_cmsg(struct msghdr * msg); + void handle_cmsg(struct msghdr * msg, int flags); void process_timestamps(mem_buf_desc_t* p_desc); virtual bool try_un_offloading(); // un-offload the socket if possible diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index dc69dd0b8f..9b499b2e77 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -1941,7 +1941,7 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov si_tcp_logfunc("something in rx queues: %d %p", m_n_rx_pkt_ready_list_count, m_rx_pkt_ready_list.front()); total_rx = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); - if (__msg) handle_cmsg(__msg); + if (__msg) handle_cmsg(__msg, in_flags); /* * RCVBUFF Accounting: Going 'out' of the internal buffer: if some bytes are not tcp_recved yet - do that. diff --git a/src/vma/sock/sockinfo_udp.cpp b/src/vma/sock/sockinfo_udp.cpp index 30810af7e7..9efe4f850a 100644 --- a/src/vma/sock/sockinfo_udp.cpp +++ b/src/vma/sock/sockinfo_udp.cpp @@ -1310,7 +1310,7 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov, m_rx_udp_poll_os_ratio_counter++; if (m_n_rx_pkt_ready_list_count > 0) { // Found a ready packet in the list - if (__msg) handle_cmsg(__msg); + if (__msg) handle_cmsg(__msg, in_flags); ret = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); goto out; } @@ -1330,7 +1330,7 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov, if (likely(rx_wait_ret == 0)) { // Got 0, means we might have a ready packet if (m_n_rx_pkt_ready_list_count > 0) { - if (__msg) handle_cmsg(__msg); + if (__msg) handle_cmsg(__msg, in_flags); ret = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); goto out; } else { From 3b233a46ed8b281440d2b9d8f15b6737f3fcc003 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Fri, 5 Jun 2020 20:08:50 +0300 Subject: [PATCH 04/21] issue: 1792164 Add MSG_ERRQUEUE handler Signed-off-by: Igor Ivanov --- src/vma/sock/sockinfo.cpp | 36 +++++++++++++++++++++++++++++++++++- src/vma/sock/sockinfo.h | 1 + 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index 86775e153a..dcb0177509 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -138,7 +138,18 @@ sockinfo::~sockinfo() delete[] m_p_rings_fds; m_p_rings_fds = NULL; } - vma_stats_instance_remove_socket_block(m_p_socket_stats); + + while (!m_error_queue.empty()) { + mem_buf_desc_t* buff = m_error_queue.get_and_pop_front(); + if (buff->m_flags & mem_buf_desc_t::CLONED) { + delete buff; + } else { + si_logerr("Detected invalid element in socket error queue as %p with flags 0x%x", + buff, buff->m_flags); + } + } + + vma_stats_instance_remove_socket_block(m_p_socket_stats); } void sockinfo::set_blocking(bool is_blocked) @@ -1640,6 +1651,28 @@ void sockinfo::handle_recv_timestamping(struct cmsg_state *cm_state) insert_cmsg(cm_state, SOL_SOCKET, SO_TIMESTAMPING, &tsing, sizeof(tsing)); } +void sockinfo::handle_recv_errqueue(struct cmsg_state *cm_state) +{ + mem_buf_desc_t *buff = NULL; + + if (m_error_queue.empty()) { + return; + } + + buff = m_error_queue.get_and_pop_front(); + + if (!(buff->m_flags & mem_buf_desc_t::CLONED)) { + si_logerr("Detected invalid element in socket error queue as %p with flags 0x%x", + buff, buff->m_flags); + return; + } + + insert_cmsg(cm_state, 0, IP_RECVERR, &buff->ee, sizeof(buff->ee)); + cm_state->mhdr->msg_flags |= MSG_ERRQUEUE; + + delete buff; +} + void sockinfo::insert_cmsg(struct cmsg_state * cm_state, int level, int type, void *data, int len) { if (!cm_state->cmhdr || @@ -1683,6 +1716,7 @@ void sockinfo::handle_cmsg(struct msghdr * msg, int flags) if (m_b_pktinfo) handle_ip_pktinfo(&cm_state); if (m_b_rcvtstamp || m_n_tsing_flags) handle_recv_timestamping(&cm_state); + if (flags & MSG_ERRQUEUE) handle_recv_errqueue(&cm_state); cm_state.mhdr->msg_controllen = cm_state.cmsg_bytes_consumed; } diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index 83b6973b74..a4d8cb083d 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -329,6 +329,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou virtual void handle_ip_pktinfo(struct cmsg_state *cm_state) = 0; inline void handle_recv_timestamping(struct cmsg_state *cm_state); + inline void handle_recv_errqueue(struct cmsg_state *cm_state); void insert_cmsg(struct cmsg_state *cm_state, int level, int type, void *data, int len); void handle_cmsg(struct msghdr * msg, int flags); void process_timestamps(mem_buf_desc_t* p_desc); From 28b8b23e45e3e6e06407401969e018f51c264213 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Fri, 5 Jun 2020 20:12:36 +0300 Subject: [PATCH 05/21] issue: 1792164 Add tx zcopy description into mem_buf_desc_t Signed-off-by: Igor Ivanov --- src/vma/proto/mem_buf_desc.h | 15 +++++++++++++++ src/vma/sock/sockinfo.cpp | 2 ++ src/vma/sock/sockinfo.h | 14 ++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/src/vma/proto/mem_buf_desc.h b/src/vma/proto/mem_buf_desc.h index dffd029f0a..88e9c50833 100644 --- a/src/vma/proto/mem_buf_desc.h +++ b/src/vma/proto/mem_buf_desc.h @@ -64,6 +64,7 @@ class mem_buf_desc_t { enum flags { TYPICAL = 0, CLONED = 0x01, + ZCOPY = 0x02 }; public: @@ -134,6 +135,20 @@ class mem_buf_desc_t { struct udphdr* p_udp_h; struct tcphdr* p_tcp_h; }; + struct { + /* This structure allows to track tx zerocopy flow + * including start send id and range in count field + * with total bytes length as len + * where + * id -> ee.ee_info + * id + count -1 -> ee.ee_data + */ + uint32_t id; + uint32_t len; + uint16_t count; + void *ctx; + void (*callback)(mem_buf_desc_t *); + } zc; } tx; }; diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index dcb0177509..9fc027d05e 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -114,6 +114,8 @@ sockinfo::sockinfo(int fd): memset(&m_so_ratelimit, 0, sizeof(vma_rate_limit_t)); set_flow_tag(m_fd + 1); + atomic_set(&m_zckey, 0); + m_socketxtreme.ec.clear(); m_socketxtreme.completion = NULL; m_socketxtreme.last_buff_lst = NULL; diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index a4d8cb083d..cf8c001e98 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -248,8 +248,22 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou ring_alloc_logic_attr m_ring_alloc_log_tx; uint32_t m_pcp; + /* Socket error queue that keeps local errors and internal data required + * to provide notification ability. + */ descq_t m_error_queue; + /* TX zcopy counter + * The notification itself for tx zcopy operation is a simple scalar value. + * Each socket maintains an internal unsigned 32-bit counter. + * Each send call with MSG_ZEROCOPY that successfully sends data increments + * the counter. The counter is not incremented on failure or if called with + * length zero. + * The counter counts system call invocations, not bytes. + * It wraps after UINT_MAX calls. + */ + atomic_t m_zckey; + struct { /* Track internal events to return in socketxtreme_poll() * Current design support single event for socket at a particular time From 815376c7a5ae3184cba859cb3273a175d282302c Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Mon, 15 Jun 2020 19:06:13 +0300 Subject: [PATCH 06/21] issue: 1792164 Define zero copy constants zero copy was introduced at linux kernel 4.14 so prevoius versions do not have related options. Signed-off-by: Igor Ivanov --- src/vma/sock/sockinfo.h | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index cf8c001e98..fe26413152 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -90,6 +90,22 @@ enum { #define SO_REUSEPORT 15 #endif +#ifndef SO_EE_ORIGIN_ZEROCOPY +#define SO_EE_ORIGIN_ZEROCOPY 5 +#endif + +#ifndef SO_ZEROCOPY +#define SO_ZEROCOPY 59 +#endif + +#ifndef SO_EE_CODE_ZEROCOPY_COPIED +#define SO_EE_CODE_ZEROCOPY_COPIED 1 +#endif + +#ifndef MSG_ZEROCOPY +#define MSG_ZEROCOPY 0x4000000 +#endif + struct cmsg_state { struct msghdr *mhdr; From 74de0617964061e87afdf9dbe3866d6d5a901fad Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Tue, 9 Jun 2020 17:30:22 +0300 Subject: [PATCH 07/21] issue: 1792164 Add SO_ZEROCOPY processing Passing the MSG_ZEROCOPY flag is the most obvious step to enable copy avoidance, but not the only one. The kernel is permissive when applications pass undefined flags to the send system call. By default it simply ignores these. To avoid enabling copy avoidance mode for legacy processes that accidentally already pass this flag, a process must first signal intent by setting a socket option as SO_ZEROCOPY. Signed-off-by: Igor Ivanov --- src/vma/sock/sockinfo.cpp | 1 + src/vma/sock/sockinfo.h | 2 ++ src/vma/sock/sockinfo_tcp.cpp | 18 ++++++++++++++++++ 3 files changed, 21 insertions(+) diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index 9fc027d05e..5c97525bbd 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -66,6 +66,7 @@ sockinfo::sockinfo(int fd): m_b_pktinfo(false), m_b_rcvtstamp(false), m_b_rcvtstampns(false), + m_b_zc(false), m_n_tsing_flags(0), m_protocol(PROTO_UNDEFINED), m_lock_rcv(MODULE_NAME "::m_lock_rcv"), diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index fe26413152..f9fbe41c60 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -216,6 +216,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou bool m_b_pktinfo; bool m_b_rcvtstamp; bool m_b_rcvtstampns; + bool m_b_zc; uint8_t m_n_tsing_flags; in_protocol_t m_protocol; @@ -563,6 +564,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou case SO_TIMESTAMP: return "SO_TIMESTAMP"; case SO_TIMESTAMPNS: return "SO_TIMESTAMPNS"; case SO_BINDTODEVICE: return "SO_BINDTODEVICE"; + case SO_ZEROCOPY: return "SO_ZEROCOPY"; case SO_VMA_RING_ALLOC_LOGIC: return "SO_VMA_RING_ALLOC_LOGIC"; case SO_MAX_PACING_RATE: return "SO_MAX_PACING_RATE"; case SO_VMA_FLOW_TAG: return "SO_VMA_FLOW_TAG"; diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index 9b499b2e77..e1090e6bc5 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -3795,6 +3795,15 @@ int sockinfo_tcp::setsockopt(int __level, int __optname, ret = SOCKOPT_HANDLE_BY_OS; break; } + case SO_ZEROCOPY: + if (__optval) { + lock_tcp_con(); + m_b_zc = *(bool *)__optval; + unlock_tcp_con(); + } + ret = SOCKOPT_HANDLE_BY_OS; + si_tcp_logdbg("(SO_ZEROCOPY) m_b_zc: %d", m_b_zc); + break; default: ret = SOCKOPT_HANDLE_BY_OS; supported = false; @@ -3926,6 +3935,15 @@ int sockinfo_tcp::getsockopt_offload(int __level, int __optname, void *__optval, case SO_MAX_PACING_RATE: ret = sockinfo::getsockopt(__level, __optname, __optval, __optlen); break; + case SO_ZEROCOPY: + if (*__optlen >= sizeof(int)) { + *(int *)__optval = m_b_zc; + si_tcp_logdbg("(SO_ZEROCOPY) m_b_zc: %d", m_b_zc); + ret = 0; + } else { + errno = EINVAL; + } + break; default: ret = SOCKOPT_HANDLE_BY_OS; break; From 3457d7d5cd1339fc12b48560f5dc361e2bbf2055 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Tue, 9 Jun 2020 19:46:29 +0300 Subject: [PATCH 08/21] issue: 1792164 Add PBUF_ZEROCOPY type of allocated pbuf Extend pbuf allocation functions with new parameter as pbuf_type to by pass requested type of memory to socket. Socket layer needs this information to manage different types of mem_buf_desc_t elements. Signed-off-by: Igor Ivanov --- src/vma/lwip/pbuf.h | 3 ++- src/vma/lwip/tcp.c | 6 +++--- src/vma/lwip/tcp.h | 2 +- src/vma/lwip/tcp_in.c | 4 ++-- src/vma/lwip/tcp_out.c | 15 +++++++++------ src/vma/sock/sockinfo_tcp.cpp | 4 +++- src/vma/sock/sockinfo_tcp.h | 2 +- 7 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/vma/lwip/pbuf.h b/src/vma/lwip/pbuf.h index d59bf87e59..a39a427a4a 100644 --- a/src/vma/lwip/pbuf.h +++ b/src/vma/lwip/pbuf.h @@ -57,7 +57,8 @@ typedef enum { PBUF_RAM, /* pbuf data is stored in RAM */ PBUF_ROM, /* pbuf data is stored in ROM */ PBUF_REF, /* pbuf comes from the pbuf pool */ - PBUF_POOL /* pbuf payload refers to RAM */ + PBUF_POOL, /* pbuf payload refers to RAM */ + PBUF_ZEROCOPY /* pbuf data used directly from user */ } pbuf_type; diff --git a/src/vma/lwip/tcp.c b/src/vma/lwip/tcp.c index 43e343f92f..cb0bd2d34c 100644 --- a/src/vma/lwip/tcp.c +++ b/src/vma/lwip/tcp.c @@ -1084,10 +1084,10 @@ tcp_tx_pbuf_alloc(struct tcp_pcb * pcb, u16_t length, pbuf_type type) { struct pbuf * p; - if (!pcb->pbuf_alloc) { + if (!pcb->pbuf_alloc || pcb->pbuf_alloc->type != type) { // pbuf_alloc is not valid, we should allocate a new pbuf. - p = external_tcp_tx_pbuf_alloc(pcb); + p = external_tcp_tx_pbuf_alloc(pcb, type); if (!p) return NULL; p->next = NULL; @@ -1129,7 +1129,7 @@ tcp_tx_pbuf_free(struct tcp_pcb * pcb, struct pbuf * p) while (p) { p_next = p->next; p->next = NULL; - if (p->type == PBUF_RAM) { + if (p->type == PBUF_RAM || p->type == PBUF_ZEROCOPY) { external_tcp_tx_pbuf_free(pcb, p); } else { pbuf_free(p); diff --git a/src/vma/lwip/tcp.h b/src/vma/lwip/tcp.h index 22d8bb79aa..394ab624f7 100644 --- a/src/vma/lwip/tcp.h +++ b/src/vma/lwip/tcp.h @@ -66,7 +66,7 @@ void register_sys_readv(sys_readv_fn fn); #endif #if LWIP_3RD_PARTY_BUFS -typedef struct pbuf * (*tcp_tx_pbuf_alloc_fn)(void* p_conn); +typedef struct pbuf * (*tcp_tx_pbuf_alloc_fn)(void* p_conn, pbuf_type type); void register_tcp_tx_pbuf_alloc(tcp_tx_pbuf_alloc_fn fn); diff --git a/src/vma/lwip/tcp_in.c b/src/vma/lwip/tcp_in.c index 1f8b693ab8..e4eee96e44 100644 --- a/src/vma/lwip/tcp_in.c +++ b/src/vma/lwip/tcp_in.c @@ -820,7 +820,7 @@ tcp_shrink_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t ackno) seg->p->next = cur_p; p->next = NULL; - if (p->type == PBUF_RAM) { + if (p->type == PBUF_RAM || p->type == PBUF_ZEROCOPY) { external_tcp_tx_pbuf_free(pcb, p); } else { pbuf_free(p); @@ -844,7 +844,7 @@ tcp_shrink_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t ackno) seg->p->next = cur_p; p->next = NULL; - if (p->type == PBUF_RAM) { + if (p->type == PBUF_RAM || p->type == PBUF_ZEROCOPY) { external_tcp_tx_pbuf_free(pcb, p); } else { pbuf_free(p); diff --git a/src/vma/lwip/tcp_out.c b/src/vma/lwip/tcp_out.c index efe944fd3f..2c6af15e0b 100644 --- a/src/vma/lwip/tcp_out.c +++ b/src/vma/lwip/tcp_out.c @@ -286,7 +286,7 @@ tcp_create_segment(struct tcp_pcb *pcb, struct pbuf *p, u8_t flags, u32_t seqno, */ static struct pbuf * tcp_pbuf_prealloc(u16_t length, u16_t max_length, - u16_t *oversize, struct tcp_pcb *pcb, u8_t tcp_write_flag_more, + u16_t *oversize, struct tcp_pcb *pcb, pbuf_type type, u8_t tcp_write_flag_more, u8_t first_seg) { struct pbuf *p; @@ -310,7 +310,7 @@ tcp_pbuf_prealloc(u16_t length, u16_t max_length, alloc = LWIP_MIN(max_length, LWIP_MEM_ALIGN_SIZE(length + pcb->tcp_oversize_val)); } } - p = tcp_tx_pbuf_alloc(pcb, alloc, PBUF_RAM); + p = tcp_tx_pbuf_alloc(pcb, alloc, type); if (p == NULL) { return NULL; } @@ -440,6 +440,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) struct iovec piov[piov_max_size]; int piov_cur_index = 0; int piov_cur_len = 0; + pbuf_type type = PBUF_RAM; int byte_queued = pcb->snd_nxt - pcb->lastack; if ( len < pcb->mss && !(apiflags & TCP_WRITE_DUMMY)) @@ -567,7 +568,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) * can use PBUF_RAW here since the data appears in the middle of * a segment. A header will never be prepended. */ /* Data is copied */ - if ((concat_p = tcp_pbuf_prealloc(seglen, space, &oversize, pcb, TCP_WRITE_FLAG_MORE, 1)) == NULL) { + if ((concat_p = tcp_pbuf_prealloc(seglen, space, &oversize, pcb, type, TCP_WRITE_FLAG_MORE, 1)) == NULL) { LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_write : could not allocate memory for pbuf copy size %"U16_F"\n", seglen)); @@ -606,7 +607,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) /* If copy is set, memory should be allocated and data copied * into pbuf */ - if ((p = tcp_pbuf_prealloc(seglen + optlen, max_len, &oversize, pcb, TCP_WRITE_FLAG_MORE, queue == NULL)) == NULL) { + if ((p = tcp_pbuf_prealloc(seglen + optlen, max_len, &oversize, pcb, type, TCP_WRITE_FLAG_MORE, queue == NULL)) == NULL) { LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_write : could not allocate memory for pbuf copy size %"U16_F"\n", seglen)); goto memerr; } @@ -1058,6 +1059,7 @@ tcp_split_one_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t lentosend, struct pbuf *cur_p = NULL; u16_t max_length = 0; u16_t oversize = 0; + pbuf_type type = PBUF_RAM; cur_seg = seg; max_length = cur_seg->p->len; @@ -1066,7 +1068,7 @@ tcp_split_one_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t lentosend, u32_t lentoqueue = cur_seg->len - lentosend; /* Allocate memory for p_buf and fill in fields. */ - if (NULL == (cur_p = tcp_pbuf_prealloc(lentoqueue + optlen, max_length, &oversize, pcb, 0, 0))) { + if (NULL == (cur_p = tcp_pbuf_prealloc(lentoqueue + optlen, max_length, &oversize, pcb, type, 0, 0))) { LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_split_one_segment: could not allocate memory for pbuf copy size %"U16_F"\n", (lentoqueue + optlen))); goto out; } @@ -1264,6 +1266,7 @@ tcp_split_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t wnd) u16_t oversize = 0; u8_t optlen = 0, optflags = 0; u16_t mss_local = 0; + pbuf_type type = PBUF_RAM; LWIP_ASSERT("tcp_split_segment: sanity check", (seg && seg->p)); @@ -1294,7 +1297,7 @@ tcp_split_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t wnd) if (seg->p->len > ((TCP_HLEN + optlen) + lentosend)) {/* First buffer is too big, split it */ u32_t lentoqueue = seg->p->len - (TCP_HLEN + optlen) - lentosend; - if (NULL == (p = tcp_pbuf_prealloc(lentoqueue + optlen, mss_local, &oversize, pcb, 0, 0))) { + if (NULL == (p = tcp_pbuf_prealloc(lentoqueue + optlen, mss_local, &oversize, pcb, type, 0, 0))) { LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_split_segment: could not allocate memory for pbuf copy size %"U16_F"\n", (lentoqueue + optlen))); return; } diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index e1090e6bc5..cb9931230c 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -4534,11 +4534,13 @@ int sockinfo_tcp::free_buffs(uint16_t len) return 0; } -struct pbuf * sockinfo_tcp::tcp_tx_pbuf_alloc(void* p_conn) +struct pbuf * sockinfo_tcp::tcp_tx_pbuf_alloc(void* p_conn, pbuf_type type) { sockinfo_tcp *p_si_tcp = (sockinfo_tcp *)(((struct tcp_pcb*)p_conn)->my_container); dst_entry_tcp *p_dst = (dst_entry_tcp *)(p_si_tcp->m_p_connected_dst_entry); mem_buf_desc_t* p_desc = NULL; + + NOT_IN_USE(type); if (likely(p_dst)) { p_desc = p_dst->get_buffer(); } diff --git a/src/vma/sock/sockinfo_tcp.h b/src/vma/sock/sockinfo_tcp.h index 53c14c4c85..9324546dd7 100644 --- a/src/vma/sock/sockinfo_tcp.h +++ b/src/vma/sock/sockinfo_tcp.h @@ -180,7 +180,7 @@ class sockinfo_tcp : public sockinfo, public timer_handler virtual void update_header_field(data_updater *updater); virtual bool rx_input_cb(mem_buf_desc_t* p_rx_pkt_mem_buf_desc_info, void* pv_fd_ready_array); - static struct pbuf * tcp_tx_pbuf_alloc(void* p_conn); + static struct pbuf * tcp_tx_pbuf_alloc(void* p_conn, pbuf_type type); static void tcp_tx_pbuf_free(void* p_conn, struct pbuf *p_buff); static struct tcp_seg * tcp_seg_alloc(void* p_conn); static void tcp_seg_free(void* p_conn, struct tcp_seg * seg); From 7aa343d3b386a3681dc037b2dc6b625f68ff693e Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Tue, 9 Jun 2020 21:19:36 +0300 Subject: [PATCH 09/21] issue: 1792164 Introduce flags to process zero copy send There flags are added: VMA_TX_PACKET_ZEROCOPY - to use on sockinfo/dst_entry layers TCP_WRITE_ZEROCOPY - to use inside lwip tcp_write TF_SEG_OPTS_ZEROCOPY - to mark tcp segment with zero copy attribute Signed-off-by: Igor Ivanov --- src/vma/lwip/tcp.h | 1 + src/vma/lwip/tcp_impl.h | 1 + src/vma/proto/dst_entry_tcp.cpp | 5 +++++ src/vma/proto/dst_entry_udp.cpp | 8 +++++++- src/vma/proto/vma_lwip.h | 22 ++++++++++++++++++---- 5 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/vma/lwip/tcp.h b/src/vma/lwip/tcp.h index 394ab624f7..83509a44d4 100644 --- a/src/vma/lwip/tcp.h +++ b/src/vma/lwip/tcp.h @@ -528,6 +528,7 @@ err_t tcp_shutdown(struct tcp_pcb *pcb, int shut_rx, int shut_tx); #define TCP_WRITE_DUMMY 0x10 #define TCP_WRITE_TSO 0x20 #define TCP_WRITE_FILE 0x40 +#define TCP_WRITE_ZEROCOPY 0x80 err_t tcp_write (struct tcp_pcb *pcb, const void *dataptr, u32_t len, u8_t apiflags); diff --git a/src/vma/lwip/tcp_impl.h b/src/vma/lwip/tcp_impl.h index 2419178c53..5eb9b11166 100644 --- a/src/vma/lwip/tcp_impl.h +++ b/src/vma/lwip/tcp_impl.h @@ -305,6 +305,7 @@ struct tcp_seg { #define TF_SEG_OPTS_WNDSCALE (u8_t)0x08U /* Include window scaling option */ #define TF_SEG_OPTS_DUMMY_MSG (u8_t)TCP_WRITE_DUMMY /* Include dummy send option */ #define TF_SEG_OPTS_TSO (u8_t)TCP_WRITE_TSO /* Use TSO send mode */ +#define TF_SEG_OPTS_ZEROCOPY (u8_t)TCP_WRITE_ZEROCOPY /* Use zerocopy send mode */ struct tcp_hdr *tcphdr; /* the TCP header */ }; diff --git a/src/vma/proto/dst_entry_tcp.cpp b/src/vma/proto/dst_entry_tcp.cpp index 7582e0d3f0..f7bed9abdd 100644 --- a/src/vma/proto/dst_entry_tcp.cpp +++ b/src/vma/proto/dst_entry_tcp.cpp @@ -81,6 +81,11 @@ ssize_t dst_entry_tcp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s p_tcp_iov = (tcp_iovec*)p_iov; + /* Suppress flags that should not be used anymore + * to avoid conflicts with VMA_TX_PACKET_L3_CSUM and VMA_TX_PACKET_L4_CSUM + */ + attr.flags = (vma_wr_tx_packet_attr)(attr.flags & ~(VMA_TX_PACKET_ZEROCOPY | VMA_TX_FILE)); + attr.flags = (vma_wr_tx_packet_attr)(attr.flags | VMA_TX_PACKET_L3_CSUM | VMA_TX_PACKET_L4_CSUM); /* Supported scenarios: diff --git a/src/vma/proto/dst_entry_udp.cpp b/src/vma/proto/dst_entry_udp.cpp index bff9bb247f..d42fb4b1dd 100644 --- a/src/vma/proto/dst_entry_udp.cpp +++ b/src/vma/proto/dst_entry_udp.cpp @@ -308,6 +308,12 @@ ssize_t dst_entry_udp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s { // Calc user data payload size ssize_t sz_data_payload = 0; + + /* Suppress flags that should not be used anymore + * to avoid conflicts with VMA_TX_PACKET_L3_CSUM and VMA_TX_PACKET_L4_CSUM + */ + attr.flags = (vma_wr_tx_packet_attr)(attr.flags & ~(VMA_TX_PACKET_ZEROCOPY | VMA_TX_FILE)); + for (ssize_t i = 0; i < sz_iov; i++) sz_data_payload += p_iov[i].iov_len; @@ -324,7 +330,7 @@ ssize_t dst_entry_udp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s attr.flags = (vma_wr_tx_packet_attr)(attr.flags | VMA_TX_PACKET_L3_CSUM | VMA_TX_PACKET_L4_CSUM); return fast_send_not_fragmented(p_iov, sz_iov, attr.flags, sz_udp_payload, sz_data_payload); } else { - attr.flags = (vma_wr_tx_packet_attr)(attr.flags | VMA_TX_PACKET_L3_CSUM); + attr.flags = (vma_wr_tx_packet_attr)(attr.flags | VMA_TX_PACKET_L3_CSUM); return fast_send_fragmented(p_iov, sz_iov, attr.flags, sz_udp_payload, sz_data_payload); } } diff --git a/src/vma/proto/vma_lwip.h b/src/vma/proto/vma_lwip.h index a334541735..1bc345c950 100644 --- a/src/vma/proto/vma_lwip.h +++ b/src/vma/proto/vma_lwip.h @@ -51,11 +51,25 @@ typedef enum vma_wr_tx_packet_attr { VMA_TX_PACKET_TSO = TCP_WRITE_TSO, /* 0x20 */ /* sendfile operation. */ VMA_TX_FILE = TCP_WRITE_FILE, /* 0x40 */ + /* zcopy write operation (MSG_ZEROCOPY). */ + VMA_TX_PACKET_ZEROCOPY = TCP_WRITE_ZEROCOPY, /* 0x80 */ - /* MLX5_ETH_WQE_L3_CSUM offload to HW L3 (IP) header checksum */ - VMA_TX_PACKET_L3_CSUM = (1 << 6), /* hardcoded values. It is the same as VMA_TX_FILE but there is no conflict */ - /* MLX5_ETH_WQE_L4_CSUM offload to HW L4 (TCP/UDP) header checksum */ - VMA_TX_PACKET_L4_CSUM = (1 << 7), /* hardcoded values */ + /* MLX5_ETH_WQE_L3_CSUM offload to HW L3 (IP) header checksum + * Important: + * - hardcoded value used directly to program send to wire + * - it is the same as VMA_TX_FILE but there is no conflict as far as + * VMA_TX_FILE is passed into dst_entry::fast_send() operation + * and it is not needed later doing send to wire + */ + VMA_TX_PACKET_L3_CSUM = (1 << 6), + /* MLX5_ETH_WQE_L4_CSUM offload to HW L4 (TCP/UDP) header checksum + * Important: + * - hardcoded value used directly to program send to wire + * - it is the same as TCP_WRITE_ZEROCOPY but there is no conflict as far as + * TCP_WRITE_ZEROCOPY is passed into dst_entry::fast_send() operation + * and it is not needed later doing send to wire + */ + VMA_TX_PACKET_L4_CSUM = (1 << 7), /* blocking send operation */ VMA_TX_PACKET_BLOCK = (1 << 8), /* Force SW checksum */ From 969ade649360d3d45108d6c741585dbf656bb014 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Wed, 10 Jun 2020 20:00:10 +0300 Subject: [PATCH 10/21] issue: 1792164 Add MSG_ZEROCOPY processing These changes make workable MSG_ZEROCOPY send flow including notification mechanizm. It is needed to notify the process when it is safe to reuse a previously passed buffer. It queues completion notifications on the socket error queue. But copy avoidance internally is not done. So all data is copied in internal buffers as without MSG_ZEROCOPY. Full zcopy support will be implemented later. Signed-off-by: Igor Ivanov --- src/vma/dev/buffer_pool.h | 6 ++ src/vma/lwip/pbuf.c | 3 + src/vma/lwip/tcp_out.c | 2 +- src/vma/proto/dst_entry_tcp.cpp | 2 +- src/vma/sock/sockinfo_tcp.cpp | 98 ++++++++++++++++++++++++++++++++- src/vma/sock/sockinfo_tcp.h | 3 + 6 files changed, 110 insertions(+), 4 deletions(-) diff --git a/src/vma/dev/buffer_pool.h b/src/vma/dev/buffer_pool.h index 606f37b095..1c229a7fa6 100644 --- a/src/vma/dev/buffer_pool.h +++ b/src/vma/dev/buffer_pool.h @@ -41,6 +41,12 @@ inline static void free_lwip_pbuf(struct pbuf_custom *pbuf_custom) { + mem_buf_desc_t* p_desc = (mem_buf_desc_t *)pbuf_custom; + + if (p_desc->m_flags & mem_buf_desc_t::ZCOPY) { + p_desc->tx.zc.callback(p_desc); + } + pbuf_custom->pbuf.type = 0; pbuf_custom->pbuf.flags = 0; pbuf_custom->pbuf.ref = 0; } diff --git a/src/vma/lwip/pbuf.c b/src/vma/lwip/pbuf.c index 7a90513d1a..6c499fcb4f 100644 --- a/src/vma/lwip/pbuf.c +++ b/src/vma/lwip/pbuf.c @@ -200,6 +200,9 @@ pbuf_header(struct pbuf *p, s16_t header_size_increment) return 1; /* AlexV: we need to check that the header EXPANTION is legal for PBUF_REF & PBUF_ROM pbufs! */ p->payload = (u8_t *)p->payload - header_size_increment; + } else if (type == PBUF_ZEROCOPY) { + /* temporary do the same as for PBUF_RAM until zcopy support is not ready */ + p->payload = (u8_t *)p->payload - header_size_increment; } else { /* Unknown type */ LWIP_ASSERT("bad pbuf type", 0); diff --git a/src/vma/lwip/tcp_out.c b/src/vma/lwip/tcp_out.c index 2c6af15e0b..1c7d9b108f 100644 --- a/src/vma/lwip/tcp_out.c +++ b/src/vma/lwip/tcp_out.c @@ -440,7 +440,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) struct iovec piov[piov_max_size]; int piov_cur_index = 0; int piov_cur_len = 0; - pbuf_type type = PBUF_RAM; + pbuf_type type = (apiflags & TCP_WRITE_ZEROCOPY ? PBUF_ZEROCOPY : PBUF_RAM); int byte_queued = pcb->snd_nxt - pcb->lastack; if ( len < pcb->mss && !(apiflags & TCP_WRITE_DUMMY)) diff --git a/src/vma/proto/dst_entry_tcp.cpp b/src/vma/proto/dst_entry_tcp.cpp index f7bed9abdd..9629e6be55 100644 --- a/src/vma/proto/dst_entry_tcp.cpp +++ b/src/vma/proto/dst_entry_tcp.cpp @@ -463,7 +463,7 @@ void dst_entry_tcp::put_buffer(mem_buf_desc_t * p_desc) if (p_desc->lwip_pbuf.pbuf.ref == 0) { p_desc->p_next_desc = NULL; - g_buffer_pool_tx->put_buffers_thread_safe(p_desc); + buffer_pool::free_tx_lwip_pbuf_custom(&p_desc->lwip_pbuf.pbuf); } } } diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index cb9931230c..472be834ac 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -824,6 +824,10 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg) } #endif + if ((__flags & MSG_ZEROCOPY) && (m_b_zc)) { + apiflags |= VMA_TX_PACKET_ZEROCOPY; + } + for (int i = 0; i < sz_iov; i++) { si_tcp_logfunc("iov:%d base=%p len=%d", i, p_iov[i].iov_base, p_iov[i].iov_len); @@ -948,6 +952,14 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg) m_p_socket_stats->n_tx_ready_byte_count += total_tx; } + /* Each send call with MSG_ZEROCOPY that successfully sends + * data increments the counter. + * The counter is not incremented on failure or if called with length zero. + */ + if ((apiflags & VMA_TX_PACKET_ZEROCOPY) && (total_tx > 0)) { + atomic_fetch_and_inc(&m_zckey); + } + unlock_tcp_con(); #ifdef VMA_TIME_MEASURE @@ -4540,9 +4552,11 @@ struct pbuf * sockinfo_tcp::tcp_tx_pbuf_alloc(void* p_conn, pbuf_type type) dst_entry_tcp *p_dst = (dst_entry_tcp *)(p_si_tcp->m_p_connected_dst_entry); mem_buf_desc_t* p_desc = NULL; - NOT_IN_USE(type); if (likely(p_dst)) { p_desc = p_dst->get_buffer(); + if (p_desc && (type == PBUF_ZEROCOPY)) { + p_desc = p_si_tcp->tcp_tx_zc_alloc(p_desc); + } } return (struct pbuf *)p_desc; } @@ -4565,11 +4579,91 @@ void sockinfo_tcp::tcp_tx_pbuf_free(void* p_conn, struct pbuf *p_buff) if (p_desc->lwip_pbuf.pbuf.ref == 0) { p_desc->p_next_desc = NULL; - g_buffer_pool_tx->put_buffers_thread_safe(p_desc); + buffer_pool::free_tx_lwip_pbuf_custom(p_buff); } } } +mem_buf_desc_t* sockinfo_tcp::tcp_tx_zc_alloc(mem_buf_desc_t* p_desc) +{ + p_desc->m_flags |= mem_buf_desc_t::ZCOPY; + p_desc->tx.zc.id = atomic_read(&m_zckey); + p_desc->tx.zc.count = 1; + p_desc->tx.zc.len = p_desc->lwip_pbuf.pbuf.len; + p_desc->tx.zc.ctx = (void *)this; + p_desc->tx.zc.callback = tcp_tx_zc_callback; + + return p_desc; +} + +void sockinfo_tcp::tcp_tx_zc_callback(mem_buf_desc_t* p_desc) +{ + uint32_t lo, hi; + uint16_t count; + uint32_t prev_lo, prev_hi; + mem_buf_desc_t* err_queue = NULL; + sockinfo_tcp* sock = NULL; + + if (!p_desc) { + return; + } + + if (!p_desc->tx.zc.ctx || !p_desc->tx.zc.count) { + goto cleanup; + } + + sock = (sockinfo_tcp *)p_desc->tx.zc.ctx; + + if (sock->m_state != SOCKINFO_OPENED) { + goto cleanup; + } + + count = p_desc->tx.zc.count; + lo = p_desc->tx.zc.id; + hi = lo + count - 1; + memset(&p_desc->ee, 0, sizeof(p_desc->ee)); + p_desc->ee.ee_errno = 0; + p_desc->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY; + p_desc->ee.ee_data = hi; + p_desc->ee.ee_info = lo; +// p_desc->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED; + + /* Update last error queue element in case it has the same type */ + err_queue = sock->m_error_queue.back(); + if (err_queue && + (err_queue->ee.ee_origin == p_desc->ee.ee_origin) && + (err_queue->ee.ee_code == p_desc->ee.ee_code)) { + uint64_t sum_count = 0; + + prev_hi = err_queue->ee.ee_data; + prev_lo = err_queue->ee.ee_info; + sum_count = prev_hi - prev_lo + 1ULL + count; + + if (lo == prev_lo) { + err_queue->ee.ee_data = hi; + } else if ((sum_count >= (1ULL << 32)) || (lo != prev_hi + 1)) { + err_queue = NULL; + } else { + err_queue->ee.ee_data += count; + } + } + + /* Add information into error queue element */ + if (!err_queue) { + err_queue = p_desc->clone(); + sock->m_error_queue.push_back(err_queue); + } + + /* Signal events on socket */ + NOTIFY_ON_EVENTS(sock, EPOLLERR); + sock->do_wakeup(); + +cleanup: + /* Clean up */ + p_desc->m_flags &= ~mem_buf_desc_t::ZCOPY; + memset(&p_desc->tx.zc, 0, sizeof(p_desc->tx.zc)); +} + struct tcp_seg * sockinfo_tcp::tcp_seg_alloc(void* p_conn) { sockinfo_tcp *p_si_tcp = (sockinfo_tcp *)(((struct tcp_pcb*)p_conn)->my_container); diff --git a/src/vma/sock/sockinfo_tcp.h b/src/vma/sock/sockinfo_tcp.h index 9324546dd7..a46ecc57d9 100644 --- a/src/vma/sock/sockinfo_tcp.h +++ b/src/vma/sock/sockinfo_tcp.h @@ -185,6 +185,9 @@ class sockinfo_tcp : public sockinfo, public timer_handler static struct tcp_seg * tcp_seg_alloc(void* p_conn); static void tcp_seg_free(void* p_conn, struct tcp_seg * seg); + mem_buf_desc_t* tcp_tx_zc_alloc(mem_buf_desc_t* p_desc); + static void tcp_tx_zc_callback(mem_buf_desc_t* p_desc); + bool inline is_readable(uint64_t *p_poll_sn, fd_array_t *p_fd_array = NULL); bool inline is_writeable(); bool inline is_errorable(int *errors); From 94ffe80f972b49190f534d8b2e1342e70e3277fb Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Thu, 11 Jun 2020 17:54:12 +0300 Subject: [PATCH 11/21] issue: 1792164 Force tx completion for zerocopy buffers ZCOPY packets should notify application as soon as possible to confirm one that user buffers are free to reuse. So force completion signal for such work requests. Signed-off-by: Igor Ivanov --- src/vma/dev/qp_mgr.cpp | 16 +++++++++++----- src/vma/dev/qp_mgr.h | 5 ++++- src/vma/dev/qp_mgr_eth_mlx5.cpp | 1 - 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/vma/dev/qp_mgr.cpp b/src/vma/dev/qp_mgr.cpp index 4a78bbe6fc..f0fa8a7b51 100644 --- a/src/vma/dev/qp_mgr.cpp +++ b/src/vma/dev/qp_mgr.cpp @@ -100,7 +100,6 @@ qp_mgr::qp_mgr(const ring_simple* p_ring, const ib_ctx_handler* p_context, m_ibv_rx_sg_array = new ibv_sge[m_n_sysvar_rx_num_wr_to_post_recv]; m_ibv_rx_wr_array = new ibv_recv_wr[m_n_sysvar_rx_num_wr_to_post_recv]; - set_unsignaled_count(); memset(&m_rate_limit, 0, sizeof(struct vma_rate_limit_t)); qp_logfunc(""); @@ -336,7 +335,6 @@ void qp_mgr::up() release_tx_buffers(); /* clean any link to completions with error we might have */ - set_unsignaled_count(); m_p_last_tx_mem_buf_desc = NULL; modify_qp_to_ready_state(); @@ -497,7 +495,6 @@ void qp_mgr::trigger_completion_for_all_sent_packets() // Close the Tx unsignaled send list set_unsignaled_count(); - m_p_last_tx_mem_buf_desc = NULL; if (!m_p_ring->m_tx_num_wr_free) { qp_logdbg("failed to trigger completion for all packets due to no available wr"); @@ -599,9 +596,19 @@ inline int qp_mgr::send_to_wire(vma_ibv_send_wr* p_send_wqe, vma_wr_tx_packet_at int qp_mgr::send(vma_ibv_send_wr* p_send_wqe, vma_wr_tx_packet_attr attr) { mem_buf_desc_t* p_mem_buf_desc = (mem_buf_desc_t *)p_send_wqe->wr_id; + /* Control tx completions: + * - VMA_TX_WRE_BATCHING - The number of Tx Work Request Elements used + * until a completion signal is requested. + * - ZCOPY packets should notify application as soon as possible to + * confirm one that user buffers are free to reuse. So force completion + * signal for such work requests. + * - First call of send() should do completion. It means that + * m_n_unsignaled_count must be zero for this time. + */ + bool request_comp = (is_completion_need() || + (p_mem_buf_desc->m_flags & mem_buf_desc_t::ZCOPY)); qp_logfunc("VERBS send, unsignaled_count: %d", m_n_unsignaled_count); - bool request_comp = is_completion_need(); #ifdef VMA_TIME_MEASURE TAKE_T_TX_POST_SEND_START; @@ -636,7 +643,6 @@ int qp_mgr::send(vma_ibv_send_wr* p_send_wqe, vma_wr_tx_packet_attr attr) int ret; set_unsignaled_count(); - m_p_last_tx_mem_buf_desc = NULL; // Poll the Tx CQ uint64_t dummy_poll_sn = 0; diff --git a/src/vma/dev/qp_mgr.h b/src/vma/dev/qp_mgr.h index 9a7b52c7e0..38898aed73 100644 --- a/src/vma/dev/qp_mgr.h +++ b/src/vma/dev/qp_mgr.h @@ -172,7 +172,10 @@ friend class cq_mgr_mp; int configure(struct ibv_comp_channel* p_rx_comp_event_channel); virtual int prepare_ibv_qp(vma_ibv_qp_init_attr& qp_init_attr) = 0; - inline void set_unsignaled_count(void) { m_n_unsignaled_count = m_n_sysvar_tx_num_wr_to_signal - 1; } + inline void set_unsignaled_count(void) { + m_n_unsignaled_count = m_n_sysvar_tx_num_wr_to_signal - 1; + m_p_last_tx_mem_buf_desc = NULL; + } virtual cq_mgr* init_rx_cq_mgr(struct ibv_comp_channel* p_rx_comp_event_channel); virtual cq_mgr* init_tx_cq_mgr(void); diff --git a/src/vma/dev/qp_mgr_eth_mlx5.cpp b/src/vma/dev/qp_mgr_eth_mlx5.cpp index 3557629f54..e97d223aa5 100644 --- a/src/vma/dev/qp_mgr_eth_mlx5.cpp +++ b/src/vma/dev/qp_mgr_eth_mlx5.cpp @@ -887,7 +887,6 @@ void qp_mgr_eth_mlx5::trigger_completion_for_all_sent_packets() // Close the Tx unsignaled send list set_unsignaled_count(); - m_p_last_tx_mem_buf_desc = NULL; if (!m_p_ring->m_tx_num_wr_free) { qp_logdbg("failed to trigger completion for all packets due to no available wr"); From c239eff46dbbca84f1173df0e0d93aa82d96b99d Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Tue, 30 Jun 2020 17:33:31 +0300 Subject: [PATCH 12/21] issue: 1792164 Track last memory descriptor with identical zcopy counter TCP write can create several memory descriptors for single write call with identical zcopy id. Notification should be done just in case last one is free. This change does not garantee correctness completelly when during the same write() call memory descriptor set current zcopy id after previous memory descriptor get tx completion and ack. zcopy operation should allocate memory buffer to track unique counter correctly. So tcp_write() should avoid adding portion of data to existing pbuf. Signed-off-by: Igor Ivanov --- src/vma/lwip/tcp_out.c | 7 ++++--- src/vma/sock/sockinfo.cpp | 1 + src/vma/sock/sockinfo.h | 3 +++ src/vma/sock/sockinfo_tcp.cpp | 18 ++++++++++++++++-- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/vma/lwip/tcp_out.c b/src/vma/lwip/tcp_out.c index 1c7d9b108f..97376247f3 100644 --- a/src/vma/lwip/tcp_out.c +++ b/src/vma/lwip/tcp_out.c @@ -465,6 +465,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) #endif /* LWIP_TSO */ optflags |= (apiflags & TCP_WRITE_DUMMY ? TF_SEG_OPTS_DUMMY_MSG : 0); + optflags |= (apiflags & TCP_WRITE_ZEROCOPY ? TF_SEG_OPTS_ZEROCOPY : 0); #if LWIP_TCP_TIMESTAMPS if ((pcb->flags & TF_TIMESTAMP)) { @@ -535,7 +536,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) #endif /* TCP_OVERSIZE_DBGCHECK */ if (pcb->unsent_oversize > 0) { - if (!(apiflags & TCP_WRITE_FILE)) { + if (!(apiflags & (TCP_WRITE_FILE | TCP_WRITE_ZEROCOPY))) { oversize = pcb->unsent_oversize; LWIP_ASSERT("inconsistent oversize vs. space", oversize_used <= space); oversize_used = oversize < len ? oversize : len; @@ -556,10 +557,10 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) * the end. */ #if LWIP_TSO - if (!(apiflags & TCP_WRITE_FILE) && (pos < len) && (space > 0) && (pcb->last_unsent->len > 0) && + if (!(apiflags & (TCP_WRITE_FILE | TCP_WRITE_ZEROCOPY)) && (pos < len) && (space > 0) && (pcb->last_unsent->len > 0) && (tot_p < (int)pcb->tso.max_send_sge)) { #else - if (!(apiflags & TCP_WRITE_FILE) && (pos < len) && (space > 0) && (pcb->last_unsent->len > 0)) { + if (!(apiflags & (TCP_WRITE_FILE | TCP_WRITE_ZEROCOPY)) && (pos < len) && (space > 0) && (pcb->last_unsent->len > 0)) { #endif /* LWIP_TSO */ u16_t seglen = space < len - pos ? space : len - pos; diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index 5c97525bbd..51044b09b6 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -116,6 +116,7 @@ sockinfo::sockinfo(int fd): set_flow_tag(m_fd + 1); atomic_set(&m_zckey, 0); + m_last_zcdesc = NULL; m_socketxtreme.ec.clear(); m_socketxtreme.completion = NULL; diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index f9fbe41c60..dee7325b0e 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -281,6 +281,9 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou */ atomic_t m_zckey; + /* Last memory descriptor with zcopy operation method */ + mem_buf_desc_t* m_last_zcdesc; + struct { /* Track internal events to return in socketxtreme_poll() * Current design support single event for socket at a particular time diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index 472be834ac..c4f343d638 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -956,8 +956,13 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg) * data increments the counter. * The counter is not incremented on failure or if called with length zero. */ - if ((apiflags & VMA_TX_PACKET_ZEROCOPY) && (total_tx > 0)) { - atomic_fetch_and_inc(&m_zckey); + if ((apiflags & VMA_TX_PACKET_ZEROCOPY) && + (total_tx > 0)) { + if (m_last_zcdesc->tx.zc.id != (uint32_t)atomic_read(&m_zckey)) { + si_tcp_logerr("Invalid tx zcopy operation"); + } else { + atomic_fetch_and_inc(&m_zckey); + } } unlock_tcp_con(); @@ -4593,6 +4598,15 @@ mem_buf_desc_t* sockinfo_tcp::tcp_tx_zc_alloc(mem_buf_desc_t* p_desc) p_desc->tx.zc.ctx = (void *)this; p_desc->tx.zc.callback = tcp_tx_zc_callback; + if (m_last_zcdesc && + (m_last_zcdesc != p_desc) && + (m_last_zcdesc->lwip_pbuf.pbuf.ref > 0) && + (m_last_zcdesc->tx.zc.id == p_desc->tx.zc.id)) { + m_last_zcdesc->tx.zc.len = m_last_zcdesc->lwip_pbuf.pbuf.len; + m_last_zcdesc->tx.zc.count = 0; + } + m_last_zcdesc = p_desc; + return p_desc; } From b22e4e9c2a02102974e3d6ac93406e82684c7e6a Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Tue, 30 Jun 2020 18:32:44 +0300 Subject: [PATCH 13/21] issue: 1792164 Do TX polling from internal thread To effectively process TX completions VMA should polling TX from internal thread too otherwise tx memory descriptor can not be freed on time as far as there user application should call any write() operations to force it. Signed-off-by: Igor Ivanov --- src/vma/dev/net_device_val.cpp | 37 ++++++++++++++++++++++++++++------ src/vma/dev/ring.h | 1 + src/vma/dev/ring_bond.cpp | 25 +++++++++++++++++++++++ src/vma/dev/ring_bond.h | 3 ++- src/vma/dev/ring_simple.cpp | 7 +++++++ src/vma/dev/ring_simple.h | 3 ++- src/vma/dev/ring_tap.h | 1 + src/vma/iomux/epfd_info.cpp | 37 ++++++++++++++++++++++++++++------ 8 files changed, 100 insertions(+), 14 deletions(-) diff --git a/src/vma/dev/net_device_val.cpp b/src/vma/dev/net_device_val.cpp index fd5a434b0f..9cf3b0e383 100644 --- a/src/vma/dev/net_device_val.cpp +++ b/src/vma/dev/net_device_val.cpp @@ -1136,13 +1136,26 @@ int net_device_val::global_ring_poll_and_process_element(uint64_t *p_poll_sn, vo int ret = THE_RING->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); BULLSEYE_EXCLUDE_BLOCK_START if (ret < 0 && errno != EAGAIN) { - nd_logerr("Error in ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); + nd_logerr("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); return ret; } BULLSEYE_EXCLUDE_BLOCK_END - if (ret > 0) - nd_logfunc("ring[%p] Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); - ret_total += ret; + if (ret > 0) { + nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); + ret_total += ret; + } + + ret = THE_RING->poll_and_process_element_tx(p_poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + nd_logerr("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); + ret_total += ret; + } } return ret_total; } @@ -1154,11 +1167,23 @@ int net_device_val::global_ring_request_notification(uint64_t poll_sn) rings_hash_map_t::iterator ring_iter; for (ring_iter = m_h_ring_map.begin(); ring_iter != m_h_ring_map.end(); ring_iter++) { int ret = THE_RING->request_notification(CQT_RX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START if (ret < 0) { - nd_logerr("Error ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); + nd_logerr("Error RX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); return ret; } - nd_logfunc("ring[%p] Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_END + nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); + ret_total += ret; + + ret = THE_RING->request_notification(CQT_TX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + nd_logerr("Error TX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); ret_total += ret; } return ret_total; diff --git a/src/vma/dev/ring.h b/src/vma/dev/ring.h index 6e8ad6414e..b9cbf3597c 100644 --- a/src/vma/dev/ring.h +++ b/src/vma/dev/ring.h @@ -102,6 +102,7 @@ class ring virtual int drain_and_proccess() = 0; virtual int wait_for_notification_and_process_element(int cq_channel_fd, uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL) = 0; virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL) = 0; + virtual int poll_and_process_element_tx(uint64_t* p_cq_poll_sn) = 0; virtual void adapt_cq_moderation() = 0; virtual void mem_buf_desc_return_single_to_owner_tx(mem_buf_desc_t* p_mem_buf_desc) = 0; diff --git a/src/vma/dev/ring_bond.cpp b/src/vma/dev/ring_bond.cpp index cd5745a855..00f8819ddb 100644 --- a/src/vma/dev/ring_bond.cpp +++ b/src/vma/dev/ring_bond.cpp @@ -451,6 +451,31 @@ int ring_bond::poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_r } } +int ring_bond::poll_and_process_element_tx(uint64_t* p_cq_poll_sn) +{ + if (m_lock_ring_tx.trylock()) { + errno = EAGAIN; + return 0; + } + + int temp = 0; + int ret = 0; + for (uint32_t i = 0; i < m_bond_rings.size(); i++) { + if (m_bond_rings[i]->is_up()) { + temp = m_bond_rings[i]->poll_and_process_element_tx(p_cq_poll_sn); + if (temp > 0) { + ret += temp; + } + } + } + m_lock_ring_tx.unlock(); + if (ret > 0) { + return ret; + } else { + return temp; + } +} + int ring_bond::drain_and_proccess() { if (m_lock_ring_rx.trylock()) { diff --git a/src/vma/dev/ring_bond.h b/src/vma/dev/ring_bond.h index adfe5566f7..5c5e1b4a3a 100644 --- a/src/vma/dev/ring_bond.h +++ b/src/vma/dev/ring_bond.h @@ -54,7 +54,8 @@ class ring_bond : public ring { virtual void print_val(); virtual int request_notification(cq_type_t cq_type, uint64_t poll_sn); - virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_tx(uint64_t* p_cq_poll_sn); virtual void adapt_cq_moderation(); virtual bool reclaim_recv_buffers(descq_t *rx_reuse); virtual bool reclaim_recv_buffers(mem_buf_desc_t* rx_reuse_lst); diff --git a/src/vma/dev/ring_simple.cpp b/src/vma/dev/ring_simple.cpp index db54810474..30fcf175d0 100644 --- a/src/vma/dev/ring_simple.cpp +++ b/src/vma/dev/ring_simple.cpp @@ -356,6 +356,13 @@ int ring_simple::poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd return ret; } +int ring_simple::poll_and_process_element_tx(uint64_t* p_cq_poll_sn) +{ + int ret = 0; + RING_TRY_LOCK_RUN_AND_UPDATE_RET(m_lock_ring_tx, m_p_cq_mgr_tx->poll_and_process_element_tx(p_cq_poll_sn)); + return ret; +} + int ring_simple::socketxtreme_poll(struct vma_completion_t *vma_completions, unsigned int ncompletions, int flags) { int ret = 0; diff --git a/src/vma/dev/ring_simple.h b/src/vma/dev/ring_simple.h index f7cac457af..9a01e4a71c 100644 --- a/src/vma/dev/ring_simple.h +++ b/src/vma/dev/ring_simple.h @@ -62,7 +62,8 @@ class ring_simple : public ring_slave virtual ~ring_simple(); virtual int request_notification(cq_type_t cq_type, uint64_t poll_sn); - virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_tx(uint64_t* p_cq_poll_sn); virtual void adapt_cq_moderation(); virtual bool reclaim_recv_buffers(descq_t *rx_reuse); virtual bool reclaim_recv_buffers(mem_buf_desc_t* rx_reuse_lst); diff --git a/src/vma/dev/ring_tap.h b/src/vma/dev/ring_tap.h index cfb2934128..ef5f675c02 100644 --- a/src/vma/dev/ring_tap.h +++ b/src/vma/dev/ring_tap.h @@ -46,6 +46,7 @@ class ring_tap : public ring_slave virtual bool attach_flow(flow_tuple& flow_spec_5t, pkt_rcvr_sink* sink); virtual bool detach_flow(flow_tuple& flow_spec_5t, pkt_rcvr_sink* sink); virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_tx(uint64_t* p_cq_poll_sn) { NOT_IN_USE(p_cq_poll_sn); return 0; } virtual int wait_for_notification_and_process_element(int cq_channel_fd, uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); virtual int drain_and_proccess(); virtual bool reclaim_recv_buffers(descq_t *rx_reuse); diff --git a/src/vma/iomux/epfd_info.cpp b/src/vma/iomux/epfd_info.cpp index 850dfa9491..3dc1398763 100644 --- a/src/vma/iomux/epfd_info.cpp +++ b/src/vma/iomux/epfd_info.cpp @@ -612,14 +612,28 @@ int epfd_info::ring_poll_and_process_element(uint64_t *p_poll_sn, void* pv_fd_re int ret = iter->first->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); BULLSEYE_EXCLUDE_BLOCK_START if (ret < 0 && errno != EAGAIN) { - __log_err("Error in ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); + __log_err("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); m_ring_map_lock.unlock(); return ret; } BULLSEYE_EXCLUDE_BLOCK_END - if (ret > 0) - __log_func("ring[%p] Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); - ret_total += ret; + if (ret > 0) { + __log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); + ret_total += ret; + } + + ret = iter->first->poll_and_process_element_tx(p_poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + __log_err("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + __log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); + ret_total += ret; + } } m_ring_map_lock.unlock(); @@ -649,12 +663,23 @@ int epfd_info::ring_request_notification(uint64_t poll_sn) int ret = iter->first->request_notification(CQT_RX, poll_sn); BULLSEYE_EXCLUDE_BLOCK_START if (ret < 0) { - __log_err("Error ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); + __log_err("Error RX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + __log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, poll_sn); + ret_total += ret; + + ret = iter->first->request_notification(CQT_TX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + __log_err("Error TX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); m_ring_map_lock.unlock(); return ret; } BULLSEYE_EXCLUDE_BLOCK_END - __log_func("ring[%p] Returned with: %d (sn=%d)", iter->first, ret, poll_sn); + __log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, poll_sn); ret_total += ret; } From 306d31271651042270c9b961689ca55d9d1f6d5f Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Tue, 30 Jun 2020 20:01:26 +0300 Subject: [PATCH 14/21] issue: 1792164 Extend VMA_INTERNAL_THREAD_ARM_CQ variable Flexible tunning is added to control RX and TX polling. Signed-off-by: Igor Ivanov --- README.txt | 6 +- src/vma/dev/net_device_val.cpp | 84 ++++++++++++---------- src/vma/dev/net_device_val.h | 3 + src/vma/event/event_handler_manager.cpp | 2 +- src/vma/iomux/epfd_info.cpp | 95 ++++++++++++++----------- src/vma/iomux/epfd_info.h | 5 +- src/vma/main.cpp | 2 +- src/vma/util/sys_vars.cpp | 16 +++-- src/vma/util/sys_vars.h | 23 +++++- 9 files changed, 145 insertions(+), 91 deletions(-) diff --git a/README.txt b/README.txt index 9be039e04a..b6f2fa6327 100644 --- a/README.txt +++ b/README.txt @@ -830,11 +830,15 @@ timer expiration (once every 100ms). Application threads may be blocked till in Default value is 0 (deferred handling) VMA_INTERNAL_THREAD_ARM_CQ -Wakeup the internal thread for each packet that the CQ receive. +Wakeup the internal thread for activity on TX/RX CQ. Poll and process the packet and bring it to the socket layer. This can minimize latency in case of a busy application which is not available to receive the packet when it arrived. However, this might decrease performance in case of high pps rate application. +Disable Arm CQ is 0 +Check RX CQ is 1 +Check TX CQ is 2 +Check all CQs is 3 Default value is 0 (Disabled) VMA_WAIT_AFTER_JOIN_MSEC diff --git a/src/vma/dev/net_device_val.cpp b/src/vma/dev/net_device_val.cpp index 9cf3b0e383..492fa78e59 100644 --- a/src/vma/dev/net_device_val.cpp +++ b/src/vma/dev/net_device_val.cpp @@ -179,7 +179,9 @@ const char* ring_alloc_logic_attr::to_str() return m_str; } -net_device_val::net_device_val(struct net_device_val_desc *desc) : m_lock("net_device_val lock") +net_device_val::net_device_val(struct net_device_val_desc *desc) : + m_lock("net_device_val lock"), + m_sysvar_internal_thread_arm_cq(safe_mce_sys().internal_thread_arm_cq) { bool valid = false; ib_ctx_handler* ib_ctx; @@ -1133,28 +1135,32 @@ int net_device_val::global_ring_poll_and_process_element(uint64_t *p_poll_sn, vo auto_unlocker lock(m_lock); rings_hash_map_t::iterator ring_iter; for (ring_iter = m_h_ring_map.begin(); ring_iter != m_h_ring_map.end(); ring_iter++) { - int ret = THE_RING->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0 && errno != EAGAIN) { - nd_logerr("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); - return ret; - } - BULLSEYE_EXCLUDE_BLOCK_END - if (ret > 0) { - nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); - ret_total += ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) { + int ret = THE_RING->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + nd_logerr("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); + ret_total += ret; + } } - ret = THE_RING->poll_and_process_element_tx(p_poll_sn); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0 && errno != EAGAIN) { - nd_logerr("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); - return ret; - } - BULLSEYE_EXCLUDE_BLOCK_END - if (ret > 0) { - nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); - ret_total += ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) { + int ret = THE_RING->poll_and_process_element_tx(p_poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + nd_logerr("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); + ret_total += ret; + } } } return ret_total; @@ -1166,25 +1172,29 @@ int net_device_val::global_ring_request_notification(uint64_t poll_sn) auto_unlocker lock(m_lock); rings_hash_map_t::iterator ring_iter; for (ring_iter = m_h_ring_map.begin(); ring_iter != m_h_ring_map.end(); ring_iter++) { - int ret = THE_RING->request_notification(CQT_RX, poll_sn); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0) { - nd_logerr("Error RX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); - return ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) { + int ret = THE_RING->request_notification(CQT_RX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + nd_logerr("Error RX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); + ret_total += ret; } - BULLSEYE_EXCLUDE_BLOCK_END - nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); - ret_total += ret; - ret = THE_RING->request_notification(CQT_TX, poll_sn); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0) { - nd_logerr("Error TX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); - return ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) { + int ret = THE_RING->request_notification(CQT_TX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + nd_logerr("Error TX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); + ret_total += ret; } - BULLSEYE_EXCLUDE_BLOCK_END - nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); - ret_total += ret; } return ret_total; } diff --git a/src/vma/dev/net_device_val.h b/src/vma/dev/net_device_val.h index 45c0941ef8..3911d0816d 100644 --- a/src/vma/dev/net_device_val.h +++ b/src/vma/dev/net_device_val.h @@ -316,6 +316,9 @@ class net_device_val std::string m_name; /* container for ifname */ char m_str[BUFF_SIZE]; /* detailed information about device */ char m_base_name[IFNAMSIZ]; /* base name of device basing ifname */ + + /* Global environment variables section */ + const int m_sysvar_internal_thread_arm_cq; }; class net_device_val_eth : public net_device_val diff --git a/src/vma/event/event_handler_manager.cpp b/src/vma/event/event_handler_manager.cpp index 107c59ec5a..c7014e2b17 100644 --- a/src/vma/event/event_handler_manager.cpp +++ b/src/vma/event/event_handler_manager.cpp @@ -229,7 +229,7 @@ void event_handler_manager::register_command_event(int fd, command* cmd) event_handler_manager::event_handler_manager() : m_reg_action_q_lock("reg_action_q_lock"), - m_b_sysvar_internal_thread_arm_cq_enabled(safe_mce_sys().internal_thread_arm_cq_enabled), + m_b_sysvar_internal_thread_arm_cq_enabled(safe_mce_sys().internal_thread_arm_cq), m_n_sysvar_vma_time_measure_num_samples(safe_mce_sys().vma_time_measure_num_samples), m_n_sysvar_timer_resolution_msec(safe_mce_sys().timer_resolution_msec) { diff --git a/src/vma/iomux/epfd_info.cpp b/src/vma/iomux/epfd_info.cpp index 3dc1398763..60639be47d 100644 --- a/src/vma/iomux/epfd_info.cpp +++ b/src/vma/iomux/epfd_info.cpp @@ -55,10 +55,13 @@ int epfd_info::remove_fd_from_epoll_os(int fd) epfd_info::epfd_info(int epfd, int size) : lock_mutex_recursive("epfd_info"), m_epfd(epfd), m_size(size), m_ring_map_lock("epfd_ring_map_lock"), - m_lock_poll_os("epfd_lock_poll_os"), m_sysvar_thread_mode(safe_mce_sys().thread_mode), - m_b_os_data_available(false) + m_lock_poll_os("epfd_lock_poll_os"), + m_b_os_data_available(false), + m_sysvar_thread_mode(safe_mce_sys().thread_mode), + m_sysvar_internal_thread_arm_cq(safe_mce_sys().internal_thread_arm_cq) { __log_funcall(""); + int max_sys_fd = get_sys_max_fd_num(); if (m_size<=max_sys_fd) { @@ -609,30 +612,34 @@ int epfd_info::ring_poll_and_process_element(uint64_t *p_poll_sn, void* pv_fd_re m_ring_map_lock.lock(); for (ring_map_t::iterator iter = m_ring_map.begin(); iter != m_ring_map.end(); iter++) { - int ret = iter->first->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0 && errno != EAGAIN) { - __log_err("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); - m_ring_map_lock.unlock(); - return ret; - } - BULLSEYE_EXCLUDE_BLOCK_END - if (ret > 0) { - __log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); - ret_total += ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) { + int ret = iter->first->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + __log_err("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + __log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); + ret_total += ret; + } } - ret = iter->first->poll_and_process_element_tx(p_poll_sn); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0 && errno != EAGAIN) { - __log_err("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); - m_ring_map_lock.unlock(); - return ret; - } - BULLSEYE_EXCLUDE_BLOCK_END - if (ret > 0) { - __log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); - ret_total += ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) { + int ret = iter->first->poll_and_process_element_tx(p_poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + __log_err("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + __log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); + ret_total += ret; + } } } @@ -660,27 +667,31 @@ int epfd_info::ring_request_notification(uint64_t poll_sn) m_ring_map_lock.lock(); for (ring_map_t::iterator iter = m_ring_map.begin(); iter != m_ring_map.end(); iter++) { - int ret = iter->first->request_notification(CQT_RX, poll_sn); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0) { - __log_err("Error RX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); - m_ring_map_lock.unlock(); - return ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) { + int ret = iter->first->request_notification(CQT_RX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + __log_err("Error RX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + __log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, poll_sn); + ret_total += ret; } - BULLSEYE_EXCLUDE_BLOCK_END - __log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, poll_sn); - ret_total += ret; - ret = iter->first->request_notification(CQT_TX, poll_sn); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0) { - __log_err("Error TX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); - m_ring_map_lock.unlock(); - return ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) { + int ret = iter->first->request_notification(CQT_TX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + __log_err("Error TX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + __log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, poll_sn); + ret_total += ret; } - BULLSEYE_EXCLUDE_BLOCK_END - __log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, poll_sn); - ret_total += ret; } m_ring_map_lock.unlock(); diff --git a/src/vma/iomux/epfd_info.h b/src/vma/iomux/epfd_info.h index 3b3e434082..41aa2f97f0 100644 --- a/src/vma/iomux/epfd_info.h +++ b/src/vma/iomux/epfd_info.h @@ -129,13 +129,16 @@ class epfd_info : public lock_mutex_recursive, public cleanable_obj, public wake ring_map_t m_ring_map; lock_mutex_recursive m_ring_map_lock; lock_spin m_lock_poll_os; - const thread_mode_t m_sysvar_thread_mode; ready_cq_fd_q_t m_ready_cq_fd_q; epoll_stats_t m_local_stats; epoll_stats_t *m_stats; int m_log_invalid_events; bool m_b_os_data_available; // true when non offloaded data is available + /* Global environment variables section */ + const thread_mode_t m_sysvar_thread_mode; + const int m_sysvar_internal_thread_arm_cq; + int add_fd(int fd, epoll_event *event); int del_fd(int fd, bool passthrough = false); int mod_fd(int fd, epoll_event *event); diff --git a/src/vma/main.cpp b/src/vma/main.cpp index 69a66dd8c4..aad8ce651b 100644 --- a/src/vma/main.cpp +++ b/src/vma/main.cpp @@ -532,7 +532,7 @@ void print_vma_global_settings() VLOG_PARAM_NUMBER("Delay after join (msec)", safe_mce_sys().wait_after_join_msec, MCE_DEFAULT_WAIT_AFTER_JOIN_MSEC, SYS_VAR_WAIT_AFTER_JOIN_MSEC); VLOG_STR_PARAM_STRING("Internal Thread Affinity", safe_mce_sys().internal_thread_affinity_str, MCE_DEFAULT_INTERNAL_THREAD_AFFINITY_STR, SYS_VAR_INTERNAL_THREAD_AFFINITY, safe_mce_sys().internal_thread_affinity_str); VLOG_STR_PARAM_STRING("Internal Thread Cpuset", safe_mce_sys().internal_thread_cpuset, MCE_DEFAULT_INTERNAL_THREAD_CPUSET, SYS_VAR_INTERNAL_THREAD_CPUSET, safe_mce_sys().internal_thread_cpuset); - VLOG_PARAM_STRING("Internal Thread Arm CQ", safe_mce_sys().internal_thread_arm_cq_enabled, MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ_ENABLED, SYS_VAR_INTERNAL_THREAD_ARM_CQ, safe_mce_sys().internal_thread_arm_cq_enabled ? "Enabled " : "Disabled"); + VLOG_PARAM_STRING("Internal Thread Arm CQ", safe_mce_sys().internal_thread_arm_cq, MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ, SYS_VAR_INTERNAL_THREAD_ARM_CQ, safe_mce_sys().arm_cq_str((mce_sys_var::arm_cq_t)safe_mce_sys().internal_thread_arm_cq)); VLOG_PARAM_NUMSTR("Internal Thread TCP Handling", safe_mce_sys().internal_thread_tcp_timer_handling, MCE_DEFAULT_INTERNAL_THREAD_TCP_TIMER_HANDLING, SYS_VAR_INTERNAL_THREAD_TCP_TIMER_HANDLING, internal_thread_tcp_timer_handling_str(safe_mce_sys().internal_thread_tcp_timer_handling)); VLOG_PARAM_STRING("Thread mode", safe_mce_sys().thread_mode, MCE_DEFAULT_THREAD_MODE, SYS_VAR_THREAD_MODE, thread_mode_str(safe_mce_sys().thread_mode)); VLOG_PARAM_NUMSTR("Buffer batching mode", safe_mce_sys().buffer_batching_mode, MCE_DEFAULT_BUFFER_BATCHING_MODE, SYS_VAR_BUFFER_BATCHING_MODE, buffer_batching_mode_str(safe_mce_sys().buffer_batching_mode)); diff --git a/src/vma/util/sys_vars.cpp b/src/vma/util/sys_vars.cpp index 49b04cb999..1f8f7c6972 100644 --- a/src/vma/util/sys_vars.cpp +++ b/src/vma/util/sys_vars.cpp @@ -583,7 +583,7 @@ void mce_sys_var::get_env_params() progress_engine_wce_max = MCE_DEFAULT_PROGRESS_ENGINE_WCE_MAX; cq_keep_qp_full = MCE_DEFAULT_CQ_KEEP_QP_FULL; qp_compensation_level = MCE_DEFAULT_QP_COMPENSATION_LEVEL; - internal_thread_arm_cq_enabled = MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ_ENABLED; + internal_thread_arm_cq = MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ; offloaded_sockets = MCE_DEFAULT_OFFLOADED_SOCKETS; timer_resolution_msec = MCE_DEFAULT_TIMER_RESOLUTION_MSEC; @@ -1161,12 +1161,16 @@ void mce_sys_var::get_env_params() tcp_timer_resolution_msec = timer_resolution_msec; } - if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_ARM_CQ)) != NULL) - internal_thread_arm_cq_enabled = atoi(env_ptr) ? true : false; + if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_ARM_CQ)) != NULL) { + internal_thread_arm_cq = (arm_cq_t)atoi(env_ptr); + if (internal_thread_arm_cq < 0 || internal_thread_arm_cq > mce_sys_var::ARM_CQ_ALL) { + internal_thread_arm_cq = MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ; + } + } - if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_CPUSET)) != NULL) { - snprintf(internal_thread_cpuset, FILENAME_MAX, "%s", env_ptr); - } + if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_CPUSET)) != NULL) { + snprintf(internal_thread_cpuset, FILENAME_MAX, "%s", env_ptr); + } // handle internal thread affinity - default is CPU-0 if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_AFFINITY)) != NULL) { diff --git a/src/vma/util/sys_vars.h b/src/vma/util/sys_vars.h index f0357b19f3..e4defc9097 100644 --- a/src/vma/util/sys_vars.h +++ b/src/vma/util/sys_vars.h @@ -296,6 +296,25 @@ struct mce_sys_var { HYPER_VMWARE }; + enum arm_cq_t { + ARM_CQ_OFF = 0, + ARM_CQ_RX = 0x01, + ARM_CQ_TX = 0x02, + ARM_CQ_ALL = 0x03 + } ; + + inline const char* arm_cq_str(arm_cq_t value) + { + switch (value) { + case ARM_CQ_OFF: return "(Disabled)"; + case ARM_CQ_RX: return "(Arm RX CQ)"; + case ARM_CQ_TX: return "(Arm TX CQ)"; + case ARM_CQ_ALL: return "(Arm All)"; + default: break; + } + return "unsupported"; + } + public: void get_env_params(); @@ -406,7 +425,7 @@ struct mce_sys_var { char internal_thread_cpuset[FILENAME_MAX]; char internal_thread_affinity_str[FILENAME_MAX]; cpu_set_t internal_thread_affinity; - bool internal_thread_arm_cq_enabled; + int internal_thread_arm_cq; internal_thread_tcp_timer_handling_t internal_thread_tcp_timer_handling; bool handle_bf; @@ -647,7 +666,7 @@ extern mce_sys_var & safe_mce_sys(); #define MCE_DEFAULT_PROGRESS_ENGINE_WCE_MAX (10000) #define MCE_DEFAULT_CQ_KEEP_QP_FULL (true) #define MCE_DEFAULT_QP_COMPENSATION_LEVEL (256) -#define MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ_ENABLED (false) +#define MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ (mce_sys_var::ARM_CQ_OFF) #define MCE_DEFAULT_QP_FORCE_MC_ATTACH (false) #define MCE_DEFAULT_OFFLOADED_SOCKETS (true) #define MCE_DEFAULT_TIMER_RESOLUTION_MSEC (10) From 50996ba82951ead5362ecd575b32e6f84e6879e1 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Wed, 1 Jul 2020 13:19:05 +0300 Subject: [PATCH 15/21] issue: 1792164 Support MSG_ERRQUEUE in sockinfo::rx() rx() processing should allow return information from error queue and income data in single call. Depending on user application it means that rx() logic should return: 1. only income data 2. only error queue data 3. income and error queue data Error processing logic is done accordingly. Signed-off-by: Igor Ivanov --- src/vma/sock/sockinfo_tcp.cpp | 43 ++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index c4f343d638..472a8dddb6 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -1910,7 +1910,7 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov int total_rx = 0; int poll_count = 0; int bytes_to_tcp_recved; - size_t total_iov_sz = 1; + size_t total_iov_sz = 0; int out_flags = 0; int in_flags = *p_flags; bool block_this_run = BLOCK_THIS_RUN(m_b_blocking, in_flags); @@ -1932,23 +1932,43 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov TAKE_T_RX_START; #endif - if (unlikely((in_flags & MSG_WAITALL) && !(in_flags & MSG_PEEK))) { - total_iov_sz = 0; - for (int i = 0; i < sz_iov; i++) { - total_iov_sz += p_iov[i].iov_len; + /* In general, without any special flags, socket options, or ioctls being set, + * a recv call on a blocking TCP socket will return any number of bytes less than + * or equal to the size being requested. But unless the socket is closed remotely, + * interrupted by signal, or in an error state, + * it will block until at least 1 byte is available. + * With MSG_ERRQUEUE flag user application can request just information from + * error queue without any income data. + */ + if (p_iov && (sz_iov > 0)) { + total_iov_sz = 1; + if (unlikely((in_flags & MSG_WAITALL) && !(in_flags & MSG_PEEK))) { + total_iov_sz = 0; + for (int i = 0; i < sz_iov; i++) { + total_iov_sz += p_iov[i].iov_len; + } + if (total_iov_sz == 0) + return 0; } - if (total_iov_sz == 0) - return 0; } si_tcp_logfunc("rx: iov=%p niovs=%d", p_iov, sz_iov); - /* poll rx queue till we have something */ + + /* poll rx queue till we have something */ lock_tcp_con(); + if (__msg) { + handle_cmsg(__msg, in_flags); + if (__msg->msg_controllen == 0) { + errno = EAGAIN; + unlock_tcp_con(); + return -1; + } + } return_reuse_buffers_postponed(); unlock_tcp_con(); while (m_rx_ready_byte_count < total_iov_sz) { - if (unlikely(g_b_exit ||!is_rtr() || (rx_wait_lockless(poll_count, block_this_run) < 0))) { + if (unlikely(g_b_exit || !is_rtr() || (rx_wait_lockless(poll_count, block_this_run) < 0))) { return handle_rx_error(block_this_run); } } @@ -1957,8 +1977,9 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov si_tcp_logfunc("something in rx queues: %d %p", m_n_rx_pkt_ready_list_count, m_rx_pkt_ready_list.front()); - total_rx = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); - if (__msg) handle_cmsg(__msg, in_flags); + if (total_iov_sz > 0) { + total_rx = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); + } /* * RCVBUFF Accounting: Going 'out' of the internal buffer: if some bytes are not tcp_recved yet - do that. From f80809145e5e391693ac486aca94a8114306f015 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Wed, 1 Jul 2020 16:50:07 +0300 Subject: [PATCH 16/21] issue: 1792164 Support checking POLLERR events by poll() Signed-off-by: Igor Ivanov --- src/vma/iomux/poll_call.cpp | 2 +- src/vma/sock/sockinfo_tcp.cpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/vma/iomux/poll_call.cpp b/src/vma/iomux/poll_call.cpp index 42130c8f6f..e96c663565 100644 --- a/src/vma/iomux/poll_call.cpp +++ b/src/vma/iomux/poll_call.cpp @@ -71,7 +71,7 @@ poll_call::poll_call(int *off_rfds_buffer, offloaded_mode_t *off_modes_buffer, i socket_fd_api* temp_sock_fd_api = fd_collection_get_sockfd(fd); if (temp_sock_fd_api && (temp_sock_fd_api->get_type()==FD_TYPE_SOCKET)) { offloaded_mode_t off_mode = OFF_NONE; - if (m_orig_fds[i].events & POLLIN) + if (m_orig_fds[i].events & (POLLIN | POLLERR | POLLHUP)) off_mode = (offloaded_mode_t)(off_mode | OFF_READ); if (m_orig_fds[i].events & POLLOUT) off_mode = (offloaded_mode_t)(off_mode | OFF_WRITE); diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index 472a8dddb6..d4bb1d9334 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -3329,7 +3329,8 @@ bool sockinfo_tcp::is_errorable(int *errors) *errors |= POLLHUP; } - if (m_conn_state == TCP_CONN_ERROR) { + if ((m_conn_state == TCP_CONN_ERROR) || + (!m_error_queue.empty())) { *errors |= POLLERR; } From fc9acbe6209d923c6b1480657384257c7b5c1a0e Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Wed, 7 Oct 2020 18:57:59 +0300 Subject: [PATCH 17/21] issue: 1792164 Improve LSO send flow LSO operation can not be done when payload data less than mss. This change allows to use LSO in right way. Signed-off-by: Igor Ivanov --- src/vma/proto/dst_entry.h | 1 + src/vma/proto/dst_entry_tcp.cpp | 19 +++++++++++++------ src/vma/sock/sockinfo_tcp.cpp | 3 ++- src/vma/sock/sockinfo_udp.cpp | 2 +- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/vma/proto/dst_entry.h b/src/vma/proto/dst_entry.h index 65ec18af97..04f8182c4b 100644 --- a/src/vma/proto/dst_entry.h +++ b/src/vma/proto/dst_entry.h @@ -65,6 +65,7 @@ struct socket_data { typedef struct { vma_wr_tx_packet_attr flags; uint16_t mss; + uint32_t length; } vma_send_attr; class dst_entry : public cache_observer, public tostr, public neigh_observer diff --git a/src/vma/proto/dst_entry_tcp.cpp b/src/vma/proto/dst_entry_tcp.cpp index 9629e6be55..e6bdfcc53c 100644 --- a/src/vma/proto/dst_entry_tcp.cpp +++ b/src/vma/proto/dst_entry_tcp.cpp @@ -109,10 +109,10 @@ ssize_t dst_entry_tcp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s */ p_pkt = (tx_packet_template_t*)((uint8_t*)p_tcp_iov[0].iovec.iov_base - m_header.m_aligned_l2_l3_len); - /* iov_len is a size of TCP header and data + /* attr.length is payload size and L4 header size * m_total_hdr_len is a size of L2/L3 header */ - total_packet_len = p_tcp_iov[0].iovec.iov_len + m_header.m_total_hdr_len; + total_packet_len = attr.length + m_header.m_total_hdr_len; /* copy just L2/L3 headers to p_pkt */ m_header.copy_l2_ip_hdr(p_pkt); @@ -130,10 +130,17 @@ ssize_t dst_entry_tcp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s } else if (is_set(attr.flags, (vma_wr_tx_packet_attr)(VMA_TX_PACKET_TSO))) { /* update send work request. do not expect noninlined scenario */ send_wqe_h.init_not_inline_wqe(send_wqe, m_sge, sz_iov); - send_wqe_h.enable_tso(send_wqe, - (void *)((uint8_t*)p_pkt + hdr_alignment_diff), - m_header.m_total_hdr_len + p_pkt->hdr.m_tcp_hdr.doff * 4, - attr.mss); + if (attr.mss < (attr.length - p_pkt->hdr.m_tcp_hdr.doff * 4)) { + send_wqe_h.enable_tso(send_wqe, + (void *)((uint8_t*)p_pkt + hdr_alignment_diff), + m_header.m_total_hdr_len + p_pkt->hdr.m_tcp_hdr.doff * 4, + attr.mss); + } else { + send_wqe_h.enable_tso(send_wqe, + (void *)((uint8_t*)p_pkt + hdr_alignment_diff), + m_header.m_total_hdr_len + p_pkt->hdr.m_tcp_hdr.doff * 4, + 0); + } m_p_send_wqe = &send_wqe; m_sge[0].addr = (uintptr_t)((uint8_t *)&p_pkt->hdr.m_tcp_hdr + p_pkt->hdr.m_tcp_hdr.doff * 4); m_sge[0].length = p_tcp_iov[0].iovec.iov_len - p_pkt->hdr.m_tcp_hdr.doff * 4; diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index d4bb1d9334..11f04e27bb 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -1005,7 +1005,7 @@ err_t sockinfo_tcp::ip_output(struct pbuf *p, void* v_p_conn, uint16_t flags) dst_entry *p_dst = p_si_tcp->m_p_connected_dst_entry; int max_count = p_si_tcp->m_pcb.tso.max_send_sge; tcp_iovec lwip_iovec[max_count]; - vma_send_attr attr = {(vma_wr_tx_packet_attr)0, 0}; + vma_send_attr attr = {(vma_wr_tx_packet_attr)0, 0, 0}; int count = 0; /* maximum number of sge can not exceed this value */ @@ -1013,6 +1013,7 @@ err_t sockinfo_tcp::ip_output(struct pbuf *p, void* v_p_conn, uint16_t flags) lwip_iovec[count].iovec.iov_base = p->payload; lwip_iovec[count].iovec.iov_len = p->len; lwip_iovec[count].p_desc = (mem_buf_desc_t*)p; + attr.length += p->len; p = p->next; count++; } diff --git a/src/vma/sock/sockinfo_udp.cpp b/src/vma/sock/sockinfo_udp.cpp index 9efe4f850a..98dce87dea 100644 --- a/src/vma/sock/sockinfo_udp.cpp +++ b/src/vma/sock/sockinfo_udp.cpp @@ -1657,7 +1657,7 @@ ssize_t sockinfo_udp::tx(vma_tx_call_attr_t &tx_arg) { #ifdef DEFINED_TSO - vma_send_attr attr = {(vma_wr_tx_packet_attr)0, 0}; + vma_send_attr attr = {(vma_wr_tx_packet_attr)0, 0, 0}; bool b_blocking = m_b_blocking; if (unlikely(__flags & MSG_DONTWAIT)) b_blocking = false; From 1392cb66a4b353131a810f9d05a727b0a41edda1 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Mon, 5 Oct 2020 18:44:22 +0300 Subject: [PATCH 18/21] issue: 1792164 Fix race access to error queue Signed-off-by: Igor Ivanov --- src/vma/sock/sockinfo.cpp | 2 ++ src/vma/sock/sockinfo.h | 1 + src/vma/sock/sockinfo_tcp.cpp | 29 ++++++++++++++++++++--------- src/vma/sock/sockinfo_tcp.h | 1 + 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index 51044b09b6..cdb67ff4d0 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -1663,7 +1663,9 @@ void sockinfo::handle_recv_errqueue(struct cmsg_state *cm_state) return; } + m_error_queue_lock.lock(); buff = m_error_queue.get_and_pop_front(); + m_error_queue_lock.unlock(); if (!(buff->m_flags & mem_buf_desc_t::CLONED)) { si_logerr("Detected invalid element in socket error queue as %p with flags 0x%x", diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index dee7325b0e..6b1edce210 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -269,6 +269,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou * to provide notification ability. */ descq_t m_error_queue; + lock_spin m_error_queue_lock; /* TX zcopy counter * The notification itself for tx zcopy operation is a simple scalar value. diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index 11f04e27bb..1d6650e4e7 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -4635,10 +4635,6 @@ mem_buf_desc_t* sockinfo_tcp::tcp_tx_zc_alloc(mem_buf_desc_t* p_desc) void sockinfo_tcp::tcp_tx_zc_callback(mem_buf_desc_t* p_desc) { - uint32_t lo, hi; - uint16_t count; - uint32_t prev_lo, prev_hi; - mem_buf_desc_t* err_queue = NULL; sockinfo_tcp* sock = NULL; if (!p_desc) { @@ -4655,6 +4651,22 @@ void sockinfo_tcp::tcp_tx_zc_callback(mem_buf_desc_t* p_desc) goto cleanup; } + sock->tcp_tx_zc_handle(p_desc); + +cleanup: + /* Clean up */ + p_desc->m_flags &= ~mem_buf_desc_t::ZCOPY; + memset(&p_desc->tx.zc, 0, sizeof(p_desc->tx.zc)); +} + +void sockinfo_tcp::tcp_tx_zc_handle(mem_buf_desc_t* p_desc) +{ + uint32_t lo, hi; + uint16_t count; + uint32_t prev_lo, prev_hi; + mem_buf_desc_t* err_queue = NULL; + sockinfo_tcp* sock = this; + count = p_desc->tx.zc.count; lo = p_desc->tx.zc.id; hi = lo + count - 1; @@ -4665,6 +4677,8 @@ void sockinfo_tcp::tcp_tx_zc_callback(mem_buf_desc_t* p_desc) p_desc->ee.ee_info = lo; // p_desc->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED; + m_error_queue_lock.lock(); + /* Update last error queue element in case it has the same type */ err_queue = sock->m_error_queue.back(); if (err_queue && @@ -4691,14 +4705,11 @@ void sockinfo_tcp::tcp_tx_zc_callback(mem_buf_desc_t* p_desc) sock->m_error_queue.push_back(err_queue); } + m_error_queue_lock.unlock(); + /* Signal events on socket */ NOTIFY_ON_EVENTS(sock, EPOLLERR); sock->do_wakeup(); - -cleanup: - /* Clean up */ - p_desc->m_flags &= ~mem_buf_desc_t::ZCOPY; - memset(&p_desc->tx.zc, 0, sizeof(p_desc->tx.zc)); } struct tcp_seg * sockinfo_tcp::tcp_seg_alloc(void* p_conn) diff --git a/src/vma/sock/sockinfo_tcp.h b/src/vma/sock/sockinfo_tcp.h index a46ecc57d9..323de25ae3 100644 --- a/src/vma/sock/sockinfo_tcp.h +++ b/src/vma/sock/sockinfo_tcp.h @@ -187,6 +187,7 @@ class sockinfo_tcp : public sockinfo, public timer_handler mem_buf_desc_t* tcp_tx_zc_alloc(mem_buf_desc_t* p_desc); static void tcp_tx_zc_callback(mem_buf_desc_t* p_desc); + void tcp_tx_zc_handle(mem_buf_desc_t* p_desc); bool inline is_readable(uint64_t *p_poll_sn, fd_array_t *p_fd_array = NULL); bool inline is_writeable(); From b0b5b528f2cc23ff9815cc5328d1cb25f860bfc6 Mon Sep 17 00:00:00 2001 From: Dmytro Podgornyi Date: Wed, 11 Nov 2020 07:35:14 -0600 Subject: [PATCH 19/21] issue: 1792164 Remove epoll event when it is consumed Zcopy notification mechanism (error queue) adds an event EPOLLERR to respective epfd_info object and it is never removed. This leads to the issue that epoll_wait() returns EPOLLERR event endlessly and doesn't enter polling loops. Fix this by removing EPOLLERR event when socket becomes not "errorable". The fix avoids fake EPOLLERR events and allows epoll_wait_helper() to perform polling. --- src/vma/iomux/epoll_wait_call.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/vma/iomux/epoll_wait_call.cpp b/src/vma/iomux/epoll_wait_call.cpp index 07530e1544..e5bb2c78ff 100644 --- a/src/vma/iomux/epoll_wait_call.cpp +++ b/src/vma/iomux/epoll_wait_call.cpp @@ -117,6 +117,15 @@ int epoll_wait_call::get_current_events() mutual_events &= ~EPOLLOUT; } + // handle zcopy notification mechanism + if (mutual_events & EPOLLERR) { + int unused; + if (handle_epoll_event(p_socket_object->is_errorable(&unused), EPOLLERR, p_socket_object, i)) { + got_event = true; + } + mutual_events &= ~EPOLLERR; + } + if (mutual_events) { if (handle_epoll_event(true, mutual_events, p_socket_object, i)) { got_event = true; From d2db645f328eeb8ab4fb3d4801a9becea3e2634b Mon Sep 17 00:00:00 2001 From: Dmytro Podgornyi Date: Wed, 11 Nov 2020 12:01:47 -0600 Subject: [PATCH 20/21] issue: 1792164 Handle duplicate zcopy notifications In retransmit scenario it is possible to get duplicate ids in the zcopy callback. In this case, ee_data is rewritten with a value which may be lower than previous value. This leads to missed notifications. As workaround, don't overwrite ee_data with lower value. --- src/vma/sock/sockinfo_tcp.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index 1d6650e4e7..5fface693a 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -4691,7 +4691,8 @@ void sockinfo_tcp::tcp_tx_zc_handle(mem_buf_desc_t* p_desc) sum_count = prev_hi - prev_lo + 1ULL + count; if (lo == prev_lo) { - err_queue->ee.ee_data = hi; + if (hi > prev_hi) + err_queue->ee.ee_data = hi; } else if ((sum_count >= (1ULL << 32)) || (lo != prev_hi + 1)) { err_queue = NULL; } else { From 07a71ec29cdbd2ab4254bdf56e1dce125a93aa6e Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Thu, 21 Jan 2021 13:37:46 +0200 Subject: [PATCH 21/21] issue: 2439102 Fix issue with processing control msg in recvmsg() Control message should be handled just in case an user passes a buffer for it. Error queue request must be processed first before data. Signed-off-by: Igor Ivanov --- src/vma/sock/sockinfo_tcp.cpp | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index 5fface693a..57555d0db1 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -1957,9 +1957,13 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov /* poll rx queue till we have something */ lock_tcp_con(); - if (__msg) { - handle_cmsg(__msg, in_flags); - if (__msg->msg_controllen == 0) { + + /* error queue request should be handled first + * It allows to return immediately during failure with correct + * error notification without data processing + */ + if (__msg && __msg->msg_control && (in_flags & MSG_ERRQUEUE)) { + if (m_error_queue.empty()) { errno = EAGAIN; unlock_tcp_con(); return -1; @@ -1982,6 +1986,11 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov total_rx = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); } + /* Handle all control message requests */ + if (__msg && __msg->msg_control) { + handle_cmsg(__msg, in_flags); + } + /* * RCVBUFF Accounting: Going 'out' of the internal buffer: if some bytes are not tcp_recved yet - do that. * The packet might not be 'acked' (tcp_recved)