diff --git a/README.txt b/README.txt index 9be039e04a..b6f2fa6327 100644 --- a/README.txt +++ b/README.txt @@ -830,11 +830,15 @@ timer expiration (once every 100ms). Application threads may be blocked till in Default value is 0 (deferred handling) VMA_INTERNAL_THREAD_ARM_CQ -Wakeup the internal thread for each packet that the CQ receive. +Wakeup the internal thread for activity on TX/RX CQ. Poll and process the packet and bring it to the socket layer. This can minimize latency in case of a busy application which is not available to receive the packet when it arrived. However, this might decrease performance in case of high pps rate application. +Disable Arm CQ is 0 +Check RX CQ is 1 +Check TX CQ is 2 +Check all CQs is 3 Default value is 0 (Disabled) VMA_WAIT_AFTER_JOIN_MSEC diff --git a/src/vma/dev/buffer_pool.h b/src/vma/dev/buffer_pool.h index 606f37b095..1c229a7fa6 100644 --- a/src/vma/dev/buffer_pool.h +++ b/src/vma/dev/buffer_pool.h @@ -41,6 +41,12 @@ inline static void free_lwip_pbuf(struct pbuf_custom *pbuf_custom) { + mem_buf_desc_t* p_desc = (mem_buf_desc_t *)pbuf_custom; + + if (p_desc->m_flags & mem_buf_desc_t::ZCOPY) { + p_desc->tx.zc.callback(p_desc); + } + pbuf_custom->pbuf.type = 0; pbuf_custom->pbuf.flags = 0; pbuf_custom->pbuf.ref = 0; } diff --git a/src/vma/dev/net_device_val.cpp b/src/vma/dev/net_device_val.cpp index fd5a434b0f..492fa78e59 100644 --- a/src/vma/dev/net_device_val.cpp +++ b/src/vma/dev/net_device_val.cpp @@ -179,7 +179,9 @@ const char* ring_alloc_logic_attr::to_str() return m_str; } -net_device_val::net_device_val(struct net_device_val_desc *desc) : m_lock("net_device_val lock") +net_device_val::net_device_val(struct net_device_val_desc *desc) : + m_lock("net_device_val lock"), + m_sysvar_internal_thread_arm_cq(safe_mce_sys().internal_thread_arm_cq) { bool valid = false; ib_ctx_handler* ib_ctx; @@ -1133,16 +1135,33 @@ int net_device_val::global_ring_poll_and_process_element(uint64_t *p_poll_sn, vo auto_unlocker lock(m_lock); rings_hash_map_t::iterator ring_iter; for (ring_iter = m_h_ring_map.begin(); ring_iter != m_h_ring_map.end(); ring_iter++) { - int ret = THE_RING->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0 && errno != EAGAIN) { - nd_logerr("Error in ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); - return ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) { + int ret = THE_RING->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + nd_logerr("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); + ret_total += ret; + } + } + + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) { + int ret = THE_RING->poll_and_process_element_tx(p_poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + nd_logerr("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); + ret_total += ret; + } } - BULLSEYE_EXCLUDE_BLOCK_END - if (ret > 0) - nd_logfunc("ring[%p] Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn); - ret_total += ret; } return ret_total; } @@ -1153,13 +1172,29 @@ int net_device_val::global_ring_request_notification(uint64_t poll_sn) auto_unlocker lock(m_lock); rings_hash_map_t::iterator ring_iter; for (ring_iter = m_h_ring_map.begin(); ring_iter != m_h_ring_map.end(); ring_iter++) { - int ret = THE_RING->request_notification(CQT_RX, poll_sn); - if (ret < 0) { - nd_logerr("Error ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); - return ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) { + int ret = THE_RING->request_notification(CQT_RX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + nd_logerr("Error RX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); + ret_total += ret; + } + + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) { + int ret = THE_RING->request_notification(CQT_TX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + nd_logerr("Error TX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); + ret_total += ret; } - nd_logfunc("ring[%p] Returned with: %d (sn=%d)", THE_RING, ret, poll_sn); - ret_total += ret; } return ret_total; } diff --git a/src/vma/dev/net_device_val.h b/src/vma/dev/net_device_val.h index 45c0941ef8..3911d0816d 100644 --- a/src/vma/dev/net_device_val.h +++ b/src/vma/dev/net_device_val.h @@ -316,6 +316,9 @@ class net_device_val std::string m_name; /* container for ifname */ char m_str[BUFF_SIZE]; /* detailed information about device */ char m_base_name[IFNAMSIZ]; /* base name of device basing ifname */ + + /* Global environment variables section */ + const int m_sysvar_internal_thread_arm_cq; }; class net_device_val_eth : public net_device_val diff --git a/src/vma/dev/qp_mgr.cpp b/src/vma/dev/qp_mgr.cpp index 4a78bbe6fc..f0fa8a7b51 100644 --- a/src/vma/dev/qp_mgr.cpp +++ b/src/vma/dev/qp_mgr.cpp @@ -100,7 +100,6 @@ qp_mgr::qp_mgr(const ring_simple* p_ring, const ib_ctx_handler* p_context, m_ibv_rx_sg_array = new ibv_sge[m_n_sysvar_rx_num_wr_to_post_recv]; m_ibv_rx_wr_array = new ibv_recv_wr[m_n_sysvar_rx_num_wr_to_post_recv]; - set_unsignaled_count(); memset(&m_rate_limit, 0, sizeof(struct vma_rate_limit_t)); qp_logfunc(""); @@ -336,7 +335,6 @@ void qp_mgr::up() release_tx_buffers(); /* clean any link to completions with error we might have */ - set_unsignaled_count(); m_p_last_tx_mem_buf_desc = NULL; modify_qp_to_ready_state(); @@ -497,7 +495,6 @@ void qp_mgr::trigger_completion_for_all_sent_packets() // Close the Tx unsignaled send list set_unsignaled_count(); - m_p_last_tx_mem_buf_desc = NULL; if (!m_p_ring->m_tx_num_wr_free) { qp_logdbg("failed to trigger completion for all packets due to no available wr"); @@ -599,9 +596,19 @@ inline int qp_mgr::send_to_wire(vma_ibv_send_wr* p_send_wqe, vma_wr_tx_packet_at int qp_mgr::send(vma_ibv_send_wr* p_send_wqe, vma_wr_tx_packet_attr attr) { mem_buf_desc_t* p_mem_buf_desc = (mem_buf_desc_t *)p_send_wqe->wr_id; + /* Control tx completions: + * - VMA_TX_WRE_BATCHING - The number of Tx Work Request Elements used + * until a completion signal is requested. + * - ZCOPY packets should notify application as soon as possible to + * confirm one that user buffers are free to reuse. So force completion + * signal for such work requests. + * - First call of send() should do completion. It means that + * m_n_unsignaled_count must be zero for this time. + */ + bool request_comp = (is_completion_need() || + (p_mem_buf_desc->m_flags & mem_buf_desc_t::ZCOPY)); qp_logfunc("VERBS send, unsignaled_count: %d", m_n_unsignaled_count); - bool request_comp = is_completion_need(); #ifdef VMA_TIME_MEASURE TAKE_T_TX_POST_SEND_START; @@ -636,7 +643,6 @@ int qp_mgr::send(vma_ibv_send_wr* p_send_wqe, vma_wr_tx_packet_attr attr) int ret; set_unsignaled_count(); - m_p_last_tx_mem_buf_desc = NULL; // Poll the Tx CQ uint64_t dummy_poll_sn = 0; diff --git a/src/vma/dev/qp_mgr.h b/src/vma/dev/qp_mgr.h index 9a7b52c7e0..38898aed73 100644 --- a/src/vma/dev/qp_mgr.h +++ b/src/vma/dev/qp_mgr.h @@ -172,7 +172,10 @@ friend class cq_mgr_mp; int configure(struct ibv_comp_channel* p_rx_comp_event_channel); virtual int prepare_ibv_qp(vma_ibv_qp_init_attr& qp_init_attr) = 0; - inline void set_unsignaled_count(void) { m_n_unsignaled_count = m_n_sysvar_tx_num_wr_to_signal - 1; } + inline void set_unsignaled_count(void) { + m_n_unsignaled_count = m_n_sysvar_tx_num_wr_to_signal - 1; + m_p_last_tx_mem_buf_desc = NULL; + } virtual cq_mgr* init_rx_cq_mgr(struct ibv_comp_channel* p_rx_comp_event_channel); virtual cq_mgr* init_tx_cq_mgr(void); diff --git a/src/vma/dev/qp_mgr_eth_mlx5.cpp b/src/vma/dev/qp_mgr_eth_mlx5.cpp index 3557629f54..e97d223aa5 100644 --- a/src/vma/dev/qp_mgr_eth_mlx5.cpp +++ b/src/vma/dev/qp_mgr_eth_mlx5.cpp @@ -887,7 +887,6 @@ void qp_mgr_eth_mlx5::trigger_completion_for_all_sent_packets() // Close the Tx unsignaled send list set_unsignaled_count(); - m_p_last_tx_mem_buf_desc = NULL; if (!m_p_ring->m_tx_num_wr_free) { qp_logdbg("failed to trigger completion for all packets due to no available wr"); diff --git a/src/vma/dev/ring.h b/src/vma/dev/ring.h index 6e8ad6414e..b9cbf3597c 100644 --- a/src/vma/dev/ring.h +++ b/src/vma/dev/ring.h @@ -102,6 +102,7 @@ class ring virtual int drain_and_proccess() = 0; virtual int wait_for_notification_and_process_element(int cq_channel_fd, uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL) = 0; virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL) = 0; + virtual int poll_and_process_element_tx(uint64_t* p_cq_poll_sn) = 0; virtual void adapt_cq_moderation() = 0; virtual void mem_buf_desc_return_single_to_owner_tx(mem_buf_desc_t* p_mem_buf_desc) = 0; diff --git a/src/vma/dev/ring_bond.cpp b/src/vma/dev/ring_bond.cpp index cd5745a855..00f8819ddb 100644 --- a/src/vma/dev/ring_bond.cpp +++ b/src/vma/dev/ring_bond.cpp @@ -451,6 +451,31 @@ int ring_bond::poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_r } } +int ring_bond::poll_and_process_element_tx(uint64_t* p_cq_poll_sn) +{ + if (m_lock_ring_tx.trylock()) { + errno = EAGAIN; + return 0; + } + + int temp = 0; + int ret = 0; + for (uint32_t i = 0; i < m_bond_rings.size(); i++) { + if (m_bond_rings[i]->is_up()) { + temp = m_bond_rings[i]->poll_and_process_element_tx(p_cq_poll_sn); + if (temp > 0) { + ret += temp; + } + } + } + m_lock_ring_tx.unlock(); + if (ret > 0) { + return ret; + } else { + return temp; + } +} + int ring_bond::drain_and_proccess() { if (m_lock_ring_rx.trylock()) { diff --git a/src/vma/dev/ring_bond.h b/src/vma/dev/ring_bond.h index adfe5566f7..5c5e1b4a3a 100644 --- a/src/vma/dev/ring_bond.h +++ b/src/vma/dev/ring_bond.h @@ -54,7 +54,8 @@ class ring_bond : public ring { virtual void print_val(); virtual int request_notification(cq_type_t cq_type, uint64_t poll_sn); - virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_tx(uint64_t* p_cq_poll_sn); virtual void adapt_cq_moderation(); virtual bool reclaim_recv_buffers(descq_t *rx_reuse); virtual bool reclaim_recv_buffers(mem_buf_desc_t* rx_reuse_lst); diff --git a/src/vma/dev/ring_simple.cpp b/src/vma/dev/ring_simple.cpp index db54810474..30fcf175d0 100644 --- a/src/vma/dev/ring_simple.cpp +++ b/src/vma/dev/ring_simple.cpp @@ -356,6 +356,13 @@ int ring_simple::poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd return ret; } +int ring_simple::poll_and_process_element_tx(uint64_t* p_cq_poll_sn) +{ + int ret = 0; + RING_TRY_LOCK_RUN_AND_UPDATE_RET(m_lock_ring_tx, m_p_cq_mgr_tx->poll_and_process_element_tx(p_cq_poll_sn)); + return ret; +} + int ring_simple::socketxtreme_poll(struct vma_completion_t *vma_completions, unsigned int ncompletions, int flags) { int ret = 0; diff --git a/src/vma/dev/ring_simple.h b/src/vma/dev/ring_simple.h index f7cac457af..9a01e4a71c 100644 --- a/src/vma/dev/ring_simple.h +++ b/src/vma/dev/ring_simple.h @@ -62,7 +62,8 @@ class ring_simple : public ring_slave virtual ~ring_simple(); virtual int request_notification(cq_type_t cq_type, uint64_t poll_sn); - virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_tx(uint64_t* p_cq_poll_sn); virtual void adapt_cq_moderation(); virtual bool reclaim_recv_buffers(descq_t *rx_reuse); virtual bool reclaim_recv_buffers(mem_buf_desc_t* rx_reuse_lst); diff --git a/src/vma/dev/ring_tap.h b/src/vma/dev/ring_tap.h index cfb2934128..ef5f675c02 100644 --- a/src/vma/dev/ring_tap.h +++ b/src/vma/dev/ring_tap.h @@ -46,6 +46,7 @@ class ring_tap : public ring_slave virtual bool attach_flow(flow_tuple& flow_spec_5t, pkt_rcvr_sink* sink); virtual bool detach_flow(flow_tuple& flow_spec_5t, pkt_rcvr_sink* sink); virtual int poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); + virtual int poll_and_process_element_tx(uint64_t* p_cq_poll_sn) { NOT_IN_USE(p_cq_poll_sn); return 0; } virtual int wait_for_notification_and_process_element(int cq_channel_fd, uint64_t* p_cq_poll_sn, void* pv_fd_ready_array = NULL); virtual int drain_and_proccess(); virtual bool reclaim_recv_buffers(descq_t *rx_reuse); diff --git a/src/vma/event/event_handler_manager.cpp b/src/vma/event/event_handler_manager.cpp index 107c59ec5a..c7014e2b17 100644 --- a/src/vma/event/event_handler_manager.cpp +++ b/src/vma/event/event_handler_manager.cpp @@ -229,7 +229,7 @@ void event_handler_manager::register_command_event(int fd, command* cmd) event_handler_manager::event_handler_manager() : m_reg_action_q_lock("reg_action_q_lock"), - m_b_sysvar_internal_thread_arm_cq_enabled(safe_mce_sys().internal_thread_arm_cq_enabled), + m_b_sysvar_internal_thread_arm_cq_enabled(safe_mce_sys().internal_thread_arm_cq), m_n_sysvar_vma_time_measure_num_samples(safe_mce_sys().vma_time_measure_num_samples), m_n_sysvar_timer_resolution_msec(safe_mce_sys().timer_resolution_msec) { diff --git a/src/vma/iomux/epfd_info.cpp b/src/vma/iomux/epfd_info.cpp index 850dfa9491..60639be47d 100644 --- a/src/vma/iomux/epfd_info.cpp +++ b/src/vma/iomux/epfd_info.cpp @@ -55,10 +55,13 @@ int epfd_info::remove_fd_from_epoll_os(int fd) epfd_info::epfd_info(int epfd, int size) : lock_mutex_recursive("epfd_info"), m_epfd(epfd), m_size(size), m_ring_map_lock("epfd_ring_map_lock"), - m_lock_poll_os("epfd_lock_poll_os"), m_sysvar_thread_mode(safe_mce_sys().thread_mode), - m_b_os_data_available(false) + m_lock_poll_os("epfd_lock_poll_os"), + m_b_os_data_available(false), + m_sysvar_thread_mode(safe_mce_sys().thread_mode), + m_sysvar_internal_thread_arm_cq(safe_mce_sys().internal_thread_arm_cq) { __log_funcall(""); + int max_sys_fd = get_sys_max_fd_num(); if (m_size<=max_sys_fd) { @@ -609,17 +612,35 @@ int epfd_info::ring_poll_and_process_element(uint64_t *p_poll_sn, void* pv_fd_re m_ring_map_lock.lock(); for (ring_map_t::iterator iter = m_ring_map.begin(); iter != m_ring_map.end(); iter++) { - int ret = iter->first->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0 && errno != EAGAIN) { - __log_err("Error in ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); - m_ring_map_lock.unlock(); - return ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) { + int ret = iter->first->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + __log_err("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + __log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); + ret_total += ret; + } + } + + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) { + int ret = iter->first->poll_and_process_element_tx(p_poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0 && errno != EAGAIN) { + __log_err("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + if (ret > 0) { + __log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); + ret_total += ret; + } } - BULLSEYE_EXCLUDE_BLOCK_END - if (ret > 0) - __log_func("ring[%p] Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn); - ret_total += ret; } m_ring_map_lock.unlock(); @@ -646,16 +667,31 @@ int epfd_info::ring_request_notification(uint64_t poll_sn) m_ring_map_lock.lock(); for (ring_map_t::iterator iter = m_ring_map.begin(); iter != m_ring_map.end(); iter++) { - int ret = iter->first->request_notification(CQT_RX, poll_sn); - BULLSEYE_EXCLUDE_BLOCK_START - if (ret < 0) { - __log_err("Error ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); - m_ring_map_lock.unlock(); - return ret; + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) { + int ret = iter->first->request_notification(CQT_RX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + __log_err("Error RX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + __log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, poll_sn); + ret_total += ret; + } + + if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) { + int ret = iter->first->request_notification(CQT_TX, poll_sn); + BULLSEYE_EXCLUDE_BLOCK_START + if (ret < 0) { + __log_err("Error TX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno); + m_ring_map_lock.unlock(); + return ret; + } + BULLSEYE_EXCLUDE_BLOCK_END + __log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, poll_sn); + ret_total += ret; } - BULLSEYE_EXCLUDE_BLOCK_END - __log_func("ring[%p] Returned with: %d (sn=%d)", iter->first, ret, poll_sn); - ret_total += ret; } m_ring_map_lock.unlock(); diff --git a/src/vma/iomux/epfd_info.h b/src/vma/iomux/epfd_info.h index 3b3e434082..41aa2f97f0 100644 --- a/src/vma/iomux/epfd_info.h +++ b/src/vma/iomux/epfd_info.h @@ -129,13 +129,16 @@ class epfd_info : public lock_mutex_recursive, public cleanable_obj, public wake ring_map_t m_ring_map; lock_mutex_recursive m_ring_map_lock; lock_spin m_lock_poll_os; - const thread_mode_t m_sysvar_thread_mode; ready_cq_fd_q_t m_ready_cq_fd_q; epoll_stats_t m_local_stats; epoll_stats_t *m_stats; int m_log_invalid_events; bool m_b_os_data_available; // true when non offloaded data is available + /* Global environment variables section */ + const thread_mode_t m_sysvar_thread_mode; + const int m_sysvar_internal_thread_arm_cq; + int add_fd(int fd, epoll_event *event); int del_fd(int fd, bool passthrough = false); int mod_fd(int fd, epoll_event *event); diff --git a/src/vma/iomux/epoll_wait_call.cpp b/src/vma/iomux/epoll_wait_call.cpp index 07530e1544..e5bb2c78ff 100644 --- a/src/vma/iomux/epoll_wait_call.cpp +++ b/src/vma/iomux/epoll_wait_call.cpp @@ -117,6 +117,15 @@ int epoll_wait_call::get_current_events() mutual_events &= ~EPOLLOUT; } + // handle zcopy notification mechanism + if (mutual_events & EPOLLERR) { + int unused; + if (handle_epoll_event(p_socket_object->is_errorable(&unused), EPOLLERR, p_socket_object, i)) { + got_event = true; + } + mutual_events &= ~EPOLLERR; + } + if (mutual_events) { if (handle_epoll_event(true, mutual_events, p_socket_object, i)) { got_event = true; diff --git a/src/vma/iomux/poll_call.cpp b/src/vma/iomux/poll_call.cpp index 42130c8f6f..e96c663565 100644 --- a/src/vma/iomux/poll_call.cpp +++ b/src/vma/iomux/poll_call.cpp @@ -71,7 +71,7 @@ poll_call::poll_call(int *off_rfds_buffer, offloaded_mode_t *off_modes_buffer, i socket_fd_api* temp_sock_fd_api = fd_collection_get_sockfd(fd); if (temp_sock_fd_api && (temp_sock_fd_api->get_type()==FD_TYPE_SOCKET)) { offloaded_mode_t off_mode = OFF_NONE; - if (m_orig_fds[i].events & POLLIN) + if (m_orig_fds[i].events & (POLLIN | POLLERR | POLLHUP)) off_mode = (offloaded_mode_t)(off_mode | OFF_READ); if (m_orig_fds[i].events & POLLOUT) off_mode = (offloaded_mode_t)(off_mode | OFF_WRITE); diff --git a/src/vma/lwip/pbuf.c b/src/vma/lwip/pbuf.c index 7a90513d1a..6c499fcb4f 100644 --- a/src/vma/lwip/pbuf.c +++ b/src/vma/lwip/pbuf.c @@ -200,6 +200,9 @@ pbuf_header(struct pbuf *p, s16_t header_size_increment) return 1; /* AlexV: we need to check that the header EXPANTION is legal for PBUF_REF & PBUF_ROM pbufs! */ p->payload = (u8_t *)p->payload - header_size_increment; + } else if (type == PBUF_ZEROCOPY) { + /* temporary do the same as for PBUF_RAM until zcopy support is not ready */ + p->payload = (u8_t *)p->payload - header_size_increment; } else { /* Unknown type */ LWIP_ASSERT("bad pbuf type", 0); diff --git a/src/vma/lwip/pbuf.h b/src/vma/lwip/pbuf.h index d59bf87e59..a39a427a4a 100644 --- a/src/vma/lwip/pbuf.h +++ b/src/vma/lwip/pbuf.h @@ -57,7 +57,8 @@ typedef enum { PBUF_RAM, /* pbuf data is stored in RAM */ PBUF_ROM, /* pbuf data is stored in ROM */ PBUF_REF, /* pbuf comes from the pbuf pool */ - PBUF_POOL /* pbuf payload refers to RAM */ + PBUF_POOL, /* pbuf payload refers to RAM */ + PBUF_ZEROCOPY /* pbuf data used directly from user */ } pbuf_type; diff --git a/src/vma/lwip/tcp.c b/src/vma/lwip/tcp.c index 43e343f92f..cb0bd2d34c 100644 --- a/src/vma/lwip/tcp.c +++ b/src/vma/lwip/tcp.c @@ -1084,10 +1084,10 @@ tcp_tx_pbuf_alloc(struct tcp_pcb * pcb, u16_t length, pbuf_type type) { struct pbuf * p; - if (!pcb->pbuf_alloc) { + if (!pcb->pbuf_alloc || pcb->pbuf_alloc->type != type) { // pbuf_alloc is not valid, we should allocate a new pbuf. - p = external_tcp_tx_pbuf_alloc(pcb); + p = external_tcp_tx_pbuf_alloc(pcb, type); if (!p) return NULL; p->next = NULL; @@ -1129,7 +1129,7 @@ tcp_tx_pbuf_free(struct tcp_pcb * pcb, struct pbuf * p) while (p) { p_next = p->next; p->next = NULL; - if (p->type == PBUF_RAM) { + if (p->type == PBUF_RAM || p->type == PBUF_ZEROCOPY) { external_tcp_tx_pbuf_free(pcb, p); } else { pbuf_free(p); diff --git a/src/vma/lwip/tcp.h b/src/vma/lwip/tcp.h index 22d8bb79aa..83509a44d4 100644 --- a/src/vma/lwip/tcp.h +++ b/src/vma/lwip/tcp.h @@ -66,7 +66,7 @@ void register_sys_readv(sys_readv_fn fn); #endif #if LWIP_3RD_PARTY_BUFS -typedef struct pbuf * (*tcp_tx_pbuf_alloc_fn)(void* p_conn); +typedef struct pbuf * (*tcp_tx_pbuf_alloc_fn)(void* p_conn, pbuf_type type); void register_tcp_tx_pbuf_alloc(tcp_tx_pbuf_alloc_fn fn); @@ -528,6 +528,7 @@ err_t tcp_shutdown(struct tcp_pcb *pcb, int shut_rx, int shut_tx); #define TCP_WRITE_DUMMY 0x10 #define TCP_WRITE_TSO 0x20 #define TCP_WRITE_FILE 0x40 +#define TCP_WRITE_ZEROCOPY 0x80 err_t tcp_write (struct tcp_pcb *pcb, const void *dataptr, u32_t len, u8_t apiflags); diff --git a/src/vma/lwip/tcp_impl.h b/src/vma/lwip/tcp_impl.h index 2419178c53..5eb9b11166 100644 --- a/src/vma/lwip/tcp_impl.h +++ b/src/vma/lwip/tcp_impl.h @@ -305,6 +305,7 @@ struct tcp_seg { #define TF_SEG_OPTS_WNDSCALE (u8_t)0x08U /* Include window scaling option */ #define TF_SEG_OPTS_DUMMY_MSG (u8_t)TCP_WRITE_DUMMY /* Include dummy send option */ #define TF_SEG_OPTS_TSO (u8_t)TCP_WRITE_TSO /* Use TSO send mode */ +#define TF_SEG_OPTS_ZEROCOPY (u8_t)TCP_WRITE_ZEROCOPY /* Use zerocopy send mode */ struct tcp_hdr *tcphdr; /* the TCP header */ }; diff --git a/src/vma/lwip/tcp_in.c b/src/vma/lwip/tcp_in.c index 1f8b693ab8..e4eee96e44 100644 --- a/src/vma/lwip/tcp_in.c +++ b/src/vma/lwip/tcp_in.c @@ -820,7 +820,7 @@ tcp_shrink_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t ackno) seg->p->next = cur_p; p->next = NULL; - if (p->type == PBUF_RAM) { + if (p->type == PBUF_RAM || p->type == PBUF_ZEROCOPY) { external_tcp_tx_pbuf_free(pcb, p); } else { pbuf_free(p); @@ -844,7 +844,7 @@ tcp_shrink_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t ackno) seg->p->next = cur_p; p->next = NULL; - if (p->type == PBUF_RAM) { + if (p->type == PBUF_RAM || p->type == PBUF_ZEROCOPY) { external_tcp_tx_pbuf_free(pcb, p); } else { pbuf_free(p); diff --git a/src/vma/lwip/tcp_out.c b/src/vma/lwip/tcp_out.c index efe944fd3f..97376247f3 100644 --- a/src/vma/lwip/tcp_out.c +++ b/src/vma/lwip/tcp_out.c @@ -286,7 +286,7 @@ tcp_create_segment(struct tcp_pcb *pcb, struct pbuf *p, u8_t flags, u32_t seqno, */ static struct pbuf * tcp_pbuf_prealloc(u16_t length, u16_t max_length, - u16_t *oversize, struct tcp_pcb *pcb, u8_t tcp_write_flag_more, + u16_t *oversize, struct tcp_pcb *pcb, pbuf_type type, u8_t tcp_write_flag_more, u8_t first_seg) { struct pbuf *p; @@ -310,7 +310,7 @@ tcp_pbuf_prealloc(u16_t length, u16_t max_length, alloc = LWIP_MIN(max_length, LWIP_MEM_ALIGN_SIZE(length + pcb->tcp_oversize_val)); } } - p = tcp_tx_pbuf_alloc(pcb, alloc, PBUF_RAM); + p = tcp_tx_pbuf_alloc(pcb, alloc, type); if (p == NULL) { return NULL; } @@ -440,6 +440,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) struct iovec piov[piov_max_size]; int piov_cur_index = 0; int piov_cur_len = 0; + pbuf_type type = (apiflags & TCP_WRITE_ZEROCOPY ? PBUF_ZEROCOPY : PBUF_RAM); int byte_queued = pcb->snd_nxt - pcb->lastack; if ( len < pcb->mss && !(apiflags & TCP_WRITE_DUMMY)) @@ -464,6 +465,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) #endif /* LWIP_TSO */ optflags |= (apiflags & TCP_WRITE_DUMMY ? TF_SEG_OPTS_DUMMY_MSG : 0); + optflags |= (apiflags & TCP_WRITE_ZEROCOPY ? TF_SEG_OPTS_ZEROCOPY : 0); #if LWIP_TCP_TIMESTAMPS if ((pcb->flags & TF_TIMESTAMP)) { @@ -534,7 +536,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) #endif /* TCP_OVERSIZE_DBGCHECK */ if (pcb->unsent_oversize > 0) { - if (!(apiflags & TCP_WRITE_FILE)) { + if (!(apiflags & (TCP_WRITE_FILE | TCP_WRITE_ZEROCOPY))) { oversize = pcb->unsent_oversize; LWIP_ASSERT("inconsistent oversize vs. space", oversize_used <= space); oversize_used = oversize < len ? oversize : len; @@ -555,10 +557,10 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) * the end. */ #if LWIP_TSO - if (!(apiflags & TCP_WRITE_FILE) && (pos < len) && (space > 0) && (pcb->last_unsent->len > 0) && + if (!(apiflags & (TCP_WRITE_FILE | TCP_WRITE_ZEROCOPY)) && (pos < len) && (space > 0) && (pcb->last_unsent->len > 0) && (tot_p < (int)pcb->tso.max_send_sge)) { #else - if (!(apiflags & TCP_WRITE_FILE) && (pos < len) && (space > 0) && (pcb->last_unsent->len > 0)) { + if (!(apiflags & (TCP_WRITE_FILE | TCP_WRITE_ZEROCOPY)) && (pos < len) && (space > 0) && (pcb->last_unsent->len > 0)) { #endif /* LWIP_TSO */ u16_t seglen = space < len - pos ? space : len - pos; @@ -567,7 +569,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) * can use PBUF_RAW here since the data appears in the middle of * a segment. A header will never be prepended. */ /* Data is copied */ - if ((concat_p = tcp_pbuf_prealloc(seglen, space, &oversize, pcb, TCP_WRITE_FLAG_MORE, 1)) == NULL) { + if ((concat_p = tcp_pbuf_prealloc(seglen, space, &oversize, pcb, type, TCP_WRITE_FLAG_MORE, 1)) == NULL) { LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_write : could not allocate memory for pbuf copy size %"U16_F"\n", seglen)); @@ -606,7 +608,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) /* If copy is set, memory should be allocated and data copied * into pbuf */ - if ((p = tcp_pbuf_prealloc(seglen + optlen, max_len, &oversize, pcb, TCP_WRITE_FLAG_MORE, queue == NULL)) == NULL) { + if ((p = tcp_pbuf_prealloc(seglen + optlen, max_len, &oversize, pcb, type, TCP_WRITE_FLAG_MORE, queue == NULL)) == NULL) { LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_write : could not allocate memory for pbuf copy size %"U16_F"\n", seglen)); goto memerr; } @@ -1058,6 +1060,7 @@ tcp_split_one_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t lentosend, struct pbuf *cur_p = NULL; u16_t max_length = 0; u16_t oversize = 0; + pbuf_type type = PBUF_RAM; cur_seg = seg; max_length = cur_seg->p->len; @@ -1066,7 +1069,7 @@ tcp_split_one_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t lentosend, u32_t lentoqueue = cur_seg->len - lentosend; /* Allocate memory for p_buf and fill in fields. */ - if (NULL == (cur_p = tcp_pbuf_prealloc(lentoqueue + optlen, max_length, &oversize, pcb, 0, 0))) { + if (NULL == (cur_p = tcp_pbuf_prealloc(lentoqueue + optlen, max_length, &oversize, pcb, type, 0, 0))) { LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_split_one_segment: could not allocate memory for pbuf copy size %"U16_F"\n", (lentoqueue + optlen))); goto out; } @@ -1264,6 +1267,7 @@ tcp_split_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t wnd) u16_t oversize = 0; u8_t optlen = 0, optflags = 0; u16_t mss_local = 0; + pbuf_type type = PBUF_RAM; LWIP_ASSERT("tcp_split_segment: sanity check", (seg && seg->p)); @@ -1294,7 +1298,7 @@ tcp_split_segment(struct tcp_pcb *pcb, struct tcp_seg *seg, u32_t wnd) if (seg->p->len > ((TCP_HLEN + optlen) + lentosend)) {/* First buffer is too big, split it */ u32_t lentoqueue = seg->p->len - (TCP_HLEN + optlen) - lentosend; - if (NULL == (p = tcp_pbuf_prealloc(lentoqueue + optlen, mss_local, &oversize, pcb, 0, 0))) { + if (NULL == (p = tcp_pbuf_prealloc(lentoqueue + optlen, mss_local, &oversize, pcb, type, 0, 0))) { LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_split_segment: could not allocate memory for pbuf copy size %"U16_F"\n", (lentoqueue + optlen))); return; } diff --git a/src/vma/main.cpp b/src/vma/main.cpp index 69a66dd8c4..aad8ce651b 100644 --- a/src/vma/main.cpp +++ b/src/vma/main.cpp @@ -532,7 +532,7 @@ void print_vma_global_settings() VLOG_PARAM_NUMBER("Delay after join (msec)", safe_mce_sys().wait_after_join_msec, MCE_DEFAULT_WAIT_AFTER_JOIN_MSEC, SYS_VAR_WAIT_AFTER_JOIN_MSEC); VLOG_STR_PARAM_STRING("Internal Thread Affinity", safe_mce_sys().internal_thread_affinity_str, MCE_DEFAULT_INTERNAL_THREAD_AFFINITY_STR, SYS_VAR_INTERNAL_THREAD_AFFINITY, safe_mce_sys().internal_thread_affinity_str); VLOG_STR_PARAM_STRING("Internal Thread Cpuset", safe_mce_sys().internal_thread_cpuset, MCE_DEFAULT_INTERNAL_THREAD_CPUSET, SYS_VAR_INTERNAL_THREAD_CPUSET, safe_mce_sys().internal_thread_cpuset); - VLOG_PARAM_STRING("Internal Thread Arm CQ", safe_mce_sys().internal_thread_arm_cq_enabled, MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ_ENABLED, SYS_VAR_INTERNAL_THREAD_ARM_CQ, safe_mce_sys().internal_thread_arm_cq_enabled ? "Enabled " : "Disabled"); + VLOG_PARAM_STRING("Internal Thread Arm CQ", safe_mce_sys().internal_thread_arm_cq, MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ, SYS_VAR_INTERNAL_THREAD_ARM_CQ, safe_mce_sys().arm_cq_str((mce_sys_var::arm_cq_t)safe_mce_sys().internal_thread_arm_cq)); VLOG_PARAM_NUMSTR("Internal Thread TCP Handling", safe_mce_sys().internal_thread_tcp_timer_handling, MCE_DEFAULT_INTERNAL_THREAD_TCP_TIMER_HANDLING, SYS_VAR_INTERNAL_THREAD_TCP_TIMER_HANDLING, internal_thread_tcp_timer_handling_str(safe_mce_sys().internal_thread_tcp_timer_handling)); VLOG_PARAM_STRING("Thread mode", safe_mce_sys().thread_mode, MCE_DEFAULT_THREAD_MODE, SYS_VAR_THREAD_MODE, thread_mode_str(safe_mce_sys().thread_mode)); VLOG_PARAM_NUMSTR("Buffer batching mode", safe_mce_sys().buffer_batching_mode, MCE_DEFAULT_BUFFER_BATCHING_MODE, SYS_VAR_BUFFER_BATCHING_MODE, buffer_batching_mode_str(safe_mce_sys().buffer_batching_mode)); diff --git a/src/vma/proto/dst_entry.h b/src/vma/proto/dst_entry.h index 65ec18af97..04f8182c4b 100644 --- a/src/vma/proto/dst_entry.h +++ b/src/vma/proto/dst_entry.h @@ -65,6 +65,7 @@ struct socket_data { typedef struct { vma_wr_tx_packet_attr flags; uint16_t mss; + uint32_t length; } vma_send_attr; class dst_entry : public cache_observer, public tostr, public neigh_observer diff --git a/src/vma/proto/dst_entry_tcp.cpp b/src/vma/proto/dst_entry_tcp.cpp index 7582e0d3f0..e6bdfcc53c 100644 --- a/src/vma/proto/dst_entry_tcp.cpp +++ b/src/vma/proto/dst_entry_tcp.cpp @@ -81,6 +81,11 @@ ssize_t dst_entry_tcp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s p_tcp_iov = (tcp_iovec*)p_iov; + /* Suppress flags that should not be used anymore + * to avoid conflicts with VMA_TX_PACKET_L3_CSUM and VMA_TX_PACKET_L4_CSUM + */ + attr.flags = (vma_wr_tx_packet_attr)(attr.flags & ~(VMA_TX_PACKET_ZEROCOPY | VMA_TX_FILE)); + attr.flags = (vma_wr_tx_packet_attr)(attr.flags | VMA_TX_PACKET_L3_CSUM | VMA_TX_PACKET_L4_CSUM); /* Supported scenarios: @@ -104,10 +109,10 @@ ssize_t dst_entry_tcp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s */ p_pkt = (tx_packet_template_t*)((uint8_t*)p_tcp_iov[0].iovec.iov_base - m_header.m_aligned_l2_l3_len); - /* iov_len is a size of TCP header and data + /* attr.length is payload size and L4 header size * m_total_hdr_len is a size of L2/L3 header */ - total_packet_len = p_tcp_iov[0].iovec.iov_len + m_header.m_total_hdr_len; + total_packet_len = attr.length + m_header.m_total_hdr_len; /* copy just L2/L3 headers to p_pkt */ m_header.copy_l2_ip_hdr(p_pkt); @@ -125,10 +130,17 @@ ssize_t dst_entry_tcp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s } else if (is_set(attr.flags, (vma_wr_tx_packet_attr)(VMA_TX_PACKET_TSO))) { /* update send work request. do not expect noninlined scenario */ send_wqe_h.init_not_inline_wqe(send_wqe, m_sge, sz_iov); - send_wqe_h.enable_tso(send_wqe, - (void *)((uint8_t*)p_pkt + hdr_alignment_diff), - m_header.m_total_hdr_len + p_pkt->hdr.m_tcp_hdr.doff * 4, - attr.mss); + if (attr.mss < (attr.length - p_pkt->hdr.m_tcp_hdr.doff * 4)) { + send_wqe_h.enable_tso(send_wqe, + (void *)((uint8_t*)p_pkt + hdr_alignment_diff), + m_header.m_total_hdr_len + p_pkt->hdr.m_tcp_hdr.doff * 4, + attr.mss); + } else { + send_wqe_h.enable_tso(send_wqe, + (void *)((uint8_t*)p_pkt + hdr_alignment_diff), + m_header.m_total_hdr_len + p_pkt->hdr.m_tcp_hdr.doff * 4, + 0); + } m_p_send_wqe = &send_wqe; m_sge[0].addr = (uintptr_t)((uint8_t *)&p_pkt->hdr.m_tcp_hdr + p_pkt->hdr.m_tcp_hdr.doff * 4); m_sge[0].length = p_tcp_iov[0].iovec.iov_len - p_pkt->hdr.m_tcp_hdr.doff * 4; @@ -458,7 +470,7 @@ void dst_entry_tcp::put_buffer(mem_buf_desc_t * p_desc) if (p_desc->lwip_pbuf.pbuf.ref == 0) { p_desc->p_next_desc = NULL; - g_buffer_pool_tx->put_buffers_thread_safe(p_desc); + buffer_pool::free_tx_lwip_pbuf_custom(&p_desc->lwip_pbuf.pbuf); } } } diff --git a/src/vma/proto/dst_entry_udp.cpp b/src/vma/proto/dst_entry_udp.cpp index bff9bb247f..d42fb4b1dd 100644 --- a/src/vma/proto/dst_entry_udp.cpp +++ b/src/vma/proto/dst_entry_udp.cpp @@ -308,6 +308,12 @@ ssize_t dst_entry_udp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s { // Calc user data payload size ssize_t sz_data_payload = 0; + + /* Suppress flags that should not be used anymore + * to avoid conflicts with VMA_TX_PACKET_L3_CSUM and VMA_TX_PACKET_L4_CSUM + */ + attr.flags = (vma_wr_tx_packet_attr)(attr.flags & ~(VMA_TX_PACKET_ZEROCOPY | VMA_TX_FILE)); + for (ssize_t i = 0; i < sz_iov; i++) sz_data_payload += p_iov[i].iov_len; @@ -324,7 +330,7 @@ ssize_t dst_entry_udp::fast_send(const iovec* p_iov, const ssize_t sz_iov, vma_s attr.flags = (vma_wr_tx_packet_attr)(attr.flags | VMA_TX_PACKET_L3_CSUM | VMA_TX_PACKET_L4_CSUM); return fast_send_not_fragmented(p_iov, sz_iov, attr.flags, sz_udp_payload, sz_data_payload); } else { - attr.flags = (vma_wr_tx_packet_attr)(attr.flags | VMA_TX_PACKET_L3_CSUM); + attr.flags = (vma_wr_tx_packet_attr)(attr.flags | VMA_TX_PACKET_L3_CSUM); return fast_send_fragmented(p_iov, sz_iov, attr.flags, sz_udp_payload, sz_data_payload); } } diff --git a/src/vma/proto/mem_buf_desc.h b/src/vma/proto/mem_buf_desc.h index 8bf4cef9dc..88e9c50833 100644 --- a/src/vma/proto/mem_buf_desc.h +++ b/src/vma/proto/mem_buf_desc.h @@ -35,6 +35,8 @@ #define MEM_BUF_DESC_H #include +#include + #include "utils/atomic.h" #include "vma/util/vma_list.h" #include "vma/lwip/pbuf.h" @@ -58,23 +60,43 @@ struct timestamps_t * received data in TX) */ class mem_buf_desc_t { +public: + enum flags { + TYPICAL = 0, + CLONED = 0x01, + ZCOPY = 0x02 + }; + public: mem_buf_desc_t(uint8_t *buffer, size_t size, pbuf_free_custom_fn custom_free_function) : - p_buffer(buffer), lkey(0), p_next_desc(0), - p_prev_desc(0), sz_buffer(size), sz_data(0), + p_buffer(buffer), + m_flags(mem_buf_desc_t::TYPICAL), + lkey(0), + p_next_desc(0), + p_prev_desc(0), + sz_buffer(size), + sz_data(0), p_desc_owner(0) { + memset(&lwip_pbuf, 0, sizeof(lwip_pbuf)); memset(&rx, 0, sizeof(rx)); memset(&tx, 0, sizeof(tx)); + memset(&ee, 0, sizeof(ee)); reset_ref_count(); lwip_pbuf.custom_free_function = custom_free_function; } - struct pbuf_custom lwip_pbuf; //Do not change the location of this field. + /* This filed should be first in this class + * It encapsulates pbuf structure from lwip + * and extra fields to proceed customer specific requirements + */ + struct pbuf_custom lwip_pbuf; uint8_t* const p_buffer; - static inline size_t buffer_node_offset(void) {return NODE_OFFSET(mem_buf_desc_t, buffer_node);} + static inline size_t buffer_node_offset(void) { + return NODE_OFFSET(mem_buf_desc_t, buffer_node); + } list_node buffer_node; union { @@ -113,13 +135,31 @@ class mem_buf_desc_t { struct udphdr* p_udp_h; struct tcphdr* p_tcp_h; }; + struct { + /* This structure allows to track tx zerocopy flow + * including start send id and range in count field + * with total bytes length as len + * where + * id -> ee.ee_info + * id + count -1 -> ee.ee_data + */ + uint32_t id; + uint32_t len; + uint16_t count; + void *ctx; + void (*callback)(mem_buf_desc_t *); + } zc; } tx; }; + /* This field is needed for error queue processing */ + struct sock_extended_err ee; + private: atomic_t n_ref_count; // number of interested receivers (sockinfo) [can be modified only in cq_mgr context] -public: +public: + int m_flags; /* object description */ uint32_t lkey; // Buffers lkey for QP access mem_buf_desc_t* p_next_desc; // A general purpose linked list of mem_buf_desc mem_buf_desc_t* p_prev_desc; @@ -130,14 +170,43 @@ class mem_buf_desc_t { // Rx: cq_mgr owns the mem_buf_desc and the associated data buffer ring_slave* p_desc_owner; - inline int get_ref_count() const {return atomic_read(&n_ref_count);} - inline void reset_ref_count() {atomic_set(&n_ref_count, 0);} - inline int inc_ref_count() {return atomic_fetch_and_inc(&n_ref_count);} - inline int dec_ref_count() {return atomic_fetch_and_dec(&n_ref_count);} + inline mem_buf_desc_t* clone() { + mem_buf_desc_t* p_desc = new mem_buf_desc_t(*this); + INIT_LIST_HEAD(&p_desc->buffer_node.head); + p_desc->m_flags |= mem_buf_desc_t::CLONED; + return p_desc; + } + + inline int get_ref_count() const { + return atomic_read(&n_ref_count); + } - inline unsigned int lwip_pbuf_inc_ref_count() {return ++lwip_pbuf.pbuf.ref;} - inline unsigned int lwip_pbuf_dec_ref_count() {if (likely(lwip_pbuf.pbuf.ref)) --lwip_pbuf.pbuf.ref; return lwip_pbuf.pbuf.ref;} - inline unsigned int lwip_pbuf_get_ref_count() const {return lwip_pbuf.pbuf.ref;} + inline void reset_ref_count() { + atomic_set(&n_ref_count, 0); + } + + inline int inc_ref_count() { + return atomic_fetch_and_inc(&n_ref_count); + } + + inline int dec_ref_count() { + return atomic_fetch_and_dec(&n_ref_count); + } + + inline unsigned int lwip_pbuf_inc_ref_count() { + return ++lwip_pbuf.pbuf.ref; + } + + inline unsigned int lwip_pbuf_dec_ref_count() { + if (likely(lwip_pbuf.pbuf.ref)) { + --lwip_pbuf.pbuf.ref; + } + return lwip_pbuf.pbuf.ref; + } + + inline unsigned int lwip_pbuf_get_ref_count() const { + return lwip_pbuf.pbuf.ref; + } }; typedef vma_list_t descq_t; diff --git a/src/vma/proto/vma_lwip.h b/src/vma/proto/vma_lwip.h index a334541735..1bc345c950 100644 --- a/src/vma/proto/vma_lwip.h +++ b/src/vma/proto/vma_lwip.h @@ -51,11 +51,25 @@ typedef enum vma_wr_tx_packet_attr { VMA_TX_PACKET_TSO = TCP_WRITE_TSO, /* 0x20 */ /* sendfile operation. */ VMA_TX_FILE = TCP_WRITE_FILE, /* 0x40 */ + /* zcopy write operation (MSG_ZEROCOPY). */ + VMA_TX_PACKET_ZEROCOPY = TCP_WRITE_ZEROCOPY, /* 0x80 */ - /* MLX5_ETH_WQE_L3_CSUM offload to HW L3 (IP) header checksum */ - VMA_TX_PACKET_L3_CSUM = (1 << 6), /* hardcoded values. It is the same as VMA_TX_FILE but there is no conflict */ - /* MLX5_ETH_WQE_L4_CSUM offload to HW L4 (TCP/UDP) header checksum */ - VMA_TX_PACKET_L4_CSUM = (1 << 7), /* hardcoded values */ + /* MLX5_ETH_WQE_L3_CSUM offload to HW L3 (IP) header checksum + * Important: + * - hardcoded value used directly to program send to wire + * - it is the same as VMA_TX_FILE but there is no conflict as far as + * VMA_TX_FILE is passed into dst_entry::fast_send() operation + * and it is not needed later doing send to wire + */ + VMA_TX_PACKET_L3_CSUM = (1 << 6), + /* MLX5_ETH_WQE_L4_CSUM offload to HW L4 (TCP/UDP) header checksum + * Important: + * - hardcoded value used directly to program send to wire + * - it is the same as TCP_WRITE_ZEROCOPY but there is no conflict as far as + * TCP_WRITE_ZEROCOPY is passed into dst_entry::fast_send() operation + * and it is not needed later doing send to wire + */ + VMA_TX_PACKET_L4_CSUM = (1 << 7), /* blocking send operation */ VMA_TX_PACKET_BLOCK = (1 << 8), /* Force SW checksum */ diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index 29d4e0665e..cdb67ff4d0 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -66,6 +66,7 @@ sockinfo::sockinfo(int fd): m_b_pktinfo(false), m_b_rcvtstamp(false), m_b_rcvtstampns(false), + m_b_zc(false), m_n_tsing_flags(0), m_protocol(PROTO_UNDEFINED), m_lock_rcv(MODULE_NAME "::m_lock_rcv"), @@ -114,6 +115,9 @@ sockinfo::sockinfo(int fd): memset(&m_so_ratelimit, 0, sizeof(vma_rate_limit_t)); set_flow_tag(m_fd + 1); + atomic_set(&m_zckey, 0); + m_last_zcdesc = NULL; + m_socketxtreme.ec.clear(); m_socketxtreme.completion = NULL; m_socketxtreme.last_buff_lst = NULL; @@ -138,7 +142,18 @@ sockinfo::~sockinfo() delete[] m_p_rings_fds; m_p_rings_fds = NULL; } - vma_stats_instance_remove_socket_block(m_p_socket_stats); + + while (!m_error_queue.empty()) { + mem_buf_desc_t* buff = m_error_queue.get_and_pop_front(); + if (buff->m_flags & mem_buf_desc_t::CLONED) { + delete buff; + } else { + si_logerr("Detected invalid element in socket error queue as %p with flags 0x%x", + buff, buff->m_flags); + } + } + + vma_stats_instance_remove_socket_block(m_p_socket_stats); } void sockinfo::set_blocking(bool is_blocked) @@ -1640,6 +1655,30 @@ void sockinfo::handle_recv_timestamping(struct cmsg_state *cm_state) insert_cmsg(cm_state, SOL_SOCKET, SO_TIMESTAMPING, &tsing, sizeof(tsing)); } +void sockinfo::handle_recv_errqueue(struct cmsg_state *cm_state) +{ + mem_buf_desc_t *buff = NULL; + + if (m_error_queue.empty()) { + return; + } + + m_error_queue_lock.lock(); + buff = m_error_queue.get_and_pop_front(); + m_error_queue_lock.unlock(); + + if (!(buff->m_flags & mem_buf_desc_t::CLONED)) { + si_logerr("Detected invalid element in socket error queue as %p with flags 0x%x", + buff, buff->m_flags); + return; + } + + insert_cmsg(cm_state, 0, IP_RECVERR, &buff->ee, sizeof(buff->ee)); + cm_state->mhdr->msg_flags |= MSG_ERRQUEUE; + + delete buff; +} + void sockinfo::insert_cmsg(struct cmsg_state * cm_state, int level, int type, void *data, int len) { if (!cm_state->cmhdr || @@ -1673,7 +1712,7 @@ void sockinfo::insert_cmsg(struct cmsg_state * cm_state, int level, int type, vo cm_state->cmhdr = next; } -void sockinfo::handle_cmsg(struct msghdr * msg) +void sockinfo::handle_cmsg(struct msghdr * msg, int flags) { struct cmsg_state cm_state; @@ -1683,6 +1722,7 @@ void sockinfo::handle_cmsg(struct msghdr * msg) if (m_b_pktinfo) handle_ip_pktinfo(&cm_state); if (m_b_rcvtstamp || m_n_tsing_flags) handle_recv_timestamping(&cm_state); + if (flags & MSG_ERRQUEUE) handle_recv_errqueue(&cm_state); cm_state.mhdr->msg_controllen = cm_state.cmsg_bytes_consumed; } diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index 7c6e7d4581..6b1edce210 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -90,6 +90,22 @@ enum { #define SO_REUSEPORT 15 #endif +#ifndef SO_EE_ORIGIN_ZEROCOPY +#define SO_EE_ORIGIN_ZEROCOPY 5 +#endif + +#ifndef SO_ZEROCOPY +#define SO_ZEROCOPY 59 +#endif + +#ifndef SO_EE_CODE_ZEROCOPY_COPIED +#define SO_EE_CODE_ZEROCOPY_COPIED 1 +#endif + +#ifndef MSG_ZEROCOPY +#define MSG_ZEROCOPY 0x4000000 +#endif + struct cmsg_state { struct msghdr *mhdr; @@ -200,6 +216,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou bool m_b_pktinfo; bool m_b_rcvtstamp; bool m_b_rcvtstampns; + bool m_b_zc; uint8_t m_n_tsing_flags; in_protocol_t m_protocol; @@ -248,6 +265,26 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou ring_alloc_logic_attr m_ring_alloc_log_tx; uint32_t m_pcp; + /* Socket error queue that keeps local errors and internal data required + * to provide notification ability. + */ + descq_t m_error_queue; + lock_spin m_error_queue_lock; + + /* TX zcopy counter + * The notification itself for tx zcopy operation is a simple scalar value. + * Each socket maintains an internal unsigned 32-bit counter. + * Each send call with MSG_ZEROCOPY that successfully sends data increments + * the counter. The counter is not incremented on failure or if called with + * length zero. + * The counter counts system call invocations, not bytes. + * It wraps after UINT_MAX calls. + */ + atomic_t m_zckey; + + /* Last memory descriptor with zcopy operation method */ + mem_buf_desc_t* m_last_zcdesc; + struct { /* Track internal events to return in socketxtreme_poll() * Current design support single event for socket at a particular time @@ -327,8 +364,9 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou virtual void handle_ip_pktinfo(struct cmsg_state *cm_state) = 0; inline void handle_recv_timestamping(struct cmsg_state *cm_state); + inline void handle_recv_errqueue(struct cmsg_state *cm_state); void insert_cmsg(struct cmsg_state *cm_state, int level, int type, void *data, int len); - void handle_cmsg(struct msghdr * msg); + void handle_cmsg(struct msghdr * msg, int flags); void process_timestamps(mem_buf_desc_t* p_desc); virtual bool try_un_offloading(); // un-offload the socket if possible @@ -530,6 +568,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou case SO_TIMESTAMP: return "SO_TIMESTAMP"; case SO_TIMESTAMPNS: return "SO_TIMESTAMPNS"; case SO_BINDTODEVICE: return "SO_BINDTODEVICE"; + case SO_ZEROCOPY: return "SO_ZEROCOPY"; case SO_VMA_RING_ALLOC_LOGIC: return "SO_VMA_RING_ALLOC_LOGIC"; case SO_MAX_PACING_RATE: return "SO_MAX_PACING_RATE"; case SO_VMA_FLOW_TAG: return "SO_VMA_FLOW_TAG"; diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index ccff41b951..57555d0db1 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -824,6 +824,10 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg) } #endif + if ((__flags & MSG_ZEROCOPY) && (m_b_zc)) { + apiflags |= VMA_TX_PACKET_ZEROCOPY; + } + for (int i = 0; i < sz_iov; i++) { si_tcp_logfunc("iov:%d base=%p len=%d", i, p_iov[i].iov_base, p_iov[i].iov_len); @@ -948,6 +952,19 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg) m_p_socket_stats->n_tx_ready_byte_count += total_tx; } + /* Each send call with MSG_ZEROCOPY that successfully sends + * data increments the counter. + * The counter is not incremented on failure or if called with length zero. + */ + if ((apiflags & VMA_TX_PACKET_ZEROCOPY) && + (total_tx > 0)) { + if (m_last_zcdesc->tx.zc.id != (uint32_t)atomic_read(&m_zckey)) { + si_tcp_logerr("Invalid tx zcopy operation"); + } else { + atomic_fetch_and_inc(&m_zckey); + } + } + unlock_tcp_con(); #ifdef VMA_TIME_MEASURE @@ -988,7 +1005,7 @@ err_t sockinfo_tcp::ip_output(struct pbuf *p, void* v_p_conn, uint16_t flags) dst_entry *p_dst = p_si_tcp->m_p_connected_dst_entry; int max_count = p_si_tcp->m_pcb.tso.max_send_sge; tcp_iovec lwip_iovec[max_count]; - vma_send_attr attr = {(vma_wr_tx_packet_attr)0, 0}; + vma_send_attr attr = {(vma_wr_tx_packet_attr)0, 0, 0}; int count = 0; /* maximum number of sge can not exceed this value */ @@ -996,6 +1013,7 @@ err_t sockinfo_tcp::ip_output(struct pbuf *p, void* v_p_conn, uint16_t flags) lwip_iovec[count].iovec.iov_base = p->payload; lwip_iovec[count].iovec.iov_len = p->len; lwip_iovec[count].p_desc = (mem_buf_desc_t*)p; + attr.length += p->len; p = p->next; count++; } @@ -1893,7 +1911,7 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov int total_rx = 0; int poll_count = 0; int bytes_to_tcp_recved; - size_t total_iov_sz = 1; + size_t total_iov_sz = 0; int out_flags = 0; int in_flags = *p_flags; bool block_this_run = BLOCK_THIS_RUN(m_b_blocking, in_flags); @@ -1915,23 +1933,47 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov TAKE_T_RX_START; #endif - if (unlikely((in_flags & MSG_WAITALL) && !(in_flags & MSG_PEEK))) { - total_iov_sz = 0; - for (int i = 0; i < sz_iov; i++) { - total_iov_sz += p_iov[i].iov_len; + /* In general, without any special flags, socket options, or ioctls being set, + * a recv call on a blocking TCP socket will return any number of bytes less than + * or equal to the size being requested. But unless the socket is closed remotely, + * interrupted by signal, or in an error state, + * it will block until at least 1 byte is available. + * With MSG_ERRQUEUE flag user application can request just information from + * error queue without any income data. + */ + if (p_iov && (sz_iov > 0)) { + total_iov_sz = 1; + if (unlikely((in_flags & MSG_WAITALL) && !(in_flags & MSG_PEEK))) { + total_iov_sz = 0; + for (int i = 0; i < sz_iov; i++) { + total_iov_sz += p_iov[i].iov_len; + } + if (total_iov_sz == 0) + return 0; } - if (total_iov_sz == 0) - return 0; } si_tcp_logfunc("rx: iov=%p niovs=%d", p_iov, sz_iov); - /* poll rx queue till we have something */ + + /* poll rx queue till we have something */ lock_tcp_con(); + + /* error queue request should be handled first + * It allows to return immediately during failure with correct + * error notification without data processing + */ + if (__msg && __msg->msg_control && (in_flags & MSG_ERRQUEUE)) { + if (m_error_queue.empty()) { + errno = EAGAIN; + unlock_tcp_con(); + return -1; + } + } return_reuse_buffers_postponed(); unlock_tcp_con(); while (m_rx_ready_byte_count < total_iov_sz) { - if (unlikely(g_b_exit ||!is_rtr() || (rx_wait_lockless(poll_count, block_this_run) < 0))) { + if (unlikely(g_b_exit || !is_rtr() || (rx_wait_lockless(poll_count, block_this_run) < 0))) { return handle_rx_error(block_this_run); } } @@ -1940,8 +1982,14 @@ ssize_t sockinfo_tcp::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov si_tcp_logfunc("something in rx queues: %d %p", m_n_rx_pkt_ready_list_count, m_rx_pkt_ready_list.front()); - total_rx = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); - if (__msg) handle_cmsg(__msg); + if (total_iov_sz > 0) { + total_rx = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); + } + + /* Handle all control message requests */ + if (__msg && __msg->msg_control) { + handle_cmsg(__msg, in_flags); + } /* * RCVBUFF Accounting: Going 'out' of the internal buffer: if some bytes are not tcp_recved yet - do that. @@ -2770,7 +2818,7 @@ void sockinfo_tcp::auto_accept_connection(sockinfo_tcp *parent, sockinfo_tcp *ch child->m_socketxtreme.ec.completion.src = parent->m_socketxtreme.ec.completion.src; child->m_socketxtreme.ec.completion.listen_fd = child->m_parent->get_fd(); } - child->set_events(VMA_SOCKETXTREME_NEW_CONNECTION_ACCEPTED); + NOTIFY_ON_EVENTS(child, VMA_SOCKETXTREME_NEW_CONNECTION_ACCEPTED); } else { vlog_printf(VLOG_ERROR, "VMA_SOCKETXTREME_NEW_CONNECTION_ACCEPTED: can't find listen socket for new connected socket with [fd=%d]", @@ -3291,7 +3339,8 @@ bool sockinfo_tcp::is_errorable(int *errors) *errors |= POLLHUP; } - if (m_conn_state == TCP_CONN_ERROR) { + if ((m_conn_state == TCP_CONN_ERROR) || + (!m_error_queue.empty())) { *errors |= POLLERR; } @@ -3795,6 +3844,15 @@ int sockinfo_tcp::setsockopt(int __level, int __optname, ret = SOCKOPT_HANDLE_BY_OS; break; } + case SO_ZEROCOPY: + if (__optval) { + lock_tcp_con(); + m_b_zc = *(bool *)__optval; + unlock_tcp_con(); + } + ret = SOCKOPT_HANDLE_BY_OS; + si_tcp_logdbg("(SO_ZEROCOPY) m_b_zc: %d", m_b_zc); + break; default: ret = SOCKOPT_HANDLE_BY_OS; supported = false; @@ -3926,6 +3984,15 @@ int sockinfo_tcp::getsockopt_offload(int __level, int __optname, void *__optval, case SO_MAX_PACING_RATE: ret = sockinfo::getsockopt(__level, __optname, __optval, __optlen); break; + case SO_ZEROCOPY: + if (*__optlen >= sizeof(int)) { + *(int *)__optval = m_b_zc; + si_tcp_logdbg("(SO_ZEROCOPY) m_b_zc: %d", m_b_zc); + ret = 0; + } else { + errno = EINVAL; + } + break; default: ret = SOCKOPT_HANDLE_BY_OS; break; @@ -4516,13 +4583,17 @@ int sockinfo_tcp::free_buffs(uint16_t len) return 0; } -struct pbuf * sockinfo_tcp::tcp_tx_pbuf_alloc(void* p_conn) +struct pbuf * sockinfo_tcp::tcp_tx_pbuf_alloc(void* p_conn, pbuf_type type) { sockinfo_tcp *p_si_tcp = (sockinfo_tcp *)(((struct tcp_pcb*)p_conn)->my_container); dst_entry_tcp *p_dst = (dst_entry_tcp *)(p_si_tcp->m_p_connected_dst_entry); mem_buf_desc_t* p_desc = NULL; + if (likely(p_dst)) { p_desc = p_dst->get_buffer(); + if (p_desc && (type == PBUF_ZEROCOPY)) { + p_desc = p_si_tcp->tcp_tx_zc_alloc(p_desc); + } } return (struct pbuf *)p_desc; } @@ -4545,9 +4616,110 @@ void sockinfo_tcp::tcp_tx_pbuf_free(void* p_conn, struct pbuf *p_buff) if (p_desc->lwip_pbuf.pbuf.ref == 0) { p_desc->p_next_desc = NULL; - g_buffer_pool_tx->put_buffers_thread_safe(p_desc); + buffer_pool::free_tx_lwip_pbuf_custom(p_buff); + } + } +} + +mem_buf_desc_t* sockinfo_tcp::tcp_tx_zc_alloc(mem_buf_desc_t* p_desc) +{ + p_desc->m_flags |= mem_buf_desc_t::ZCOPY; + p_desc->tx.zc.id = atomic_read(&m_zckey); + p_desc->tx.zc.count = 1; + p_desc->tx.zc.len = p_desc->lwip_pbuf.pbuf.len; + p_desc->tx.zc.ctx = (void *)this; + p_desc->tx.zc.callback = tcp_tx_zc_callback; + + if (m_last_zcdesc && + (m_last_zcdesc != p_desc) && + (m_last_zcdesc->lwip_pbuf.pbuf.ref > 0) && + (m_last_zcdesc->tx.zc.id == p_desc->tx.zc.id)) { + m_last_zcdesc->tx.zc.len = m_last_zcdesc->lwip_pbuf.pbuf.len; + m_last_zcdesc->tx.zc.count = 0; + } + m_last_zcdesc = p_desc; + + return p_desc; +} + +void sockinfo_tcp::tcp_tx_zc_callback(mem_buf_desc_t* p_desc) +{ + sockinfo_tcp* sock = NULL; + + if (!p_desc) { + return; + } + + if (!p_desc->tx.zc.ctx || !p_desc->tx.zc.count) { + goto cleanup; + } + + sock = (sockinfo_tcp *)p_desc->tx.zc.ctx; + + if (sock->m_state != SOCKINFO_OPENED) { + goto cleanup; + } + + sock->tcp_tx_zc_handle(p_desc); + +cleanup: + /* Clean up */ + p_desc->m_flags &= ~mem_buf_desc_t::ZCOPY; + memset(&p_desc->tx.zc, 0, sizeof(p_desc->tx.zc)); +} + +void sockinfo_tcp::tcp_tx_zc_handle(mem_buf_desc_t* p_desc) +{ + uint32_t lo, hi; + uint16_t count; + uint32_t prev_lo, prev_hi; + mem_buf_desc_t* err_queue = NULL; + sockinfo_tcp* sock = this; + + count = p_desc->tx.zc.count; + lo = p_desc->tx.zc.id; + hi = lo + count - 1; + memset(&p_desc->ee, 0, sizeof(p_desc->ee)); + p_desc->ee.ee_errno = 0; + p_desc->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY; + p_desc->ee.ee_data = hi; + p_desc->ee.ee_info = lo; +// p_desc->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED; + + m_error_queue_lock.lock(); + + /* Update last error queue element in case it has the same type */ + err_queue = sock->m_error_queue.back(); + if (err_queue && + (err_queue->ee.ee_origin == p_desc->ee.ee_origin) && + (err_queue->ee.ee_code == p_desc->ee.ee_code)) { + uint64_t sum_count = 0; + + prev_hi = err_queue->ee.ee_data; + prev_lo = err_queue->ee.ee_info; + sum_count = prev_hi - prev_lo + 1ULL + count; + + if (lo == prev_lo) { + if (hi > prev_hi) + err_queue->ee.ee_data = hi; + } else if ((sum_count >= (1ULL << 32)) || (lo != prev_hi + 1)) { + err_queue = NULL; + } else { + err_queue->ee.ee_data += count; } } + + /* Add information into error queue element */ + if (!err_queue) { + err_queue = p_desc->clone(); + sock->m_error_queue.push_back(err_queue); + } + + m_error_queue_lock.unlock(); + + /* Signal events on socket */ + NOTIFY_ON_EVENTS(sock, EPOLLERR); + sock->do_wakeup(); } struct tcp_seg * sockinfo_tcp::tcp_seg_alloc(void* p_conn) diff --git a/src/vma/sock/sockinfo_tcp.h b/src/vma/sock/sockinfo_tcp.h index 53c14c4c85..323de25ae3 100644 --- a/src/vma/sock/sockinfo_tcp.h +++ b/src/vma/sock/sockinfo_tcp.h @@ -180,11 +180,15 @@ class sockinfo_tcp : public sockinfo, public timer_handler virtual void update_header_field(data_updater *updater); virtual bool rx_input_cb(mem_buf_desc_t* p_rx_pkt_mem_buf_desc_info, void* pv_fd_ready_array); - static struct pbuf * tcp_tx_pbuf_alloc(void* p_conn); + static struct pbuf * tcp_tx_pbuf_alloc(void* p_conn, pbuf_type type); static void tcp_tx_pbuf_free(void* p_conn, struct pbuf *p_buff); static struct tcp_seg * tcp_seg_alloc(void* p_conn); static void tcp_seg_free(void* p_conn, struct tcp_seg * seg); + mem_buf_desc_t* tcp_tx_zc_alloc(mem_buf_desc_t* p_desc); + static void tcp_tx_zc_callback(mem_buf_desc_t* p_desc); + void tcp_tx_zc_handle(mem_buf_desc_t* p_desc); + bool inline is_readable(uint64_t *p_poll_sn, fd_array_t *p_fd_array = NULL); bool inline is_writeable(); bool inline is_errorable(int *errors); diff --git a/src/vma/sock/sockinfo_udp.cpp b/src/vma/sock/sockinfo_udp.cpp index 30810af7e7..98dce87dea 100644 --- a/src/vma/sock/sockinfo_udp.cpp +++ b/src/vma/sock/sockinfo_udp.cpp @@ -1310,7 +1310,7 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov, m_rx_udp_poll_os_ratio_counter++; if (m_n_rx_pkt_ready_list_count > 0) { // Found a ready packet in the list - if (__msg) handle_cmsg(__msg); + if (__msg) handle_cmsg(__msg, in_flags); ret = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); goto out; } @@ -1330,7 +1330,7 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov, if (likely(rx_wait_ret == 0)) { // Got 0, means we might have a ready packet if (m_n_rx_pkt_ready_list_count > 0) { - if (__msg) handle_cmsg(__msg); + if (__msg) handle_cmsg(__msg, in_flags); ret = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags); goto out; } else { @@ -1657,7 +1657,7 @@ ssize_t sockinfo_udp::tx(vma_tx_call_attr_t &tx_arg) { #ifdef DEFINED_TSO - vma_send_attr attr = {(vma_wr_tx_packet_attr)0, 0}; + vma_send_attr attr = {(vma_wr_tx_packet_attr)0, 0, 0}; bool b_blocking = m_b_blocking; if (unlikely(__flags & MSG_DONTWAIT)) b_blocking = false; diff --git a/src/vma/util/sys_vars.cpp b/src/vma/util/sys_vars.cpp index 49b04cb999..1f8f7c6972 100644 --- a/src/vma/util/sys_vars.cpp +++ b/src/vma/util/sys_vars.cpp @@ -583,7 +583,7 @@ void mce_sys_var::get_env_params() progress_engine_wce_max = MCE_DEFAULT_PROGRESS_ENGINE_WCE_MAX; cq_keep_qp_full = MCE_DEFAULT_CQ_KEEP_QP_FULL; qp_compensation_level = MCE_DEFAULT_QP_COMPENSATION_LEVEL; - internal_thread_arm_cq_enabled = MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ_ENABLED; + internal_thread_arm_cq = MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ; offloaded_sockets = MCE_DEFAULT_OFFLOADED_SOCKETS; timer_resolution_msec = MCE_DEFAULT_TIMER_RESOLUTION_MSEC; @@ -1161,12 +1161,16 @@ void mce_sys_var::get_env_params() tcp_timer_resolution_msec = timer_resolution_msec; } - if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_ARM_CQ)) != NULL) - internal_thread_arm_cq_enabled = atoi(env_ptr) ? true : false; + if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_ARM_CQ)) != NULL) { + internal_thread_arm_cq = (arm_cq_t)atoi(env_ptr); + if (internal_thread_arm_cq < 0 || internal_thread_arm_cq > mce_sys_var::ARM_CQ_ALL) { + internal_thread_arm_cq = MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ; + } + } - if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_CPUSET)) != NULL) { - snprintf(internal_thread_cpuset, FILENAME_MAX, "%s", env_ptr); - } + if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_CPUSET)) != NULL) { + snprintf(internal_thread_cpuset, FILENAME_MAX, "%s", env_ptr); + } // handle internal thread affinity - default is CPU-0 if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_AFFINITY)) != NULL) { diff --git a/src/vma/util/sys_vars.h b/src/vma/util/sys_vars.h index f0357b19f3..e4defc9097 100644 --- a/src/vma/util/sys_vars.h +++ b/src/vma/util/sys_vars.h @@ -296,6 +296,25 @@ struct mce_sys_var { HYPER_VMWARE }; + enum arm_cq_t { + ARM_CQ_OFF = 0, + ARM_CQ_RX = 0x01, + ARM_CQ_TX = 0x02, + ARM_CQ_ALL = 0x03 + } ; + + inline const char* arm_cq_str(arm_cq_t value) + { + switch (value) { + case ARM_CQ_OFF: return "(Disabled)"; + case ARM_CQ_RX: return "(Arm RX CQ)"; + case ARM_CQ_TX: return "(Arm TX CQ)"; + case ARM_CQ_ALL: return "(Arm All)"; + default: break; + } + return "unsupported"; + } + public: void get_env_params(); @@ -406,7 +425,7 @@ struct mce_sys_var { char internal_thread_cpuset[FILENAME_MAX]; char internal_thread_affinity_str[FILENAME_MAX]; cpu_set_t internal_thread_affinity; - bool internal_thread_arm_cq_enabled; + int internal_thread_arm_cq; internal_thread_tcp_timer_handling_t internal_thread_tcp_timer_handling; bool handle_bf; @@ -647,7 +666,7 @@ extern mce_sys_var & safe_mce_sys(); #define MCE_DEFAULT_PROGRESS_ENGINE_WCE_MAX (10000) #define MCE_DEFAULT_CQ_KEEP_QP_FULL (true) #define MCE_DEFAULT_QP_COMPENSATION_LEVEL (256) -#define MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ_ENABLED (false) +#define MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ (mce_sys_var::ARM_CQ_OFF) #define MCE_DEFAULT_QP_FORCE_MC_ATTACH (false) #define MCE_DEFAULT_OFFLOADED_SOCKETS (true) #define MCE_DEFAULT_TIMER_RESOLUTION_MSEC (10)