diff --git a/src/core/ddsi/src/ddsi__tran.h b/src/core/ddsi/src/ddsi__tran.h index cef8837cd3..c23fbee0f8 100644 --- a/src/core/ddsi/src/ddsi__tran.h +++ b/src/core/ddsi/src/ddsi__tran.h @@ -116,7 +116,6 @@ typedef int (*ddsi_is_loopbackaddr_fn_t) (const struct ddsi_tran_factory *tran, typedef int (*ddsi_is_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const ddsi_locator_t *loc); typedef int (*ddsi_is_ssm_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const ddsi_locator_t *loc); typedef int (*ddsi_is_valid_port_fn_t) (const struct ddsi_tran_factory *tran, uint32_t port); -typedef uint32_t (*ddsi_receive_buffer_size_fn_t) (const struct ddsi_tran_factory *fact); typedef uint32_t (*m_get_locator_port_fn_t) (const struct ddsi_tran_factory *factory, const ddsi_locator_t *loc); typedef void (*m_set_locator_port_fn_t) (const struct ddsi_tran_factory *factory, ddsi_locator_t *loc, uint32_t port); typedef uint32_t (*m_get_locator_aux_fn_t) (const struct ddsi_tran_factory *factory, const ddsi_locator_t *loc); @@ -225,7 +224,6 @@ struct ddsi_tran_factory ddsi_locator_to_string_fn_t m_locator_to_string_fn; ddsi_enumerate_interfaces_fn_t m_enumerate_interfaces_fn; ddsi_is_valid_port_fn_t m_is_valid_port_fn; - ddsi_receive_buffer_size_fn_t m_receive_buffer_size_fn; ddsi_locator_from_sockaddr_fn_t m_locator_from_sockaddr_fn; m_get_locator_port_fn_t m_get_locator_port_fn; m_set_locator_port_fn_t m_set_locator_port_fn; @@ -262,6 +260,10 @@ struct ddsi_tran_factory /// no default address exists. const char *m_default_spdp_address; + /// Actual minimum receive buffer size in use + /// Atomically loaded/stored so we don't have to lie about constness + ddsrt_atomic_uint32_t m_receive_buf_size; + struct ddsi_domaingv *gv; /* Relationships */ @@ -330,7 +332,7 @@ inline int ddsi_is_valid_port (const struct ddsi_tran_factory *factory, uint32_t /** @component transport */ inline uint32_t ddsi_receive_buffer_size (const struct ddsi_tran_factory *factory) { - return factory->m_receive_buffer_size_fn (factory); + return ddsrt_atomic_ld32 (&factory->m_receive_buf_size); } /** @component transport */ @@ -452,6 +454,12 @@ void ddsi_listener_unblock (struct ddsi_tran_listener * listener); /** @component transport */ void ddsi_listener_free (struct ddsi_tran_listener * listener); +/** @component transport */ +dds_return_t ddsi_tran_set_rcvbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size); + +/** @component transport */ +dds_return_t ddsi_tran_set_sndbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size); + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsi/src/ddsi_raweth.c b/src/core/ddsi/src/ddsi_raweth.c index a745d747ea..6caeeedde4 100644 --- a/src/core/ddsi/src/ddsi_raweth.c +++ b/src/core/ddsi/src/ddsi_raweth.c @@ -37,7 +37,6 @@ typedef struct ddsi_raweth_conn { int m_ifindex; } *ddsi_raweth_conn_t; - struct ddsi_ethernet_header { unsigned char dmac[ETH_ALEN]; unsigned char smac[ETH_ALEN]; @@ -304,6 +303,18 @@ static dds_return_t ddsi_raweth_create_conn (struct ddsi_tran_conn **conn_out, s return DDS_RETCODE_ERROR; } + if ((rc = ddsi_tran_set_rcvbuf(fact, sock, &gv->config.socket_rcvbuf_size, 1048576)) < 0) + { + ddsrt_close(sock); + return DDS_RETCODE_ERROR; + } + + if ((rc = ddsi_tran_set_sndbuf(fact, sock, &gv->config.socket_sndbuf_size, 65536)) < 0) + { + ddsrt_close(sock); + return DDS_RETCODE_ERROR; + } + memset(&addr, 0, sizeof(addr)); addr.sll_family = AF_PACKET; addr.sll_protocol = htons(ETH_P_ALL); @@ -529,12 +540,6 @@ static int ddsi_raweth_is_valid_port (const struct ddsi_tran_factory *fact, uint return (eport >= 1 && eport <= 65535) && (vlanid < 4095) && vlancfi == 0; } -static uint32_t ddsi_raweth_receive_buffer_size (const struct ddsi_tran_factory *fact) -{ - (void) fact; - return 0; -} - static int ddsi_raweth_locator_from_sockaddr (const struct ddsi_tran_factory *tran, ddsi_locator_t *loc, const struct sockaddr *sockaddr) { (void) tran; @@ -598,17 +603,19 @@ int ddsi_raweth_init (struct ddsi_domaingv *gv) fact->m_locator_to_string_fn = ddsi_raweth_to_string; fact->m_enumerate_interfaces_fn = ddsi_raweth_enumerate_interfaces; fact->m_is_valid_port_fn = ddsi_raweth_is_valid_port; - fact->m_receive_buffer_size_fn = ddsi_raweth_receive_buffer_size; fact->m_locator_from_sockaddr_fn = ddsi_raweth_locator_from_sockaddr; fact->m_get_locator_port_fn = ddsi_raweth_get_locator_port; fact->m_set_locator_port_fn = ddsi_raweth_set_locator_port; fact->m_get_locator_aux_fn = ddsi_raweth_get_locator_aux; fact->m_set_locator_aux_fn = ddsi_raweth_set_locator_aux; + ddsrt_atomic_st32 (&fact->m_receive_buf_size, UINT32_MAX); + ddsi_factory_add (gv, fact); GVLOG (DDS_LC_CONFIG, "raweth initialized\n"); return 0; } + #else int ddsi_raweth_init (struct ddsi_domaingv *gv) { (void) gv; return 0; } diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c index 37e28408d9..034b5a9b9b 100644 --- a/src/core/ddsi/src/ddsi_tcp.c +++ b/src/core/ddsi/src/ddsi_tcp.c @@ -1182,12 +1182,6 @@ static int ddsi_tcp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_ return (0 < port && port <= 65535); } -static uint32_t ddsi_tcp_receive_buffer_size (const struct ddsi_tran_factory *fact) -{ - (void) fact; - return 0; -} - static char *ddsi_tcp_locator_to_string (char *dst, size_t sizeof_dst, const ddsi_locator_t *loc, struct ddsi_tran_conn * conn, int with_port) { (void) conn; @@ -1240,9 +1234,10 @@ int ddsi_tcp_init (struct ddsi_domaingv *gv) fact->fact.m_is_ssm_mcaddr_fn = ddsi_tcp_is_ssm_mcaddr; fact->fact.m_is_nearby_address_fn = ddsi_tcp_is_nearby_address; fact->fact.m_is_valid_port_fn = ddsi_tcp_is_valid_port; - fact->fact.m_receive_buffer_size_fn = ddsi_tcp_receive_buffer_size; fact->fact.m_locator_from_sockaddr_fn = ddsi_tcp_locator_from_sockaddr; + ddsrt_atomic_st32 (&fact->fact.m_receive_buf_size, 0); + #if DDSRT_HAVE_IPV6 if (gv->config.transport_selector == DDSI_TRANS_TCP6) { diff --git a/src/core/ddsi/src/ddsi_tran.c b/src/core/ddsi/src/ddsi_tran.c index 52de6b88e6..e10f32256a 100644 --- a/src/core/ddsi/src/ddsi_tran.c +++ b/src/core/ddsi/src/ddsi_tran.c @@ -16,6 +16,7 @@ #include "dds/ddsrt/heap.h" #include "dds/ddsrt/string.h" #include "dds/ddsrt/ifaddrs.h" +#include "dds/ddsrt/sockets.h" #include "dds/ddsi/ddsi_log.h" #include "dds/ddsi/ddsi_domaingv.h" #include "ddsi__tran.h" @@ -407,3 +408,121 @@ int ddsi_enumerate_interfaces (struct ddsi_tran_factory * factory, enum ddsi_tra { return factory->m_enumerate_interfaces_fn (factory, transport_selector, interfs); } + + + + + + + + + + + + + + + + + + +static dds_return_t set_socket_buffer (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, int32_t socket_option, const char *socket_option_name, const char *name, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size) +{ + // if (min, max)= and initbuf= then request= and result= + // (def, def) < defmin defmin whatever it is + // (def, N) anything N whatever it is + // (M, def) < M M error if < M + // (M, N=M) anything N error if < M + // defmin = 1MB for receive buffer, 0B for send buffer + const bool always_set_size = // whether to call setsockopt unconditionally + ((config->min.isdefault && !config->max.isdefault) || + (!config->min.isdefault && !config->max.isdefault && config->max.value >= config->min.value)); + const uint32_t socket_min_buf_size = // error if it ends up below this + !config->min.isdefault ? config->min.value : 0; + const uint32_t socket_req_buf_size = // size to request + (!config->max.isdefault && config->max.value > socket_min_buf_size) ? config->max.value + : !config->min.isdefault ? config->min.value + : default_min_size; + + uint32_t actsize; + socklen_t optlen = (socklen_t) sizeof (actsize); + dds_return_t rc; + + rc = ddsrt_getsockopt (sock, SOL_SOCKET, socket_option, &actsize, &optlen); + if (rc == DDS_RETCODE_BAD_PARAMETER || rc == DDS_RETCODE_UNSUPPORTED) + { + /* not all stacks support getting/setting RCVBUF */ + GVLOG (DDS_LC_CONFIG, "cannot retrieve socket %s buffer size\n", name); + return DDS_RETCODE_OK; + } + else if (rc != DDS_RETCODE_OK) + { + GVERROR ("ddsi_udp_create_conn: get %s failed: %s\n", socket_option_name, dds_strretcode (rc)); + return rc; + } + + if (always_set_size || actsize < socket_req_buf_size) + { + (void) ddsrt_setsockopt (sock, SOL_SOCKET, socket_option, &socket_req_buf_size, sizeof (actsize)); + + /* We don't check the return code from setsockopt, because some O/Ss tend + to silently cap the buffer size. The only way to make sure is to read + the option value back and check it is now set correctly. */ + if ((rc = ddsrt_getsockopt (sock, SOL_SOCKET, socket_option, &actsize, &optlen)) != DDS_RETCODE_OK) + { + GVERROR ("ddsi_udp_create_conn: get %s failed: %s\n", socket_option_name, dds_strretcode (rc)); + return rc; + } + + if (actsize >= socket_req_buf_size) + GVLOG (DDS_LC_CONFIG, "socket %s buffer size set to %"PRIu32" bytes\n", name, actsize); + else if (actsize >= socket_min_buf_size) + GVLOG (DDS_LC_CONFIG, + "failed to increase socket %s buffer size to %"PRIu32" bytes, continuing with %"PRIu32" bytes\n", + name, socket_req_buf_size, actsize); + else + { + /* If the configuration states it must be >= X, then error out if the + kernel doesn't give us at least X */ + GVLOG (DDS_LC_CONFIG | DDS_LC_ERROR, + "failed to increase socket %s buffer size to at least %"PRIu32" bytes, current is %"PRIu32" bytes\n", + name, socket_min_buf_size, actsize); + rc = DDS_RETCODE_NOT_ENOUGH_SPACE; + } + } + + return (rc < 0) ? rc : (actsize > (uint32_t) INT32_MAX) ? INT32_MAX : (int32_t) actsize; +} + +dds_return_t ddsi_tran_set_rcvbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size) +{ + dds_return_t rc; + + if ((rc = set_socket_buffer (fact->gv, sock, SO_RCVBUF, "SO_RCVBUF", "receive", config, default_min_size)) < 0) + return rc; + + if (rc > 0) { + // set fact->receive_buf_size to the smallest observed value + uint32_t old; + do { + old = ddsrt_atomic_ld32 (&fact->m_receive_buf_size); + if ((uint32_t) rc >= old) + break; + } while (!ddsrt_atomic_cas32 (&fact->m_receive_buf_size, old, (uint32_t) rc)); + } + return rc; +} + +dds_return_t ddsi_tran_set_sndbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size) +{ + return set_socket_buffer (fact->gv, sock, SO_SNDBUF, "SO_SNDBUF", "send", config, default_min_size); +} + + + + + + + + diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c index ced1444252..278039520a 100644 --- a/src/core/ddsi/src/ddsi_udp.c +++ b/src/core/ddsi/src/ddsi_udp.c @@ -281,6 +281,7 @@ static dds_return_t set_dont_route (struct ddsi_domaingv const * const gv, ddsrt return rc; } +#if 0 static dds_return_t set_socket_buffer (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, int32_t socket_option, const char *socket_option_name, const char *name, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size) { // if (min, max)= and initbuf= then request= and result= @@ -359,6 +360,7 @@ static dds_return_t set_sndbuf (struct ddsi_domaingv const * const gv, ddsrt_soc { return set_socket_buffer (gv, sock, SO_SNDBUF, "SO_SNDBUF", "send", config, 65536); } +#endif static dds_return_t set_mc_options_transmit_ipv6 (struct ddsi_domaingv const * const gv, struct ddsi_network_interface const * const intf, ddsrt_socket_t sock) { @@ -513,20 +515,12 @@ static dds_return_t ddsi_udp_create_conn (struct ddsi_tran_conn **conn_out, stru } } - if ((rc = set_rcvbuf (gv, sock, &gv->config.socket_rcvbuf_size)) < 0) + if ((rc = ddsi_tran_set_rcvbuf(fact_cmn, sock, &gv->config.socket_rcvbuf_size, 1048576)) < 0) goto fail_w_socket; - if (rc > 0) { - // set fact->receive_buf_size to the smallest observed value - uint32_t old; - do { - old = ddsrt_atomic_ld32 (&fact->receive_buf_size); - if ((uint32_t) rc >= old) - break; - } while (!ddsrt_atomic_cas32 (&fact->receive_buf_size, old, (uint32_t) rc)); - } - if (set_sndbuf (gv, sock, &gv->config.socket_sndbuf_size) < 0) - goto fail_w_socket; + if ((rc = ddsi_tran_set_sndbuf(fact_cmn, sock, &gv->config.socket_sndbuf_size, 65536)) < 0) + goto fail_w_socket; + if (gv->config.dontRoute && set_dont_route (gv, sock, ipv6) != DDS_RETCODE_OK) goto fail_w_socket; @@ -865,12 +859,6 @@ static int ddsi_udp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_ return (0 < port && port <= 65535); } -static uint32_t ddsi_udp_receive_buffer_size (const struct ddsi_tran_factory *fact_cmn) -{ - const struct ddsi_udp_tran_factory *fact = (const struct ddsi_udp_tran_factory *) fact_cmn; - return ddsrt_atomic_ld32 (&fact->receive_buf_size); -} - static int ddsi_udp_locator_from_sockaddr (const struct ddsi_tran_factory *tran_cmn, ddsi_locator_t *loc, const struct sockaddr *sockaddr) { struct ddsi_udp_tran_factory const * const tran = (const struct ddsi_udp_tran_factory *) tran_cmn; @@ -915,7 +903,6 @@ int ddsi_udp_init (struct ddsi_domaingv*gv) fact->fact.m_locator_to_string_fn = ddsi_udp_locator_to_string; fact->fact.m_enumerate_interfaces_fn = ddsi_eth_enumerate_interfaces; fact->fact.m_is_valid_port_fn = ddsi_udp_is_valid_port; - fact->fact.m_receive_buffer_size_fn = ddsi_udp_receive_buffer_size; fact->fact.m_locator_from_sockaddr_fn = ddsi_udp_locator_from_sockaddr; #if DDSRT_HAVE_IPV6 if (gv->config.transport_selector == DDSI_TRANS_UDP6) @@ -925,7 +912,7 @@ int ddsi_udp_init (struct ddsi_domaingv*gv) fact->fact.m_default_spdp_address = "udp6/ff02::ffff:239.255.0.1"; } #endif - ddsrt_atomic_st32 (&fact->receive_buf_size, UINT32_MAX); + ddsrt_atomic_st32 (&fact->fact.m_receive_buf_size, UINT32_MAX); ddsi_factory_add (gv, &fact->fact); GVLOG (DDS_LC_CONFIG, "udp initialized\n"); diff --git a/src/core/ddsi/src/ddsi_vnet.c b/src/core/ddsi/src/ddsi_vnet.c index 78a32be515..9fc4b54f59 100644 --- a/src/core/ddsi/src/ddsi_vnet.c +++ b/src/core/ddsi/src/ddsi_vnet.c @@ -176,12 +176,6 @@ static int ddsi_vnet_is_valid_port (const struct ddsi_tran_factory *fact, uint32 return true; } -static uint32_t ddsi_vnet_receive_buffer_size (const struct ddsi_tran_factory *fact) -{ - (void) fact; - return 0; -} - static int ddsi_vnet_locator_from_sockaddr (const struct ddsi_tran_factory *tran, ddsi_locator_t *loc, const struct sockaddr *sockaddr) { (void) sockaddr; @@ -224,8 +218,9 @@ int ddsi_vnet_init (struct ddsi_domaingv *gv, const char *name, int32_t locator_ fact->m_base.m_locator_to_string_fn = ddsi_vnet_to_string; fact->m_base.m_enumerate_interfaces_fn = ddsi_vnet_enumerate_interfaces; fact->m_base.m_is_valid_port_fn = ddsi_vnet_is_valid_port; - fact->m_base.m_receive_buffer_size_fn = ddsi_vnet_receive_buffer_size; fact->m_base.m_locator_from_sockaddr_fn = ddsi_vnet_locator_from_sockaddr; + ddsrt_atomic_st32 (&fact->m_base.m_receive_buf_size, 0); + ddsi_factory_add (gv, &fact->m_base); GVLOG (DDS_LC_CONFIG, "vnet %s initialized\n", name); return 0;