Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide setting of socket recv buffer size on raw ethernet sockets #1929

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/core/ddsi/src/ddsi__tran.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ typedef int (*ddsi_is_loopbackaddr_fn_t) (const struct ddsi_tran_factory *tran,
typedef int (*ddsi_is_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const ddsi_locator_t *loc);
typedef int (*ddsi_is_ssm_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const ddsi_locator_t *loc);
typedef int (*ddsi_is_valid_port_fn_t) (const struct ddsi_tran_factory *tran, uint32_t port);
typedef uint32_t (*ddsi_receive_buffer_size_fn_t) (const struct ddsi_tran_factory *fact);
typedef uint32_t (*m_get_locator_port_fn_t) (const struct ddsi_tran_factory *factory, const ddsi_locator_t *loc);
typedef void (*m_set_locator_port_fn_t) (const struct ddsi_tran_factory *factory, ddsi_locator_t *loc, uint32_t port);
typedef uint32_t (*m_get_locator_aux_fn_t) (const struct ddsi_tran_factory *factory, const ddsi_locator_t *loc);
Expand Down Expand Up @@ -225,7 +224,6 @@ struct ddsi_tran_factory
ddsi_locator_to_string_fn_t m_locator_to_string_fn;
ddsi_enumerate_interfaces_fn_t m_enumerate_interfaces_fn;
ddsi_is_valid_port_fn_t m_is_valid_port_fn;
ddsi_receive_buffer_size_fn_t m_receive_buffer_size_fn;
ddsi_locator_from_sockaddr_fn_t m_locator_from_sockaddr_fn;
m_get_locator_port_fn_t m_get_locator_port_fn;
m_set_locator_port_fn_t m_set_locator_port_fn;
Expand Down Expand Up @@ -262,6 +260,10 @@ struct ddsi_tran_factory
/// no default address exists.
const char *m_default_spdp_address;

/// Actual minimum receive buffer size in use
/// Atomically loaded/stored so we don't have to lie about constness
ddsrt_atomic_uint32_t m_receive_buf_size;

struct ddsi_domaingv *gv;

/* Relationships */
Expand Down Expand Up @@ -330,7 +332,7 @@ inline int ddsi_is_valid_port (const struct ddsi_tran_factory *factory, uint32_t

/** @component transport */
inline uint32_t ddsi_receive_buffer_size (const struct ddsi_tran_factory *factory) {
return factory->m_receive_buffer_size_fn (factory);
return ddsrt_atomic_ld32 (&factory->m_receive_buf_size);
}

/** @component transport */
Expand Down Expand Up @@ -452,6 +454,12 @@ void ddsi_listener_unblock (struct ddsi_tran_listener * listener);
/** @component transport */
void ddsi_listener_free (struct ddsi_tran_listener * listener);

/** @component transport */
dds_return_t ddsi_tran_set_rcvbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size);

/** @component transport */
dds_return_t ddsi_tran_set_sndbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size);

#if defined (__cplusplus)
}
#endif
Expand Down
23 changes: 15 additions & 8 deletions src/core/ddsi/src/ddsi_raweth.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ typedef struct ddsi_raweth_conn {
int m_ifindex;
} *ddsi_raweth_conn_t;


