Skip to content

Commit

Permalink
Fix flaky latency tests on mac (#5009)
Browse files Browse the repository at this point in the history
* Refs #21232: Fix Latency test destruction

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21232: Change local_reader() to return a BaseReader

Signed-off-by: Mario Domínguez López <[email protected]>

* Refs #21232: Typo

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
Signed-off-by: Mario Domínguez López <[email protected]>
Signed-off-by: Mario Dominguez <[email protected]>
  • Loading branch information
Mario-DL authored Jul 2, 2024
1 parent 379da7d commit 9961e61
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/cpp/rtps/writer/ReaderLocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ bool ReaderLocator::send(
return true;
}

RTPSReader* ReaderLocator::local_reader()
BaseReader* ReaderLocator::local_reader()
{
if (!local_reader_)
{
Expand Down
8 changes: 4 additions & 4 deletions src/cpp/rtps/writer/ReaderLocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace rtps {

class RTPSParticipantImpl;
class RTPSWriter;
class RTPSReader;
class BaseReader;
class IDataSharingNotifier;

/**
Expand Down Expand Up @@ -67,10 +67,10 @@ class ReaderLocator : public RTPSMessageSenderInterface
return is_local_reader_;
}

RTPSReader* local_reader();
BaseReader* local_reader();

void local_reader(
RTPSReader* local_reader)
BaseReader* local_reader)
{
local_reader_ = local_reader;
}
Expand Down Expand Up @@ -260,7 +260,7 @@ class ReaderLocator : public RTPSMessageSenderInterface
LocatorSelectorEntry async_locator_info_;
bool expects_inline_qos_;
bool is_local_reader_;
RTPSReader* local_reader_;
BaseReader* local_reader_;
std::vector<GuidPrefix_t> guid_prefix_as_vector_;
std::vector<GUID_t> guid_as_vector_;
IDataSharingNotifier* datasharing_notifier_;
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/writer/ReaderProxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace eprosima {
namespace fastdds {
namespace rtps {

class BaseReader;
class StatefulWriter;
class TimedEvent;
class RTPSReader;
Expand Down Expand Up @@ -289,7 +290,7 @@ class ReaderProxy
* Get the local reader on the same process (if any).
* @return The local reader on the same process.
*/
inline RTPSReader* local_reader()
inline BaseReader* local_reader()
{
return locator_info_.local_reader();
}
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,14 +402,14 @@ bool StatefulWriter::intraprocess_delivery(
CacheChange_t* change,
ReaderProxy* reader_proxy)
{
RTPSReader* reader = reader_proxy->local_reader();
BaseReader* reader = reader_proxy->local_reader();
if (reader)
{
if (change->write_params.related_sample_identity() != SampleIdentity::unknown())
{
change->write_params.sample_identity(change->write_params.related_sample_identity());
}
return BaseReader::downcast(reader)->process_data_msg(change);
return reader->process_data_msg(change);
}
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions test/mock/rtps/ReaderLocator/rtps/writer/ReaderLocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace rtps {

class RTPSParticipantImpl;
class RTPSWriter;
class RTPSReader;
class BaseReader;
class IDataSharingNotifier;

/**
Expand Down Expand Up @@ -202,7 +202,7 @@ class ReaderLocator : public RTPSMessageSenderInterface
return false;
}

RTPSReader* local_reader()
BaseReader* local_reader()
{
return nullptr;
}
Expand Down
49 changes: 26 additions & 23 deletions test/performance/latency/LatencyTestPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,6 @@ LatencyTestPublisher::LatencyTestPublisher()

LatencyTestPublisher::~LatencyTestPublisher()
{
// Static type endpoints shpuld have been removed for each payload iteration
if (dynamic_types_)
{
destroy_data_endpoints();
}
else if (nullptr != data_writer_
|| nullptr != data_reader_
|| nullptr != latency_data_pub_topic_
|| nullptr != latency_data_sub_topic_
|| !latency_data_type_)
{
EPROSIMA_LOG_ERROR(LATENCYPUBLISHER, "ERROR unregistering the DATA type and/or removing the endpoints");
}

subscriber_->delete_datareader(command_reader_);
participant_->delete_subscriber(subscriber_);

publisher_->delete_datawriter(command_writer_);
participant_->delete_publisher(publisher_);

participant_->delete_topic(latency_command_sub_topic_);
participant_->delete_topic(latency_command_pub_topic_);

std::string TestCommandType("TestCommandType");
participant_->unregister_type(TestCommandType);

Expand Down Expand Up @@ -682,6 +659,32 @@ void LatencyTestPublisher::run()
}
}

void LatencyTestPublisher::destroy_user_entities()
{
// Static type endpoints should have been removed for each payload iteration
if (dynamic_types_)
{
destroy_data_endpoints();
}
else if (nullptr != data_writer_
|| nullptr != data_reader_
|| nullptr != latency_data_pub_topic_
|| nullptr != latency_data_sub_topic_
|| !latency_data_type_)
{
EPROSIMA_LOG_ERROR(LATENCYPUBLISHER, "ERROR unregistering the DATA type and/or removing the endpoints");
}

subscriber_->delete_datareader(command_reader_);
participant_->delete_subscriber(subscriber_);

publisher_->delete_datawriter(command_writer_);
participant_->delete_publisher(publisher_);

participant_->delete_topic(latency_command_sub_topic_);
participant_->delete_topic(latency_command_pub_topic_);
}

void LatencyTestPublisher::export_csv(
const std::string& data_name,
const std::string& str_reliable,
Expand Down
2 changes: 2 additions & 0 deletions test/performance/latency/LatencyTestPublisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class LatencyTestPublisher

void run();

void destroy_user_entities();

private:

bool init_dynamic_types();
Expand Down
49 changes: 26 additions & 23 deletions test/performance/latency/LatencyTestSubscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,6 @@ LatencyTestSubscriber::LatencyTestSubscriber()

LatencyTestSubscriber::~LatencyTestSubscriber()
{
// Static type endpoints should have been remove for each payload iteration
if (dynamic_types_)
{
destroy_data_endpoints();
}
else if (nullptr != data_writer_
|| nullptr != data_reader_
|| nullptr != latency_data_pub_topic_
|| nullptr != latency_data_sub_topic_
|| !latency_data_type_)
{
EPROSIMA_LOG_ERROR(LATENCYSUBSCRIBER, "ERROR unregistering the DATA type and/or removing the endpoints");
}

subscriber_->delete_datareader(command_reader_);
participant_->delete_subscriber(subscriber_);

publisher_->delete_datawriter(command_writer_);
participant_->delete_publisher(publisher_);

participant_->delete_topic(latency_command_sub_topic_);
participant_->delete_topic(latency_command_pub_topic_);

std::string TestCommandType("TestCommandType");
participant_->unregister_type(TestCommandType);

Expand Down Expand Up @@ -641,6 +618,32 @@ void LatencyTestSubscriber::run()
}
}

void LatencyTestSubscriber::destroy_user_entities()
{
// Static type endpoints should have been remove for each payload iteration
if (dynamic_types_)
{
destroy_data_endpoints();
}
else if (nullptr != data_writer_
|| nullptr != data_reader_
|| nullptr != latency_data_pub_topic_
|| nullptr != latency_data_sub_topic_
|| !latency_data_type_)
{
EPROSIMA_LOG_ERROR(LATENCYSUBSCRIBER, "ERROR unregistering the DATA type and/or removing the endpoints");
}

subscriber_->delete_datareader(command_reader_);
participant_->delete_subscriber(subscriber_);

publisher_->delete_datawriter(command_writer_);
participant_->delete_publisher(publisher_);

participant_->delete_topic(latency_command_sub_topic_);
participant_->delete_topic(latency_command_pub_topic_);
}

bool LatencyTestSubscriber::test(
uint32_t datasize)
{
Expand Down
2 changes: 2 additions & 0 deletions test/performance/latency/LatencyTestSubscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class LatencyTestSubscriber

void run();

void destroy_user_entities();

bool test(
uint32_t datasize);

Expand Down
9 changes: 9 additions & 0 deletions test/performance/latency/main_LatencyTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ int main(
dynamic_types, data_sharing, data_loans, shared_memory, forced_domain, data_sizes))
{
latency_publisher.run();
latency_publisher.destroy_user_entities();
}
else
{
Expand All @@ -518,6 +519,7 @@ int main(
xml_config_file, dynamic_types, data_sharing, data_loans, shared_memory, forced_domain, data_sizes))
{
latency_subscriber.run();
latency_subscriber.destroy_user_entities();
}
else
{
Expand Down Expand Up @@ -568,6 +570,13 @@ int main(
{
sub.join();
}

for (auto& sub : latency_subscribers)
{
sub->destroy_user_entities();
}

latency_publisher.destroy_user_entities();
}
else
{
Expand Down

0 comments on commit 9961e61

Please sign in to comment.