Skip to content

Commit

Permalink
Add event framework based on epoll, kqueue and select
Browse files Browse the repository at this point in the history
Signed-off-by: Jeroen Koekkoek <[email protected]>
  • Loading branch information
k0ekk0ek committed Jan 27, 2022
1 parent 72ed95f commit 35c8bd4
Show file tree
Hide file tree
Showing 31 changed files with 3,093 additions and 1,410 deletions.
3 changes: 1 addition & 2 deletions src/core/ddsi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ set(srcs_ddsi
q_qosmatch.c
q_radmin.c
q_receive.c
q_sockwaitset.c
q_thread.c
q_transmit.c
q_inverse_uint32_set.c
Expand Down Expand Up @@ -152,14 +151,14 @@ set(hdrs_private_ddsi
q_radmin.h
q_receive.h
q_rtps.h
q_sockwaitset.h
q_thread.h
q_transmit.h
q_inverse_uint32_set.h
q_unused.h
q_whc.h
q_xevent.h
q_xmsg.h
q_receive.h
sysdeps.h)

if(ENABLE_LIFESPAN)
Expand Down
4 changes: 2 additions & 2 deletions src/core/ddsi/include/dds/ddsi/ddsi_domaingv.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
#include "dds/ddsrt/sync.h"
#include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsrt/event.h"

#include "dds/ddsi/ddsi_plist.h"
#include "dds/ddsi/ddsi_ownip.h"
#include "dds/ddsi/q_protocol.h"
#include "dds/ddsi/q_sockwaitset.h"
#include "dds/ddsi/q_config.h"

#if defined (__cplusplus)
Expand Down Expand Up @@ -80,7 +80,7 @@ struct recv_thread_arg {
struct ddsi_tran_conn *conn;
} single;
struct {
os_sockWaitset ws;
ddsrt_loop_t loop;
} many;
} u;
};
Expand Down
12 changes: 12 additions & 0 deletions src/core/ddsi/include/dds/ddsi/ddsi_tran.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

/* DDSI Transport module */

#include "dds/ddsrt/event.h"
#include "dds/ddsrt/ifaddrs.h"
#include "dds/ddsrt/atomics.h"
#include "dds/ddsi/ddsi_locator.h"
Expand Down Expand Up @@ -50,6 +51,7 @@ typedef ssize_t (*ddsi_tran_write_fn_t) (ddsi_tran_conn_t, const ddsi_locator_t
typedef int (*ddsi_tran_locator_fn_t) (ddsi_tran_factory_t, ddsi_tran_base_t, ddsi_locator_t *);
typedef bool (*ddsi_tran_supports_fn_t) (const struct ddsi_tran_factory *, int32_t);
typedef ddsrt_socket_t (*ddsi_tran_handle_fn_t) (ddsi_tran_base_t);
typedef ddsrt_event_t *(*ddsi_tran_event_fn_t) (ddsi_tran_base_t);
typedef int (*ddsi_tran_listen_fn_t) (ddsi_tran_listener_t);
typedef void (*ddsi_tran_free_fn_t) (ddsi_tran_factory_t);
typedef void (*ddsi_tran_peer_locator_fn_t) (ddsi_tran_conn_t, ddsi_locator_t *);
Expand Down Expand Up @@ -104,6 +106,7 @@ struct ddsi_tran_base
/* Functions */

ddsi_tran_handle_fn_t m_handle_fn;
ddsi_tran_event_fn_t m_event_fn;
};

struct ddsi_tran_conn
Expand Down Expand Up @@ -262,9 +265,15 @@ void ddsi_tran_free (ddsi_tran_base_t base);
DDS_INLINE_EXPORT inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base) {
return base->m_handle_fn (base);
}
DDS_INLINE_EXPORT inline ddsrt_event_t *ddsi_tran_event (ddsi_tran_base_t base) {
return base->m_event_fn (base);
}
DDS_INLINE_EXPORT inline ddsrt_socket_t ddsi_conn_handle (ddsi_tran_conn_t conn) {
return conn->m_base.m_handle_fn (&conn->m_base);
}
DDS_INLINE_EXPORT inline ddsrt_event_t *ddsi_conn_event (ddsi_tran_conn_t conn) {
return conn->m_base.m_event_fn (&conn->m_base);
}
DDS_INLINE_EXPORT inline uint32_t ddsi_conn_type (const struct ddsi_tran_conn *conn) {
return conn->m_base.m_trantype;
}
Expand Down Expand Up @@ -326,6 +335,9 @@ DDS_INLINE_EXPORT inline int ddsi_listener_listen (ddsi_tran_listener_t listener
DDS_INLINE_EXPORT inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener) {
return listener->m_accept_fn (listener);
}
DDS_INLINE_EXPORT inline ddsrt_event_t *ddsi_listener_event (ddsi_tran_listener_t listener) {
return listener->m_base.m_event_fn (&listener->m_base);
}
void ddsi_listener_unblock (ddsi_tran_listener_t listener);
void ddsi_listener_free (ddsi_tran_listener_t listener);