struct ddsi_ethernet_header {
unsigned char dmac[ETH_ALEN];
unsigned char smac[ETH_ALEN];
Expand Down Expand Up @@ -304,6 +303,18 @@ static dds_return_t ddsi_raweth_create_conn (struct ddsi_tran_conn **conn_out, s
return DDS_RETCODE_ERROR;
}

if ((rc = ddsi_tran_set_rcvbuf(fact, sock, &gv->config.socket_rcvbuf_size, 1048576)) < 0)
{
ddsrt_close(sock);
return DDS_RETCODE_ERROR;
}

if ((rc = ddsi_tran_set_sndbuf(fact, sock, &gv->config.socket_sndbuf_size, 65536)) < 0)
{
ddsrt_close(sock);
return DDS_RETCODE_ERROR;
}

memset(&addr, 0, sizeof(addr));
addr.sll_family = AF_PACKET;
addr.sll_protocol = htons(ETH_P_ALL);
Expand Down Expand Up @@ -529,12 +540,6 @@ static int ddsi_raweth_is_valid_port (const struct ddsi_tran_factory *fact, uint
return (eport >= 1 && eport <= 65535) && (vlanid < 4095) && vlancfi == 0;
}

static uint32_t ddsi_raweth_receive_buffer_size (const struct ddsi_tran_factory *fact)
{
(void) fact;
return 0;
}

static int ddsi_raweth_locator_from_sockaddr (const struct ddsi_tran_factory *tran, ddsi_locator_t *loc, const struct sockaddr *sockaddr)
{
(void) tran;
Expand Down Expand Up @@ -598,17 +603,19 @@ int ddsi_raweth_init (struct ddsi_domaingv *gv)
fact->m_locator_to_string_fn = ddsi_raweth_to_string;
fact->m_enumerate_interfaces_fn = ddsi_raweth_enumerate_interfaces;
fact->m_is_valid_port_fn = ddsi_raweth_is_valid_port;
fact->m_receive_buffer_size_fn = ddsi_raweth_receive_buffer_size;
fact->m_locator_from_sockaddr_fn = ddsi_raweth_locator_from_sockaddr;
fact->m_get_locator_port_fn = ddsi_raweth_get_locator_port;
fact->m_set_locator_port_fn = ddsi_raweth_set_locator_port;
fact->m_get_locator_aux_fn = ddsi_raweth_get_locator_aux;
fact->m_set_locator_aux_fn = ddsi_raweth_set_locator_aux;
ddsrt_atomic_st32 (&fact->m_receive_buf_size, UINT32_MAX);

ddsi_factory_add (gv, fact);
GVLOG (DDS_LC_CONFIG, "raweth initialized\n");
return 0;
}


#else

int ddsi_raweth_init (struct ddsi_domaingv *gv) { (void) gv; return 0; }
Expand Down
9 changes: 2 additions & 7 deletions src/core/ddsi/src/ddsi_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1182,12 +1182,6 @@ static int ddsi_tcp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_
return (0 < port && port <= 65535);
}

static uint32_t ddsi_tcp_receive_buffer_size (const struct ddsi_tran_factory *fact)
{
(void) fact;
return 0;
}

