Skip to content

Commit

Permalink
Monitor service properly managing instances (#4564)
Browse files Browse the repository at this point in the history
* Refs #20625: Add BB tests

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #20625: Add Monitor Service Fix. Use DataWriterHistory

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #20625: Fix MonitorService Unittests

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #20625: Apply rev suggestion

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #20625: Add more comments in BB tests

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
(cherry picked from commit 84960eb)

# Conflicts:
#	test/unittest/statistics/rtps/CMakeLists.txt
  • Loading branch information
Mario-DL authored and mergify[bot] committed Mar 25, 2024
1 parent 11e1e3e commit a6280de
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 52 deletions.
38 changes: 31 additions & 7 deletions src/cpp/statistics/rtps/monitor-service/MonitorService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

#include <statistics/rtps/monitor-service/MonitorService.hpp>

#include <fastdds/publisher/DataWriterHistory.hpp>
#include <fastdds/statistics/topic_names.hpp>
#include <fastrtps/utils/TimeConversion.h>

#include <rtps/history/PoolConfig.h>
#include <rtps/history/TopicPayloadPoolRegistry.hpp>

#include <statistics/rtps/StatisticsBase.hpp>

using namespace eprosima::fastrtps;
Expand Down Expand Up @@ -461,9 +462,13 @@ bool MonitorService::add_change(
MonitorServiceStatusData& status_data,
const bool& disposed)
{
InstanceHandle_t handle;
type_.getKey(&status_data, &handle, false);

CacheChange_t* change = status_writer_->new_change(
type_.getSerializedSizeProvider(&status_data),
(disposed ? fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED : fastrtps::rtps::ALIVE));
(disposed ? fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED : fastrtps::rtps::ALIVE),
handle);

if (nullptr != change)
{
Expand All @@ -476,8 +481,13 @@ bool MonitorService::add_change(
return false;
}

type_.getKey(&status_data, &change->instanceHandle);
status_writer_history_->add_change(change);
WriteParams wp;
auto datawriter_history = static_cast<eprosima::fastdds::dds::DataWriterHistory*>(status_writer_history_.get());

std::unique_lock<RecursiveTimedMutex> lock(status_writer_->getMutex());
auto max_blocking_time = std::chrono::steady_clock::now() +
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(Duration_t()));
datawriter_history->add_pub_change(change, wp, lock, max_blocking_time);
}
else
{
Expand Down Expand Up @@ -511,11 +521,25 @@ bool MonitorService::create_endpoint()
watts.endpoint.properties.properties().push_back(std::move(property));

HistoryAttributes hatt;
hatt.payloadMaxSize = BUILTIN_DATA_MAX_SIZE;
hatt.payloadMaxSize = type_.m_typeSize;
hatt.memoryPolicy = MemoryManagementPolicy_t::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;
hatt.initialReservedCaches = 25;

status_writer_history_.reset(new WriterHistory(hatt));
hatt.maximumReservedCaches = 0;

TopicAttributes tatt;
tatt.historyQos.kind = KEEP_LAST_HISTORY_QOS;
tatt.historyQos.depth = 1;
tatt.topicKind = WITH_KEY;
tatt.topicName = MONITOR_SERVICE_TOPIC;
tatt.resourceLimitsQos.max_instances = 0;
tatt.resourceLimitsQos.max_samples_per_instance = 1;

status_writer_history_.reset(new eprosima::fastdds::dds::DataWriterHistory(tatt, type_.m_typeSize,
MemoryManagementPolicy_t::PREALLOCATED_WITH_REALLOC_MEMORY_MODE,
[](
const InstanceHandle_t& ) -> void
{
}));

PoolConfig writer_pool_cfg = PoolConfig::from_history_attributes(hatt);
status_writer_payload_pool_ = TopicPayloadPoolRegistry::get(MONITOR_SERVICE_TOPIC, writer_pool_cfg);
Expand Down
Loading

0 comments on commit a6280de

Please sign in to comment.