Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rclcpp] add WaitSet class and modify entities to work without executor #1047

Merged
merged 21 commits into from
Apr 13, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
style and cpplint
Signed-off-by: William Woodall <[email protected]>
wjwwood committed Apr 13, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 08381e371145d024f93530b540d4bacc90a5afa2
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/context.hpp
Original file line number Diff line number Diff line change
@@ -159,7 +159,7 @@ class Context : public std::enable_shared_from_this<Context>
*
* \param[in] reason the description of why shutdown happened
* \return true if shutdown was successful, false if context was already shutdown
* \throw various exceptions derived from RCLErrorBase, if rcl_shutdown fails
* \throw various exceptions derived from rclcpp::exceptions::RCLError, if rcl_shutdown fails
*/
RCLCPP_PUBLIC
virtual
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/guard_condition.hpp
Original file line number Diff line number Diff line change
@@ -39,13 +39,13 @@ class GuardCondition
* Shared ownership of the context is held with the guard condition until
* destruction.
* \throws std::invalid_argument if the context is nullptr.
* \throws rclcpp::exceptions::RCLErrorBase based exceptions when underlying
* \throws rclcpp::exceptions::RCLError based exceptions when underlying
* rcl functions fail.
*/
RCLCPP_PUBLIC
explicit GuardCondition(
rclcpp::Context::SharedPtr context =
rclcpp::contexts::default_context::get_global_default_context());
rclcpp::contexts::default_context::get_global_default_context());

RCLCPP_PUBLIC
virtual
@@ -66,7 +66,7 @@ class GuardCondition
* This function is thread-safe, and may be called concurrently with waiting
* on this guard condition in a wait set.
*
* \throws rclcpp::exceptions::RCLErrorBase based exceptions when underlying
* \throws rclcpp::exceptions::RCLError based exceptions when underlying
* rcl functions fail.
*/
RCLCPP_PUBLIC
15 changes: 10 additions & 5 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
@@ -186,7 +186,8 @@ class Subscription : public SubscriptionBase
}

/// Called after construction to continue setup that requires shared_from_this().
void post_init_setup(
void
post_init_setup(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rclcpp::QoS & qos,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options)
@@ -252,7 +253,8 @@ class Subscription : public SubscriptionBase

void
handle_loaned_message(
void * loaned_message, const rclcpp::MessageInfo & message_info) override
void * loaned_message,
const rclcpp::MessageInfo & message_info) override
{
auto typed_message = static_cast<CallbackMessageT *>(loaned_message);
// message is loaned, so we have to make sure that the deleter does not deallocate the message
@@ -263,18 +265,21 @@ class Subscription : public SubscriptionBase

/// Return the borrowed message.
/** \param message message to be returned */
void return_message(std::shared_ptr<void> & message) override
void
return_message(std::shared_ptr<void> & message) override
{
auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
message_memory_strategy_->return_message(typed_message);
}

void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) override
void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) override
{
message_memory_strategy_->return_serialized_message(message);
}

bool use_take_shared_method() const
bool
use_take_shared_method() const
{
return any_callback_.use_take_shared_method();
}
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/timer.hpp
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ class TimerBase
/**
* \return true if the timer has been cancelled, false otherwise
* \throws std::runtime_error if the rcl_get_error_state returns 0
* \throws RCLErrorBase some child class exception based on ret
* \throws rclcpp::exceptions::RCLError some child class exception based on ret
*/
RCLCPP_PUBLIC
bool
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/utilities.hpp
Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@ init_and_remove_ros_arguments(
* \param[in] argv Argument vector.
* \returns Members of the argument vector that are not ROS arguments.
* \throws anything throw_from_rcl_error can throw
* \throws rclcpp::exceptions::RCLErrorBase if the parsing fails
* \throws rclcpp::exceptions::RCLError if the parsing fails
*/
RCLCPP_PUBLIC
std::vector<std::string>
Original file line number Diff line number Diff line change
@@ -94,7 +94,11 @@ class StoragePolicyCommon
}

// (Re)build the wait set for the first time.
this->storage_rebuild_rcl_wait_set_with_sets(subscriptions, guard_conditions, timers, waitables);
this->storage_rebuild_rcl_wait_set_with_sets(
subscriptions,
guard_conditions,
timers,
waitables);
}

~StoragePolicyCommon()
Original file line number Diff line number Diff line change
@@ -32,18 +32,18 @@ class SynchronizationPolicyCommon
SynchronizationPolicyCommon() = default;
~SynchronizationPolicyCommon() = default;

std::function<bool ()>
std::function<bool()>
create_loop_predicate(
std::chrono::nanoseconds time_to_wait_ns,
std::chrono::steady_clock::time_point start)
{
if (time_to_wait_ns >= std::chrono::nanoseconds(0)) {
// If time_to_wait_ns is >= 0 schedule against a deadline.
auto deadline = start + time_to_wait_ns;
return [deadline]() -> bool { return std::chrono::steady_clock::now() < deadline; };
return [deadline]() -> bool {return std::chrono::steady_clock::now() < deadline;};
} else {
// In the case of time_to_wait_ns < 0, just always return true to loop forever.
return []() -> bool { return true; };
return []() -> bool {return true;};
}
}

