Skip to content

Commit

Permalink
Fix Discovery Server over TCP using LocatorSelectorEntry (#4586)
Browse files Browse the repository at this point in the history
* Refs #20628: Add TCP DS blackbox test

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Configuration to use logical port 0 as default in DS

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Check interface change before creating new send resources

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Use new OpenOutputChannels in TCP

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Update LocatorSelectorEntry

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Use locator_selector_entry to call OpenOutputChannels

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Refactor PDPClient to handle initial TCP connections

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Refactor PDPServer to handle initial TCP connections

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Uncrustify

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Minor fix

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Fix windows

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Method in RTPSPartImpl

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Check loc.kind for default logical port

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Apply suggestions

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Create Resource in update for UDP too

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Remove unnecessary headers

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Clarify Log_info

Signed-off-by: cferreiragonz <[email protected]>

---------

Signed-off-by: cferreiragonz <[email protected]>
  • Loading branch information
cferreiragonz committed Apr 9, 2024
1 parent 5eeee69 commit 4595ea8
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 28 deletions.
22 changes: 22 additions & 0 deletions include/fastdds/rtps/common/LocatorSelectorEntry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,28 @@ struct LocatorSelectorEntry
state.multicast.clear();
}

static LocatorSelectorEntry create_fully_selected_entry(
const LocatorList_t& unicast_locators,
const LocatorList_t& multicast_locators)
{
// Create an entry with space for all locators
LocatorSelectorEntry entry(unicast_locators.size(), multicast_locators.size());
// Add and select unicast locators
for (const Locator_t& locator : unicast_locators)
{
entry.state.unicast.push_back(entry.unicast.size());
entry.unicast.push_back(locator);
}
// Add and select multicast locators
for (const Locator_t& locator : multicast_locators)
{
entry.state.multicast.push_back(entry.multicast.size());
entry.multicast.push_back(locator);
}
// Return created entry
return entry;
}

//! GUID of the remote entity.
GUID_t remote_guid;
//! List of unicast locators to send data to the remote entity.
Expand Down
13 changes: 11 additions & 2 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,9 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList);
auto entry = LocatorSelectorEntry::create_fully_selected_entry(
it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(entry);

#if HAVE_SECURITY
if (!mp_RTPSParticipant->is_secure())
Expand Down Expand Up @@ -843,6 +844,14 @@ void PDPClient::update_remote_servers_list()

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) ||
!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader()))
{
auto entry = LocatorSelectorEntry::create_fully_selected_entry(
it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(entry);
}

if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()))
{
match_pdp_writer_nts_(it);
Expand Down
16 changes: 14 additions & 2 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,9 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(it.metatrafficUnicastLocatorList);
auto entry = LocatorSelectorEntry::create_fully_selected_entry(
it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(entry);

if (!secure)
{
Expand Down Expand Up @@ -1188,6 +1189,14 @@ void PDPServer::update_remote_servers_list()

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) ||
!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader()))
{
auto entry = LocatorSelectorEntry::create_fully_selected_entry(
it.metatrafficUnicastLocatorList, it.metatrafficMulticastLocatorList);
mp_RTPSParticipant->createSenderResources(entry);
}

if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()))
{
match_pdp_writer_nts_(it);
Expand All @@ -1203,6 +1212,9 @@ void PDPServer::update_remote_servers_list()
{
discovery_db_.add_server(server.guidPrefix);
}

// Need to reactivate the server thread to send the DATA(p) to the new servers
awake_server_thread();
}

bool PDPServer::process_writers_acknowledgements()
Expand Down
14 changes: 14 additions & 0 deletions src/cpp/rtps/network/NetworkFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ bool NetworkFactory::build_send_resources(
return returned_value;
}

bool NetworkFactory::build_send_resources(
SendResourceList& sender_resource_list,
const LocatorSelectorEntry& locator_selector_entry)
{
bool returned_value = false;

for (auto& transport : mRegisteredTransports)
{
returned_value |= transport->OpenOutputChannels(sender_resource_list, locator_selector_entry);
}

return returned_value;
}

bool NetworkFactory::BuildReceiverResources(
Locator_t& local,
std::vector<std::shared_ptr<ReceiverResource>>& returned_resources_list,
Expand Down
10 changes: 10 additions & 0 deletions src/cpp/rtps/network/NetworkFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ class NetworkFactory
fastdds::rtps::SendResourceList&,
const Locator_t& locator);

/**
* Walk over the list of transports, opening every possible channel that can send through
* the locators contained in @param locator_selector_entry and returning a vector of Sender Resources associated with it.
* @param locator_selector_entry LocatorSelectorEntry containing metadata and the locators through which to send.
* @return true if at least one send resource was created, false otherwise.
*/
bool build_send_resources(
fastdds::rtps::SendResourceList&,
const LocatorSelectorEntry& locator_selector_entry);

