From 5b792c266af8dba8677b3f38439e67b0e6d397bc Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 2 Apr 2024 09:27:00 +0200 Subject: [PATCH 1/4] Refs #20729. Regression test. Signed-off-by: Miguel Company --- .../unittest/rtps/writer/ReaderProxyTests.cpp | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/test/unittest/rtps/writer/ReaderProxyTests.cpp b/test/unittest/rtps/writer/ReaderProxyTests.cpp index 238f9562cd1..bb6d0bee937 100644 --- a/test/unittest/rtps/writer/ReaderProxyTests.cpp +++ b/test/unittest/rtps/writer/ReaderProxyTests.cpp @@ -382,6 +382,38 @@ TEST(ReaderProxyTests, has_been_delivered_test) expect_result({0, 3}, false, false); } +// Test expectations regarding acknack count. +// Serves as a regression test for redmine issue #20729. +TEST(ReaderProxyTests, acknack_count) +{ + StatefulWriter writer_mock; + WriterTimes w_times; + RemoteLocatorsAllocationAttributes alloc; + ReaderProxy rproxy(w_times, alloc, &writer_mock); + + ReaderProxyData reader_attributes(0, 0); + reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; + rproxy.start(reader_attributes); + + // Check that the initial acknack count is 0. + EXPECT_TRUE(rproxy.check_and_set_acknack_count(0u)); + // Check that it is not accepted twice. + EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u)); + // Check that it is accepted if it is incremented. + EXPECT_TRUE(rproxy.check_and_set_acknack_count(1u)); + // Check that it is not accepted twice. + EXPECT_FALSE(rproxy.check_and_set_acknack_count(1u)); + // Check that it is not accepted if it is decremented. + EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u)); + // Check that it is accepted if it has a big increment. + EXPECT_TRUE(rproxy.check_and_set_acknack_count(100u)); + // Check that previous values are rejected. + for (uint32_t i = 0; i <= 100u; ++i) + { + EXPECT_FALSE(rproxy.check_and_set_acknack_count(i)); + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima From 530e8da7c769fc1cd9b642fafad027a0dcf34e38 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 2 Apr 2024 09:14:49 +0200 Subject: [PATCH 2/4] Refs #20729. Fix issue. Signed-off-by: Miguel Company --- include/fastdds/rtps/writer/ReaderProxy.h | 9 +++++---- src/cpp/rtps/writer/ReaderProxy.cpp | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index 66f77d4c5e5..b0f605a06fe 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -307,9 +307,10 @@ class ReaderProxy bool check_and_set_acknack_count( uint32_t acknack_count) { - if (last_acknack_count_ < acknack_count) + if (acknack_count >= next_expected_acknack_count_) { - last_acknack_count_ = acknack_count; + next_expected_acknack_count_ = acknack_count; + ++next_expected_acknack_count_; return true; } @@ -442,8 +443,8 @@ class ReaderProxy TimedEvent* initial_heartbeat_event_; //! Are timed events enabled? std::atomic_bool timers_enabled_; - //! Last ack/nack count - uint32_t last_acknack_count_; + //! Next expected ack/nack count + uint32_t next_expected_acknack_count_; //! Last NACKFRAG count. uint32_t last_nackfrag_count_; diff --git a/src/cpp/rtps/writer/ReaderProxy.cpp b/src/cpp/rtps/writer/ReaderProxy.cpp index 2655f87e42f..3a751c989d2 100644 --- a/src/cpp/rtps/writer/ReaderProxy.cpp +++ b/src/cpp/rtps/writer/ReaderProxy.cpp @@ -57,7 +57,7 @@ ReaderProxy::ReaderProxy( , nack_supression_event_(nullptr) , initial_heartbeat_event_(nullptr) , timers_enabled_(false) - , last_acknack_count_(0) + , next_expected_acknack_count_(0) , last_nackfrag_count_(0) { nack_supression_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(), @@ -166,7 +166,7 @@ void ReaderProxy::stop() disable_timers(); changes_for_reader_.clear(); - last_acknack_count_ = 0; + next_expected_acknack_count_ = 0; last_nackfrag_count_ = 0; changes_low_mark_ = SequenceNumber_t(); } From a9b1d6e7196ba3f061e5a53ae79d30d1ac0c53c4 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 3 Apr 2024 14:03:19 +0200 Subject: [PATCH 3/4] Refs #20729. Update doxydoc. Signed-off-by: Miguel Company --- include/fastdds/rtps/writer/ReaderProxy.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index b0f605a06fe..2610e988ce2 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -300,9 +300,11 @@ class ReaderProxy } /** - * Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK. + * Called when an ACKNACK is received to set a new value for the minimum count accepted for following received + * ACKNACKs. + * * @param acknack_count The count of the received ACKNACK. - * @return true if internal count changed (i.e. new ACKNACK is accepted) + * @return true if internal count changed (i.e. received ACKNACK is accepted) */ bool check_and_set_acknack_count( uint32_t acknack_count) From 5c87d3520572351fb5839d937ca30bc57463e3da Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 3 Apr 2024 14:22:03 +0200 Subject: [PATCH 4/4] Refs #20729. Dont create timers if participant is nullptr. Signed-off-by: Miguel Company --- src/cpp/rtps/writer/ReaderProxy.cpp | 50 +++++++++++++++++------------ 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/src/cpp/rtps/writer/ReaderProxy.cpp b/src/cpp/rtps/writer/ReaderProxy.cpp index 3a751c989d2..e215280c206 100644 --- a/src/cpp/rtps/writer/ReaderProxy.cpp +++ b/src/cpp/rtps/writer/ReaderProxy.cpp @@ -60,20 +60,24 @@ ReaderProxy::ReaderProxy( , next_expected_acknack_count_(0) , last_nackfrag_count_(0) { - nack_supression_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(), - [&]() -> bool - { - writer_->perform_nack_supression(guid()); - return false; - }, - TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration)); + auto participant = writer_->getRTPSParticipant(); + if (nullptr != participant) + { + nack_supression_event_ = new TimedEvent(participant->getEventResource(), + [&]() -> bool + { + writer_->perform_nack_supression(guid()); + return false; + }, + TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration)); - initial_heartbeat_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(), - [&]() -> bool - { - writer_->intraprocess_heartbeat(this); - return false; - }, 0); + initial_heartbeat_event_ = new TimedEvent(participant->getEventResource(), + [&]() -> bool + { + writer_->intraprocess_heartbeat(this); + return false; + }, 0); + } stop(); } @@ -135,7 +139,7 @@ void ReaderProxy::start( } timers_enabled_.store(is_remote_and_reliable()); - if (is_local_reader()) + if (is_local_reader() && initial_heartbeat_event_) { initial_heartbeat_event_->restart_timer(); } @@ -173,17 +177,23 @@ void ReaderProxy::stop() void ReaderProxy::disable_timers() { - if (timers_enabled_.exchange(false)) + if (timers_enabled_.exchange(false) && nack_supression_event_) { nack_supression_event_->cancel_timer(); } - initial_heartbeat_event_->cancel_timer(); + if (initial_heartbeat_event_) + { + initial_heartbeat_event_->cancel_timer(); + } } void ReaderProxy::update_nack_supression_interval( const Duration_t& interval) { - nack_supression_event_->update_interval(interval); + if (nack_supression_event_) + { + nack_supression_event_->update_interval(interval); + } } void ReaderProxy::add_change( @@ -191,7 +201,7 @@ void ReaderProxy::add_change( bool is_relevant, bool restart_nack_supression) { - if (restart_nack_supression && timers_enabled_.load()) + if (restart_nack_supression && timers_enabled_.load() && nack_supression_event_) { nack_supression_event_->restart_timer(); } @@ -205,7 +215,7 @@ void ReaderProxy::add_change( bool restart_nack_supression, const std::chrono::time_point& max_blocking_time) { - if (restart_nack_supression && timers_enabled_) + if (restart_nack_supression && timers_enabled_ && nack_supression_event_) { nack_supression_event_->restart_timer(max_blocking_time); } @@ -459,7 +469,7 @@ void ReaderProxy::from_unsent_to_status( // It will use acked_changes_set(). assert(is_reliable_); - if (restart_nack_supression && is_remote_and_reliable()) + if (restart_nack_supression && is_remote_and_reliable() && nack_supression_event_) { assert(timers_enabled_.load()); nack_supression_event_->restart_timer();