Skip to content

Commit

Permalink
RTPS WriterHistory refactor (#4966)
Browse files Browse the repository at this point in the history
* Refs #21082. Remove templated version of `new_change`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Call simple version of `new_change` in rtps examples.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Call simple version of `new_change` in `TypeLookupManager`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor on EDP.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor on PDP.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor on SecurityManager.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor on unit tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor on WLP.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor on MonitorService.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor on blackbox tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Fix blackbox tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Remove `History::do_reserve_cache`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Remove `new_change` overload receiving `std::function`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Remove `is_pool_initialized`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. WriterHistory keeps a change pool.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Implementation of `release_change` moved to WriterHistory.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Remove `release_change` from `RTPSWriter`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Add `create_change` to `WriterHistory`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Change calls from `new_change` to `create_change`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Remove `new_change` from `RTPSWriter`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Remove `remove_older_changes` from `RTPSWriter`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Add payload pool to `WriterHistory`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Add new `create_change` overload.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Using new `create_change` overload where relevant.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor on `IPersistenceService`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor on `DataWriterHistory`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Several methods to create writers removed.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Move data sharing pool initialization to DataWriterImpl.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor creation of rtps writer.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Refactor PersistentWriter.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Remove constructors taking pools.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. RTPSWriter does not handle pools.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Add pool getters to WriterHistory.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Remove pool references on StatexxxWriter.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Move pools from `Endpoint` to `BaseReader`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Avoid accessing history attributes.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Fixes on SecurityManager.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Fix build after rebase.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Please linters.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Fix release order in `WriterHistory` destructor.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Move datasharing pool initialization to `RTPSWriter`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21082. Fix negative tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21137. Apply review suggestions.

Signed-off-by: Miguel Company <[email protected]>

* Refs #21137. Apply suggestions from code review

Signed-off-by: Miguel Company <[email protected]>

Co-authored-by: Eduardo Ponz Segrelles <[email protected]>

* Refs #21137. Add note to versions.md

Signed-off-by: eduponz <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>
Signed-off-by: eduponz <[email protected]>
Co-authored-by: Eduardo Ponz Segrelles <[email protected]>
  • Loading branch information
MiguelCompany and EduPonz committed Jun 27, 2024
1 parent 4c9f1d6 commit d3ca40c
Show file tree
Hide file tree
Showing 77 changed files with 867 additions and 1,307 deletions.
5 changes: 1 addition & 4 deletions examples/cpp/rtps/AsSocket/TestWriterSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,7 @@ void TestWriterSocket::run(
{
for (int i = 0; i < nmsgs; ++i )
{
CacheChange_t* ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
CacheChange_t* ch = mp_history->create_change(255, ALIVE);
#if defined(_WIN32)
ch->serializedPayload.length =
sprintf_s((char*)ch->serializedPayload.data, 255, "My example string %d", i) + 1;
Expand Down
12 changes: 3 additions & 9 deletions examples/cpp/rtps/Persistent/TestWriterPersistent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,12 @@ void TestWriterPersistent::run(

for (int i = 0; i < samples; ++i )
{
CacheChange_t* ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
CacheChange_t* ch = mp_history->create_change(255, ALIVE);
if (!ch) // In the case history is full, remove some old changes
{
std::cout << "cleaning history...";
mp_writer->remove_older_changes(20);
ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
mp_history->remove_min_change();
ch = mp_history->create_change(255, ALIVE);
}

#if defined(_WIN32)
Expand Down
12 changes: 3 additions & 9 deletions examples/cpp/rtps/Registered/TestWriterRegistered.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,12 @@ void TestWriterRegistered::run(

for (int i = 0; i < samples; ++i )
{
CacheChange_t* ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
CacheChange_t* ch = mp_history->create_change(255, ALIVE);
if (!ch) // In the case history is full, remove some old changes
{
std::cout << "cleaning history...";
mp_writer->remove_older_changes(20);
ch = mp_writer->new_change([]() -> uint32_t
{
return 255;
}, ALIVE);
mp_history->remove_min_change();
ch = mp_history->create_change(255, ALIVE);
}

#if defined(_WIN32)
Expand Down
11 changes: 0 additions & 11 deletions include/fastdds/rtps/Endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ class Endpoint

virtual ~Endpoint()
{
// As releasing the change pool will delete the cache changes it owns,
// the payload pool may be called to release their payloads, so we should
// ensure that the payload pool is destroyed after the change pool.
change_pool_.reset();
payload_pool_.reset();
}

public:
Expand Down Expand Up @@ -120,12 +115,6 @@ class Endpoint
//!Endpoint Mutex
mutable RecursiveTimedMutex mp_mutex;

//!Pool of serialized payloads.
std::shared_ptr<IPayloadPool> payload_pool_;

//!Pool of cache changes.
std::shared_ptr<IChangePool> change_pool_;

//!Fixed size of payloads
uint32_t fixed_payload_size_ = 0;

Expand Down
83 changes: 11 additions & 72 deletions include/fastdds/rtps/RTPSDomain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,92 +113,32 @@ class RTPSDomain

/**
* Create a RTPSWriter in a participant.
* @param p Pointer to the RTPSParticipant.
* @param watt Writer Attributes.
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @return Pointer to the created RTPSWriter.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
* so its use may result in undefined behaviour.
*/
FASTDDS_EXPORTED_API static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
WriterAttributes& watt,
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Create a RTPSWriter in a participant using a custom payload pool.
* @param p Pointer to the RTPSParticipant.
* @param watt Writer Attributes.
* @param payload_pool Shared pointer to the IPayloadPool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @return Pointer to the created RTPSWriter.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
* so its use may result in undefined behaviour.
*/
FASTDDS_EXPORTED_API static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Create a RTPSWriter in a participant using a custom payload pool.
* @param p Pointer to the RTPSParticipant.
* @param watt Writer Attributes.
* @param payload_pool Shared pointer to the IPayloadPool
* @param change_pool Shared pointer to the IChangePool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @return Pointer to the created RTPSWriter.
* @param p Pointer to the RTPSParticipant.
* @param watt Writer Attributes.
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
* so its use may result in undefined behaviour.
*/
FASTDDS_EXPORTED_API static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Create a RTPSWriter in a participant using a custom payload pool.
* @param p Pointer to the RTPSParticipant.
* @param entity_id Specific entity id to use for the created writer.
* @param watt Writer Attributes.
* @param payload_pool Shared pointer to the IPayloadPool
* @param change_pool Shared pointer to the IChangePool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @return Pointer to the created RTPSWriter.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
* so its use may result in undefined behaviour.
*/
FASTDDS_EXPORTED_API static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
const EntityId_t& entity_id,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);

/**
* Create a RTPSWriter in a participant.
* @param p Pointer to the RTPSParticipant.
* @param entity_id Specific entity id to use for the created writer.
* @param watt Writer Attributes.
* @param payload_pool Shared pointer to the IPayloadPool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
*
* @param p Pointer to the RTPSParticipant.
* @param entity_id Specific entity id to use for the created writer.
* @param watt Writer Attributes.
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
*
* @return Pointer to the created RTPSWriter.
*
* \warning The returned pointer is invalidated after a call to removeRTPSWriter() or stopAll(),
Expand All @@ -208,7 +148,6 @@ class RTPSDomain
RTPSParticipant* p,
const EntityId_t& entity_id,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);

Expand Down
4 changes: 0 additions & 4 deletions include/fastdds/rtps/history/History.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,6 @@ class History
//!Print the seqNum of the changes in the History (for debuggisi, mng purposes).
void print_changes_seqNum2();

FASTDDS_EXPORTED_API virtual bool do_reserve_cache(
CacheChange_t** change,
uint32_t size) = 0;

FASTDDS_EXPORTED_API virtual void do_release_cache(
CacheChange_t* ch) = 0;

Expand Down
4 changes: 0 additions & 4 deletions include/fastdds/rtps/history/ReaderHistory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ class ReaderHistory : public History

protected:

FASTDDS_EXPORTED_API bool do_reserve_cache(
CacheChange_t** change,
uint32_t size) override;

FASTDDS_EXPORTED_API void do_release_cache(
CacheChange_t* ch) override;

Expand Down
115 changes: 106 additions & 9 deletions include/fastdds/rtps/history/WriterHistory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,23 @@
#ifndef FASTDDS_RTPS_HISTORY__WRITERHISTORY_HPP
#define FASTDDS_RTPS_HISTORY__WRITERHISTORY_HPP

#include <fastdds/rtps/history/History.hpp>
#include <cstdint>
#include <memory>

#include <fastdds/fastdds_dll.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/common/CacheChange.hpp>
#include <fastdds/rtps/common/ChangeKind_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.hpp>
#include <fastdds/rtps/history/History.hpp>
#include <fastdds/rtps/history/IChangePool.hpp>
#include <fastdds/rtps/history/IPayloadPool.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

class HistoryAttributes;
class RTPSWriter;
class WriteParams;

Expand All @@ -48,12 +58,81 @@ class WriterHistory : public rtps::History
public:

/**
* Constructor of the WriterHistory.
* @brief Construct a WriterHistory.
*
* @param att Attributes configuring the WriterHistory.
*/
FASTDDS_EXPORTED_API WriterHistory(
const HistoryAttributes& att);

/**
* @brief Construct a WriterHistory with a custom payload pool.
*
* @param att Attributes configuring the WriterHistory.
* @param payload_pool Pool of payloads to be used by the WriterHistory.
*/
FASTDDS_EXPORTED_API WriterHistory(
const HistoryAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool);

/**
* @brief Construct a WriterHistory with custom payload and change pools.
*
* @param att Attributes configuring the WriterHistory.
* @param payload_pool Pool of payloads to be used by the WriterHistory.
* @param change_pool Pool of changes to be used by the WriterHistory.
*/
FASTDDS_EXPORTED_API WriterHistory(
const HistoryAttributes& att);
const HistoryAttributes& att,
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool);

FASTDDS_EXPORTED_API virtual ~WriterHistory() override;

/**
* @brief Get the payload pool used by this history.
*
* @return Reference to the payload pool used by this history.
*/
FASTDDS_EXPORTED_API const std::shared_ptr<IPayloadPool>& get_payload_pool() const;

/**
* @brief Get the change pool used by this history.
*
* @return Reference to the change pool used by this history.
*/
FASTDDS_EXPORTED_API const std::shared_ptr<IChangePool>& get_change_pool() const;

/**
* @brief Create a new CacheChange_t object.
*
* @param change_kind Kind of the change.
* @param handle InstanceHandle_t of the change.
*
* @return Pointer to the new CacheChange_t object.
*
* @pre A writer has been associated with this history
*/
FASTDDS_EXPORTED_API CacheChange_t* create_change(
ChangeKind_t change_kind,
InstanceHandle_t handle = c_InstanceHandle_Unknown);

/**
* @brief Create a new CacheChange_t object with a specific payload size.
*
* @param payload_size Size of the payload.
* @param change_kind Kind of the change.
* @param handle InstanceHandle_t of the change.
*
* @return Pointer to the new CacheChange_t object.
*
* @pre A writer has been associated with this history
*/
FASTDDS_EXPORTED_API CacheChange_t* create_change(
uint32_t payload_size,
ChangeKind_t change_kind,
InstanceHandle_t handle = c_InstanceHandle_Unknown);

/**
* Add a CacheChange_t to the WriterHistory.
* @param a_change Pointer to the CacheChange_t to be added.
Expand Down Expand Up @@ -141,11 +220,24 @@ class WriterHistory : public rtps::History
return m_lastCacheChangeSeqNum + 1;
}

protected:
/**
* Release a change when it is not being used anymore.
*
* @param ch Pointer to the cache change to be released.
*
* @returns whether the operation succeeded or not
*
* @pre
* @li A writer has been associated with this history
* @li @c ch is not @c nullptr
* @li @c ch points to a cache change obtained from a call to @c this->create_change
*
* @post memory pointed to by @c ch is not accessed
*/
FASTDDS_EXPORTED_API bool release_change(
CacheChange_t* ch);

FASTDDS_EXPORTED_API bool do_reserve_cache(
CacheChange_t** change,
uint32_t size) override;
protected:

FASTDDS_EXPORTED_API void do_release_cache(
CacheChange_t* ch) override;
Expand Down Expand Up @@ -211,9 +303,9 @@ class WriterHistory : public rtps::History
}

//!Last CacheChange Sequence Number added to the History.
SequenceNumber_t m_lastCacheChangeSeqNum;
SequenceNumber_t m_lastCacheChangeSeqNum {};
//!Pointer to the associated RTPSWriter;
RTPSWriter* mp_writer;
RTPSWriter* mp_writer = nullptr;

uint32_t high_mark_for_frag_ = 0;

Expand Down Expand Up @@ -248,6 +340,11 @@ class WriterHistory : public rtps::History

void set_fragments(
CacheChange_t* change);

/// Reference to the change pool used by this history.
std::shared_ptr<IChangePool> change_pool_;
/// Reference to the payload pool used by this history.
std::shared_ptr<IPayloadPool> payload_pool_;
};

} // namespace rtps
Expand Down
Loading

0 comments on commit d3ca40c

Please sign in to comment.