diff --git a/src/cpp/rtps/transport/TCPChannelResource.cpp b/src/cpp/rtps/transport/TCPChannelResource.cpp index 315e9f6cbe0..d24dfc3d0f1 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.cpp +++ b/src/cpp/rtps/transport/TCPChannelResource.cpp @@ -310,7 +310,8 @@ bool TCPChannelResource::check_socket_send_buffer( size_t future_queue_size = size_t(bytesInSendQueue) + msg_size; - if (future_queue_size > size_t(parent_->configuration()->sendBufferSize)) + // TCP actually allocates twice the size of the buffer requested. + if (future_queue_size > size_t(2 * parent_->configuration()->sendBufferSize)) { return false; } diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index bdaf86b385a..68e3c66ccdd 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1331,17 +1331,21 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports_with_sni) // destination that does not read or does it so slowly. TEST_F(TCPv4Tests, secure_non_blocking_send) { + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); + uint16_t port = g_default_port; uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; // Create a TCP Server transport using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; - using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole; TCPv4TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); + senderDescriptor.apply_security = true; senderDescriptor.sendBufferSize = msg_size; - senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT; - senderDescriptor.tls_config.verify_file = "ca.crt"; + senderDescriptor.tls_config.password = "fastddspwd"; + senderDescriptor.tls_config.cert_chain_file = "fastdds.crt"; + senderDescriptor.tls_config.private_key_file = "fastdds.key"; + senderDescriptor.tls_config.tmp_dh_file = "dh_params.pem"; senderDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER; senderDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); senderDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); @@ -1358,8 +1362,8 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) // saturate the reception socket of the datareader. This saturation requires // preventing the datareader from reading from the socket, what inevitably // happens continuously if instantiating and connecting the receiver transport. - // Hence, a raw socket is opened and connected to the server. There won't be read - // calls on that socket. + // Hence, a raw socket is opened and connected to the server. Read calls on that + // socket are controlled. Locator_t serverLoc; serverLoc.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(serverLoc, 127, 0, 0, 1); @@ -1372,13 +1376,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) { return preverified; }); - ssl_context.set_password_callback([](std::size_t, asio::ssl::context_base::password_purpose) - { - return "fastddspwd"; - }); - ssl_context.use_certificate_chain_file("fastdds.crt"); - ssl_context.use_private_key_file("fastdds.key", asio::ssl::context::pem); - ssl_context.use_tmp_dh_file("dh_params.pem"); + ssl_context.load_verify_file("ca.crt"); uint32_t options = 0; options |= asio::ssl::context::default_workarounds; @@ -1387,8 +1385,19 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) options |= asio::ssl::context::no_compression; ssl_context.set_options(options); - // TCPChannelResourceSecure::connect() like connection asio::io_service io_service; + auto ioServiceFunction = [&]() + { +#if ASIO_VERSION >= 101200 + asio::executor_work_guard work(io_service.get_executor()); +#else + io_service::work work(io_service_); +#endif // if ASIO_VERSION >= 101200 + io_service.run(); + }; + std::thread ioServiceThread(ioServiceFunction); + + // TCPChannelResourceSecure::connect() like connection asio::ip::tcp::resolver resolver(io_service); auto endpoints = resolver.resolve( IPLocator::ip_to_string(serverLoc), @@ -1409,7 +1418,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) ) { ASSERT_TRUE(!ec); - asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::server; + asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::client; secure_socket->async_handshake(role, [](const std::error_code& ec) { @@ -1425,6 +1434,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) a connection. This channel will not be present in the server's channel_resources_ map as communication lacks most of the discovery messages using a raw socket as participant. */ + // auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources(); auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources(); ASSERT_TRUE(sender_unbound_channel_resources.size() == 1); auto sender_channel_resource = @@ -1432,17 +1442,32 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size, 0); + std::vector message(msg_size * 2, 0); const octet* data = message.data(); size_t size = message.size(); - // Send the message with no header - for (int i = 0; i < 5; i++) - { - sender_channel_resource->send(nullptr, 0, data, size, ec); - } + // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested + // it should be able to send a message of msg_size*2. + size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, size); + + // Now wait until the receive buffer is flushed (send buffer will be empty too) + std::vector buffer(size, 0); + size_t bytes_read = 0; + bytes_read = asio::read(*secure_socket, asio::buffer(buffer.data(), size), asio::transfer_exactly(size), ec); + ASSERT_EQ(ec, asio::error_code()); + ASSERT_EQ(bytes_read, size); + + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size * 2 + 1); + data = message.data(); + size = message.size(); + bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, 0u); secure_socket->lowest_layer().close(ec); + io_service.stop(); + ioServiceThread.join(); } #endif // ifndef _WIN32 @@ -1880,7 +1905,7 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) } #ifndef _WIN32 -// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a +// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a // destination that does not read or does it so slowly. TEST_F(TCPv4Tests, non_blocking_send) { @@ -1901,8 +1926,8 @@ TEST_F(TCPv4Tests, non_blocking_send) // saturate the reception socket of the datareader. This saturation requires // preventing the datareader from reading from the socket, what inevitably // happens continuously if instantiating and connecting the receiver transport. - // Hence, a raw socket is opened and connected to the server. There won't be read - // calls on that socket. + // Hence, a raw socket is opened and connected to the server. Read calls on that + // socket are controlled. Locator_t serverLoc; serverLoc.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(serverLoc, 127, 0, 0, 1); @@ -1947,15 +1972,26 @@ TEST_F(TCPv4Tests, non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size, 0); + std::vector message(msg_size * 2, 0); const octet* data = message.data(); size_t size = message.size(); - // Send the message with no header - for (int i = 0; i < 5; i++) - { - sender_channel_resource->send(nullptr, 0, data, size, ec); - } + // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested + // it should be able to send a message of msg_size*2. + size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, size); + + // Now wait until the receive buffer is flushed (send buffer will be empty too) + std::vector buffer(size, 0); + size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec); + ASSERT_EQ(bytes_read, size); + + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size * 2 + 1); + data = message.data(); + size = message.size(); + bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, 0u); socket.shutdown(asio::ip::tcp::socket::shutdown_both); socket.cancel(); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index 0d5303bd5aa..af58d6968e1 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -318,7 +318,7 @@ TEST_F(TCPv6Tests, client_announced_local_port_uniqueness) } #ifndef _WIN32 -// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a +// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a // destination that does not read or does it so slowly. TEST_F(TCPv6Tests, non_blocking_send) { @@ -339,8 +339,8 @@ TEST_F(TCPv6Tests, non_blocking_send) // saturate the reception socket of the datareader. This saturation requires // preventing the datareader from reading from the socket, what inevitably // happens continuously if instantiating and connecting the receiver transport. - // Hence, a raw socket is opened and connected to the server. There won't be read - // calls on that socket. + // Hence, a raw socket is opened and connected to the server. Read calls on that + // socket are controlled. Locator_t serverLoc; serverLoc.kind = LOCATOR_KIND_TCPv6; IPLocator::setIPv6(serverLoc, "::1"); @@ -385,15 +385,26 @@ TEST_F(TCPv6Tests, non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size, 0); + std::vector message(msg_size * 2, 0); const octet* data = message.data(); size_t size = message.size(); - // Send the message with no header - for (int i = 0; i < 5; i++) - { - sender_channel_resource->send(nullptr, 0, data, size, ec); - } + // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested + // it should be able to send a message of msg_size*2. + size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, size); + + // Now wait until the receive buffer is flushed (send buffer will be empty too) + std::vector buffer(size, 0); + size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec); + ASSERT_EQ(bytes_read, size); + + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size * 2 + 1); + data = message.data(); + size = message.size(); + bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, 0u); socket.shutdown(asio::ip::tcp::socket::shutdown_both); socket.cancel();