Skip to content

Commit

Permalink
Merge pull request #386 from mxgrey/rcl_lifecycles
Browse files Browse the repository at this point in the history
Correct rcl entity lifecycles and fix spurious test failures
  • Loading branch information
maspe36 authored Apr 5, 2024
2 parents 0578a76 + 58b2c66 commit 646869b
Show file tree
Hide file tree
Showing 15 changed files with 512 additions and 263 deletions.
5 changes: 4 additions & 1 deletion rclrs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ path = "src/lib.rs"
# Please keep the list of dependencies alphabetically sorted,
# and also state why each dependency is needed.
[dependencies]
# Needed for dynamically finding type support libraries
# Needed for dynamically finding type support libraries
ament_rs = { version = "0.2", optional = true }

# Needed for uploading documentation to docs.rs
cfg-if = "1.0.0"

# Needed for clients
futures = "0.3"

# Needed for dynamic messages
libloading = { version = "0.8", optional = true }

# Needed for the Message trait, among others
rosidl_runtime_rs = "0.4"

Expand Down
68 changes: 41 additions & 27 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,38 @@ use rosidl_runtime_rs::Message;

use crate::error::{RclReturnCode, ToResult};
use crate::MessageCow;
use crate::{rcl_bindings::*, RclrsError};
use crate::{rcl_bindings::*, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_client_t {}

/// Internal struct used by clients.
/// Manage the lifecycle of an `rcl_client_t`, including managing its dependencies
/// on `rcl_node_t` and `rcl_context_t` by ensuring that these dependencies are
/// [dropped after][1] the `rcl_client_t`.
///
/// [1]: <https://doc.rust-lang.org/reference/destructors.html>
pub struct ClientHandle {
rcl_client_mtx: Mutex<rcl_client_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
rcl_client: Mutex<rcl_client_t>,
node_handle: Arc<NodeHandle>,
pub(crate) in_use_by_wait_set: Arc<AtomicBool>,
}

impl ClientHandle {
pub(crate) fn lock(&self) -> MutexGuard<rcl_client_t> {
self.rcl_client_mtx.lock().unwrap()
self.rcl_client.lock().unwrap()
}
}

impl Drop for ClientHandle {
fn drop(&mut self) {
let rcl_client = self.rcl_client_mtx.get_mut().unwrap();
let rcl_node_mtx = &mut *self.rcl_node_mtx.lock().unwrap();
// SAFETY: No preconditions for this function
let rcl_client = self.rcl_client.get_mut().unwrap();
let mut rcl_node = self.node_handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
// SAFETY: The entity lifecycle mutex is locked to protect against the risk of
// global variables in the rmw implementation being unsafely modified during cleanup.
unsafe {
rcl_client_fini(rcl_client, rcl_node_mtx);
rcl_client_fini(rcl_client, &mut *rcl_node);
}
}
}
Expand Down Expand Up @@ -74,7 +80,7 @@ where
T: rosidl_runtime_rs::Service,
{
/// Creates a new client.
pub(crate) fn new(rcl_node_mtx: Arc<Mutex<rcl_node_t>>, topic: &str) -> Result<Self, RclrsError>
pub(crate) fn new(node_handle: Arc<NodeHandle>, topic: &str) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_client`], see the struct's documentation for the rationale
where
Expand All @@ -92,24 +98,32 @@ where
// SAFETY: No preconditions for this function.
let client_options = unsafe { rcl_client_get_default_options() };

unsafe {
// SAFETY: The rcl_client is zero-initialized as expected by this function.
// The rcl_node is kept alive because it is co-owned by the client.
// The topic name and the options are copied by this function, so they can be dropped
// afterwards.
rcl_client_init(
&mut rcl_client,
&*rcl_node_mtx.lock().unwrap(),
type_support,
topic_c_string.as_ptr(),
&client_options,
)
.ok()?;
{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();

// SAFETY:
// * The rcl_client was zero-initialized as expected by this function.
// * The rcl_node is kept alive by the NodeHandle because it is a dependency of the client.
// * The topic name and the options are copied by this function, so they can be dropped
// afterwards.
// * The entity lifecycle mutex is locked to protect against the risk of global
// variables in the rmw implementation being unsafely modified during initialization.
unsafe {
rcl_client_init(
&mut rcl_client,
&*rcl_node,
type_support,
topic_c_string.as_ptr(),
&client_options,
)
.ok()?;
}
}

let handle = Arc::new(ClientHandle {
rcl_client_mtx: Mutex::new(rcl_client),
rcl_node_mtx,
rcl_client: Mutex::new(rcl_client),
node_handle,
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
});

Expand Down Expand Up @@ -245,8 +259,8 @@ where
///
pub fn service_is_ready(&self) -> Result<bool, RclrsError> {
let mut is_ready = false;
let client = &mut *self.handle.rcl_client_mtx.lock().unwrap();
let node = &mut *self.handle.rcl_node_mtx.lock().unwrap();
let client = &mut *self.handle.rcl_client.lock().unwrap();
let node = &mut *self.handle.node_handle.rcl_node.lock().unwrap();

unsafe {
// SAFETY both node and client are guaranteed to be valid here
Expand Down
168 changes: 143 additions & 25 deletions rclrs/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,31 @@ use std::vec::Vec;
use crate::rcl_bindings::*;
use crate::{RclrsError, ToResult};

/// This is locked whenever initializing or dropping any middleware entity
/// because we have found issues in RCL and some RMW implementations that
/// make it unsafe to simultaneously initialize and/or drop middleware
/// entities such as `rcl_context_t` and `rcl_node_t` as well middleware
/// primitives such as `rcl_publisher_t`, `rcl_subscription_t`, etc.
/// It seems these C and C++ based libraries will regularly use
/// unprotected global variables in their object initialization and cleanup.
///
/// Further discussion with the RCL team may help to improve the RCL
/// documentation to specifically call out where these risks are present. For
/// now we lock this mutex for any RCL function that carries reasonable suspicion
/// of a risk.
pub(crate) static ENTITY_LIFECYCLE_MUTEX: Mutex<()> = Mutex::new(());

impl Drop for rcl_context_t {
fn drop(&mut self) {
unsafe {
// The context may be invalid when rcl_init failed, e.g. because of invalid command
// line arguments.
// SAFETY: No preconditions for this function.

// SAFETY: No preconditions for rcl_context_is_valid.
if rcl_context_is_valid(self) {
// SAFETY: These functions have no preconditions besides a valid rcl_context
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
// SAFETY: The entity lifecycle mutex is locked to protect against the risk of
// global variables in the rmw implementation being unsafely modified during cleanup.
rcl_shutdown(self);
rcl_context_fini(self);
}
Expand All @@ -39,16 +56,26 @@ unsafe impl Send for rcl_context_t {}
/// - the allocator used (left as the default by `rclrs`)
///
pub struct Context {
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
pub(crate) handle: Arc<ContextHandle>,
}

/// This struct manages the lifetime and access to the `rcl_context_t`. It will also
/// account for the lifetimes of any dependencies, if we need to add
/// dependencies in the future (currently there are none). It is not strictly
/// necessary to decompose `Context` and `ContextHandle` like this, but we are
/// doing it to be consistent with the lifecycle management of other rcl
/// bindings in this library.
pub(crate) struct ContextHandle {
pub(crate) rcl_context: Mutex<rcl_context_t>,
}

impl Context {
/// Creates a new context.
///
/// Usually, this would be called with `std::env::args()`, analogously to `rclcpp::init()`.
/// Usually this would be called with `std::env::args()`, analogously to `rclcpp::init()`.
/// See also the official "Passing ROS arguments to nodes via the command-line" tutorial.
///
/// Creating a context can fail in case the args contain invalid ROS arguments.
/// Creating a context will fail if the args contain invalid ROS arguments.
///
/// # Example
/// ```
Expand All @@ -58,6 +85,21 @@ impl Context {
/// assert!(Context::new(invalid_remapping).is_err());
/// ```
pub fn new(args: impl IntoIterator<Item = String>) -> Result<Self, RclrsError> {
Self::new_with_options(args, InitOptions::new())
}

/// Same as [`Context::new`] except you can additionally provide initialization options.
///
/// # Example
/// ```
/// use rclrs::{Context, InitOptions};
/// let context = Context::new_with_options([], InitOptions::new().with_domain_id(Some(5))).unwrap();
/// assert_eq!(context.domain_id(), 5);
/// ````
pub fn new_with_options(
args: impl IntoIterator<Item = String>,
options: InitOptions,
) -> Result<Self, RclrsError> {
// SAFETY: Getting a zero-initialized value is always safe
let mut rcl_context = unsafe { rcl_get_zero_initialized_context() };
let cstring_args: Vec<CString> = args
Expand All @@ -74,48 +116,124 @@ impl Context {
unsafe {
// SAFETY: No preconditions for this function.
let allocator = rcutils_get_default_allocator();
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_init_options = rcl_get_zero_initialized_init_options();
// SAFETY: Passing in a zero-initialized value is expected.
// In the case where this returns not ok, there's nothing to clean up.
rcl_init_options_init(&mut rcl_init_options, allocator).ok()?;
// SAFETY: This function does not store the ephemeral init_options and c_args
// pointers. Passing in a zero-initialized rcl_context is expected.
let ret = rcl_init(
c_args.len() as i32,
if c_args.is_empty() {
std::ptr::null()
} else {
c_args.as_ptr()
},
&rcl_init_options,
&mut rcl_context,
)
.ok();
let mut rcl_init_options = options.into_rcl(allocator)?;
// SAFETY:
// * This function does not store the ephemeral init_options and c_args pointers.
// * Passing in a zero-initialized rcl_context is mandatory.
// * The entity lifecycle mutex is locked to protect against the risk of global variables
// in the rmw implementation being unsafely modified during initialization.
let ret = {
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
rcl_init(
c_args.len() as i32,
if c_args.is_empty() {
std::ptr::null()
} else {
c_args.as_ptr()
},
&rcl_init_options,
&mut rcl_context,
)
.ok()
};
// SAFETY: It's safe to pass in an initialized object.
// Early return will not leak memory, because this is the last fini function.
rcl_init_options_fini(&mut rcl_init_options).ok()?;
// Move the check after the last fini()
ret?;
}
Ok(Self {
rcl_context_mtx: Arc::new(Mutex::new(rcl_context)),
handle: Arc::new(ContextHandle {
rcl_context: Mutex::new(rcl_context),
}),
})
}

/// Returns the ROS domain ID that the context is using.
///
/// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1].
/// It can be set through the `ROS_DOMAIN_ID` environment variable.
///
/// [1]: https://docs.ros.org/en/rolling/Concepts/About-Domain-ID.html
pub fn domain_id(&self) -> usize {
let mut domain_id: usize = 0;
let ret = unsafe {
rcl_context_get_domain_id(
&mut *self.handle.rcl_context.lock().unwrap(),
&mut domain_id,
)
};

debug_assert_eq!(ret, 0);
domain_id
}

/// Checks if the context is still valid.
///
/// This will return `false` when a signal has caused the context to shut down (currently
/// unimplemented).
pub fn ok(&self) -> bool {
// This will currently always return true, but once we have a signal handler, the signal
// handler could call `rcl_shutdown()`, hence making the context invalid.
let rcl_context = &mut *self.rcl_context_mtx.lock().unwrap();
let rcl_context = &mut *self.handle.rcl_context.lock().unwrap();
// SAFETY: No preconditions for this function.
unsafe { rcl_context_is_valid(rcl_context) }
}
}

/// Additional options for initializing the Context.
#[derive(Default, Clone)]
pub struct InitOptions {
/// The domain ID that should be used by the Context. Set to None to ask for
/// the default behavior, which is to set the domain ID according to the
/// [ROS_DOMAIN_ID][1] environment variable.
///
/// [1]: https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Domain-ID.html#the-ros-domain-id
domain_id: Option<usize>,
}

impl InitOptions {
/// Create a new InitOptions with all default values.
pub fn new() -> InitOptions {
Self::default()
}

/// Transform an InitOptions into a new one with a certain domain_id
pub fn with_domain_id(mut self, domain_id: Option<usize>) -> InitOptions {
self.domain_id = domain_id;
self
}

/// Set the domain_id of an InitOptions, or reset it to the default behavior
/// (determined by environment variables) by providing None.
pub fn set_domain_id(&mut self, domain_id: Option<usize>) {
self.domain_id = domain_id;
}

/// Get the domain_id that will be provided by these InitOptions.
pub fn domain_id(&self) -> Option<usize> {
self.domain_id
}

fn into_rcl(self, allocator: rcutils_allocator_s) -> Result<rcl_init_options_t, RclrsError> {
unsafe {
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_init_options = rcl_get_zero_initialized_init_options();
// SAFETY: Passing in a zero-initialized value is expected.
// In the case where this returns not ok, there's nothing to clean up.
rcl_init_options_init(&mut rcl_init_options, allocator).ok()?;

// We only need to set the domain_id if the user asked for something
// other than None. When the user asks for None, that is equivalent
// to the default value in rcl_init_options.
if let Some(domain_id) = self.domain_id {
rcl_init_options_set_domain_id(&mut rcl_init_options, domain_id);
}
Ok(rcl_init_options)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 3 additions & 1 deletion rclrs/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ impl SingleThreadedExecutor {
for node in { self.nodes_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.filter(|node| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) })
.filter(|node| unsafe {
rcl_context_is_valid(&*node.handle.context_handle.rcl_context.lock().unwrap())
})
{
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;
Expand Down
Loading

0 comments on commit 646869b

Please sign in to comment.