Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROS1 Services support persistent #213

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Dropping the last ros1::NodeHandle results in the node cleaning up any advertises, subscriptions, and services with the ROS master.
- Generated code now includes various lint attributes to suppress warnings.
- TCPROS header parsing now ignores (the undocumented fields) response_type and request_type and doesn't produce warnings on them.
- ROS1 native service servers now respect the "persistent" header field, and automatically close the underlying TCP socket after a single request unless persistent is set to 1 in the connection header.

### Changed

Expand Down
1 change: 1 addition & 0 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl Publication {
topic_type: topic_type.to_owned(),
tcp_nodelay: false,
service: None,
persistent: None,
};
trace!("Publisher connection header: {responding_conn_header:?}");

Expand Down
2 changes: 2 additions & 0 deletions roslibrust/src/ros1/service_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ impl ServiceClientLink {
service: Some(service_name.to_owned()),
topic_type: service_type.to_owned(),
tcp_nodelay: false,
// We do want a persistent connection to our service clients
persistent: Some(true),
};

let (call_tx, call_rx) = mpsc::unbounded_channel::<CallServiceRequest>();
Expand Down
13 changes: 10 additions & 3 deletions roslibrust/src/ros1/service_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ impl ServiceServerLink {
topic: None,
topic_type: service_type.to_string(),
tcp_nodelay: false,
persistent: None,
};
let bytes = response_header.to_bytes(false).unwrap();
if let Err(e) = stream.write_all(&bytes).await {
Expand All @@ -188,9 +189,6 @@ impl ServiceServerLink {
return;
}

// TODO we're not currently reading the persistent flag out of the connection header and treating
// all connections as persistent
// That means we expect one header exchange, and then multiple body exchanges
// Each loop is one body:
loop {
let full_body = match tcpros::receive_body(&mut stream).await {
Expand Down Expand Up @@ -226,6 +224,15 @@ impl ServiceServerLink {
stream.write_all(&full_response).await.unwrap();
}
}

// If a persistent service connection was requested keep requesting bodies
if let Some(true) = connection_header.persistent {
continue;
} else {
// This will result in the task shutting down, dropping the TCP socket and clean shutdown
debug!("Service request connection for {service_name} is not persistent, shutting down");
break;
}
}
}
}
1 change: 1 addition & 0 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl Subscription {
topic_type: topic_type.to_owned(),
tcp_nodelay: false,
service: None,
persistent: None,
};

Self {
Expand Down
16 changes: 14 additions & 2 deletions roslibrust/src/ros1/tcpros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub struct ConnectionHeader {
pub topic: Option<String>,
pub topic_type: String,
pub tcp_nodelay: bool, // TODO this field should be optional and None for service clients and servers
// TODO service client may include "persistent" here
// TODO service server only has to respond with caller_id (all other fields optional)
pub persistent: Option<bool>,
// TODO service server only has to respond with caller_id (all other fields optional)
}

impl ConnectionHeader {
Expand All @@ -40,6 +40,7 @@ impl ConnectionHeader {
let mut service = None;
let mut topic_type = String::new();
let mut tcp_nodelay = false;
let mut persistent = None;

// TODO: Unhandled: error, persistent
while cursor.position() < header_data.len() as u64 {
Expand Down Expand Up @@ -80,6 +81,10 @@ impl ConnectionHeader {
let mut tcp_nodelay_str = String::new();
field[equals_pos + 1..].clone_into(&mut tcp_nodelay_str);
tcp_nodelay = &tcp_nodelay_str != "0";
} else if field.starts_with("persistent=") {
let mut persistent_str = String::new();
field[equals_pos + 1..].clone_into(&mut persistent_str);
persistent = Some(&persistent_str != "0");
} else if field.starts_with("probe=") {
// probe is apprantly an undocumented header field that is sent
// by certain ros tools when they initiate a service_client connection to a service server
Expand All @@ -106,6 +111,7 @@ impl ConnectionHeader {
service,
topic_type,
tcp_nodelay,
persistent,
};
trace!(
"Got connection header: {header:?} for topic {:?}",
Expand Down Expand Up @@ -162,6 +168,12 @@ impl ConnectionHeader {
header_data.write_u32::<LittleEndian>(topic_type.len() as u32)?;
header_data.write(topic_type.as_bytes())?;

if let Some(persistent) = self.persistent {
let persistent = format!("persistent={}", if persistent { 1 } else { 0 });
header_data.write_u32::<LittleEndian>(persistent.len() as u32)?;
header_data.write(persistent.as_bytes())?;
}

// Now that we know the length, stick its value in the first 4 bytes
let total_length = (header_data.len() - 4) as u32;
for (idx, byte) in total_length.to_le_bytes().iter().enumerate() {
Expand Down
Loading