Expand Down
110 changes: 0 additions & 110 deletions src/core/ddsi/include/dds/ddsi/q_sockwaitset.h

This file was deleted.

58 changes: 46 additions & 12 deletions src/core/ddsi/src/ddsi_raweth.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/log.h"
#include "dds/ddsrt/sockets.h"
#include "dds/ddsi/q_entity.h"
#include "q_receive.h"

#if defined(__linux) && !LWIP_SOCKET
#include <linux/if_packet.h>
Expand All @@ -33,7 +35,7 @@

typedef struct ddsi_raweth_conn {
struct ddsi_tran_conn m_base;
ddsrt_socket_t m_sock;
ddsrt_event_t m_event;
int m_ifindex;
} *ddsi_raweth_conn_t;

Expand Down Expand Up @@ -72,7 +74,7 @@ static ssize_t ddsi_raweth_conn_read (ddsi_tran_conn_t conn, unsigned char * buf
msghdr.msg_iovlen = 1;

do {
rc = ddsrt_recvmsg(((ddsi_raweth_conn_t) conn)->m_sock, &msghdr, 0, &ret);
rc = ddsrt_recvmsg(((ddsi_raweth_conn_t) conn)->m_event.source.socket.socketfd, &msghdr, 0, &ret);
} while (rc == DDS_RETCODE_INTERRUPTED);

if (ret > 0)
Expand Down Expand Up @@ -103,7 +105,7 @@ static ssize_t ddsi_raweth_conn_read (ddsi_tran_conn_t conn, unsigned char * buf
rc != DDS_RETCODE_BAD_PARAMETER &&
rc != DDS_RETCODE_NO_CONNECTION)
{
DDS_CERROR(&conn->m_base.gv->logconfig, "UDP recvmsg sock %d: ret %d retcode %d\n", (int) ((ddsi_raweth_conn_t) conn)->m_sock, (int) ret, rc);
DDS_CERROR(&conn->m_base.gv->logconfig, "UDP recvmsg sock %d: ret %d retcode %d\n", (int) ((ddsi_raweth_conn_t) conn)->m_event.source.socket.socketfd, (int) ret, rc);
}
return ret;
}
Expand Down Expand Up @@ -134,7 +136,7 @@ static ssize_t ddsi_raweth_conn_write (ddsi_tran_conn_t conn, const ddsi_locator
sendflags |= MSG_NOSIGNAL;
#endif
do {
rc = ddsrt_sendmsg (uc->m_sock, &msg, sendflags, &ret);
rc = ddsrt_sendmsg (uc->m_event.source.socket.socketfd, &msg, sendflags, &ret);
} while ((rc == DDS_RETCODE_INTERRUPTED) ||
(rc == DDS_RETCODE_TRY_AGAIN) ||
(rc == DDS_RETCODE_NOT_ALLOWED && retry-- > 0));
Expand All @@ -148,9 +150,36 @@ static ssize_t ddsi_raweth_conn_write (ddsi_tran_conn_t conn, const ddsi_locator
return (rc == DDS_RETCODE_OK ? ret : -1);
}

static dds_return_t ddsi_raweth_read_callback(ddsrt_event_t *event, uint32_t flags, const void *data, void *user_data)
{
struct ddsi_raweth_conn *conn = (ddsi_raweth_conn_t)((uintptr_t)event - offsetof(struct ddsi_raweth_conn, m_event));
struct recv_thread *recv = user_data;

if (!(flags & DDSRT_READ))
return DDS_RETCODE_OK;

(void) data;
assert (conn);
assert (recv);

const ddsi_guid_prefix_t *guid_prefix;
if (event->user_data)
guid_prefix = (ddsi_guid_prefix_t *)&((struct participant *)user_data)->e.guid.prefix;
else
guid_prefix = NULL;

do_packet (recv->ts, recv->arg.gv, (ddsi_tran_conn_t)conn, guid_prefix, recv->arg.rbpool);
return DDS_RETCODE_OK;
}

static ddsrt_socket_t ddsi_raweth_conn_handle (ddsi_tran_base_t base)
{
return ((ddsi_raweth_conn_t) base)->m_sock;
return ((ddsi_raweth_conn_t) base)->m_event.source.socket.socketfd;
}

static ddsrt_event_t *ddsi_raweth_conn_event (ddsi_tran_base_t base)
{
return &((ddsi_raweth_conn_t) base)->m_event;
}

static bool ddsi_raweth_supports (const struct ddsi_tran_factory *fact, int32_t kind)
Expand All @@ -164,7 +193,7 @@ static int ddsi_raweth_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t
ddsi_raweth_conn_t uc = (ddsi_raweth_conn_t) base;
int ret = -1;
(void) fact;
if (uc->m_sock != DDSRT_INVALID_SOCKET)
if (uc->m_event.source.socket.socketfd != DDSRT_INVALID_SOCKET)
{
loc->kind = NN_LOCATOR_KIND_RAWETH;
loc->port = uc->m_base.m_base.m_port;
Expand Down Expand Up @@ -219,19 +248,24 @@ static dds_return_t ddsi_raweth_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tr
}

memset (uc, 0, sizeof (*uc));
uc->m_sock = sock;
uc->m_event.flags = DDSRT_READ;
uc->m_event.loop = NULL;
uc->m_event.callback = ddsi_raweth_read_callback;
uc->m_event.user_data = NULL;
uc->m_event.source.socket.socketfd = sock;
uc->m_ifindex = addr.sll_ifindex;
ddsi_factory_conn_init (fact, intf, &uc->m_base);
uc->m_base.m_base.m_port = port;
uc->m_base.m_base.m_trantype = DDSI_TRAN_CONN;
uc->m_base.m_base.m_multicast = mcast;
uc->m_base.m_base.m_handle_fn = ddsi_raweth_conn_handle;
uc->m_base.m_base.m_event_fn = ddsi_raweth_conn_event;
uc->m_base.m_locator_fn = ddsi_raweth_conn_locator;
uc->m_base.m_read_fn = ddsi_raweth_conn_read;
uc->m_base.m_write_fn = ddsi_raweth_conn_write;
uc->m_base.m_disable_multiplexing_fn = 0;

DDS_CTRACE (&fact->gv->logconfig, "ddsi_raweth_create_conn %s socket %d port %u\n", mcast ? "multicast" : "unicast", uc->m_sock, uc->m_base.m_base.m_port);
DDS_CTRACE (&fact->gv->logconfig, "ddsi_raweth_create_conn %s socket %d port %u\n", mcast ? "multicast" : "unicast", uc->m_event.source.socket.socketfd, uc->m_base.m_base.m_port);
*conn_out = &uc->m_base;
return DDS_RETCODE_OK;
}
Expand Down Expand Up @@ -265,7 +299,7 @@ static int ddsi_raweth_join_mc (ddsi_tran_conn_t conn, const ddsi_locator_t *src
{
ddsi_raweth_conn_t uc = (ddsi_raweth_conn_t) conn;
(void)srcloc;
return joinleave_asm_mcgroup(uc->m_sock, 1, mcloc, interf);
return joinleave_asm_mcgroup(uc->m_event.source.socket.socketfd, 1, mcloc, interf);
}
}

Expand All @@ -277,7 +311,7 @@ static int ddsi_raweth_leave_mc (ddsi_tran_conn_t conn, const ddsi_locator_t *sr
{
ddsi_raweth_conn_t uc = (ddsi_raweth_conn_t) conn;
(void)srcloc;
return joinleave_asm_mcgroup(uc->m_sock, 0, mcloc, interf);
return joinleave_asm_mcgroup(uc->m_event.source.socket.socketfd, 0, mcloc, interf);
}
}

Expand All @@ -287,9 +321,9 @@ static void ddsi_raweth_release_conn (ddsi_tran_conn_t conn)
DDS_CTRACE (&conn->m_base.gv->logconfig,
"ddsi_raweth_release_conn %s socket %d port %d\n",
conn->m_base.m_multicast ? "multicast" : "unicast",
uc->m_sock,
uc->m_event.source.socket.socketfd,
uc->m_base.m_base.m_port);
ddsrt_close (uc->m_sock);
ddsrt_close (uc->m_event.source.socket.socketfd);
ddsrt_free (conn);
}

Expand Down
Loading

0 comments on commit 35c8bd4

Please sign in to comment.