Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20311] Add missing TypeLookup listeners (backport #4335) #4438

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/fastdds/dds/builtin/typelookup/TypeLookupManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class TypeLookupManager
bool create_secure_endpoints();
#endif
*/

void request_cache_change_acked(
fastrtps::rtps::CacheChange_t* change);

void reply_cache_change_acked(
fastrtps::rtps::CacheChange_t* change);
};

} /* namespace builtin */
Expand Down
17 changes: 14 additions & 3 deletions include/fastdds/dds/builtin/typelookup/TypeLookupReplyListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#define TYPELOOKUP_REPLY_LISTENER_HPP_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastrtps/rtps/reader/ReaderListener.h>
#include <fastrtps/rtps/writer/WriterListener.h>


namespace eprosima {
namespace fastrtps {
Expand All @@ -47,7 +49,7 @@ class TypeLookupManager;
* Class TypeLookupReplyListener that receives the typelookup request messages of remote endpoints.
* @ingroup TYPES_MODULE
*/
class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener
class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener, public fastrtps::rtps::WriterListener
{
public:

Expand All @@ -70,7 +72,16 @@ class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener
*/
void onNewCacheChangeAdded(
fastrtps::rtps::RTPSReader* reader,
const fastrtps::rtps::CacheChange_t* const change) override;
const fastrtps::rtps::CacheChange_t* const change) override;

/**
* @brief This method is called when all the readers matched with this Writer acknowledge that a cache
* change has been received.
* @param change The cache change
*/
void onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change) override;

private:

Expand All @@ -85,5 +96,5 @@ class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener
} /* namespace dds */
} /* namespace fastdds */
} /* namespace eprosima */
#endif
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* TYPELOOKUP_REPLY_LISTENER_HPP_*/
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#define TYPELOOKUP_REQUEST_LISTENER_HPP_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastrtps/rtps/reader/ReaderListener.h>
#include <fastrtps/rtps/writer/WriterListener.h>