53 changes: 28 additions & 25 deletions rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp
Original file line number Diff line number Diff line change
@@ -16,8 +16,9 @@
#define RCLCPP__WAIT_SET_POLICIES__DYNAMIC_STORAGE_HPP_

#include <algorithm>
#include <array>
#include <memory>
#include <utility>
#include <vector>

#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
@@ -41,7 +42,9 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo

class SubscriptionEntry
{
public:
// (wjwwood): indent of 'public:' is weird, I know. uncrustify is dumb.

public:
std::shared_ptr<rclcpp::SubscriptionBase> subscription;
rclcpp::SubscriptionWaitSetMask mask;

@@ -61,7 +64,7 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo
};
class WeakSubscriptionEntry
{
public:
public:
std::weak_ptr<rclcpp::SubscriptionBase> subscription;
rclcpp::SubscriptionWaitSetMask mask;

@@ -100,7 +103,7 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo

class WaitableEntry
{
public:
public:
std::shared_ptr<rclcpp::Waitable> waitable;
std::shared_ptr<void> associated_entity;

@@ -121,7 +124,7 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo
};
class WeakWaitableEntry
{
public:
public:
std::weak_ptr<rclcpp::Waitable> waitable;
std::weak_ptr<void> associated_entity;

@@ -192,7 +195,7 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo
return std::any_of(
entities.cbegin(),
entities.cend(),
[&entity](const auto & inner) { return &entity == inner.lock().get(); });
[&entity](const auto & inner) {return &entity == inner.lock().get();});
}

template<class EntityT, class SequenceOfEntitiesT>
@@ -203,7 +206,7 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo
return std::find_if(
entities.cbegin(),
entities.cend(),
[&entity](const auto & inner) { return &entity == inner.lock().get(); });
[&entity](const auto & inner) {return &entity == inner.lock().get();});
}

void
@@ -323,26 +326,26 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo
}
// Setup common locking function.
auto lock_all = [](const auto & weak_ptrs, auto & shared_ptrs) {
shared_ptrs.resize(weak_ptrs.size());
size_t index = 0;
for (const auto & weak_ptr : weak_ptrs) {
shared_ptrs[index++] = weak_ptr.lock();
}
};
shared_ptrs.resize(weak_ptrs.size());
size_t index = 0;
for (const auto & weak_ptr : weak_ptrs) {
shared_ptrs[index++] = weak_ptr.lock();
}
};
// Lock all the weak pointers and hold them until released.
lock_all(guard_conditions_, shared_guard_conditions_);
lock_all(timers_, shared_timers_);

// We need a specialized version of this for waitables.
auto lock_all_waitables = [](const auto & weak_ptrs, auto & shared_ptrs) {
shared_ptrs.resize(weak_ptrs.size());
size_t index = 0;
for (const auto & weak_ptr : weak_ptrs) {
shared_ptrs[index++] = WaitableEntry{
weak_ptr.waitable.lock(),
weak_ptr.associated_entity.lock()};
}
};
shared_ptrs.resize(weak_ptrs.size());
size_t index = 0;
for (const auto & weak_ptr : weak_ptrs) {
shared_ptrs[index++] = WaitableEntry{
weak_ptr.waitable.lock(),
weak_ptr.associated_entity.lock()};
}
};
lock_all_waitables(waitables_, shared_waitables_);
}

@@ -355,10 +358,10 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo
}
// "Unlock" all shared pointers by resetting them.
auto reset_all = [](auto & shared_ptrs) {
for (auto & shared_ptr : shared_ptrs) {
shared_ptr.reset();
}
};
for (auto & shared_ptr : shared_ptrs) {
shared_ptr.reset();
}
};
reset_all(shared_guard_conditions_);
reset_all(shared_timers_);
reset_all(shared_waitables_);
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
#include <chrono>
#include <functional>
#include <memory>
#include <utility>

