Skip to content

Commit

Permalink
Enforce SHM ports open mode exclusions (#4635)
Browse files Browse the repository at this point in the history
* Refs #20701. Added unit regression tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20701. Added blackbox regression test.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20701. Fix issue.

Signed-off-by: Miguel Company <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>
(cherry picked from commit 3d159dc)

# Conflicts:
#	test/blackbox/common/BlackboxTestsTransportSHM.cpp
  • Loading branch information
MiguelCompany authored and mergify[bot] committed Apr 3, 2024
1 parent 39b3d58 commit b68c9c9
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 1 deletion.
29 changes: 28 additions & 1 deletion src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -898,12 +898,22 @@ class SharedMemGlobal

void lock_read_exclusive()
{
if (OpenMode::ReadShared == open_mode())
{
throw std::runtime_error("port is opened ReadShared");
}

std::string lock_name = std::string(node_->domain_name) + "_port" + std::to_string(node_->port_id) + "_el";
read_exclusive_lock_ = std::unique_ptr<RobustExclusiveLock>(new RobustExclusiveLock(lock_name));
}

void lock_read_shared()
{
if (OpenMode::ReadExclusive == open_mode())
{
throw std::runtime_error("port is opened ReadExclusive");
}

std::string lock_name = std::string(node_->domain_name) + "_port" + std::to_string(node_->port_id) + "_sl";
read_shared_lock_ = std::unique_ptr<RobustSharedLock>(new RobustSharedLock(lock_name));
}
Expand Down Expand Up @@ -1124,7 +1134,24 @@ class SharedMemGlobal
std::stringstream ss;

ss << port_node->port_id << " (" << port_node->uuid.to_string() <<
") because is ReadExclusive locked";
") because it was already locked";

err_reason = ss.str();
port.reset();
}
}
else if (open_mode == Port::OpenMode::ReadShared)
{
try
{
port->lock_read_shared();
}
catch (const std::exception&)
{
std::stringstream ss;

ss << port_node->port_id << " (" << port_node->uuid.to_string() <<
") because it had a ReadExclusive lock";

err_reason = ss.str();
port.reset();
Expand Down
145 changes: 145 additions & 0 deletions test/blackbox/common/BlackboxTestsTransportSHM.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "BlackboxTests.hpp"

#include "PubSubParticipant.hpp"
#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"

Expand All @@ -28,6 +29,8 @@ using namespace eprosima::fastrtps;

using SharedMemTransportDescriptor = eprosima::fastdds::rtps::SharedMemTransportDescriptor;
using test_SharedMemTransportDescriptor = eprosima::fastdds::rtps::test_SharedMemTransportDescriptor;
using Locator = eprosima::fastdds::rtps::Locator;
using LocatorList = eprosima::fastdds::rtps::LocatorList;

TEST(SHM, TransportPubSub)
{
Expand Down Expand Up @@ -70,6 +73,148 @@ TEST(SHM, TransportPubSub)
reader.wait_participant_undiscovery();
}

<<<<<<< HEAD
=======
/* Regression test for redmine issue #20701
*
* This test checks that the SHM transport will not listen on the same port
* in unicast and multicast at the same time.
* It does so by specifying custom default locators on a DataReader and then
* checking that the port mutation took place, thus producing a different port.
*/
TEST(SHM, SamePortUnicastMulticast)
{
PubSubReader<HelloWorldPubSubType> participant(TEST_TOPIC_NAME);

Locator locator;
locator.kind = LOCATOR_KIND_SHM;
locator.port = global_port;

LocatorList unicast_list;
LocatorList multicast_list;

// Note: this is using knowledge of the SHM locator address format since
// SHMLocator is not exposed to the user.
locator.address[0] = 'U';
unicast_list.push_back(locator);

// Note: this is using knowledge of the SHM locator address format since
// SHMLocator is not exposed to the user.
locator.address[0] = 'M';
multicast_list.push_back(locator);

// Create the reader with the custom transport and locators
auto testTransport = std::make_shared<SharedMemTransportDescriptor>();
participant
.disable_builtin_transport()
.add_user_transport_to_pparams(testTransport)
.set_default_unicast_locators(unicast_list)
.set_default_multicast_locators(multicast_list)
.init();

ASSERT_TRUE(participant.isInitialized());

// Retrieve the listening locators and check that one port is different
LocatorList reader_locators;
participant.get_native_reader().get_listening_locators(reader_locators);

ASSERT_EQ(reader_locators.size(), 2u);
auto it = reader_locators.begin();
auto first_port = it->port;
++it;
auto second_port = it->port;
EXPECT_NE(first_port, second_port);
EXPECT_TRUE(first_port == global_port || second_port == global_port);
}

// Regression test for redmine #19500
TEST(SHM, IgnoreNonExistentSegment)
{
using namespace eprosima::fastdds::dds;

// Set up log
BlackboxMockConsumer* helper_consumer = new BlackboxMockConsumer();
Log::ClearConsumers(); // Remove default consumers
Log::RegisterConsumer(std::unique_ptr<LogConsumer>(helper_consumer)); // Registering a consumer transfer ownership
// Filter specific message
Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Warning);
Log::SetCategoryFilter(std::regex("RTPS_TRANSPORT_SHM"));
Log::SetErrorStringFilter(std::regex("Error receiving data.*"));

PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);

writer
.asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS)
.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS)
.disable_builtin_transport()
.add_user_transport_to_pparams(std::make_shared<SharedMemTransportDescriptor>())
.init();
ASSERT_TRUE(writer.isInitialized());