static char *ddsi_tcp_locator_to_string (char *dst, size_t sizeof_dst, const ddsi_locator_t *loc, struct ddsi_tran_conn * conn, int with_port)
{
(void) conn;
Expand Down Expand Up @@ -1240,9 +1234,10 @@ int ddsi_tcp_init (struct ddsi_domaingv *gv)
fact->fact.m_is_ssm_mcaddr_fn = ddsi_tcp_is_ssm_mcaddr;
fact->fact.m_is_nearby_address_fn = ddsi_tcp_is_nearby_address;
fact->fact.m_is_valid_port_fn = ddsi_tcp_is_valid_port;
fact->fact.m_receive_buffer_size_fn = ddsi_tcp_receive_buffer_size;
fact->fact.m_locator_from_sockaddr_fn = ddsi_tcp_locator_from_sockaddr;

ddsrt_atomic_st32 (&fact->fact.m_receive_buf_size, 0);

#if DDSRT_HAVE_IPV6
if (gv->config.transport_selector == DDSI_TRANS_TCP6)
{
Expand Down
119 changes: 119 additions & 0 deletions src/core/ddsi/src/ddsi_tran.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/string.h"
#include "dds/ddsrt/ifaddrs.h"
#include "dds/ddsrt/sockets.h"
#include "dds/ddsi/ddsi_log.h"
#include "dds/ddsi/ddsi_domaingv.h"
#include "ddsi__tran.h"
Expand Down Expand Up @@ -407,3 +408,121 @@ int ddsi_enumerate_interfaces (struct ddsi_tran_factory * factory, enum ddsi_tra
{
return factory->m_enumerate_interfaces_fn (factory, transport_selector, interfs);
}


















static dds_return_t set_socket_buffer (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, int32_t socket_option, const char *socket_option_name, const char *name, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size)
{
// if (min, max)= and initbuf= then request= and result=
// (def, def) < defmin defmin whatever it is
// (def, N) anything N whatever it is
// (M, def) < M M error if < M
// (M, N<M) < M M error if < M
// (M, N>=M) anything N error if < M
// defmin = 1MB for receive buffer, 0B for send buffer
const bool always_set_size = // whether to call setsockopt unconditionally
((config->min.isdefault && !config->max.isdefault) ||
(!config->min.isdefault && !config->max.isdefault && config->max.value >= config->min.value));
const uint32_t socket_min_buf_size = // error if it ends up below this
!config->min.isdefault ? config->min.value : 0;
const uint32_t socket_req_buf_size = // size to request
(!config->max.isdefault && config->max.value > socket_min_buf_size) ? config->max.value
: !config->min.isdefault ? config->min.value
: default_min_size;

uint32_t actsize;
socklen_t optlen = (socklen_t) sizeof (actsize);
dds_return_t rc;

rc = ddsrt_getsockopt (sock, SOL_SOCKET, socket_option, &actsize, &optlen);
if (rc == DDS_RETCODE_BAD_PARAMETER || rc == DDS_RETCODE_UNSUPPORTED)
{
/* not all stacks support getting/setting RCVBUF */
GVLOG (DDS_LC_CONFIG, "cannot retrieve socket %s buffer size\n", name);
return DDS_RETCODE_OK;
}
else if (rc != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: get %s failed: %s\n", socket_option_name, dds_strretcode (rc));
return rc;
}

if (always_set_size || actsize < socket_req_buf_size)
{
(void) ddsrt_setsockopt (sock, SOL_SOCKET, socket_option, &socket_req_buf_size, sizeof (actsize));

/* We don't check the return code from setsockopt, because some O/Ss tend
to silently cap the buffer size. The only way to make sure is to read
the option value back and check it is now set correctly. */
if ((rc = ddsrt_getsockopt (sock, SOL_SOCKET, socket_option, &actsize, &optlen)) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: get %s failed: %s\n", socket_option_name, dds_strretcode (rc));
return rc;
}

if (actsize >= socket_req_buf_size)
GVLOG (DDS_LC_CONFIG, "socket %s buffer size set to %"PRIu32" bytes\n", name, actsize);
else if (actsize >= socket_min_buf_size)
GVLOG (DDS_LC_CONFIG,
"failed to increase socket %s buffer size to %"PRIu32" bytes, continuing with %"PRIu32" bytes\n",
name, socket_req_buf_size, actsize);
else
{
/* If the configuration states it must be >= X, then error out if the
kernel doesn't give us at least X */
GVLOG (DDS_LC_CONFIG | DDS_LC_ERROR,
"failed to increase socket %s buffer size to at least %"PRIu32" bytes, current is %"PRIu32" bytes\n",
name, socket_min_buf_size, actsize);
rc = DDS_RETCODE_NOT_ENOUGH_SPACE;
}
}

return (rc < 0) ? rc : (actsize > (uint32_t) INT32_MAX) ? INT32_MAX : (int32_t) actsize;
}

dds_return_t ddsi_tran_set_rcvbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size)
{
dds_return_t rc;

if ((rc = set_socket_buffer (fact->gv, sock, SO_RCVBUF, "SO_RCVBUF", "receive", config, default_min_size)) < 0)
return rc;

if (rc > 0) {
// set fact->receive_buf_size to the smallest observed value
uint32_t old;
do {
old = ddsrt_atomic_ld32 (&fact->m_receive_buf_size);
if ((uint32_t) rc >= old)
break;
} while (!ddsrt_atomic_cas32 (&fact->m_receive_buf_size, old, (uint32_t) rc));
}
return rc;
}

dds_return_t ddsi_tran_set_sndbuf (struct ddsi_tran_factory *fact, ddsrt_socket_t sock, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size)
{
return set_socket_buffer (fact->gv, sock, SO_SNDBUF, "SO_SNDBUF", "send", config, default_min_size);
}








27 changes: 7 additions & 20 deletions src/core/ddsi/src/ddsi_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ static dds_return_t set_dont_route (struct ddsi_domaingv const * const gv, ddsrt
return rc;
}

#if 0
static dds_return_t set_socket_buffer (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, int32_t socket_option, const char *socket_option_name, const char *name, const struct ddsi_config_socket_buf_size *config, uint32_t default_min_size)
{
// if (min, max)= and initbuf= then request= and result=
Expand Down Expand Up @@ -359,6 +360,7 @@ static dds_return_t set_sndbuf (struct ddsi_domaingv const * const gv, ddsrt_soc
{
return set_socket_buffer (gv, sock, SO_SNDBUF, "SO_SNDBUF", "send", config, 65536);
}
#endif

static dds_return_t set_mc_options_transmit_ipv6 (struct ddsi_domaingv const * const gv, struct ddsi_network_interface const * const intf, ddsrt_socket_t sock)
{
Expand Down Expand Up @@ -513,20 +515,12 @@ static dds_return_t ddsi_udp_create_conn (struct ddsi_tran_conn **conn_out, stru
}
}

if ((rc = set_rcvbuf (gv, sock, &gv->config.socket_rcvbuf_size)) < 0)
if ((rc = ddsi_tran_set_rcvbuf(fact_cmn, sock, &gv->config.socket_rcvbuf_size, 1048576)) < 0)
goto fail_w_socket;
if (rc > 0) {
// set fact->receive_buf_size to the smallest observed value
uint32_t old;
do {
old = ddsrt_atomic_ld32 (&fact->receive_buf_size);
if ((uint32_t) rc >= old)
break;
} while (!ddsrt_atomic_cas32 (&fact->receive_buf_size, old, (uint32_t) rc));
}

if (set_sndbuf (gv, sock, &gv->config.socket_sndbuf_size) < 0)
goto fail_w_socket;
if ((rc = ddsi_tran_set_sndbuf(fact_cmn, sock, &gv->config.socket_sndbuf_size, 65536)) < 0)
goto fail_w_socket;

if (gv->config.dontRoute && set_dont_route (gv, sock, ipv6) != DDS_RETCODE_OK)
goto fail_w_socket;

Expand Down Expand Up @@ -865,12 +859,6 @@ static int ddsi_udp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_
return (0 < port && port <= 65535);
}

static uint32_t ddsi_udp_receive_buffer_size (const struct ddsi_tran_factory *fact_cmn)
{
const struct ddsi_udp_tran_factory *fact = (const struct ddsi_udp_tran_factory *) fact_cmn;
return ddsrt_atomic_ld32 (&fact->receive_buf_size);
}

static int ddsi_udp_locator_from_sockaddr (const struct ddsi_tran_factory *tran_cmn, ddsi_locator_t *loc, const struct sockaddr *sockaddr)
{
struct ddsi_udp_tran_factory const * const tran = (const struct ddsi_udp_tran_factory *) tran_cmn;
Expand Down Expand Up @@ -915,7 +903,6 @@ int ddsi_udp_init (struct ddsi_domaingv*gv)
fact->fact.m_locator_to_string_fn = ddsi_udp_locator_to_string;
fact->fact.m_enumerate_interfaces_fn = ddsi_eth_enumerate_interfaces;
fact->fact.m_is_valid_port_fn = ddsi_udp_is_valid_port;
fact->fact.m_receive_buffer_size_fn = ddsi_udp_receive_buffer_size;
fact->fact.m_locator_from_sockaddr_fn = ddsi_udp_locator_from_sockaddr;
#if DDSRT_HAVE_IPV6
if (gv->config.transport_selector == DDSI_TRANS_UDP6)
Expand All @@ -925,7 +912,7 @@ int ddsi_udp_init (struct ddsi_domaingv*gv)
fact->fact.m_default_spdp_address = "udp6/ff02::ffff:239.255.0.1";
}
#endif
ddsrt_atomic_st32 (&fact->receive_buf_size, UINT32_MAX);
ddsrt_atomic_st32 (&fact->fact.m_receive_buf_size, UINT32_MAX);

ddsi_factory_add (gv, &fact->fact);
GVLOG (DDS_LC_CONFIG, "udp initialized\n");
Expand Down
9 changes: 2 additions & 7 deletions src/core/ddsi/src/ddsi_vnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,6 @@ static int ddsi_vnet_is_valid_port (const struct ddsi_tran_factory *fact, uint32
return true;
}

static uint32_t ddsi_vnet_receive_buffer_size (const struct ddsi_tran_factory *fact)
{
(void) fact;
return 0;
}

static int ddsi_vnet_locator_from_sockaddr (const struct ddsi_tran_factory *tran, ddsi_locator_t *loc, const struct sockaddr *sockaddr)
{
(void) sockaddr;
Expand Down Expand Up @@ -224,8 +218,9 @@ int ddsi_vnet_init (struct ddsi_domaingv *gv, const char *name, int32_t locator_
fact->m_base.m_locator_to_string_fn = ddsi_vnet_to_string;
fact->m_base.m_enumerate_interfaces_fn = ddsi_vnet_enumerate_interfaces;
fact->m_base.m_is_valid_port_fn = ddsi_vnet_is_valid_port;
fact->m_base.m_receive_buffer_size_fn = ddsi_vnet_receive_buffer_size;
fact->m_base.m_locator_from_sockaddr_fn = ddsi_vnet_locator_from_sockaddr;
ddsrt_atomic_st32 (&fact->m_base.m_receive_buf_size, 0);

ddsi_factory_add (gv, &fact->m_base);
GVLOG (DDS_LC_CONFIG, "vnet %s initialized\n", name);
return 0;
Expand Down
Loading