namespace eprosima {
namespace fastrtps {
Expand All @@ -47,7 +49,7 @@ class TypeLookupManager;
* Class TypeLookupRequestListener that receives the typelookup request messages of remote endpoints.
* @ingroup TYPES_MODULE
*/
class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener
class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener, public fastrtps::rtps::WriterListener
{
public:

Expand All @@ -70,7 +72,16 @@ class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener
*/
void onNewCacheChangeAdded(
fastrtps::rtps::RTPSReader* reader,
const fastrtps::rtps::CacheChange_t* const change) override;
const fastrtps::rtps::CacheChange_t* const change) override;

/**
* @brief This method is called when all the readers matched with this Writer acknowledge that a cache
* change has been received.
* @param change The cache change
*/
void onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change) override;

private:

Expand All @@ -86,5 +97,5 @@ class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener
} /* namespace dds */
} /* namespace fastdds */
} /* namespace eprosima */
#endif
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* TYPELOOKUP_REQUEST_LISTENER_HPP_*/
105 changes: 83 additions & 22 deletions src/cpp/fastdds/builtin/typelookup/TypeLookupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ ReaderHistory* TypeLookupManager::get_builtin_reply_reader_history()
*/
bool TypeLookupManager::create_endpoints()
{
bool ret = true;

// Built-in history attributes.
HistoryAttributes hatt;
hatt.initialReservedCaches = 20;
Expand All @@ -343,14 +345,15 @@ bool TypeLookupManager::create_endpoints()
// Built-in request writer
if (builtin_protocols_->m_att.typelookup_config.use_client)
{
request_listener_ = new TypeLookupRequestListener(this);
builtin_request_writer_history_ = new WriterHistory(hatt);

RTPSWriter* req_writer;
if (participant_->createWriter(
&req_writer,
watt,
builtin_request_writer_history_,
nullptr,
request_listener_,
fastrtps::rtps::c_EntityId_TypeLookup_request_writer,
true))
{
Expand All @@ -360,23 +363,22 @@ bool TypeLookupManager::create_endpoints()
else
{
logError(TYPELOOKUP_SERVICE, "Typelookup request writer creation failed.");
delete builtin_request_writer_history_;
builtin_request_writer_history_ = nullptr;
return false;
ret = false;
}
}

// Built-in reply writer
if (builtin_protocols_->m_att.typelookup_config.use_server)
if (ret && builtin_protocols_->m_att.typelookup_config.use_server)
{
reply_listener_ = new TypeLookupReplyListener(this);
builtin_reply_writer_history_ = new WriterHistory(hatt);

RTPSWriter* rep_writer;
if (participant_->createWriter(
&rep_writer,
watt,
builtin_reply_writer_history_,
nullptr,
reply_listener_,
fastrtps::rtps::c_EntityId_TypeLookup_reply_writer,
true))
{
Expand All @@ -386,9 +388,7 @@ bool TypeLookupManager::create_endpoints()
else
{
logError(TYPELOOKUP_SERVICE, "Typelookup reply writer creation failed.");
delete builtin_reply_writer_history_;
builtin_reply_writer_history_ = nullptr;
return false;
ret = false;
}
}

Expand All @@ -403,9 +403,12 @@ bool TypeLookupManager::create_endpoints()
ratt.endpoint.durabilityKind = fastrtps::rtps::VOLATILE;

// Built-in request reader
if (builtin_protocols_->m_att.typelookup_config.use_server)
if (ret && builtin_protocols_->m_att.typelookup_config.use_server)
{
request_listener_ = new TypeLookupRequestListener(this);
if (nullptr == request_listener_)
{
request_listener_ = new TypeLookupRequestListener(this);
}
builtin_request_reader_history_ = new ReaderHistory(hatt);

RTPSReader* req_reader;
Expand All @@ -423,18 +426,17 @@ bool TypeLookupManager::create_endpoints()
else
{
logError(TYPELOOKUP_SERVICE, "Typelookup request reader creation failed.");
delete builtin_request_reader_history_;
builtin_request_reader_history_ = nullptr;
delete request_listener_;
request_listener_ = nullptr;
return false;
ret = false;
}
}

// Built-in reply reader
if (builtin_protocols_->m_att.typelookup_config.use_client)
if (ret && builtin_protocols_->m_att.typelookup_config.use_client)
{
reply_listener_ = new TypeLookupReplyListener(this);
if (nullptr == reply_listener_)
{
reply_listener_ = new TypeLookupReplyListener(this);
}
builtin_reply_reader_history_ = new ReaderHistory(hatt);

RTPSReader* rep_reader;
Expand All @@ -452,15 +454,50 @@ bool TypeLookupManager::create_endpoints()
else
{
logError(TYPELOOKUP_SERVICE, "Typelookup reply reader creation failed.");
ret = false;
}
}

// Clean up if something failed.
if (!ret)
{
if (nullptr != builtin_request_writer_history_)
{
delete builtin_request_writer_history_;
builtin_request_writer_history_ = nullptr;
}

if (nullptr != builtin_reply_writer_history_)
{
delete builtin_reply_writer_history_;
builtin_reply_writer_history_ = nullptr;
}

if (nullptr != builtin_request_reader_history_)
{
delete builtin_request_reader_history_;
builtin_request_reader_history_ = nullptr;
}

if (nullptr != builtin_reply_reader_history_)
{
delete builtin_reply_reader_history_;
builtin_reply_reader_history_ = nullptr;
}

if (nullptr != request_listener_)
{
delete request_listener_;
request_listener_ = nullptr;
}
if (nullptr != reply_listener_)
{
delete reply_listener_;
reply_listener_ = nullptr;
return false;
}
}

return true;
return ret;
}

/* TODO Implement if security is needed.
Expand Down Expand Up @@ -558,7 +595,13 @@ bool TypeLookupManager::send_request(
SerializedPayload_t payload;
payload.max_size = change->serializedPayload.max_size - 4;
payload.data = change->serializedPayload.data + 4;
if (valid && request_type_.serialize(&req, &payload))

bool serialize_ret = request_type_.serialize(&req, &payload);
if (!serialize_ret)
{
payload.data = nullptr;
}
else if (valid)
{
change->serializedPayload.length += payload.length;
change->serializedPayload.pos += payload.pos;
Expand Down Expand Up @@ -599,7 +642,13 @@ bool TypeLookupManager::send_reply(
SerializedPayload_t payload;
payload.max_size = change->serializedPayload.max_size - 4;
payload.data = change->serializedPayload.data + 4;
if (valid && reply_type_.serialize(&rep, &payload))

bool serialize_ret = reply_type_.serialize(&rep, &payload);
if (!serialize_ret)
{
payload.data = nullptr;
}
else if (valid)
{
change->serializedPayload.length += payload.length;
change->serializedPayload.pos += payload.pos;
Expand Down Expand Up @@ -684,6 +733,18 @@ const fastrtps::rtps::GUID_t& TypeLookupManager::get_builtin_request_writer_guid
return c_Guid_Unknown;
}

void TypeLookupManager::request_cache_change_acked(
fastrtps::rtps::CacheChange_t* change)
{
builtin_request_writer_history_->remove_change(change);
}

void TypeLookupManager::reply_cache_change_acked(
fastrtps::rtps::CacheChange_t* change)
{
builtin_reply_writer_history_->remove_change(change);
}

} // namespace builtin
} // namespace dds
} // namespace fastdds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ void TypeLookupReplyListener::onNewCacheChangeAdded(
reader->getHistory()->remove_change(change);
}

void TypeLookupReplyListener::onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change)
{
tlm_->reply_cache_change_acked(change);
}

} // namespace builtin
} // namespace dds
} // namespace fastdds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ void TypeLookupRequestListener::onNewCacheChangeAdded(
reader->getHistory()->remove_change(change);
}

void TypeLookupRequestListener::onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change)
{
tlm_->request_cache_change_acked(change);
}

} // namespace builtin
} // namespace dds
} // namespace fastdds
Expand Down
Loading