#include "rclcpp/exceptions.hpp"
#include "rclcpp/guard_condition.hpp"
@@ -52,7 +53,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
std::shared_ptr<rclcpp::SubscriptionBase> && subscription,
const rclcpp::SubscriptionWaitSetMask & mask,
std::function<
void (std::shared_ptr<rclcpp::SubscriptionBase> &&, const rclcpp::SubscriptionWaitSetMask &)
void(std::shared_ptr<rclcpp::SubscriptionBase>&&, const rclcpp::SubscriptionWaitSetMask &)
> add_subscription_function)
{
// Explicitly no thread synchronization.
@@ -68,7 +69,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
std::shared_ptr<rclcpp::SubscriptionBase> && subscription,
const rclcpp::SubscriptionWaitSetMask & mask,
std::function<
void (std::shared_ptr<rclcpp::SubscriptionBase> &&, const rclcpp::SubscriptionWaitSetMask &)
void(std::shared_ptr<rclcpp::SubscriptionBase>&&, const rclcpp::SubscriptionWaitSetMask &)
> remove_subscription_function)
{
// Explicitly no thread synchronization.
@@ -82,7 +83,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
void
sync_add_guard_condition(
std::shared_ptr<rclcpp::GuardCondition> && guard_condition,
std::function<void (std::shared_ptr<rclcpp::GuardCondition> &&)> add_guard_condition_function)
std::function<void(std::shared_ptr<rclcpp::GuardCondition>&&)> add_guard_condition_function)
{
// Explicitly no thread synchronization.
add_guard_condition_function(std::move(guard_condition));
@@ -95,7 +96,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
void
sync_remove_guard_condition(
std::shared_ptr<rclcpp::GuardCondition> && guard_condition,
std::function<void (std::shared_ptr<rclcpp::GuardCondition> &&)> remove_guard_condition_function)
std::function<void(std::shared_ptr<rclcpp::GuardCondition>&&)> remove_guard_condition_function)
{
// Explicitly no thread synchronization.
remove_guard_condition_function(std::move(guard_condition));
@@ -108,7 +109,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
void
sync_add_timer(
std::shared_ptr<rclcpp::TimerBase> && timer,
std::function<void (std::shared_ptr<rclcpp::TimerBase> &&)> add_timer_function)
std::function<void(std::shared_ptr<rclcpp::TimerBase>&&)> add_timer_function)
{
// Explicitly no thread synchronization.
add_timer_function(std::move(timer));
@@ -121,7 +122,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
void
sync_remove_timer(
std::shared_ptr<rclcpp::TimerBase> && timer,
std::function<void (std::shared_ptr<rclcpp::TimerBase> &&)> remove_timer_function)
std::function<void(std::shared_ptr<rclcpp::TimerBase>&&)> remove_timer_function)
{
// Explicitly no thread synchronization.
remove_timer_function(std::move(timer));
@@ -136,7 +137,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
std::shared_ptr<rclcpp::Waitable> && waitable,
std::shared_ptr<void> && associated_entity,
std::function<
void (std::shared_ptr<rclcpp::Waitable> &&, std::shared_ptr<void> &&)
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void>&&)
> add_waitable_function)
{
// Explicitly no thread synchronization.
@@ -150,7 +151,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
void
sync_remove_waitable(
std::shared_ptr<rclcpp::Waitable> && waitable,
std::function<void (std::shared_ptr<rclcpp::Waitable> &&)> remove_waitable_function)
std::function<void(std::shared_ptr<rclcpp::Waitable>&&)> remove_waitable_function)
{
// Explicitly no thread synchronization.
remove_waitable_function(std::move(waitable));
@@ -161,7 +162,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
* Does not throw, but storage function may throw.
*/
void
sync_prune_deleted_entities(std::function<void ()> prune_deleted_entities_function)
sync_prune_deleted_entities(std::function<void()> prune_deleted_entities_function)
{
// Explicitly no thread synchronization.
prune_deleted_entities_function();
@@ -172,9 +173,9 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
WaitResultT
sync_wait(
std::chrono::nanoseconds time_to_wait_ns,
std::function<void ()> rebuild_rcl_wait_set,
std::function<void()> rebuild_rcl_wait_set,
std::function<rcl_wait_set_t & ()> get_rcl_wait_set,
std::function<WaitResultT (WaitResultKind wait_result_kind)> create_wait_result)
std::function<WaitResultT(WaitResultKind wait_result_kind)> create_wait_result)
{
// Assumption: this function assumes that some measure has been taken to
// ensure none of the entities being waited on by the wait set are allowed
@@ -187,7 +188,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon

// Setup looping predicate.
auto start = std::chrono::steady_clock::now();
std::function<bool ()> should_loop = this->create_loop_predicate(time_to_wait_ns, start);
std::function<bool()> should_loop = this->create_loop_predicate(time_to_wait_ns, start);

// Wait until exit condition is met.
do {
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ class StaticStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCom

class SubscriptionEntry
{
public:
public:
std::shared_ptr<rclcpp::SubscriptionBase> subscription;
rclcpp::SubscriptionWaitSetMask mask;

5 changes: 3 additions & 2 deletions rclcpp/include/rclcpp/wait_set_template.hpp
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@

#include <chrono>
#include <memory>
#include <utility>

#include "rcl/wait.h"

@@ -70,7 +71,7 @@ class WaitSetTemplate final : private StoragePolicy, private SynchronizationPoli
const typename StoragePolicy::TimersIterable & timers = {},
const typename StoragePolicy::WaitablesIterable & waitables = {},
rclcpp::Context::SharedPtr context =
rclcpp::contexts::default_context::get_global_default_context())
rclcpp::contexts::default_context::get_global_default_context())
: StoragePolicy(
subscriptions,
guard_conditions,
@@ -475,7 +476,7 @@ class WaitSetTemplate final : private StoragePolicy, private SynchronizationPoli
* when time_to_wait is < 0, or
* \returns Empty if the wait set is empty, avoiding the possibility of
* waiting indefinitely on an empty wait set.
* \throws rclcpp::exceptions::RCLErrorBase on unhandled rcl errors
* \throws rclcpp::exceptions::RCLError on unhandled rcl errors
*/
template<class Rep = int64_t, class Period = std::milli>
RCUTILS_WARN_UNUSED
Loading