Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20729] Allow processing of AckNack submessages with count == 0 (backport #4639) #4774

Merged
merged 2 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,16 +300,19 @@ class ReaderProxy
}

/**
* Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK.
* Called when an ACKNACK is received to set a new value for the minimum count accepted for following received
* ACKNACKs.
*
* @param acknack_count The count of the received ACKNACK.
* @return true if internal count changed (i.e. new ACKNACK is accepted)
* @return true if internal count changed (i.e. received ACKNACK is accepted)
*/
bool check_and_set_acknack_count(
uint32_t acknack_count)
{
if (last_acknack_count_ < acknack_count)
if (acknack_count >= next_expected_acknack_count_)
{
last_acknack_count_ = acknack_count;
next_expected_acknack_count_ = acknack_count;
++next_expected_acknack_count_;
return true;
}

Expand Down Expand Up @@ -423,8 +426,8 @@ class ReaderProxy
TimedEvent* initial_heartbeat_event_;
//! Are timed events enabled?
std::atomic_bool timers_enabled_;
//! Last ack/nack count
uint32_t last_acknack_count_;
//! Next expected ack/nack count
uint32_t next_expected_acknack_count_;
//! Last NACKFRAG count.
uint32_t last_nackfrag_count_;

Expand Down
54 changes: 32 additions & 22 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,27 @@ ReaderProxy::ReaderProxy(
, nack_supression_event_(nullptr)
, initial_heartbeat_event_(nullptr)
, timers_enabled_(false)
, last_acknack_count_(0)
, next_expected_acknack_count_(0)
, last_nackfrag_count_(0)
{
nack_supression_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(),
[&]() -> bool
{
writer_->perform_nack_supression(guid());
return false;
},
TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration));
auto participant = writer_->getRTPSParticipant();
if (nullptr != participant)
{
nack_supression_event_ = new TimedEvent(participant->getEventResource(),
[&]() -> bool
{
writer_->perform_nack_supression(guid());
return false;
},
TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration));

initial_heartbeat_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(),
[&]() -> bool
{
writer_->intraprocess_heartbeat(this);
return false;
}, 0);
initial_heartbeat_event_ = new TimedEvent(participant->getEventResource(),
[&]() -> bool
{
writer_->intraprocess_heartbeat(this);
return false;
}, 0);
}

stop();
}
Expand Down Expand Up @@ -135,7 +139,7 @@ void ReaderProxy::start(
}

timers_enabled_.store(is_remote_and_reliable());
if (is_local_reader())
if (is_local_reader() && initial_heartbeat_event_)
{
initial_heartbeat_event_->restart_timer();
}
Expand Down Expand Up @@ -166,32 +170,38 @@ void ReaderProxy::stop()
disable_timers();

changes_for_reader_.clear();
last_acknack_count_ = 0;
next_expected_acknack_count_ = 0;
last_nackfrag_count_ = 0;
changes_low_mark_ = SequenceNumber_t();
}

void ReaderProxy::disable_timers()
{
if (timers_enabled_.exchange(false))
if (timers_enabled_.exchange(false) && nack_supression_event_)
{
nack_supression_event_->cancel_timer();
}
initial_heartbeat_event_->cancel_timer();
if (initial_heartbeat_event_)
{
initial_heartbeat_event_->cancel_timer();
}
}

void ReaderProxy::update_nack_supression_interval(
const Duration_t& interval)
{
nack_supression_event_->update_interval(interval);
if (nack_supression_event_)
{
nack_supression_event_->update_interval(interval);
}
}

void ReaderProxy::add_change(
const ChangeForReader_t& change,
bool is_relevant,
bool restart_nack_supression)
{
if (restart_nack_supression && timers_enabled_.load())
if (restart_nack_supression && timers_enabled_.load() && nack_supression_event_)
{
nack_supression_event_->restart_timer();
}
Expand All @@ -205,7 +215,7 @@ void ReaderProxy::add_change(
bool restart_nack_supression,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
if (restart_nack_supression && timers_enabled_)
if (restart_nack_supression && timers_enabled_ && nack_supression_event_)
{
nack_supression_event_->restart_timer(max_blocking_time);
}
Expand Down Expand Up @@ -459,7 +469,7 @@ void ReaderProxy::from_unsent_to_status(
// It will use acked_changes_set().
assert(is_reliable_);

if (restart_nack_supression && is_remote_and_reliable())
if (restart_nack_supression && is_remote_and_reliable() && nack_supression_event_)
{
assert(timers_enabled_.load());
nack_supression_event_->restart_timer();
Expand Down
32 changes: 32 additions & 0 deletions test/unittest/rtps/writer/ReaderProxyTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,38 @@ TEST(ReaderProxyTests, process_nack_frag_multiple_fragments_different_windows_te
TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u);
}

// Test expectations regarding acknack count.
// Serves as a regression test for redmine issue #20729.
TEST(ReaderProxyTests, acknack_count)
{
StatefulWriter writer_mock;
WriterTimes w_times;
RemoteLocatorsAllocationAttributes alloc;
ReaderProxy rproxy(w_times, alloc, &writer_mock);

ReaderProxyData reader_attributes(0, 0);
reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
rproxy.start(reader_attributes);

// Check that the initial acknack count is 0.
EXPECT_TRUE(rproxy.check_and_set_acknack_count(0u));
// Check that it is not accepted twice.
EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u));
// Check that it is accepted if it is incremented.
EXPECT_TRUE(rproxy.check_and_set_acknack_count(1u));
// Check that it is not accepted twice.
EXPECT_FALSE(rproxy.check_and_set_acknack_count(1u));
// Check that it is not accepted if it is decremented.
EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u));
// Check that it is accepted if it has a big increment.
EXPECT_TRUE(rproxy.check_and_set_acknack_count(100u));
// Check that previous values are rejected.
for (uint32_t i = 0; i <= 100u; ++i)
{
EXPECT_FALSE(rproxy.check_and_set_acknack_count(i));
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Expand Down
Loading