reader
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS)
.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS)
.disable_builtin_transport()
.add_user_transport_to_pparams(std::make_shared<SharedMemTransportDescriptor>())
.init();

ASSERT_TRUE(reader.isInitialized());

reader.wait_discovery();

// Create and quickly destroy several participants in several threads
#ifdef _WIN32
constexpr size_t num_threads = 1;
#else
constexpr size_t num_threads = 10;
#endif // _WIN32
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; i++)
{
threads.push_back(std::thread([]()
{
#ifdef _WIN32
constexpr size_t num_parts = 2;
#else
constexpr size_t num_parts = 10;
#endif // _WIN32
for (size_t i = 0; i < num_parts; ++i)
{
PubSubWriter<Data1mbPubSubType> late_writer(TEST_TOPIC_NAME);
late_writer
.asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.disable_builtin_transport()
.add_user_transport_to_pparams(std::make_shared<SharedMemTransportDescriptor>())
.init();
ASSERT_TRUE(late_writer.isInitialized());
}
}));
}

// Destroy the writer participant.
writer.destroy();

// Check that reader receives the unmatched.
reader.wait_participant_undiscovery();

for (auto& thread : threads)
{
thread.join();
}
// Check logs
Log::Flush();
EXPECT_EQ(helper_consumer->ConsumedEntries().size(), 0u);

// Clean-up
Log::Reset(); // This calls to ClearConsumers, which deletes the registered consumer
}

>>>>>>> 3d159dc8c (Enforce SHM ports open mode exclusions (#4635))
TEST(SHM, Test300KFragmentation)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
Expand Down
30 changes: 30 additions & 0 deletions test/unittest/transport/SharedMemTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,36 @@ TEST_F(SHMTransportTests, port_lock_read_exclusive)
port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive);
}

// Regression test for redmine issue #20701
TEST_F(SHMTransportTests, port_lock_read_shared_then_exclusive)
{
auto shared_mem_manager = SharedMemManager::create(domain_name);

shared_mem_manager->remove_port(0);

auto port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadShared);
ASSERT_THROW(shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive),
std::exception);

port.reset();
port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive);
}

// Regression test for redmine issue #20701
TEST_F(SHMTransportTests, port_lock_read_exclusive_then_shared)
{
auto shared_mem_manager = SharedMemManager::create(domain_name);

shared_mem_manager->remove_port(0);

auto port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive);
ASSERT_THROW(shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadShared),
std::exception);

port.reset();
port = shared_mem_manager->open_port(0, 1, 1000, SharedMemGlobal::Port::OpenMode::ReadShared);
}

TEST_F(SHMTransportTests, robust_exclusive_lock)
{
const std::string lock_name = "robust_exclusive_lock_test1_el";
Expand Down

0 comments on commit b68c9c9

Please sign in to comment.