Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[22033] Release participant_stateless secure builtin writer history change when authentication has finished (backport #5386) #5394

Open
wants to merge 3 commits into
base: 2.10.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions src/cpp/rtps/security/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1065,11 +1065,13 @@ void SecurityManager::delete_participant_stateless_message_entities()
void SecurityManager::create_participant_stateless_message_pool()
{
participant_stateless_message_writer_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 20, 100 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, static_cast<uint32_t>(PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE),
20, 100};
participant_stateless_message_reader_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 5000 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, static_cast<uint32_t>(PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE),
10, 5000};

BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize() };
BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE};
participant_stateless_message_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipantStatelessMessage", cfg);

PoolConfig writer_cfg = PoolConfig::from_history_attributes(participant_stateless_message_writer_hattr_);
Expand Down Expand Up @@ -1221,7 +1223,8 @@ void SecurityManager::delete_participant_volatile_message_secure_entities()
void SecurityManager::create_participant_volatile_message_secure_pool()
{
participant_volatile_message_secure_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 0 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, static_cast<uint32_t>(PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE),
10, 0 };

PoolConfig pool_cfg = PoolConfig::from_history_attributes(participant_volatile_message_secure_hattr_);
participant_volatile_message_secure_pool_ =
Expand Down Expand Up @@ -1720,6 +1723,7 @@ void SecurityManager::process_participant_volatile_message_secure(
const GUID_t remote_participant_key(message.message_identity().source_guid().guidPrefix,
c_EntityId_RTPSParticipant);
std::shared_ptr<ParticipantCryptoHandle> remote_participant_crypto;
DiscoveredParticipantInfo::AuthUniquePtr remote_participant_info;

// Search remote participant crypto handle.
{
Expand All @@ -1735,6 +1739,7 @@ void SecurityManager::process_participant_volatile_message_secure(
}

remote_participant_crypto = dp_it->second->get_participant_crypto();
remote_participant_info = dp_it->second->get_auth();
}
else
{
Expand All @@ -1756,12 +1761,30 @@ void SecurityManager::process_participant_volatile_message_secure(
EPROSIMA_LOG_ERROR(SECURITY, "Cannot set remote participant crypto tokens ("
<< remote_participant_key << ") - (" << exception.what() << ")");
}
else
{
// Release the change from the participant_stateless_message_writer_pool_
// As both participants have already authorized each other

if (remote_participant_info &&
remote_participant_info->change_sequence_number_ != SequenceNumber_t::unknown())
{
participant_stateless_message_writer_history_->remove_change(
remote_participant_info->change_sequence_number_);
remote_participant_info->change_sequence_number_ = SequenceNumber_t::unknown();
}
}
}
else
{
std::lock_guard<shared_mutex> _(mutex_);
remote_participant_pending_messages_.emplace(remote_participant_key, std::move(message.message_data()));
}

if (remote_participant_info)
{
restore_discovered_participant_info(remote_participant_key, remote_participant_info);
}
}
else if (message.message_class_id().compare(GMCLASSID_SECURITY_READER_CRYPTO_TOKENS) == 0)
{
Expand Down Expand Up @@ -1917,7 +1940,7 @@ void SecurityManager::process_participant_volatile_message_secure(
}
else
{
EPROSIMA_LOG_INFO(SECURITY, "Discarted ParticipantGenericMessage with class id " << message.message_class_id());
EPROSIMA_LOG_INFO(SECURITY, "Discarded ParticipantGenericMessage with class id " << message.message_class_id());
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/cpp/rtps/security/SecurityManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ struct EndpointSecurityAttributes;
*/
class SecurityManager : private WriterListener
{
static constexpr std::size_t PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE = 8192;
static constexpr std::size_t PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE = 1024;

public:

/**
Expand Down Expand Up @@ -401,14 +404,19 @@ class SecurityManager : private WriterListener
}

AuthenticationInfo(
AuthenticationInfo&& auth)
AuthenticationInfo&& auth) noexcept
: identity_handle_(std::move(auth.identity_handle_))
, handshake_handle_(std::move(auth.handshake_handle_))
, auth_status_(auth.auth_status_)
, expected_sequence_number_(auth.expected_sequence_number_)
, change_sequence_number_(std::move(auth.change_sequence_number_))
, event_(std::move(auth.event_))
{
auth.identity_handle_ = nullptr;
auth.handshake_handle_ = nullptr;
auth.auth_status_ = AUTHENTICATION_NOT_AVAILABLE;
auth.expected_sequence_number_ = 0;
auth.change_sequence_number_ = SequenceNumber_t::unknown();
}

int32_t handshake_requests_sent_;
Expand Down
31 changes: 26 additions & 5 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,16 +834,28 @@ class PubSubReader
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Reader is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Reader authorization finished..." << std::endl;
}
Expand Down Expand Up @@ -1134,6 +1146,15 @@ class PubSubReader
return *this;
}

PubSubReader& participants_allocation_properties(
size_t initial,
size_t maximum)
{
participant_qos_.allocation().participants.initial = initial;
participant_qos_.allocation().participants.maximum = maximum;
return *this;
}

PubSubReader& expect_no_allocs()
{
// TODO(Mcc): Add no allocations check code when feature is completely ready
Expand Down
31 changes: 26 additions & 5 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,16 +700,28 @@ class PubSubWriter
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Writer is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Writer authorization finished..." << std::endl;
}
Expand Down Expand Up @@ -1080,6 +1092,15 @@ class PubSubWriter
return *this;
}

PubSubWriter& participants_allocation_properties(
size_t initial,
size_t maximum)
{
participant_qos_.allocation().participants.initial = initial;
participant_qos_.allocation().participants.maximum = maximum;
return *this;
}

PubSubWriter& expect_no_allocs()
{
// TODO(Mcc): Add no allocations check code when feature is completely ready
Expand Down
22 changes: 17 additions & 5 deletions test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,16 +599,28 @@ class PubSubReader
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Reader is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Reader authorization finished..." << std::endl;
}
Expand Down
22 changes: 17 additions & 5 deletions test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -561,16 +561,28 @@ class PubSubWriter
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Writer is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Writer authorization finished..." << std::endl;
}
Expand Down
Loading
Loading