Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage API - listen sockets support #218

Open
wants to merge 5 commits into
base: vNext
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/core/event/event_handler_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,21 @@ void event_handler_manager::unregister_timers_event_and_delete(timer_handler *ha
post_new_reg_action(reg_action);
}

void event_handler_manager::unregister_socket_timer_and_delete(sockinfo_tcp *sock_tcp)
void event_handler_manager::unregister_socket_timer_event(sockinfo_tcp *sock_tcp)
{
evh_logdbg("Unregistering TCP socket timer: %p", sock_tcp);
reg_action_t reg_action;
memset(&reg_action, 0, sizeof(reg_action));
reg_action.type = UNREGISTER_TCP_SOCKET_TIMER;
reg_action.info.timer.user_data = sock_tcp;
post_new_reg_action(reg_action);
}

void event_handler_manager::unregister_socket_timer_and_delete(sockinfo_tcp *sock_tcp)
{
evh_logdbg("Unregistering TCP socket timer and destroying: %p", sock_tcp);
reg_action_t reg_action;
memset(&reg_action, 0, sizeof(reg_action));
reg_action.type = UNREGISTER_TCP_SOCKET_TIMER_AND_DELETE;
reg_action.info.timer.user_data = sock_tcp;
post_new_reg_action(reg_action);
Expand Down Expand Up @@ -449,6 +459,8 @@ const char *event_handler_manager::reg_action_str(event_action_type_e reg_action
switch (reg_action_type) {
case REGISTER_TCP_SOCKET_TIMER:
return "REGISTER_TCP_SOCKET_TIMER";
case UNREGISTER_TCP_SOCKET_TIMER:
return "UNREGISTER_TCP_SOCKET_TIMER";
case UNREGISTER_TCP_SOCKET_TIMER_AND_DELETE:
return "UNREGISTER_TCP_SOCKET_TIMER_AND_DELETE";
case REGISTER_TIMER:
Expand Down Expand Up @@ -731,6 +743,10 @@ void event_handler_manager::handle_registration_action(reg_action_t &reg_action)
sock = reinterpret_cast<sockinfo_tcp *>(reg_action.info.timer.user_data);
sock->get_tcp_timer_collection()->add_new_timer(sock);
break;
case UNREGISTER_TCP_SOCKET_TIMER:
sock = reinterpret_cast<sockinfo_tcp *>(reg_action.info.timer.user_data);
sock->get_tcp_timer_collection()->remove_timer(sock);
break;
case UNREGISTER_TCP_SOCKET_TIMER_AND_DELETE:
sock = reinterpret_cast<sockinfo_tcp *>(reg_action.info.timer.user_data);
sock->get_tcp_timer_collection()->remove_timer(sock);
Expand Down
2 changes: 2 additions & 0 deletions src/core/event/event_handler_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef std::map<void * /*event_handler_id*/, event_handler_rdma_cm * /*p_event_

typedef enum {
REGISTER_TCP_SOCKET_TIMER,
UNREGISTER_TCP_SOCKET_TIMER,
UNREGISTER_TCP_SOCKET_TIMER_AND_DELETE,
REGISTER_TIMER,
WAKEUP_TIMER, /* NOT AVAILABLE FOR GROUPED TIMERS */
Expand Down Expand Up @@ -195,6 +196,7 @@ class event_handler_manager : public wakeup_pipe {
void unregister_timers_event_and_delete(timer_handler *handler);

void register_socket_timer_event(sockinfo_tcp *sock_tcp);
void unregister_socket_timer_event(sockinfo_tcp *sock_tcp);
void unregister_socket_timer_and_delete(sockinfo_tcp *sock_tcp);

void register_ibverbs_event(int fd, event_handler_ibverbs *handler, void *channel,
Expand Down
8 changes: 7 additions & 1 deletion src/core/event/poll_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ poll_group::poll_group(const struct xlio_poll_group_attr *attr)
: m_socket_event_cb(attr->socket_event_cb)
, m_socket_comp_cb(attr->socket_comp_cb)
, m_socket_rx_cb(attr->socket_rx_cb)
, m_socket_accept_cb(attr->socket_accept_cb)
, m_group_flags(attr->flags)
{
/*
Expand Down Expand Up @@ -180,7 +181,7 @@ void poll_group::add_socket(sockinfo_tcp *si)
g_p_fd_collection->set_socket(si->get_fd(), si);
}

void poll_group::close_socket(sockinfo_tcp *si, bool force /*=false*/)
void poll_group::remove_socket(sockinfo_tcp *si)
{
g_p_fd_collection->clear_socket(si->get_fd());
m_sockets_list.erase(si);
Expand All @@ -189,6 +190,11 @@ void poll_group::close_socket(sockinfo_tcp *si, bool force /*=false*/)
if (iter != std::end(m_dirty_sockets)) {
m_dirty_sockets.erase(iter);
}
}

void poll_group::close_socket(sockinfo_tcp *si, bool force /*=false*/)
{
remove_socket(si);

bool closed = si->prepare_to_close(force);
if (closed) {
Expand Down
2 changes: 2 additions & 0 deletions src/core/event/poll_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class poll_group {
void add_ring(ring *rng, ring_alloc_logic_attr *attr);

void add_socket(sockinfo_tcp *si);
void remove_socket(sockinfo_tcp *si);
void close_socket(sockinfo_tcp *si, bool force = false);

unsigned get_flags() const { return m_group_flags; }
Expand All @@ -73,6 +74,7 @@ class poll_group {
xlio_socket_event_cb_t m_socket_event_cb;
xlio_socket_comp_cb_t m_socket_comp_cb;
xlio_socket_rx_cb_t m_socket_rx_cb;
xlio_socket_accept_cb_t m_socket_accept_cb;

private:
std::vector<ring *> m_rings;
Expand Down
2 changes: 2 additions & 0 deletions src/core/lwip/tcp_in.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,10 @@ static void tcp_listen_input(struct tcp_pcb *pcb, tcp_in_data *in_data)
if (in_data->flags & TCP_ACK) {
/* For incoming segments with the ACK flag set, respond with a RST. */
LWIP_DEBUGF(TCP_RST_DEBUG, ("tcp_listen_input: ACK in LISTEN, sending reset\n"));
#if 0
tcp_rst(in_data->ackno + 1, in_data->seqno + in_data->tcplen, in_data->tcphdr->dest,
in_data->tcphdr->src, NULL);
#endif
} else if (in_data->flags & TCP_SYN) {
LWIP_DEBUGF(TCP_DEBUG,
("TCP connection request %" U16_F " -> %" U16_F ".\n", in_data->tcphdr->src,
Expand Down
56 changes: 56 additions & 0 deletions src/core/sock/sock-extra.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,18 @@ struct xlio_api_t *extra_api()
SET_EXTRA_API(xlio_poll_group_poll, xlio_poll_group_poll, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_create, xlio_socket_create, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_destroy, xlio_socket_destroy, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_update, xlio_socket_update, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_setsockopt, xlio_socket_setsockopt, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_getsockname, xlio_socket_getsockname, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_getpeername, xlio_socket_getpeername, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_bind, xlio_socket_bind, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_connect, xlio_socket_connect, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_listen, xlio_socket_listen, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_get_pd, xlio_socket_get_pd, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_detach_group, xlio_socket_detach_group,
XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_attach_group, xlio_socket_attach_group,
XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_send, xlio_socket_send, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_socket_sendv, xlio_socket_sendv, XLIO_EXTRA_API_XLIO_SOCKET);
SET_EXTRA_API(xlio_poll_group_flush, xlio_poll_group_flush, XLIO_EXTRA_API_XLIO_SOCKET);
Expand Down Expand Up @@ -505,6 +513,12 @@ extern "C" int xlio_socket_destroy(xlio_socket_t sock)
return 0;
}

extern "C" int xlio_socket_update(xlio_socket_t sock, unsigned flags, uintptr_t userdata_sq)
{
sockinfo_tcp *si = reinterpret_cast<sockinfo_tcp *>(sock);
return si->update_xlio_socket(flags, userdata_sq);
}

extern "C" int xlio_socket_setsockopt(xlio_socket_t sock, int level, int optname,
const void *optval, socklen_t optlen)
{
Expand All @@ -518,6 +532,20 @@ extern "C" int xlio_socket_setsockopt(xlio_socket_t sock, int level, int optname
return rc;
}

extern "C" int xlio_socket_getsockname(xlio_socket_t sock, struct sockaddr *addr,
socklen_t *addrlen)
{
sockinfo_tcp *si = reinterpret_cast<sockinfo_tcp *>(sock);
return si->getsockname(addr, addrlen);
}

extern "C" int xlio_socket_getpeername(xlio_socket_t sock, struct sockaddr *addr,
socklen_t *addrlen)
{
sockinfo_tcp *si = reinterpret_cast<sockinfo_tcp *>(sock);
return si->getpeername(addr, addrlen);
}

extern "C" int xlio_socket_bind(xlio_socket_t sock, const struct sockaddr *addr, socklen_t addrlen)
{
sockinfo_tcp *si = reinterpret_cast<sockinfo_tcp *>(sock);
Expand All @@ -544,6 +572,19 @@ extern "C" int xlio_socket_connect(xlio_socket_t sock, const struct sockaddr *to
return rc;
}

extern "C" int xlio_socket_listen(xlio_socket_t sock)
{
sockinfo_tcp *si = reinterpret_cast<sockinfo_tcp *>(sock);
poll_group *group = si->get_poll_group();

if (!group->m_socket_accept_cb) {
errno = ENOTCONN;
return -1;
}
// TODO handle positive return code from prepareListen() and convert it to errno
return si->prepareListen() ?: si->listen(-1);
}

extern "C" struct ibv_pd *xlio_socket_get_pd(xlio_socket_t sock)
{
sockinfo_tcp *si = reinterpret_cast<sockinfo_tcp *>(sock);
Expand All @@ -552,6 +593,21 @@ extern "C" struct ibv_pd *xlio_socket_get_pd(xlio_socket_t sock)
return ctx ? ctx->get_ibv_pd() : nullptr;
}

int xlio_socket_detach_group(xlio_socket_t sock)
{
sockinfo_tcp *si = reinterpret_cast<sockinfo_tcp *>(sock);

return si->detach_xlio_group();
}

int xlio_socket_attach_group(xlio_socket_t sock, xlio_poll_group_t group)
{
sockinfo_tcp *si = reinterpret_cast<sockinfo_tcp *>(sock);
poll_group *grp = reinterpret_cast<poll_group *>(group);

return si->attach_xlio_group(grp);
}

static void xlio_buf_free(struct xlio_buf *buf)
{
mem_buf_desc_t *desc = mem_buf_desc_t::from_xlio_buf(buf);
Expand Down
4 changes: 1 addition & 3 deletions src/core/sock/sockinfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -981,9 +981,7 @@ bool sockinfo::attach_receiver(flow_tuple_with_local_if &flow_key)
// Registered as receiver successfully
si_logdbg("Attached %s to ring %p", flow_key.to_str().c_str(), p_nd_resources->p_ring);

/* Verify 5 tuple over 3 tuple
* and replace flow rule with the strongest
*/
// Verify 5 tuple over 3 tuple and replace flow rule with the strongest
if (flow_key.is_5_tuple()) {
// Check and remove lesser 3 tuple
flow_tuple_with_local_if flow_key_3t(
Expand Down
Loading