diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index 20f487a1d67..5ca0b686db4 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -300,16 +300,19 @@ class ReaderProxy } /** - * Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK. + * Called when an ACKNACK is received to set a new value for the minimum count accepted for following received + * ACKNACKs. + * * @param acknack_count The count of the received ACKNACK. - * @return true if internal count changed (i.e. new ACKNACK is accepted) + * @return true if internal count changed (i.e. received ACKNACK is accepted) */ bool check_and_set_acknack_count( uint32_t acknack_count) { - if (last_acknack_count_ < acknack_count) + if (acknack_count >= next_expected_acknack_count_) { - last_acknack_count_ = acknack_count; + next_expected_acknack_count_ = acknack_count; + ++next_expected_acknack_count_; return true; } @@ -423,8 +426,8 @@ class ReaderProxy TimedEvent* initial_heartbeat_event_; //! Are timed events enabled? std::atomic_bool timers_enabled_; - //! Last ack/nack count - uint32_t last_acknack_count_; + //! Next expected ack/nack count + uint32_t next_expected_acknack_count_; //! Last NACKFRAG count. uint32_t last_nackfrag_count_; diff --git a/src/cpp/rtps/writer/ReaderProxy.cpp b/src/cpp/rtps/writer/ReaderProxy.cpp index 4ef2829e98c..a86b5614b2d 100644 --- a/src/cpp/rtps/writer/ReaderProxy.cpp +++ b/src/cpp/rtps/writer/ReaderProxy.cpp @@ -57,23 +57,27 @@ ReaderProxy::ReaderProxy( , nack_supression_event_(nullptr) , initial_heartbeat_event_(nullptr) , timers_enabled_(false) - , last_acknack_count_(0) + , next_expected_acknack_count_(0) , last_nackfrag_count_(0) { - nack_supression_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(), - [&]() -> bool - { - writer_->perform_nack_supression(guid()); - return false; - }, - TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration)); + auto participant = writer_->getRTPSParticipant(); + if (nullptr != participant) + { + nack_supression_event_ = new TimedEvent(participant->getEventResource(), + [&]() -> bool + { + writer_->perform_nack_supression(guid()); + return false; + }, + TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration)); - initial_heartbeat_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(), - [&]() -> bool - { - writer_->intraprocess_heartbeat(this); - return false; - }, 0); + initial_heartbeat_event_ = new TimedEvent(participant->getEventResource(), + [&]() -> bool + { + writer_->intraprocess_heartbeat(this); + return false; + }, 0); + } stop(); } @@ -135,7 +139,7 @@ void ReaderProxy::start( } timers_enabled_.store(is_remote_and_reliable()); - if (is_local_reader()) + if (is_local_reader() && initial_heartbeat_event_) { initial_heartbeat_event_->restart_timer(); } @@ -166,24 +170,30 @@ void ReaderProxy::stop() disable_timers(); changes_for_reader_.clear(); - last_acknack_count_ = 0; + next_expected_acknack_count_ = 0; last_nackfrag_count_ = 0; changes_low_mark_ = SequenceNumber_t(); } void ReaderProxy::disable_timers() { - if (timers_enabled_.exchange(false)) + if (timers_enabled_.exchange(false) && nack_supression_event_) { nack_supression_event_->cancel_timer(); } - initial_heartbeat_event_->cancel_timer(); + if (initial_heartbeat_event_) + { + initial_heartbeat_event_->cancel_timer(); + } } void ReaderProxy::update_nack_supression_interval( const Duration_t& interval) { - nack_supression_event_->update_interval(interval); + if (nack_supression_event_) + { + nack_supression_event_->update_interval(interval); + } } void ReaderProxy::add_change( @@ -191,7 +201,7 @@ void ReaderProxy::add_change( bool is_relevant, bool restart_nack_supression) { - if (restart_nack_supression && timers_enabled_.load()) + if (restart_nack_supression && timers_enabled_.load() && nack_supression_event_) { nack_supression_event_->restart_timer(); } @@ -205,7 +215,7 @@ void ReaderProxy::add_change( bool restart_nack_supression, const std::chrono::time_point& max_blocking_time) { - if (restart_nack_supression && timers_enabled_) + if (restart_nack_supression && timers_enabled_ && nack_supression_event_) { nack_supression_event_->restart_timer(max_blocking_time); } @@ -459,7 +469,7 @@ void ReaderProxy::from_unsent_to_status( // It will use acked_changes_set(). assert(is_reliable_); - if (restart_nack_supression && is_remote_and_reliable()) + if (restart_nack_supression && is_remote_and_reliable() && nack_supression_event_) { assert(timers_enabled_.load()); nack_supression_event_->restart_timer(); diff --git a/test/unittest/rtps/writer/ReaderProxyTests.cpp b/test/unittest/rtps/writer/ReaderProxyTests.cpp index 56bc95b3a44..7e29a3f8a9b 100644 --- a/test/unittest/rtps/writer/ReaderProxyTests.cpp +++ b/test/unittest/rtps/writer/ReaderProxyTests.cpp @@ -331,6 +331,38 @@ TEST(ReaderProxyTests, process_nack_frag_multiple_fragments_different_windows_te TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u); } +// Test expectations regarding acknack count. +// Serves as a regression test for redmine issue #20729. +TEST(ReaderProxyTests, acknack_count) +{ + StatefulWriter writer_mock; + WriterTimes w_times; + RemoteLocatorsAllocationAttributes alloc; + ReaderProxy rproxy(w_times, alloc, &writer_mock); + + ReaderProxyData reader_attributes(0, 0); + reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; + rproxy.start(reader_attributes); + + // Check that the initial acknack count is 0. + EXPECT_TRUE(rproxy.check_and_set_acknack_count(0u)); + // Check that it is not accepted twice. + EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u)); + // Check that it is accepted if it is incremented. + EXPECT_TRUE(rproxy.check_and_set_acknack_count(1u)); + // Check that it is not accepted twice. + EXPECT_FALSE(rproxy.check_and_set_acknack_count(1u)); + // Check that it is not accepted if it is decremented. + EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u)); + // Check that it is accepted if it has a big increment. + EXPECT_TRUE(rproxy.check_and_set_acknack_count(100u)); + // Check that previous values are rejected. + for (uint32_t i = 0; i <= 100u; ++i) + { + EXPECT_FALSE(rproxy.check_and_set_acknack_count(i)); + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima