Skip to content

Commit

Permalink
[SubscribeHandler]: added getNumSubscribers() and setNoMessageTimeout…
Browse files Browse the repository at this point in the history
…() methods
  • Loading branch information
matemat13 committed May 13, 2024
1 parent 8566e70 commit 730b2a9
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 19 deletions.
69 changes: 51 additions & 18 deletions include/mrs_lib/impl/subscribe_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,20 @@ namespace mrs_lib
m_queue_size(options.queue_size),
m_transport_hints(options.transport_hints)
{
// initialize the callback for the TimeoutManager
if (options.timeout_callback)
m_timeout_mgr_callback = std::bind(options.timeout_callback, topicName(), std::placeholders::_1);
else
m_timeout_mgr_callback = std::bind(&Impl::default_timeout_callback, this, topicName(), std::placeholders::_1);

if (options.no_message_timeout != mrs_lib::no_timeout)
{
// initialize a new TimeoutManager if not provided by the user
if (!m_timeout_manager)
m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_nh, ros::Rate(2.0/options.no_message_timeout.toSec()));

// initialize the callback for the TimeoutManager
std::function<void(const ros::Time&)> timeout_mgr_callback;
if (options.timeout_callback)
timeout_mgr_callback = std::bind(options.timeout_callback, topicName(), std::placeholders::_1);
else
timeout_mgr_callback = std::bind(&Impl::default_timeout_callback, this, topicName(), std::placeholders::_1);
m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_nh, ros::Rate(options.no_message_timeout * 0.5));

// register the timeout callback with the TimeoutManager
m_timeout_id = m_timeout_manager->registerNew(options.no_message_timeout, timeout_mgr_callback);
m_timeout_id = m_timeout_manager->registerNew(options.no_message_timeout, m_timeout_mgr_callback);
}

const std::string msg = "Subscribed to topic '" + m_topic_name + "' -> '" + topicName() + "'";
Expand Down Expand Up @@ -142,20 +141,53 @@ namespace mrs_lib
}
//}

/* getNumPublishers() method //{ */
virtual uint32_t getNumPublishers() const
{
return m_sub.getNumPublishers();
};
//}

/* setNoMessageTimeout() method //{ */
virtual void setNoMessageTimeout(const ros::Duration& timeout)
{
if (timeout == mrs_lib::no_timeout)
{
// if there is a timeout callback already registered but the user wants to disable it, pause it
if (m_timeout_manager != nullptr && m_timeout_id.has_value())
m_timeout_manager->pause(m_timeout_id.value());
// otherwise, there is no callback, so nothing to do
}
else
{
// if there is no callback manager, create it
if (m_timeout_manager == nullptr)
m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_nh, ros::Rate(timeout * 0.5));

// if there is an existing timeout callback registered, change its timeout
if (m_timeout_id.has_value())
m_timeout_manager->change(m_timeout_id.value(), timeout, m_timeout_mgr_callback);
// otherwise, register it
else
m_timeout_id = m_timeout_manager->registerNew(timeout, m_timeout_mgr_callback);
}
}
//}

/* start() method //{ */
virtual void start()
{
if (m_timeout_manager)
m_timeout_manager->start(m_timeout_id);
if (m_timeout_manager && m_timeout_id.has_value())
m_timeout_manager->start(m_timeout_id.value());
m_sub = m_nh.subscribe(m_topic_name, m_queue_size, &Impl::data_callback, this, m_transport_hints);
}
//}

/* stop() method //{ */
virtual void stop()
{
if (m_timeout_manager)
m_timeout_manager->pause(m_timeout_id);
if (m_timeout_manager && m_timeout_id.has_value())
m_timeout_manager->pause(m_timeout_id.value());
m_sub.shutdown();
}
//}
Expand All @@ -179,7 +211,8 @@ namespace mrs_lib

protected:
std::shared_ptr<mrs_lib::TimeoutManager> m_timeout_manager;
mrs_lib::TimeoutManager::timeout_id_t m_timeout_id;
std::optional<mrs_lib::TimeoutManager::timeout_id_t> m_timeout_id;
mrs_lib::TimeoutManager::callback_t m_timeout_mgr_callback;

protected:
ros::Time m_latest_message_time;
Expand Down Expand Up @@ -223,8 +256,8 @@ namespace mrs_lib
{
{
std::lock_guard lck(m_new_data_mtx);
if (m_timeout_manager)
m_timeout_manager->reset(m_timeout_id);
if (m_timeout_manager && m_timeout_id.has_value())
m_timeout_manager->reset(m_timeout_id.value());
process_new_message(msg);
}

Expand Down Expand Up @@ -309,8 +342,8 @@ namespace mrs_lib
{
{
std::scoped_lock lck(m_mtx, this->m_new_data_mtx);
if (this->m_timeout_manager)
this->m_timeout_manager->reset(this->m_timeout_id);
if (this->m_timeout_manager && this->m_timeout_id.has_value())
this->m_timeout_manager->reset(this->m_timeout_id.value());
impl_class_t::process_new_message(msg);
}

Expand Down
17 changes: 16 additions & 1 deletion include/mrs_lib/subscribe_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ namespace mrs_lib
/*!
* \brief Blocks until new data becomes available or until the timeout runs out or until a spurious wake-up.
*
* \return the message if a new message is available after waking up, \p nullptr otherwise.
* \param timeout after this duration, this method will return a nullptr if no new data becomes available.
* \return the message if a new message is available after waking up, \p nullptr otherwise.
*/
virtual typename MessageType::ConstPtr waitForNew(const ros::WallDuration& timeout) {assert(m_pimpl); return m_pimpl->waitForNew(timeout);};

Expand All @@ -161,6 +162,20 @@ namespace mrs_lib
*/
virtual std::string subscribedTopicName() const {assert(m_pimpl); return m_pimpl->m_topic_name;};

/*!
* \brief Returns number of publishers registered at the topic.
*
* \return number of publishers.
*/
virtual uint32_t getNumPublishers() const {assert(m_pimpl); return m_pimpl->getNumPublishers();};

/*!
* \brief Sets the timeout for no received message.
*
* \param timeout The new timeout for no received messages.
*/
virtual void setNoMessageTimeout(const ros::Duration& timeout) {assert(m_pimpl); return m_pimpl->setNoMessageTimeout(timeout);};

/*!
* \brief Enables the callbacks for the handled topic.
*
Expand Down

0 comments on commit 730b2a9

Please sign in to comment.