/**
* Walk over the list of transports, opening every possible channel that we can listen to
* from the given locator, and returns a vector of Receiver Resources for this goal.
Expand Down
122 changes: 110 additions & 12 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,18 +298,68 @@ RTPSParticipantImpl::RTPSParticipantImpl(
switch (m_att.builtin.discovery_config.discoveryProtocol)
{
case DiscoveryProtocol::BACKUP:
case DiscoveryProtocol::CLIENT:
case DiscoveryProtocol::SERVER:
// Verify if listening ports are provided
for (auto& transportDescriptor : m_att.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
if (pT->listening_ports.empty())
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid <<
" tries to create a TCP server for discovery server without providing a proper listening port.");
break;
}
if (!m_att.builtin.metatrafficUnicastLocatorList.empty())
{
std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(),
m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
break;
case DiscoveryProtocol::CLIENT:
case DiscoveryProtocol::SUPER_CLIENT:
// Verify if listening ports are provided
for (auto& transportDescriptor : m_att.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT && pT->listening_ports.empty())
if (pT)
{
EPROSIMA_LOG_INFO(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid <<
" tries to use discovery server over TCP without providing a proper listening port.");
if (pT->listening_ports.empty())
{
EPROSIMA_LOG_INFO(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid <<
" tries to create a TCP client for discovery server without providing a proper listening port." <<
" No TCP participants will be able to connect to this participant, but it will be able make connections.");
}
for (fastdds::rtps::RemoteServerAttributes& it : m_att.builtin.discovery_config.m_DiscoveryServers)
{
std::for_each(it.metatrafficUnicastLocatorList.begin(),
it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
default:
Expand Down Expand Up @@ -1458,8 +1508,37 @@ void RTPSParticipantImpl::update_attributes(
auto pdp = mp_builtinProtocols->mp_PDP;
bool update_pdp = false;

// Check if discovery servers need to be updated
eprosima::fastdds::rtps::RemoteServerList_t converted_discovery_servers =
patt.builtin.discovery_config.m_DiscoveryServers;
if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers)
{
for (auto& transportDescriptor : m_att.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
for (fastdds::rtps::RemoteServerAttributes& it : converted_discovery_servers)
{
std::for_each(it.metatrafficUnicastLocatorList.begin(),
it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
}

// Check if there are changes
if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers
if (converted_discovery_servers != m_att.builtin.discovery_config.m_DiscoveryServers
|| patt.userData != m_att.userData
|| local_interfaces_changed)
{
Expand Down Expand Up @@ -1496,7 +1575,7 @@ void RTPSParticipantImpl::update_attributes(
for (auto existing_server : m_att.builtin.discovery_config.m_DiscoveryServers)
{
bool contained = false;
for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers)
for (auto incoming_server : converted_discovery_servers)
{
if (existing_server.guidPrefix == incoming_server.guidPrefix)
{
Expand Down Expand Up @@ -1555,9 +1634,12 @@ void RTPSParticipantImpl::update_attributes(
local_participant_proxy_data->default_locators.add_unicast_locator(locator);
}

createSenderResources(m_att.builtin.metatrafficMulticastLocatorList);
createSenderResources(m_att.builtin.metatrafficUnicastLocatorList);
createSenderResources(m_att.defaultUnicastLocatorList);
if (local_interfaces_changed)
{
createSenderResources(m_att.builtin.metatrafficMulticastLocatorList);
createSenderResources(m_att.builtin.metatrafficUnicastLocatorList);
createSenderResources(m_att.defaultUnicastLocatorList);
}
if (!modified_locators.empty())
{
createSenderResources(modified_locators);
Expand All @@ -1569,8 +1651,8 @@ void RTPSParticipantImpl::update_attributes(
m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER ||
m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP)
{
// Add incoming servers iff we don't know about them already or the listening locator has been modified
for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers)
// Add incoming servers if we don't know about them already or the listening locator has been modified
for (auto incoming_server : converted_discovery_servers)
{
eprosima::fastdds::rtps::RemoteServerList_t::iterator server_it;
for (server_it = m_att.builtin.discovery_config.m_DiscoveryServers.begin();
Expand Down Expand Up @@ -1927,6 +2009,14 @@ void RTPSParticipantImpl::createSenderResources(
m_network_Factory.build_send_resources(send_resource_list_, locator);
}

void RTPSParticipantImpl::createSenderResources(
const LocatorSelectorEntry& locator_selector_entry)
{
std::lock_guard<std::timed_mutex> lock(m_send_resources_mutex_);

m_network_Factory.build_send_resources(send_resource_list_, locator_selector_entry);
}

bool RTPSParticipantImpl::deleteUserEndpoint(
const GUID_t& endpoint)
{
Expand Down Expand Up @@ -2593,9 +2683,17 @@ bool RTPSParticipantImpl::did_mutation_took_place_on_meta(
case LOCATOR_KIND_TCPv4:
set_wan_address(ret);
IPLocator::setPhysicalPort(ret, Tcp4ListeningPort());
if (IPLocator::getLogicalPort(ret) == 0)
{
IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret));
}
break;
case LOCATOR_KIND_TCPv6:
IPLocator::setPhysicalPort(ret, Tcp6ListeningPort());
if (IPLocator::getLogicalPort(ret) == 0)
{
IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret));
}
break;
}
return ret;
Expand Down
9 changes: 9 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,15 @@ class RTPSParticipantImpl
void createSenderResources(
const Locator_t& locator);

/**
* Creates sender resources for the given locator selector entry by calling the NetworkFactory's
* build_send_resources method.
*
* @param locator_selector The locator selector entry for which sender resources need to be created.
*/
void createSenderResources(
const LocatorSelectorEntry& locator_selector);

bool networkFactoryHasRegisteredTransports() const;

/**
Expand Down
Loading

0 comments on commit 4595ea8

Please sign in to comment.