diff --git a/SECURITY.md b/SECURITY.md index 730cf6f4..b6133d72 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -14,9 +14,9 @@ Please see the [DDS Security Specification](https://www.omg.org/spec/DDS-SECURIT In order to use the security functionality, enable the Cargo feature `security` in RustDDS. By default, it is not enabled, because it adds a large body of code and some processing overhead. -Security needs to be confgured in order to be used. There are several mandatory configuration files that need to be supplied to RustDDS. These configuration files and their format and semantics are not unique to RustDDS, but specified in the OMG DDS Security specification. The security configration files should also be interoperable between compliant DDS implementations. +Security needs to be configured in order to be used. There are several mandatory configuration files that need to be supplied to RustDDS. These configuration files and their format and semantics are not unique to RustDDS, but specified in the OMG DDS Security specification. The security configuration files should also be interoperable between compliant DDS implementations. -Configuring security for DomainParticipants needs two Certification Authority roles, or CAs. A CA is someone who has the ability to issue and sign the various configuration files. The two CAs are the Identity Certification Authority and the Permissions Certificate Authority. +Configuring security for DomainParticipants needs two Certificate Authority roles, or CAs. A CA is someone who has the ability to issue and sign the various configuration files. The two CAs are the Identity Certificate Authority and the Permissions Certificate Authority. It is possible that a single CA performs both of these roles. This is a matter of security configuration. @@ -35,8 +35,8 @@ The following security configuration files are needed: * X.509 Certificate `.pem` file * This file gives the Subject Name and corresponding public key for a DomainParticipant. -* Signed by Identity CA. -* Not secret. Sent as plaintext to other DomainParticiapnts during authentication. +* Signed by the Identity CA. +* Not secret. Sent as plaintext to other DomainParticipants during authentication. ## Participant Private Key @@ -46,8 +46,8 @@ The following security configuration files are needed: ## Permissions CA Certificate -* Used to verify the auhenticity of permisisons documents, both our own and those presented to us over the authentication protocol. -* X.509 Certificate (`.pem`) +* Used to verify the authenticity of permissions documents, both our own and those presented to us over the authentication protocol. +* X.509 Certificate `.pem` file ## Participant Permissions diff --git a/examples/async_shapes_demo/main.rs b/examples/async_shapes_demo/main.rs index 461ad1fb..7f45b03c 100644 --- a/examples/async_shapes_demo/main.rs +++ b/examples/async_shapes_demo/main.rs @@ -57,6 +57,7 @@ fn main() { .cloned() .unwrap_or("BLUE".to_owned()); + // Domain Participant let domain_participant = DomainParticipant::new(*domain_id) .unwrap_or_else(|e| panic!("DomainParticipant construction failed: {e:?}")); @@ -131,11 +132,12 @@ fn main() { ); // Set Ctrl-C handler - let (stop_sender, stop_receiver) = smol::channel::bounded(2); + let (stop_sender, stop_receiver) = smol::channel::bounded(3); ctrlc::set_handler(move || { // We will send two stop coammnds, one for reader, the other for writer. stop_sender.send_blocking(()).unwrap_or(()); stop_sender.send_blocking(()).unwrap_or(()); + stop_sender.send_blocking(()).unwrap_or(()); // ignore errors, as we are quitting anyway }) .expect("Error setting Ctrl-C handler"); @@ -188,6 +190,22 @@ fn main() { random_gen.gen_range(-5..-1) }; + let dp_event_loop = async { + let mut run = true; + let mut stop = stop_receiver.recv().fuse(); + let dp_status_listener = domain_participant.status_listener(); + let mut dp_status_stream = dp_status_listener.as_async_stream(); + + while run { + futures::select! { + _ = stop => run = false, + e = dp_status_stream.select_next_some() => { + println!("DP Status: {e:?}"); + } + } // select! + } // while + }; + let read_loop = async { match reader_opt { None => (), @@ -263,7 +281,7 @@ fn main() { }; // Run both read and write concurrently, until both are done. - smol::block_on(async { futures::join!(read_loop, write_loop) }); + smol::block_on(async { futures::join!(read_loop, write_loop, dp_event_loop) }); } fn configure_logging() { diff --git a/prepare-for-commit.sh b/prepare-for-commit.sh index 97824634..ff146f0e 100755 --- a/prepare-for-commit.sh +++ b/prepare-for-commit.sh @@ -8,6 +8,6 @@ cargo +nightly fmt echo cargo +nightly clippy --tests --examples cargo +nightly clippy --tests --examples -# Run linter without default features (=without security in the security branch) -echo cargo +nightly clippy --no-default-features --tests --examples -cargo +nightly clippy --no-default-features --tests --examples \ No newline at end of file +# Run linter with all features, including security +echo cargo +nightly clippy --tests --examples --all-features +cargo +nightly clippy --tests --examples --all-features \ No newline at end of file diff --git a/src/dds/participant.rs b/src/dds/participant.rs index 2b6454b7..4bbcf86a 100644 --- a/src/dds/participant.rs +++ b/src/dds/participant.rs @@ -1,22 +1,36 @@ // use mio::Token; use std::{ collections::HashMap, + io, io::ErrorKind, net::Ipv4Addr, + pin::Pin, sync::{atomic, Arc, Mutex, RwLock, Weak}, + task::{Context, Poll}, thread, thread::JoinHandle, time::{Duration, Instant}, }; use mio_extras::channel as mio_channel; -use mio_06::Token; +use mio_06::{self, Evented}; +use mio_08::{self, Interest, Registry}; +use futures::stream::{FusedStream, Stream}; #[allow(unused_imports)] use log::{debug, error, info, trace, warn}; use crate::{ create_error_out_of_resources, create_error_poisoned, - dds::{pubsub::*, qos::*, result::*, topic::*, typedesc::TypeDesc}, + dds::{ + pubsub::*, + qos::*, + result::*, + statusevents::{ + sync_status_channel, DomainParticipantStatusEvent, StatusChannelReceiver, StatusChannelSender, + }, + topic::*, + typedesc::TypeDesc, + }, discovery::{ discovery::{Discovery, DiscoveryCommand}, discovery_db::DiscoveryDB, @@ -30,6 +44,7 @@ use crate::{ writer::WriterIngredients, }, structure::{dds_cache::DDSCache, entity::RTPSEntity, guid::*, locator::Locator}, + StatusEvented, }; #[cfg(feature = "security")] use crate::{ @@ -219,6 +234,9 @@ impl DomainParticipantBuilder { let (discovery_command_sender, discovery_command_receiver) = mio_channel::sync_channel::(64); + // Channel used to report noteworthy events to DomainParticipant + let (status_sender, status_receiver) = sync_status_channel(16)?; + #[cfg(not(feature = "security"))] let security_plugins_handle = None; #[cfg(feature = "security")] @@ -233,6 +251,8 @@ impl DomainParticipantBuilder { discovery_update_notification_receiver, discovery_command_sender, spdp_liveness_sender, + status_sender.clone(), + status_receiver, security_plugins_handle.clone(), )?; let self_locators = dp.self_locators(); @@ -258,6 +278,7 @@ impl DomainParticipantBuilder { discovery_command_receiver, spdp_liveness_receiver, self_locators, + status_sender, security_plugins_handle, ) { discovery.discovery_event_loop(); // run the event loop @@ -296,7 +317,7 @@ impl DomainParticipantBuilder { /// `u16`. Domain identifier values are application-specific, but `0` is usually /// the default. #[derive(Clone)] -// This is a smart pointer for DomainParticipantInner for easier manipulation. +// This is a smart pointer for DomainParticipant for easier manipulation. pub struct DomainParticipant { dpi: Arc>, } @@ -453,10 +474,18 @@ impl DomainParticipant { /// let domain_participant = DomainParticipant::new(0).expect("Failed to create participant"); /// domain_participant.assert_liveliness(); /// ``` - pub fn assert_liveliness(self) -> WriteResult<(), ()> { + pub fn assert_liveliness(&self) -> WriteResult<(), ()> { self.dpi.lock()?.assert_liveliness() } + /// Get a `DomainDomainParticipantStatusListener` that can be used + /// to get `DomainParticipantStatusEvent`s for this DomainParticipant. + pub fn status_listener(&self) -> DomainParticipantStatusListener { + DomainParticipantStatusListener { + dp_disc: Arc::clone(&self.dpi), + } + } + pub(crate) fn weak_clone(&self) -> DomainParticipantWeak { DomainParticipantWeak::new(self) } @@ -471,25 +500,171 @@ impl DomainParticipant { } pub(crate) fn discovery_db(&self) -> Arc> { + self.dpi.lock().unwrap().dpi.discovery_db.clone() + } + + pub(crate) fn new_entity_id(&self, entity_kind: EntityKind) -> EntityId { + self.dpi.lock().unwrap().new_entity_id(entity_kind) + } + + pub(crate) fn self_locators(&self) -> HashMap> { + self.dpi.lock().unwrap().self_locators() + } +} // end impl DomainParticipant + +// -------------------------------------------------------------------------- +// -------------------------------------------------------------------------- + +pub struct DomainParticipantStatusListener { + dp_disc: Arc>, +} + +impl DomainParticipantStatusListener { + pub fn as_async_stream(&self) -> DomainParticipantStatusStream { + DomainParticipantStatusStream { + status_listener: self, + } + } +} + +impl StatusEvented for DomainParticipantStatusListener { + fn as_status_evented(&mut self) -> &dyn Evented { self - .dpi + } + + fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source { + self + } + + fn try_recv_status(&self) -> Option { + self + .dp_disc .lock() .unwrap() - .dpi + .status_channel_receiver() + .try_recv_status() + } +} + +impl mio_08::event::Source for DomainParticipantStatusListener { + fn register( + &mut self, + registry: &Registry, + token: mio_08::Token, + interests: Interest, + ) -> io::Result<()> { + self + .dp_disc .lock() .unwrap() - .discovery_db - .clone() + .status_channel_receiver_mut() + .register(registry, token, interests) } - pub(crate) fn new_entity_id(&self, entity_kind: EntityKind) -> EntityId { - self.dpi.lock().unwrap().new_entity_id(entity_kind) + fn reregister( + &mut self, + registry: &Registry, + token: mio_08::Token, + interests: Interest, + ) -> io::Result<()> { + self + .dp_disc + .lock() + .unwrap() + .status_channel_receiver_mut() + .reregister(registry, token, interests) } - pub(crate) fn self_locators(&self) -> HashMap> { - self.dpi.lock().unwrap().self_locators() + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self + .dp_disc + .lock() + .unwrap() + .status_channel_receiver_mut() + .deregister(registry) } -} // end impl DomainParticipant +} + +impl mio_06::Evented for DomainParticipantStatusListener { + // We just delegate all the operations to notification_receiver, since it + // already implements Evented + fn register( + &self, + poll: &mio_06::Poll, + token: mio_06::Token, + interest: mio_06::Ready, + opts: mio_06::PollOpt, + ) -> io::Result<()> { + self + .dp_disc + .lock() + .unwrap() + .status_channel_receiver_mut() + .as_status_evented() + .register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &mio_06::Poll, + token: mio_06::Token, + interest: mio_06::Ready, + opts: mio_06::PollOpt, + ) -> io::Result<()> { + self + .dp_disc + .lock() + .unwrap() + .status_channel_receiver_mut() + .as_status_evented() + .reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> { + self + .dp_disc + .lock() + .unwrap() + .status_channel_receiver_mut() + .as_status_evented() + .deregister(poll) + } +} + +pub struct DomainParticipantStatusStream<'a> { + status_listener: &'a DomainParticipantStatusListener, +} + +impl<'a> Stream for DomainParticipantStatusStream<'a> { + type Item = DomainParticipantStatusEvent; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let dp_lock = self.status_listener.dp_disc.lock().unwrap(); + let mut w = dp_lock.status_channel_receiver().get_waker_update_lock(); + // lock already at the beginning, before try_recv + match dp_lock.status_channel_receiver().try_recv() { + Err(std::sync::mpsc::TryRecvError::Empty) => { + // nothing available + *w = Some(cx.waker().clone()); + Poll::Pending + } + Err(std::sync::mpsc::TryRecvError::Disconnected) => { + error!("DomainParticipant status channel disconnected"); + Poll::Ready(None) + } + Ok(t) => Poll::Ready(Some(t)), // got data + } + } // fn +} + +impl<'a> FusedStream for DomainParticipantStatusStream<'a> { + fn is_terminated(&self) -> bool { + false + } +} + +// -------------------------------------------------------------------------- +// -------------------------------------------------------------------------- impl PartialEq for DomainParticipant { fn eq(&self, other: &Self) -> bool { @@ -572,42 +747,6 @@ impl DomainParticipantWeak { }) } - // pub fn find_topic(&self, name: &str, timeout: Duration) -> - // Result> { self - // .dpi - // .upgrade() - // .ok_or(Error::LockPoisoned) - // .and_then(|dpi| dpi.lock().unwrap().find_topic(self, name, timeout)) - // } - - // pub fn domain_id(&self) -> u16 { - // self - // .dpi - // .upgrade() - // .expect("Unable to get original domain participant.") - // .lock() - // .unwrap() - // .domain_id() - // } - - // pub fn participant_id(&self) -> u16 { - // self - // .dpi - // .upgrade() - // .expect("Unable to get original domain participant.") - // .lock() - // .unwrap() - // .participant_id() - // } - - // pub fn discovered_topics(&self) -> Vec { - // self - // .dpi - // .upgrade() - // .map(|dpi| dpi.lock().unwrap().discovered_topics()) - // .unwrap_or_default() - // } - pub fn upgrade(self) -> Option { self.dpi.upgrade().map(|d| DomainParticipant { dpi: d }) } @@ -622,7 +761,7 @@ impl RTPSEntity for DomainParticipantWeak { // This struct exists only to control and stop Discovery when DomainParticipant // should be dropped pub(crate) struct DomainParticipantDisc { - dpi: Arc>, + dpi: DomainParticipantInner, // Discovery control discovery_command_sender: mio_channel::SyncSender, discovery_join_handle: mio_channel::Receiver>, @@ -640,6 +779,8 @@ impl DomainParticipantDisc { discovery_update_notification_receiver: mio_channel::Receiver, discovery_command_sender: mio_channel::SyncSender, spdp_liveness_sender: mio_channel::SyncSender, + status_sender: StatusChannelSender, + status_receiver: StatusChannelReceiver, security_plugins_handle: Option, ) -> CreateResult { let dpi = DomainParticipantInner::new( @@ -649,11 +790,13 @@ impl DomainParticipantDisc { discovery_update_notification_receiver, discovery_command_sender.clone(), spdp_liveness_sender, + status_sender, + status_receiver, security_plugins_handle, )?; Ok(Self { - dpi: Arc::new(Mutex::new(dpi)), + dpi, discovery_command_sender, discovery_join_handle, entity_id_generator: atomic::AtomicU32::new(0), @@ -677,7 +820,6 @@ impl DomainParticipantDisc { ) -> CreateResult { self .dpi - .lock()? .create_publisher(dp, qos, self.discovery_command_sender.clone()) } @@ -688,7 +830,6 @@ impl DomainParticipantDisc { ) -> CreateResult { self .dpi - .lock()? .create_subscriber(dp, qos, self.discovery_command_sender.clone()) } @@ -701,10 +842,7 @@ impl DomainParticipantDisc { topic_kind: TopicKind, ) -> CreateResult { // println!("Create topic disc"); - self - .dpi - .lock()? - .create_topic(dp, name, type_desc, qos, topic_kind) + self.dpi.create_topic(dp, name, type_desc, qos, topic_kind) } pub fn find_topic( @@ -713,28 +851,28 @@ impl DomainParticipantDisc { name: &str, timeout: Duration, ) -> CreateResult> { - self.dpi.lock()?.find_topic(dp, name, timeout) + self.dpi.find_topic(dp, name, timeout) } pub fn domain_id(&self) -> u16 { - self.dpi.lock().unwrap().domain_id() + self.dpi.domain_id() } pub fn participant_id(&self) -> u16 { - self.dpi.lock().unwrap().participant_id() + self.dpi.participant_id() } pub fn discovered_topics(&self) -> Vec { - self.dpi.lock().unwrap().discovered_topics() + self.dpi.discovered_topics() } pub(crate) fn dds_cache(&self) -> Arc> { - self.dpi.lock().unwrap().dds_cache() + self.dpi.dds_cache() } #[cfg(feature = "security")] // just to avoid warning pub(crate) fn qos(&self) -> QosPolicies { - self.dpi.lock().unwrap().qos() + self.dpi.qos() } // pub(crate) fn discovery_db(&self) -> Arc> { @@ -752,8 +890,19 @@ impl DomainParticipantDisc { .map_err(|_e| WriteError::WouldBlock { data: () }) } - pub(crate) fn self_locators(&self) -> HashMap> { - self.dpi.lock().unwrap().self_locators.clone() + pub(crate) fn self_locators(&self) -> HashMap> { + self.dpi.self_locators.clone() + } + + pub(crate) fn status_channel_receiver( + &self, + ) -> &StatusChannelReceiver { + self.dpi.status_channel_receiver() + } + pub(crate) fn status_channel_receiver_mut( + &mut self, + ) -> &mut StatusChannelReceiver { + self.dpi.status_channel_receiver_mut() } } @@ -764,8 +913,6 @@ impl Drop for DomainParticipantDisc { debug!("Wan dp_event_loop about stop."); if self .dpi - .lock() - .unwrap() .stop_poll_sender .send(EventLoopCommand::PrepareStop) .is_err() @@ -817,8 +964,11 @@ pub(crate) struct DomainParticipantInner { discovery_db: Arc>, discovery_db_event_receiver: mio_channel::Receiver<()>, + // status event receiver + status_receiver: StatusChannelReceiver, + // RTPS locators describing how to reach this DP - self_locators: HashMap>, + self_locators: HashMap>, security_plugins_handle: Option, } @@ -848,6 +998,7 @@ impl Drop for DomainParticipantInner { } impl DomainParticipantInner { + #[allow(clippy::too_many_arguments)] fn new( domain_id: u16, participant_guid: GUID, @@ -855,6 +1006,8 @@ impl DomainParticipantInner { discovery_update_notification_receiver: mio_channel::Receiver, discovery_command_sender: mio_channel::SyncSender, spdp_liveness_sender: mio_channel::SyncSender, + status_sender: StatusChannelSender, + status_receiver: StatusChannelReceiver, security_plugins_handle: Option, ) -> CreateResult { #[cfg(not(feature = "security"))] @@ -934,7 +1087,7 @@ impl DomainParticipantInner { listeners.insert(USER_TRAFFIC_LISTENER_TOKEN, user_traffic_listener); // construct our own Locators - let self_locators: HashMap> = listeners + let self_locators: HashMap> = listeners .iter() .map(|(t, l)| match l.to_locator_address() { Ok(locs) => (*t, locs), @@ -965,9 +1118,12 @@ impl DomainParticipantInner { let (discovery_db_event_sender, discovery_db_event_receiver) = mio_channel::sync_channel::<()>(1); + + // Discovert DB creation let discovery_db = Arc::new(RwLock::new(DiscoveryDB::new( participant_guid, discovery_db_event_sender, + status_sender.clone(), ))); let (stop_poll_sender, stop_poll_receiver) = mio_channel::channel(); @@ -1005,6 +1161,7 @@ impl DomainParticipantInner { discovery_update_notification_receiver, discovery_command_sender, spdp_liveness_sender, + status_sender, security_plugins_clone, ); dp_event_loop.event_loop(); @@ -1033,6 +1190,7 @@ impl DomainParticipantInner { dds_cache, discovery_db, discovery_db_event_receiver, + status_receiver, self_locators, security_plugins_handle, }) @@ -1158,7 +1316,7 @@ impl DomainParticipantInner { // event poll.register( &self.discovery_db_event_receiver, - mio::Token(0), + mio_06::Token(0), mio::Ready::readable(), mio::PollOpt::level(), )?; @@ -1257,6 +1415,16 @@ impl DomainParticipantInner { db.all_user_topics().cloned().collect() } + pub(crate) fn status_channel_receiver( + &self, + ) -> &StatusChannelReceiver { + &self.status_receiver + } + pub(crate) fn status_channel_receiver_mut( + &mut self, + ) -> &mut StatusChannelReceiver { + &mut self.status_receiver + } } // impl impl RTPSEntity for DomainParticipant { @@ -1267,7 +1435,7 @@ impl RTPSEntity for DomainParticipant { impl RTPSEntity for DomainParticipantDisc { fn guid(&self) -> GUID { - self.dpi.lock().unwrap().guid() + self.dpi.guid() } } diff --git a/src/dds/statusevents.rs b/src/dds/statusevents.rs index 1e7b89ca..4cf651d3 100644 --- a/src/dds/statusevents.rs +++ b/src/dds/statusevents.rs @@ -25,6 +25,7 @@ use crate::{ qos::QosPolicyId, result::{ReadError, ReadResult}, }, + discovery::SpdpDiscoveredParticipantData, messages::{protocol_version::ProtocolVersion, vendor_id::VendorId}, mio_source::*, read_error_poisoned, @@ -40,8 +41,8 @@ use crate::discovery::secure_discovery::AuthenticationStatus; pub trait StatusEvented { fn as_status_evented(&mut self) -> &dyn Evented; // This is for polling with mio-0.6.x fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source; // This is for polling with mio-0.8.x - // fn as_async_receiver(&self) -> dyn Stream; + // fn as_async_receiver(&self) -> dyn Stream; fn try_recv_status(&self) -> Option; } @@ -118,6 +119,7 @@ pub(crate) fn sync_status_channel( } // TODO: try to make this (and the Receiver) private types +#[derive(Clone)] pub struct StatusChannelSender { actual_sender: mio_channel::SyncSender, signal_sender: PollEventSender, @@ -169,6 +171,23 @@ impl StatusChannelReceiver { sync_receiver: self, } } + pub(crate) fn get_waker_update_lock(&self) -> std::sync::MutexGuard<'_, Option> { + self.waker.lock().unwrap() + } +} + +impl StatusEvented for StatusChannelReceiver { + fn as_status_evented(&mut self) -> &dyn Evented { + &self.actual_receiver + } + + fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source { + self + } + + fn try_recv_status(&self) -> Option { + self.try_recv().ok() + } } impl event::Source for StatusChannelReceiver { @@ -204,7 +223,7 @@ impl<'a, T> Stream for StatusReceiverStream<'a, T> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // debug!("poll_next"); - let mut w = self.sync_receiver.waker.lock().unwrap(); + let mut w = self.sync_receiver.get_waker_update_lock(); // lock already at the beginning, before try_recv match self.sync_receiver.try_recv() { Err(std::sync::mpsc::TryRecvError::Empty) => { @@ -215,7 +234,7 @@ impl<'a, T> Stream for StatusReceiverStream<'a, T> { Err(std::sync::mpsc::TryRecvError::Disconnected) => Poll::Ready(Some(read_error_poisoned!( "StatusReceiver channel disconnected" ))), - Ok(t) => Poll::Ready(Some(Ok(t))), // got date + Ok(t) => Poll::Ready(Some(Ok(t))), // got data } } // fn } @@ -245,7 +264,14 @@ pub enum DomainParticipantStatusEvent { discovery_from: GUID, // Who sent the Discovery data }, /// Discovery detects a new topic - TopicDetected {}, + TopicDetected { + name: String, + type_name: String, + }, + /// Topics are lost when there are no more known Readers or Writers in them. + TopicLost { + name: String, + }, /// New Reader detected (or created locally). Detection happens regardless of /// the remote being matched or not by a local Endpoint. ReaderDetected { @@ -310,6 +336,21 @@ pub struct ParticipantDescription { pub supports_security: bool, } +impl From<&SpdpDiscoveredParticipantData> for ParticipantDescription { + fn from(dpd: &SpdpDiscoveredParticipantData) -> Self { + ParticipantDescription { + updated_time: dpd.updated_time, + protocol_version: dpd.protocol_version, + vendor_id: dpd.vendor_id, + guid: dpd.participant_guid, + lease_duration: dpd.lease_duration, + entity_name: dpd.entity_name.clone(), + #[cfg(feature = "security")] + supports_security: dpd.supports_security(), + } + } +} + /// This is a summary of SubscriptionBuiltinTopicData / /// PublicationBuiltinTopicData from discovery. The original is not used to /// avoid circular dependency between participant and discovery. diff --git a/src/dds/with_key/datareader.rs b/src/dds/with_key/datareader.rs index 1b0e2cc7..1d89b5a2 100644 --- a/src/dds/with_key/datareader.rs +++ b/src/dds/with_key/datareader.rs @@ -1000,7 +1000,7 @@ mod tests { topic::{TopicDescription, TopicKind}, }, messages::submessages::{ - elements::serialized_payload::SerializedPayload, submessage_flag::*, submessages::DecodedData, + elements::serialized_payload::SerializedPayload, submessage_flag::*, submessages::Data, }, mio_source, network::udp_sender::UDPSender, @@ -1110,28 +1110,34 @@ mod tests { a: 11, b: ":)))".to_string(), }; - let data_msg = DecodedData { + let data_msg = Data { reader_id: reader.entity_id(), writer_id: writer_guid.entity_id, writer_sn: SequenceNumber::from(1), - serialized_payload: Some(SerializedPayload { - representation_identifier: RepresentationIdentifier::CDR_LE, - representation_options: [0, 0], - value: Bytes::from(to_bytes::(&test_data).unwrap()), - }), - ..DecodedData::default() + serialized_payload: Some( + SerializedPayload { + representation_identifier: RepresentationIdentifier::CDR_LE, + representation_options: [0, 0], + value: Bytes::from(to_bytes::(&test_data).unwrap()), + } + .into(), + ), + ..Data::default() }; - let data_msg2 = DecodedData { + let data_msg2 = Data { reader_id: reader.entity_id(), writer_id: writer_guid.entity_id, writer_sn: SequenceNumber::from(2), - serialized_payload: Some(SerializedPayload { - representation_identifier: RepresentationIdentifier::CDR_LE, - representation_options: [0, 0], - value: Bytes::from(to_bytes::(&test_data2).unwrap()), - }), - ..DecodedData::default() + serialized_payload: Some( + SerializedPayload { + representation_identifier: RepresentationIdentifier::CDR_LE, + representation_options: [0, 0], + value: Bytes::from(to_bytes::(&test_data2).unwrap()), + } + .into(), + ), + ..Data::default() }; let data_flags = DATA_Flags::Endianness | DATA_Flags::Data; @@ -1282,49 +1288,61 @@ mod tests { // Create data messages from the data items // Note that sequence numbering needs to continue as expected - let data_msg = DecodedData { + let data_msg = Data { reader_id: reader.entity_id(), writer_id: writer_guid.entity_id, writer_sn: SequenceNumber::from(1), - serialized_payload: Some(SerializedPayload { - representation_identifier: RepresentationIdentifier::CDR_LE, - representation_options: [0, 0], - value: Bytes::from(to_bytes::(&data_key1).unwrap()), - }), - ..DecodedData::default() + serialized_payload: Some( + SerializedPayload { + representation_identifier: RepresentationIdentifier::CDR_LE, + representation_options: [0, 0], + value: Bytes::from(to_bytes::(&data_key1).unwrap()), + } + .into(), + ), + ..Data::default() }; - let data_msg2 = DecodedData { + let data_msg2 = Data { reader_id: reader.entity_id(), writer_id: writer_guid.entity_id, writer_sn: SequenceNumber::from(2), - serialized_payload: Some(SerializedPayload { - representation_identifier: RepresentationIdentifier::CDR_LE, - representation_options: [0, 0], - value: Bytes::from(to_bytes::(&data_key2_1).unwrap()), - }), - ..DecodedData::default() + serialized_payload: Some( + SerializedPayload { + representation_identifier: RepresentationIdentifier::CDR_LE, + representation_options: [0, 0], + value: Bytes::from(to_bytes::(&data_key2_1).unwrap()), + } + .into(), + ), + ..Data::default() }; - let data_msg3 = DecodedData { + let data_msg3 = Data { reader_id: reader.entity_id(), writer_id: writer_guid.entity_id, writer_sn: SequenceNumber::from(3), - serialized_payload: Some(SerializedPayload { - representation_identifier: RepresentationIdentifier::CDR_LE, - representation_options: [0, 0], - value: Bytes::from(to_bytes::(&data_key2_2).unwrap()), - }), - ..DecodedData::default() + serialized_payload: Some( + SerializedPayload { + representation_identifier: RepresentationIdentifier::CDR_LE, + representation_options: [0, 0], + value: Bytes::from(to_bytes::(&data_key2_2).unwrap()), + } + .into(), + ), + ..Data::default() }; - let data_msg4 = DecodedData { + let data_msg4 = Data { reader_id: reader.entity_id(), writer_id: writer_guid.entity_id, writer_sn: SequenceNumber::from(4), - serialized_payload: Some(SerializedPayload { - representation_identifier: RepresentationIdentifier::CDR_LE, - representation_options: [0, 0], - value: Bytes::from(to_bytes::(&data_key2_3).unwrap()), - }), - ..DecodedData::default() + serialized_payload: Some( + SerializedPayload { + representation_identifier: RepresentationIdentifier::CDR_LE, + representation_options: [0, 0], + value: Bytes::from(to_bytes::(&data_key2_3).unwrap()), + } + .into(), + ), + ..Data::default() }; let data_flags = DATA_Flags::Endianness | DATA_Flags::Data; diff --git a/src/discovery/discovery.rs b/src/discovery/discovery.rs index d60de1fb..fc2f9272 100644 --- a/src/discovery/discovery.rs +++ b/src/discovery/discovery.rs @@ -22,6 +22,7 @@ use crate::{ }, readcondition::ReadCondition, result::{CreateError, CreateResult}, + statusevents::{DomainParticipantStatusEvent, LostReason, StatusChannelSender}, }, discovery::{ discovery_db::{discovery_db_read, discovery_db_write, DiscoveredVia, DiscoveryDB}, @@ -180,6 +181,8 @@ pub(crate) struct Discovery { liveliness_state: LivelinessState, + participant_status_sender: StatusChannelSender, + // TODO: Why is this a HashMap? Are there ever more than 2? self_locators: HashMap>, @@ -282,6 +285,7 @@ impl Discovery { discovery_command_receiver: mio_channel::Receiver, spdp_liveness_receiver: mio_channel::Receiver, self_locators: HashMap>, + participant_status_sender: StatusChannelSender, security_plugins_opt: Option, ) -> CreateResult { // helper macro to handle initialization failures. @@ -628,6 +632,7 @@ impl Discovery { discovery_updated_sender, discovery_command_receiver, spdp_liveness_receiver, + participant_status_sender, self_locators, liveliness_state: LivelinessState::new(), @@ -970,6 +975,8 @@ impl Discovery { let guid_prefix = participant_data.participant_guid.prefix; self.send_discovery_notification(DiscoveryNotificationType::ParticipantUpdated { guid_prefix }); if was_new { + let dpd = participant_data.into(); + self.send_participant_status(DomainParticipantStatusEvent::ParticipantDiscovered { dpd }); // This may be a rediscovery of a previously seen participant that // was temporarily lost due to network outage. Check if we already know // what it has (readers, writers, topics). @@ -986,6 +993,10 @@ impl Discovery { self.send_discovery_notification(DiscoveryNotificationType::ParticipantLost { guid_prefix: participant_guidp, }); + self.send_participant_status(DomainParticipantStatusEvent::ParticipantLost { + id: participant_guidp, + reason: LostReason::Disposed, + }); } fn send_endpoint_dispose_message(&self, endpoint_guid: GUID) { @@ -1634,10 +1645,14 @@ impl Discovery { } pub fn participant_cleanup(&self) { - let removed_guid_prefixes = discovery_db_write(&self.discovery_db).participant_cleanup(); - for guid_prefix in removed_guid_prefixes { + let removed = discovery_db_write(&self.discovery_db).participant_cleanup(); + for (guid_prefix, reason) in removed { debug!("participant cleanup - timeout for {:?}", guid_prefix); self.send_discovery_notification(DiscoveryNotificationType::ParticipantLost { guid_prefix }); + self.send_participant_status(DomainParticipantStatusEvent::ParticipantLost { + id: guid_prefix, + reason, + }); } } @@ -1840,6 +1855,13 @@ impl Discovery { Err(e) => error!("Failed to send DiscoveryNotification {e:?}"), } } + + fn send_participant_status(&self, event: DomainParticipantStatusEvent) { + self + .participant_status_sender + .try_send(event) + .unwrap_or_else(|e| error!("Cannot report participant status: {e:?}")); + } } // ----------------------------------------------------------------------- @@ -1962,11 +1984,7 @@ mod tests { match &mut submsg.body { SubmessageBody::Writer(WriterSubmessage::Data(d, _)) => { let mut drd: DiscoveredReaderData = PlCdrDeserializerAdapter::from_bytes( - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value, + &d.unwrap_serialized_payload_value(), RepresentationIdentifier::PL_CDR_LE, ) .unwrap(); @@ -1983,11 +2001,7 @@ mod tests { data = drd .to_pl_cdr_bytes(RepresentationIdentifier::PL_CDR_LE) .unwrap(); - d.no_crypto_decoded() - .serialized_payload - .as_mut() - .unwrap() - .value = data.clone(); + d.update_serialized_payload_value(data.clone()); } SubmessageBody::Interpreter(_) => (), _ => continue, diff --git a/src/discovery/discovery_db.rs b/src/discovery/discovery_db.rs index 7abaf59a..faeb24b9 100644 --- a/src/discovery/discovery_db.rs +++ b/src/discovery/discovery_db.rs @@ -12,6 +12,7 @@ use crate::{ dds::{ participant::DomainParticipant, qos::HasQoSPolicy, + statusevents::{DomainParticipantStatusEvent, LostReason, StatusChannelSender}, topic::{Topic, TopicDescription}, }, rtps::{ @@ -75,6 +76,8 @@ pub(crate) struct DiscoveryDB { // sender for notifying (potential) waiters in participant.find_topic() call topic_updated_sender: mio_extras::channel::SyncSender<()>, + + participant_status_sender: StatusChannelSender, } // How did we discover this topic @@ -115,7 +118,11 @@ pub(crate) fn discovery_db_write( } impl DiscoveryDB { - pub fn new(my_guid: GUID, topic_updated_sender: mio_extras::channel::SyncSender<()>) -> Self { + pub fn new( + my_guid: GUID, + topic_updated_sender: mio_extras::channel::SyncSender<()>, + participant_status_sender: StatusChannelSender, + ) -> Self { Self { my_guid, participant_proxies: BTreeMap::new(), @@ -130,9 +137,17 @@ impl DiscoveryDB { external_topic_writers_attic: BTreeMap::new(), topics: BTreeMap::new(), topic_updated_sender, + participant_status_sender, } } + fn send_participant_status(&self, event: DomainParticipantStatusEvent) { + self + .participant_status_sender + .try_send(event) + .unwrap_or_else(|e| error!("Cannot report participant status: {e:?}")); + } + // Returns if participant was previously unknown pub fn update_participant(&mut self, data: &SpdpDiscoveredParticipantData) -> bool { debug!("update_participant: {:?}", &data); @@ -291,7 +306,7 @@ impl DiscoveryDB { // Delete participant proxies, if we have not heard of them within // lease_duration - pub fn participant_cleanup(&mut self) -> Vec { + pub fn participant_cleanup(&mut self) -> Vec<(GuidPrefix, LostReason)> { let inow = Instant::now(); let mut to_remove = Vec::new(); @@ -314,7 +329,13 @@ impl DiscoveryDB { elapsed = {:?}", guid, lease_duration, elapsed ); - to_remove.push(guid); + to_remove.push(( + guid, + LostReason::Timeout { + lease: lease_duration, + elapsed, + }, + )); } } None => { @@ -322,9 +343,11 @@ impl DiscoveryDB { } } // match } // for - for guid in &to_remove { + + for (guid, _) in &to_remove { self.remove_participant(*guid, false); // false = removed due to timeout } + to_remove } @@ -568,6 +591,10 @@ impl DiscoveryDB { let mut b = BTreeMap::new(); b.insert(updater.prefix, (discovered_via, dtd.clone())); self.topics.insert(topic_name, b); + self.send_participant_status(DomainParticipantStatusEvent::TopicDetected { + name: dtd.topic_data.name.clone(), + type_name: dtd.topic_data.type_name.clone(), + }); }; if notify { @@ -768,8 +795,13 @@ mod tests { fn discdb_participant_operations() { let (discovery_db_event_sender, _discovery_db_event_receiver) = mio_channel::sync_channel::<()>(4); + let (status_sender, _status_receiver) = sync_status_channel(16).unwrap(); - let mut discoverydb = DiscoveryDB::new(GUID::new_participant_guid(), discovery_db_event_sender); + let mut discoverydb = DiscoveryDB::new( + GUID::new_participant_guid(), + discovery_db_event_sender, + status_sender, + ); let mut data = spdp_participant_data().unwrap(); data.lease_duration = Some(Duration::from(StdDuration::from_secs(1))); @@ -790,7 +822,12 @@ mod tests { fn discdb_writer_proxies() { let (discovery_db_event_sender, _discovery_db_event_receiver) = mio_channel::sync_channel::<()>(4); - let _discoverydb = DiscoveryDB::new(GUID::new_participant_guid(), discovery_db_event_sender); + let (status_sender, _status_receiver) = sync_status_channel(16).unwrap(); + let _discoverydb = DiscoveryDB::new( + GUID::new_participant_guid(), + discovery_db_event_sender, + status_sender, + ); let topic_name = String::from("some_topic"); let type_name = String::from("RandomData"); let _dreader = DiscoveredReaderData::default(topic_name, type_name); @@ -802,9 +839,13 @@ mod tests { fn discdb_subscription_operations() { let (discovery_db_event_sender, _discovery_db_event_receiver) = mio_channel::sync_channel::<()>(4); + let (status_sender, _status_receiver) = sync_status_channel(16).unwrap(); - let mut discovery_db = - DiscoveryDB::new(GUID::new_participant_guid(), discovery_db_event_sender); + let mut discovery_db = DiscoveryDB::new( + GUID::new_participant_guid(), + discovery_db_event_sender, + status_sender, + ); let domain_participant = DomainParticipant::new(0).expect("Failed to create publisher"); let topic = domain_participant @@ -897,7 +938,12 @@ mod tests { TopicKind::WithKey, ) .unwrap(); - let mut discoverydb = DiscoveryDB::new(GUID::new_participant_guid(), discovery_db_event_sender); + let (status_sender, _status_receiver) = sync_status_channel(16).unwrap(); + let mut discoverydb = DiscoveryDB::new( + GUID::new_participant_guid(), + discovery_db_event_sender, + status_sender, + ); // Create reader ingredients let (notification_sender1, _notification_receiver1) = mio_extras::channel::sync_channel(100); diff --git a/src/discovery/spdp_participant_data.rs b/src/discovery/spdp_participant_data.rs index 8d5e3475..5b942744 100644 --- a/src/discovery/spdp_participant_data.rs +++ b/src/discovery/spdp_participant_data.rs @@ -80,6 +80,16 @@ pub struct SpdpDiscoveredParticipantData { } impl SpdpDiscoveredParticipantData { + #[cfg(feature = "security")] + pub(crate) fn supports_security(&self) -> bool { + // TODO: Is this logic correct? Or maybe we could come up with a more accurate + // version? + self.identity_token.is_some() + && self.permissions_token.is_some() + && self.property.is_some() + && self.security_info.is_some() + } + pub(crate) fn as_reader_proxy( &self, is_metatraffic: bool, @@ -595,11 +605,7 @@ mod tests { SubmessageBody::Writer(WriterSubmessage::Data(d, _)) => { let participant_data: SpdpDiscoveredParticipantData = PlCdrDeserializerAdapter::from_bytes( - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value, + &d.unwrap_serialized_payload_value(), RepresentationIdentifier::PL_CDR_LE, ) .unwrap(); @@ -609,12 +615,7 @@ mod tests { eprintln!("message data = {:?}", &data); eprintln!( "payload = {:?}", - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value - .to_vec() + &d.unwrap_serialized_payload_value().to_vec() ); eprintln!("deserialized = {:?}", &participant_data); eprintln!("serialized = {:?}", &sdata); @@ -665,22 +666,13 @@ mod tests { SubmessageBody::Writer(WriterSubmessage::Data(d, _)) => { let participant_data: Result = PlCdrDeserializerAdapter::from_bytes( - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value, + &d.unwrap_serialized_payload_value(), RepresentationIdentifier::PL_CDR_LE, ); eprintln!("message data = {:?}", &data); eprintln!( "payload = {:?}", - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value - .to_vec() + &d.unwrap_serialized_payload_value().to_vec() ); eprintln!("deserialized = {:?}", &participant_data); } @@ -710,22 +702,13 @@ mod tests { SubmessageBody::Writer(WriterSubmessage::Data(d, _)) => { let participant_data: Result = PlCdrDeserializerAdapter::from_bytes( - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value, + &d.unwrap_serialized_payload_value(), RepresentationIdentifier::PL_CDR_LE, ); eprintln!("message data = {:?}", &data); eprintln!( "payload = {:?}", - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value - .to_vec() + &d.unwrap_serialized_payload_value().to_vec() ); eprintln!("deserialized = {:?}", &participant_data); } @@ -764,22 +747,13 @@ mod tests { SubmessageBody::Writer(WriterSubmessage::Data(d, _)) => { let participant_data: Result = PlCdrDeserializerAdapter::from_bytes( - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value, + &d.unwrap_serialized_payload_value(), RepresentationIdentifier::PL_CDR_LE, ); eprintln!("message data = {:?}", &data); eprintln!( "payload = {:?}", - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value - .to_vec() + &d.unwrap_serialized_payload_value().to_vec() ); eprintln!("deserialized = {:?}", &participant_data); } diff --git a/src/messages/protocol_version.rs b/src/messages/protocol_version.rs index fe182ce3..59ddc808 100644 --- a/src/messages/protocol_version.rs +++ b/src/messages/protocol_version.rs @@ -1,8 +1,10 @@ +use std::fmt; + use speedy::{Readable, Writable}; #[allow(unused_imports)] use log::{debug, error, info, trace}; -#[derive(Debug, PartialOrd, PartialEq, Ord, Eq, Readable, Writable, Clone, Copy)] +#[derive(PartialOrd, PartialEq, Ord, Eq, Readable, Writable, Clone, Copy)] pub struct ProtocolVersion { pub major: u8, pub minor: u8, @@ -32,6 +34,12 @@ impl Default for ProtocolVersion { } } +impl fmt::Debug for ProtocolVersion { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}.{}", self.major, self.minor) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/messages/submessages/data.rs b/src/messages/submessages/data.rs index 3874d177..cbbe6cc6 100644 --- a/src/messages/submessages/data.rs +++ b/src/messages/submessages/data.rs @@ -8,8 +8,9 @@ use crate::{ messages::submessages::{elements::parameter_list::ParameterList, submessages::*}, structure::{guid::EntityId, sequence_number::SequenceNumber}, }; -use super::elements::serialized_payload::SerializedPayload; // use log::debug; +#[cfg(test)] +use super::elements::serialized_payload::SerializedPayload; /// This Submessage is sent from an RTPS Writer (NO_KEY or WITH_KEY) /// to an RTPS Reader (NO_KEY or WITH_KEY) @@ -38,17 +39,14 @@ pub struct Data { /// Present only if the InlineQosFlag is set in the header. pub inline_qos: Option, - /// Depending on the payload transformation kind, contains the serialized - /// CryptoHeader + CryptoContent + CryptoFooter, or - /// SerializedPayload as bytes, so that the - /// submessage can be deserialized without knowing which type to expect, - /// after which the payload can be decoded. - /// - /// If the DataFlag is set, then the SerializedPayload contains the - /// encapsulation of the new value of the data-object after the change. + /// If the DataFlag is set, then serialized_payload contains the encapsulation + /// of the new value of the data-object after the change. /// If the KeyFlag is set, then it contains the encapsulation of /// the key of the data-object the message refers to. - pub encoded_payload: Option, + /// + /// In case of submessage protection, this payload contains the + /// encoded version of the original payload + pub serialized_payload: Option, } impl Data { @@ -127,7 +125,7 @@ impl Data { None }; - let encoded_payload = if expect_data { + let serialized_payload = if expect_data { Some(buffer.clone().split_off(cursor.position() as usize)) } else { None @@ -138,7 +136,7 @@ impl Data { writer_id, writer_sn: sequence_number, inline_qos: parameter_list, - encoded_payload, + serialized_payload, }) } @@ -153,25 +151,29 @@ impl Data { 4 + // writerId 8 + // writerSN self.inline_qos.as_ref().map(|q| q.len_serialized() ).unwrap_or(0) + // QoS ParameterList - self.encoded_payload.as_ref().map(|q| q.len()).unwrap_or(0) + self.serialized_payload.as_ref().map(|q| q.len()).unwrap_or(0) } - // Creates a DecodedData with encoded_payload replaced by the input - pub fn decoded(self, decoded_payload: Option) -> DecodedData { - let Self { - reader_id, - writer_id, - writer_sn, - inline_qos, - .. - } = self; - DecodedData { - reader_id, - writer_id, - writer_sn, - inline_qos, - serialized_payload: decoded_payload, - } + #[cfg(test)] + pub(crate) fn unwrap_serialized_payload(&self) -> SerializedPayload { + self + .serialized_payload + .as_ref() + .map(SerializedPayload::from_bytes) + .unwrap() + .unwrap() + } + + #[cfg(test)] + pub(crate) fn unwrap_serialized_payload_value(&self) -> Bytes { + self.unwrap_serialized_payload().value + } + + #[cfg(test)] + pub(crate) fn update_serialized_payload_value(&mut self, new_value: Bytes) { + let mut payload = self.unwrap_serialized_payload(); + payload.value = new_value; + self.serialized_payload = Some(payload.into()); } } @@ -194,8 +196,8 @@ impl Writable for Data { writer.write_value(inline_qos)?; } - if let Some(encoded_payload) = self.encoded_payload.as_ref() { - writer.write_bytes(encoded_payload)?; + if let Some(serialized_payload) = self.serialized_payload.as_ref() { + writer.write_bytes(serialized_payload)?; } Ok(()) @@ -210,42 +212,3 @@ impl HasEntityIds for Data { self.writer_id } } - -// A version of the above struct with the payload decoded -pub struct DecodedData { - /// Identifies the RTPS Reader entity that is being informed of the change - /// to the data-object. - pub reader_id: EntityId, - - /// Identifies the RTPS Writer entity that made the change to the - /// data-object. - pub writer_id: EntityId, - - /// Uniquely identifies the change and the relative order for all changes - /// made by the RTPS Writer identified by the writerGuid. Each change - /// gets a consecutive sequence number. Each RTPS Writer maintains is - /// own sequence number. - pub writer_sn: SequenceNumber, - - /// Contains QoS that may affect the interpretation of the message. - /// Present only if the InlineQosFlag is set in the header. - pub inline_qos: Option, - - /// If the DataFlag is set, then the SerializedPayload contains the - /// encapsulation of the new value of the data-object after the change. - /// If the KeyFlag is set, then it contains the encapsulation of - /// the key of the data-object the message refers to. - pub serialized_payload: Option, -} - -#[cfg(test)] -impl Data { - // Restore compatibility with some tests that require DecodedData - pub fn no_crypto_decoded(&self) -> DecodedData { - let decoded_payload = self - .clone() - .encoded_payload - .map(|payload| SerializedPayload::from_bytes(&payload).unwrap()); - self.clone().decoded(decoded_payload) - } -} diff --git a/src/messages/submessages/data_frag.rs b/src/messages/submessages/data_frag.rs index f18d418a..516a250e 100644 --- a/src/messages/submessages/data_frag.rs +++ b/src/messages/submessages/data_frag.rs @@ -54,23 +54,22 @@ pub struct DataFrag { /// Present only if the InlineQosFlag is set in the header. pub inline_qos: Option, - /// Depending on the payload transformation kind, contains the serialized - /// CryptoContent or serialized payload fragment described below as bytes, so - /// that the submessage can be deserialized without knowing which type to - /// expect, after which the fragment can be decoded. - /// /// Encapsulation of a consecutive series of fragments, starting at /// fragment_starting_num for a total of fragments_in_submessage. /// Represents part of the new value of the data-object - /// after the change. Present only if either the DataFlag or the KeyFlag are - /// set in the header. Present only if DataFlag is set in the header. + /// after the change. + /// + /// If payloads are protected, contains the buffer that decodes to the series + /// of fragments. In particular, a CryptoHeader, a plaintext buffer or + /// CryptoContent depending on the transformation kind, and + /// CryptoFooter, which have been serialized and concatenated. /// /// Note: RTPS spec says the serialized_payload is of type SerializedPayload, /// but that is technically incorrect. It is a fragment of /// SerializedPayload. The headers at the beginning of SerializedPayload /// appear only at the first fragment. The fragmentation mechanism here /// should treat serialized_payload as an opaque stream of bytes. - pub encoded_payload: Bytes, + pub serialized_payload: Bytes, } impl DataFrag { @@ -89,7 +88,7 @@ impl DataFrag { 2 + // fragmentSize 4 + // sampleSize self.inline_qos.as_ref().map(|q| q.len_serialized() ).unwrap_or(0) + // QoS ParameterList - self.encoded_payload.len() + self.serialized_payload.len() } /// Spec talks about (expected) total number of fragments. @@ -225,7 +224,7 @@ impl DataFrag { } // Payload should be always present, be it data or key fragments. - let encoded_payload = buffer.clone().split_off(cursor.position() as usize); + let serialized_payload = buffer.clone().split_off(cursor.position() as usize); let datafrag = Self { reader_id, @@ -236,7 +235,7 @@ impl DataFrag { data_size, fragment_size, inline_qos, - encoded_payload, + serialized_payload, }; // fragment_starting_num strictly positive and must not exceed total number of @@ -256,32 +255,6 @@ impl DataFrag { Ok(datafrag) } - - // Creates a DecodedDataFrag with encoded_payload replaced by the input - pub fn decoded(self, decoded_payload: Bytes) -> DecodedDataFrag { - let Self { - reader_id, - writer_id, - writer_sn, - fragment_starting_num, - fragments_in_submessage, - data_size, - fragment_size, - inline_qos, - .. - } = self; - DecodedDataFrag { - reader_id, - writer_id, - writer_sn, - fragment_starting_num, - fragments_in_submessage, - data_size, - fragment_size, - inline_qos, - serialized_payload: decoded_payload, - } - } } impl Writable for DataFrag { @@ -302,7 +275,7 @@ impl Writable for DataFrag { if self.inline_qos.is_some() && !self.inline_qos.as_ref().unwrap().parameters.is_empty() { writer.write_value(&self.inline_qos)?; } - writer.write_bytes(&self.encoded_payload)?; + writer.write_bytes(&self.serialized_payload)?; Ok(()) } } @@ -315,65 +288,3 @@ impl HasEntityIds for DataFrag { self.writer_id } } - -#[derive(Debug, PartialEq, Eq, Clone)] -#[cfg_attr(test, derive(Default))] -pub struct DecodedDataFrag { - /// Identifies the RTPS Reader entity that is being informed of the change - /// to the data-object. - pub reader_id: EntityId, - - /// Identifies the RTPS Writer entity that made the change to the - /// data-object. - pub writer_id: EntityId, - - /// Uniquely identifies the change and the relative order for all changes - /// made by the RTPS Writer identified by the writerGuid. - /// Each change gets a consecutive sequence number. - /// Each RTPS Writer maintains is own sequence number. - pub writer_sn: SequenceNumber, - - /// Indicates the starting fragment for the series of fragments in - /// serialized_data. Fragment numbering starts with number 1. - pub fragment_starting_num: FragmentNumber, - - /// The number of consecutive fragments contained in this Submessage, - /// starting at fragment_starting_num. - pub fragments_in_submessage: u16, - - /// The total size in bytes of the original data before fragmentation. - pub data_size: u32, - - /// The size of an individual fragment in bytes. The maximum fragment size - /// equals 64K. - pub fragment_size: u16, - - /// Contains QoS that may affect the interpretation of the message. - /// Present only if the InlineQosFlag is set in the header. - pub inline_qos: Option, - - /// Encapsulation of a consecutive series of fragments, starting at - /// fragment_starting_num for a total of fragments_in_submessage. - /// Represents part of the new value of the data-object - /// after the change. Present only if either the DataFlag or the KeyFlag are - /// set in the header. Present only if DataFlag is set in the header. - /// - /// Note: RTPS spec says the serialized_payload is of type SerializedPayload, - /// but that is technically incorrect. It is a fragment of - /// SerializedPayload. The headers at the beginning of SerializedPayload - /// appear only at the first fragment. The fragmentation mechanism here - /// should treat serialized_payload as an opaque stream of bytes. - pub serialized_payload: Bytes, -} - -impl DecodedDataFrag { - // Duplicate for the method in DataFrag - pub fn total_number_of_fragments(&self) -> FragmentNumber { - let frag_size = self.fragment_size as u32; - if frag_size < 1 { - FragmentNumber::INVALID - } else { - FragmentNumber::new((self.data_size / frag_size) + u32::from(self.data_size % frag_size > 0)) - } - } -} diff --git a/src/messages/submessages/elements/serialized_payload.rs b/src/messages/submessages/elements/serialized_payload.rs index 2bf1acea..e10d6b49 100644 --- a/src/messages/submessages/elements/serialized_payload.rs +++ b/src/messages/submessages/elements/serialized_payload.rs @@ -21,7 +21,8 @@ use crate::RepresentationIdentifier; #[derive(Debug, PartialEq, Eq, Clone)] pub struct SerializedPayload { pub representation_identifier: RepresentationIdentifier, - pub representation_options: [u8; 2], // Not used. Send as zero, ignore on receive. + // Can represent payload protection kind. Currently not used outside security. + pub representation_options: [u8; 2], pub value: Bytes, } @@ -116,7 +117,7 @@ impl Writable for SerializedPayload { } } -// TODO: Should this be fallibe try_from instead? +// TODO: Should this be fallible try_from instead? // The speedy write_to_buffer() call could in theory fail, but it is hard to see // how. impl From for Bytes { diff --git a/src/messages/vendor_id.rs b/src/messages/vendor_id.rs index 4e16316f..f3a85c1d 100644 --- a/src/messages/vendor_id.rs +++ b/src/messages/vendor_id.rs @@ -1,13 +1,15 @@ +use std::fmt; + use speedy::{Context, Readable, Reader, Writable, Writer}; -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(PartialEq, Eq, Clone, Copy)] pub struct VendorId { pub vendor_id: [u8; 2], } impl VendorId { pub const VENDOR_UNKNOWN: Self = Self { - vendor_id: [0x00; 2], + vendor_id: [0x00, 0x00], }; /// assigned by OMG DDS SIG on 2020-11-21 @@ -20,6 +22,40 @@ impl VendorId { pub fn as_bytes(&self) -> [u8; 2] { self.vendor_id } + + fn known_vendor_id_string(self) -> Option<(&'static str, &'static str)> { + match self.vendor_id { + // from https://www.dds-foundation.org/dds-rtps-vendor-and-product-ids/ + // on 2023-11-03 + [0x01, 0x01] => Some(("RTI Connext DDS", "Real-Time Innovations, Inc. (RTI)")), + [0x01, 0x02] => Some(("OpenSplice DDS", "ADLink Ltd.")), + [0x01, 0x03] => Some(("OpenDDS", "Object Computing Inc. (OCI)")), + [0x01, 0x04] => Some(("Mil-DDS", "MilSoft")), + [0x01, 0x05] => Some(("InterCOM", "DDS Kongsberg")), + [0x01, 0x06] => Some(("CoreDX DDS", "Twin Oaks Computing")), + [0x01, 0x07] => Some(("Not Activ", "Lakota Technical Solutions, Inc.")), + [0x01, 0x08] => Some(("Not Active", "ICOUP Consulting")), + [0x01, 0x09] => Some(( + "Diamond DDS", + "Electronics and Telecommunication Research Institute (ETRI)", + )), + [0x01, 0x0A] => Some(("RTI Connext", "DDS Micro Real-Time Innovations, Inc. (RTI)")), + [0x01, 0x0B] => Some(("Vortex Cafe", "ADLink Ltd.")), + [0x01, 0x0C] => Some(("Not Active", "PrismTech Ltd.")), + [0x01, 0x0D] => Some(("Vortex Lite", "ADLink Ltd.")), + [0x01, 0x0E] => Some(("Qeo (Not active)", "Technicolor")), + [0x01, 0x0F] => Some(("FastRTPS, FastDDS", "eProsima")), + [0x01, 0x10] => Some(("Eclipse Cyclone DDS", "Eclipse Foundation")), + [0x01, 0x11] => Some(("GurumDDS", "Gurum Networks, Inc.")), + [0x01, 0x12] => Some(("RustDDS", "Atostek")), + [0x01, 0x13] => Some(( + "Zhenrong Data Distribution Service (ZRDDS)", + "Nanjing Zhenrong Software Technology Co.", + )), + [0x01, 0x14] => Some(("Dust DDS", "S2E Software Systems B.V.")), + _ => None, + } + } } impl Default for VendorId { @@ -28,6 +64,18 @@ impl Default for VendorId { } } +impl fmt::Debug for VendorId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + Self::VENDOR_UNKNOWN => write!(f, "VENDOR_UNKNOWN"), + other => match other.known_vendor_id_string() { + Some((product, vendor)) => write!(f, "{product} / {vendor}"), + None => write!(f, "{:x?}", other.vendor_id), + }, + } + } +} + impl<'a, C: Context> Readable<'a, C> for VendorId { #[inline] fn read_from>(reader: &mut R) -> Result { diff --git a/src/mio_source.rs b/src/mio_source.rs index 5fd60b61..5e0552ad 100644 --- a/src/mio_source.rs +++ b/src/mio_source.rs @@ -1,20 +1,20 @@ use std::{ io, io::{Read, Write}, - sync::Mutex, + sync::{Arc, Mutex}, }; #[cfg(not(target_os = "windows"))] -use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; +use std::os::fd::OwnedFd; #[cfg(target_os = "windows")] use std::{thread::sleep, time::Duration}; -#[cfg(target_os = "windows")] -use mio_08::net::{TcpListener, TcpStream}; #[allow(unused_imports)] use log::{debug, error, info, trace, warn}; +#[cfg(target_os = "windows")] +use mio_08::net::TcpListener; #[cfg(not(target_os = "windows"))] use socketpair::*; -use mio_08::{self, *}; +use mio_08::{self, net::TcpStream, *}; // PollEventSource and PollEventSender are an event communication // channel. PollEventSource is a mio-0.8 event::Source for Poll, @@ -22,32 +22,14 @@ use mio_08::{self, *}; // These Events carry no data. // This is the event receiver end. It is a "Source" in the terminology of mio. -#[cfg(target_os = "windows")] pub struct PollEventSource { rec_mio_socket: Mutex, } -#[cfg(not(target_os = "windows"))] -pub struct PollEventSource { - #[allow(dead_code)] - rec_sps: SocketpairStream, // we are storing this just to keep the socket alive - rec_mio_socket: Mutex, -} - -// TODO: How to store the socket so that it is correctly tracked and dropped -// when these PolEventSource /-Sender are dropped. Can we do that without -// unsafe? - -#[cfg(target_os = "windows")] -pub struct PollEventSender { - send_mio_socket: Mutex, -} - -#[cfg(not(target_os = "windows"))] +#[derive(Clone)] pub struct PollEventSender { - #[allow(dead_code)] - send_sps: SocketpairStream, - send_mio_socket: Mutex, + send_mio_socket: Arc>, + // Sender has Arc to support Clone, whereas Receiver has not. } #[cfg(not(target_os = "windows"))] @@ -64,16 +46,16 @@ pub fn make_poll_channel() -> io::Result<(PollEventSource, PollEventSender)> { let (rec_sps, send_sps) = socketpair_stream()?; let rec_sps = set_non_blocking(rec_sps)?; let send_sps = set_non_blocking(send_sps)?; - let rec_mio_socket = unsafe { mio_08::net::TcpStream::from_raw_fd(rec_sps.as_raw_fd()) }; - let send_mio_socket = unsafe { mio_08::net::TcpStream::from_raw_fd(send_sps.as_raw_fd()) }; + + let rec_mio_socket = TcpStream::from_std(std::net::TcpStream::from(OwnedFd::from(rec_sps))); + let send_mio_socket = TcpStream::from_std(std::net::TcpStream::from(OwnedFd::from(send_sps))); + Ok(( PollEventSource { - rec_sps, rec_mio_socket: Mutex::new(rec_mio_socket), }, PollEventSender { - send_sps, - send_mio_socket: Mutex::new(send_mio_socket), + send_mio_socket: Arc::new(Mutex::new(send_mio_socket)), }, )) } diff --git a/src/rtps/dp_event_loop.rs b/src/rtps/dp_event_loop.rs index cbcd2712..f029e6cc 100644 --- a/src/rtps/dp_event_loop.rs +++ b/src/rtps/dp_event_loop.rs @@ -10,7 +10,11 @@ use mio_06::{Event, Events, Poll, PollOpt, Ready, Token}; use mio_extras::channel as mio_channel; use crate::{ - dds::{qos::policy, typedesc::TypeDesc}, + dds::{ + qos::policy, + statusevents::{DomainParticipantStatusEvent, StatusChannelSender}, + typedesc::TypeDesc, + }, discovery::{ discovery::DiscoveryCommand, discovery_db::{discovery_db_read, DiscoveryDB}, @@ -80,6 +84,10 @@ pub struct DPEventLoop { writers: HashMap, udp_sender: Rc, + #[cfg(feature = "security")] // Currently used only with security. + // Just remove attribute if used also without. + participant_status_sender: StatusChannelSender, + discovery_update_notification_receiver: mio_channel::Receiver, #[cfg(feature = "security")] discovery_command_sender: mio_channel::SyncSender, @@ -87,7 +95,7 @@ pub struct DPEventLoop { impl DPEventLoop { // This pub(crate) , because it should be constructed only by DomainParticipant. - #[allow(clippy::too_many_arguments)] + #[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)] pub(crate) fn new( domain_info: DomainInfo, udp_listeners: HashMap, @@ -102,6 +110,9 @@ impl DPEventLoop { discovery_update_notification_receiver: mio_channel::Receiver, _discovery_command_sender: mio_channel::SyncSender, spdp_liveness_sender: mio_channel::SyncSender, + #[allow(unused_variables)] participant_status_sender: StatusChannelSender< + DomainParticipantStatusEvent, + >, security_plugins_opt: Option, ) -> Self { #[cfg(not(feature = "security"))] @@ -214,6 +225,8 @@ impl DPEventLoop { ack_nack_receiver: acknack_receiver, discovery_update_notification_receiver, #[cfg(feature = "security")] + participant_status_sender, + #[cfg(feature = "security")] discovery_command_sender: _discovery_command_sender, } } @@ -411,6 +424,15 @@ impl DPEventLoop { } // loop } // fn + #[cfg(feature = "security")] // Currently used only with security. + // Just remove attribute if used also without. + fn send_participant_status(&self, event: DomainParticipantStatusEvent) { + self + .participant_status_sender + .try_send(event) + .unwrap_or_else(|e| error!("Cannot report participant status: {e:?}")); + } + fn handle_reader_action(&mut self, event: &Event) { match event.token() { ADD_READER_TOKEN => { @@ -922,6 +944,10 @@ impl DPEventLoop { fn on_remote_participant_authentication_status_changed(&mut self, remote_guidp: GuidPrefix) { let auth_status = discovery_db_read(&self.discovery_db).get_authentication_status(remote_guidp); + auth_status.map(|status| { + self.send_participant_status(DomainParticipantStatusEvent::Authentication { status }); + }); + match auth_status { Some(AuthenticationStatus::Authenticated) => { // The participant has been authenticated @@ -1048,6 +1074,8 @@ mod tests { let (discovery_command_sender, _discovery_command_receiver) = mio_channel::sync_channel::(64); let (spdp_liveness_sender, _spdp_liveness_receiver) = mio_channel::sync_channel(8); + let (participant_status_sender, _participant_status_receiver) = + sync_status_channel(16).unwrap(); let dds_cache = Arc::new(RwLock::new(DDSCache::new())); let (discovery_db_event_sender, _discovery_db_event_receiver) = @@ -1056,6 +1084,7 @@ mod tests { let discovery_db = Arc::new(RwLock::new(DiscoveryDB::new( GUID::new_participant_guid(), discovery_db_event_sender, + participant_status_sender.clone(), ))); let domain_info = DomainInfo { @@ -1095,6 +1124,7 @@ mod tests { discovery_update_notification_receiver, discovery_command_sender, spdp_liveness_sender, + participant_status_sender, None, ); dp_event_loop diff --git a/src/rtps/fragment_assembler.rs b/src/rtps/fragment_assembler.rs index 9ee72eeb..56a01c36 100644 --- a/src/rtps/fragment_assembler.rs +++ b/src/rtps/fragment_assembler.rs @@ -10,7 +10,7 @@ use crate::{ dds::ddsdata::DDSData, messages::submessages::{ elements::serialized_payload::SerializedPayload, - submessages::{DATAFRAG_Flags, DecodedDataFrag}, + submessages::{DATAFRAG_Flags, DataFrag}, }, structure::{ cache_change::ChangeKind, @@ -32,7 +32,7 @@ struct AssemblyBuffer { } impl AssemblyBuffer { - pub fn new(datafrag: &DecodedDataFrag) -> Self { + pub fn new(datafrag: &DataFrag) -> Self { let data_size: usize = datafrag.data_size.try_into().unwrap(); // We have unwrap here, but it will succeed as long as usize >= u32. let fragment_size: u16 = datafrag.fragment_size; @@ -62,10 +62,10 @@ impl AssemblyBuffer { } } - pub fn insert_frags(&mut self, datafrag: &DecodedDataFrag, frag_size: u16) { + pub fn insert_frags(&mut self, datafrag: &DataFrag, frag_size: u16) { // TODO: Sanity checks? E.g. datafrag.fragment_size == frag_size let frag_size = usize::from(frag_size); // - payload_header; - let frags_in_subm = usize::from(datafrag.fragments_in_submessage); + let frags_in_submessage = usize::from(datafrag.fragments_in_submessage); let fragment_starting_num: usize = u32::from(datafrag.fragment_starting_num) .try_into() .unwrap(); @@ -91,22 +91,26 @@ impl AssemblyBuffer { // ends first. // And clamp to assembly buffer length to avoid buffer overrun. let to_before_byte = std::cmp::min( - from_byte + std::cmp::min(frags_in_subm * frag_size, datafrag.serialized_payload.len()), + from_byte + + std::cmp::min( + frags_in_submessage * frag_size, + datafrag.serialized_payload.len(), + ), self.buffer_bytes.len(), ); let payload_size = to_before_byte - from_byte; // sanity check data size - // Last fragment may be smaller than frags_in_subm * frag_size + // Last fragment may be smaller than frags_in_submessage * frag_size if fragment_starting_num < self.fragment_count - && datafrag.serialized_payload.len() < frags_in_subm * frag_size + && datafrag.serialized_payload.len() < frags_in_submessage * frag_size { error!( "Received DATAFRAG too small. fragment_starting_num={} out of fragment_count={}, \ - frags_in_subm={}, frag_size={} but payload length ={}", + frags_in_submessage={}, frag_size={} but payload length ={}", fragment_starting_num, self.fragment_count, - frags_in_subm, + frags_in_submessage, frag_size, datafrag.serialized_payload.len(), ); @@ -125,7 +129,7 @@ impl AssemblyBuffer { self.buffer_bytes.as_mut()[from_byte..to_before_byte] .copy_from_slice(&datafrag.serialized_payload[..payload_size]); - for f in 0..frags_in_subm { + for f in 0..frags_in_submessage { self.received_bitmap.set(start_frag_from_0 + f, true); } self.modified_time = Timestamp::now(); @@ -163,38 +167,38 @@ impl FragmentAssembler { // Returns completed DDSData, when complete, and disposes the assembly buffer. pub fn new_datafrag( &mut self, - datafrag: &DecodedDataFrag, + datafrag: &DataFrag, flags: BitFlags, ) -> Option { let writer_sn = datafrag.writer_sn; let frag_size = self.fragment_size; - let abuf = self + let assembly_buffer = self .assembly_buffers .entry(datafrag.writer_sn) .or_insert_with(|| AssemblyBuffer::new(datafrag)); - abuf.insert_frags(datafrag, frag_size); + assembly_buffer.insert_frags(datafrag, frag_size); - if abuf.is_complete() { + if assembly_buffer.is_complete() { debug!("new_datafrag: COMPLETED FRAGMENT"); - if let Some(abuf) = self.assembly_buffers.remove(&writer_sn) { + if let Some(assembly_buffer) = self.assembly_buffers.remove(&writer_sn) { // Return what we have assembled. - let ser_data_or_key = SerializedPayload::from_bytes(&abuf.buffer_bytes.freeze()) - .map_or_else( + let serialized_data_or_key = + SerializedPayload::from_bytes(&assembly_buffer.buffer_bytes.freeze()).map_or_else( |e| { error!("Deserializing SerializedPayload from DATAFRAG: {:?}", &e); None }, Some, )?; - let ddsdata = if flags.contains(DATAFRAG_Flags::Key) { - DDSData::new_disposed_by_key(ChangeKind::NotAliveDisposed, ser_data_or_key) + let dds_data = if flags.contains(DATAFRAG_Flags::Key) { + DDSData::new_disposed_by_key(ChangeKind::NotAliveDisposed, serialized_data_or_key) } else { // it is data - DDSData::new(ser_data_or_key) + DDSData::new(serialized_data_or_key) }; - Some(ddsdata) // completed data from fragments + Some(dds_data) // completed data from fragments } else { error!("Assembly buffer mysteriously lost"); None diff --git a/src/rtps/message.rs b/src/rtps/message.rs index 07a9df05..c066c830 100644 --- a/src/rtps/message.rs +++ b/src/rtps/message.rs @@ -13,9 +13,7 @@ use crate::{ protocol_id::ProtocolId, protocol_version::ProtocolVersion, submessages::{ - elements::{ - parameter::Parameter, parameter_list::ParameterList, serialized_payload::SerializedPayload, - }, + elements::{parameter::Parameter, parameter_list::ParameterList}, submessage::WriterSubmessage, submessages::{SubmessageKind, *}, }, @@ -25,12 +23,11 @@ use crate::{ structure::{ cache_change::CacheChange, entity::RTPSEntity, - guid::{EntityId, EntityKind, GuidPrefix, GUID}, + guid::{EntityId, GuidPrefix, GUID}, parameter_id::ParameterId, sequence_number::{FragmentNumber, SequenceNumber, SequenceNumberSet}, time::Timestamp, }, - RepresentationIdentifier, }; #[cfg(feature = "security")] use crate::{ @@ -161,7 +158,7 @@ impl MessageBuilder { content_length, }; - let submsg = Submessage { + let submessage = Submessage { header: submessage_header, body: SubmessageBody::Interpreter(InterpreterSubmessage::InfoTimestamp( InfoTimestamp { timestamp }, @@ -170,7 +167,7 @@ impl MessageBuilder { original_bytes: None, }; - self.submessages.push(submsg); + self.submessages.push(submessage); self } @@ -180,8 +177,12 @@ impl MessageBuilder { reader_entity_id: EntityId, // The entity id to be included in the submessage writer_guid: GUID, endianness: Endianness, - _security_plugins: Option<&SecurityPluginsHandle>, + security_plugins: Option<&SecurityPluginsHandle>, ) -> Self { + #[cfg(not(feature = "security"))] + // Parameter not used + let _ = security_plugins; + let writer_entity_id = writer_guid.entity_id; let mut param_list = ParameterList::new(); // inline QoS goes here @@ -221,22 +222,6 @@ impl MessageBuilder { DDSData::DisposeByKey { ref key, .. } => Some(key.clone()), DDSData::DisposeByKeyHash { .. } => None, }; - // TODO: please explain this logic here: - // - // Current hypothesis: - // If we are writing to a built-in ( = Discovery) topic, then we mark the - // encoding to be PL_CDR_LE, no matter what. This works if we are compiled - // on a little-endian machine. - let serialized_payload = serialized_payload.map(|serialized_payload| { - if writer_entity_id.kind() == EntityKind::WRITER_WITH_KEY_BUILT_IN { - SerializedPayload { - representation_identifier: RepresentationIdentifier::PL_CDR_LE, - ..serialized_payload - } - } else { - serialized_payload - } - }); #[cfg(not(feature = "security"))] let encoded_payload = serialized_payload; @@ -250,22 +235,18 @@ impl MessageBuilder { .write_to_vec() .map_err(|e| security_error!("{e:?}")) .and_then(|serialized_payload| { - _security_plugins - .map(SecurityPluginsHandle::get_plugins) - .map_or( - // If there are no security plugins, use plaintext - Ok(serialized_payload.clone()), - // If security plugins exist, call them to encode payload - |security_plugins| { - security_plugins - .encode_serialized_payload(serialized_payload, &writer_guid) - // Add the extra qos - .map(|(encoded_payload, extra_inline_qos)| { - param_list.concat(extra_inline_qos); - encoded_payload - }) - }, - ) + match security_plugins.map(SecurityPluginsHandle::get_plugins) { + Some(security_plugins) => { + security_plugins + .encode_serialized_payload(serialized_payload, &writer_guid) + // Add the extra qos + .map(|(encoded_payload, extra_inline_qos)| { + param_list.concat(extra_inline_qos); + encoded_payload + }) + } + None => Ok(serialized_payload), + } }) }) .transpose() @@ -289,7 +270,7 @@ impl MessageBuilder { writer_id: writer_entity_id, writer_sn: cache_change.sequence_number, inline_qos, - encoded_payload: encoded_payload.map(Bytes::from), + serialized_payload: encoded_payload.map(Bytes::from), }; let flags: BitFlags = BitFlags::::from_endianness(endianness) @@ -330,8 +311,12 @@ impl MessageBuilder { fragment_size: u16, sample_size: u32, // all fragments together endianness: Endianness, - _security_plugins: Option<&SecurityPluginsHandle>, + security_plugins: Option<&SecurityPluginsHandle>, ) -> Self { + #[cfg(not(feature = "security"))] + // Parameter not used + let _ = security_plugins; + let writer_entity_id = writer_guid.entity_id; let mut param_list = ParameterList::new(); // inline QoS goes here @@ -379,13 +364,9 @@ impl MessageBuilder { let encoded_payload = serialized_payload; #[cfg(feature = "security")] - let encoded_payload = match _security_plugins - .map(SecurityPluginsHandle::get_plugins) - .map_or( - // If there are no security plugins, use plaintext - Ok(serialized_payload.clone()), - // ..else, security plugins exist, call them to encode payload - |security_plugins| { + let encoded_payload = { + let encode_result = match security_plugins.map(SecurityPluginsHandle::get_plugins) { + Some(security_plugins) => { security_plugins .encode_serialized_payload(serialized_payload, &writer_guid) // Add the extra qos @@ -393,12 +374,20 @@ impl MessageBuilder { param_list.concat(extra_inline_qos); encoded_payload }) - }, - ) { - Ok(encoded_payload) => encoded_payload, - Err(e) => { - error!("{e:?}"); - return self; + } + None => + // If there are no security plugins, use plaintext + { + Ok(serialized_payload) + } + }; + + match encode_result { + Ok(encoded_payload) => encoded_payload, + Err(e) => { + error!("{e:?}"); + return self; + } } }; // end security encoding @@ -415,24 +404,9 @@ impl MessageBuilder { } else { None }, - encoded_payload: Bytes::from(encoded_payload), + serialized_payload: Bytes::from(encoded_payload), }; - // TODO: please explain this logic here: - // - // Current hypothesis: - // If we are writing to a built-in ( = Discovery) topic, then we mark the - // encoding to be PL_CDR_LE, no matter what. This works if we are compiled - // on a little-endian machine. - - // Is this ever necessary for fragmented data? - - // if writer_entity_id.kind() == EntityKind::WRITER_WITH_KEY_BUILT_IN { - // if let Some(sp) = data_message.serialized_payload.as_mut() { - // sp.representation_identifier = RepresentationIdentifier::PL_CDR_LE; - // } - // } - let flags: BitFlags = // endianness flag BitFlags::::from_endianness(endianness) @@ -749,13 +723,7 @@ mod tests { wtf => panic!("Unexpected message structure {wtf:?}"), }; - let serialized_payload = data_submessage - .no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value - .clone(); + let serialized_payload = data_submessage.unwrap_serialized_payload_value().clone(); info!("{:x?}", serialized_payload); let serialized = Bytes::from( diff --git a/src/rtps/message_receiver.rs b/src/rtps/message_receiver.rs index 637f3e49..e0aa785f 100644 --- a/src/rtps/message_receiver.rs +++ b/src/rtps/message_receiver.rs @@ -9,10 +9,7 @@ use bytes::Bytes; use crate::{ messages::{ protocol_version::ProtocolVersion, - submessages::{ - elements::serialized_payload::SerializedPayload, - submessages::{WriterSubmessage, *}, - }, + submessages::submessages::{WriterSubmessage, *}, vendor_id::VendorId, }, rtps::{reader::Reader, Message, Submessage, SubmessageBody}, @@ -511,7 +508,7 @@ impl MessageReceiver { } other => { warn!( - "Expected SecurePostfix submessage after SecurePrefix and payload submsg. \ + "Expected SecurePostfix submessage after SecurePrefix and payload submessage. \ Discarding." ); debug!("Unexpected submessage instead: {other:?}"); @@ -635,16 +632,7 @@ impl MessageReceiver { reader: &mut Reader, mr_state: &MessageReceiverState, ) { - let encoded_payload = data.encoded_payload.clone(); - - let sp: Option = encoded_payload - .map(|encoded_payload| SerializedPayload::from_bytes(&encoded_payload)) - .transpose() - .map_err(|e| error!("{e:?}")) - .ok() - .flatten(); - - reader.handle_data_msg(data.decoded(sp), data_flags, mr_state); + reader.handle_data_msg(data, data_flags, mr_state); } #[cfg(feature = "security")] @@ -658,43 +646,36 @@ impl MessageReceiver { ) { let Data { inline_qos, - encoded_payload, + serialized_payload, .. } = data.clone(); - encoded_payload + serialized_payload // If there is an encoded_payload, decode it - .map(|encoded_payload| { - security_plugins - .map(SecurityPluginsHandle::get_plugins) - .map_or( - // If there are no security plugins, we expect a serialized SerializedPayload as - // Bytes - Ok(encoded_payload.clone()), - // If security plugins exist, use them to decode - |security_plugins| { - // Decode - security_plugins - .decode_serialized_payload( - Vec::from(encoded_payload), - inline_qos.unwrap_or_default(), - source_guid, - &reader.guid(), - ) - // Convert to Bytes - .map(Bytes::from) - }, - ) - .map_err(|e| error!("{e:?}")) - // Deserialize - .and_then(|serialized_payload| { - SerializedPayload::from_bytes(&serialized_payload).map_err(|e| error!("{e:?}")) - }) - }) + .map( + |encoded_payload| match security_plugins.map(SecurityPluginsHandle::get_plugins) { + Some(security_plugins) => security_plugins + .decode_serialized_payload( + encoded_payload, + inline_qos.unwrap_or_default(), + source_guid, + &reader.guid(), + ) + .map_err(|e| error!("{e:?}")), + None => Ok(encoded_payload), + }, + ) .transpose() - // If there were no errors, give DecodedData to the reader + // If there were no errors, give to the reader .map(|decoded_payload| { - reader.handle_data_msg(data.decoded(decoded_payload), data_flags, mr_state); + reader.handle_data_msg( + Data { + serialized_payload: decoded_payload, + ..data + }, + data_flags, + mr_state, + ); }) // Errors have already been printed .ok(); @@ -710,8 +691,8 @@ impl MessageReceiver { reader: &mut Reader, mr_state: &MessageReceiverState, ) { - let serialized_payload = datafrag.encoded_payload.clone(); - if serialized_payload.len() + let payload_buffer_length = datafrag.serialized_payload.len(); + if payload_buffer_length > (datafrag.fragments_in_submessage as usize) * (datafrag.fragment_size as usize) { error!( @@ -721,17 +702,16 @@ impl MessageReceiver { format!( "Invalid DataFrag. serializedData length={} should be less than or equal to \ (fragments_in_submessage={}) x (fragment_size={})", - serialized_payload.len(), - datafrag.fragments_in_submessage, - datafrag.fragment_size + payload_buffer_length, datafrag.fragments_in_submessage, datafrag.fragment_size ), ) ); // and we're done } else { - let decoded_payload = serialized_payload; - reader.handle_datafrag_msg(&datafrag.decoded(decoded_payload), datafrag_flags, mr_state); + reader.handle_datafrag_msg(&datafrag, datafrag_flags, mr_state); } + // Consume to keep the same method signature as in the security case + drop(datafrag); } #[cfg(feature = "security")] @@ -745,60 +725,60 @@ impl MessageReceiver { ) { let DataFrag { inline_qos, - encoded_payload, + serialized_payload: encoded_payload, .. } = datafrag.clone(); - security_plugins - .map(SecurityPluginsHandle::get_plugins) - .map_or( - // If there are no security plugins, we expect a serialized SerializedPayload as - // Bytes - Ok(encoded_payload.clone()), - // If security plugins exist, use them to decode - |security_plugins| { - // Decode - security_plugins - .decode_serialized_payload( - Vec::from(encoded_payload), - inline_qos.unwrap_or_default(), - source_guid, - &reader.guid(), - ) - // Convert to Bytes - .map(Bytes::from) + match security_plugins.map(SecurityPluginsHandle::get_plugins) { + Some(security_plugins) => { + // Decode + security_plugins + .decode_serialized_payload( + encoded_payload, + inline_qos.unwrap_or_default(), + source_guid, + &reader.guid(), + ) + .map_err(|e| error!("{e:?}")) + } + None => Ok(encoded_payload), + } + .ok() + // Deserialize + .and_then(|serialized_payload| { + // The check that used to be in DataFrag deserialization + if serialized_payload.len() + > (datafrag.fragments_in_submessage as usize) * (datafrag.fragment_size as usize) + { + error!( + "{:?}", + std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Invalid DataFrag. serializedData length={} should be less than or equal to \ + (fragments_in_submessage={}) x (fragment_size={})", + serialized_payload.len(), + datafrag.fragments_in_submessage, + datafrag.fragment_size + ), + ) + ); + None + } else { + Some(serialized_payload) + } + }) + // If there were no errors, give DecodedDataFrag to the reader + .map(|decoded_payload| { + reader.handle_datafrag_msg( + &DataFrag { + serialized_payload: decoded_payload, + ..datafrag }, - ) - .map_err(|e| error!("{e:?}")) - .ok() - // Deserialize - .and_then(|serialized_payload| { - // The check that used to be in DataFrag deserialization - if serialized_payload.len() - > (datafrag.fragments_in_submessage as usize) * (datafrag.fragment_size as usize) - { - error!( - "{:?}", - std::io::Error::new( - std::io::ErrorKind::Other, - format!( - "Invalid DataFrag. serializedData length={} should be less than or equal to \ - (fragments_in_submessage={}) x (fragment_size={})", - serialized_payload.len(), - datafrag.fragments_in_submessage, - datafrag.fragment_size - ), - ) - ); - None - } else { - Some(serialized_payload) - } - }) - // If there were no errors, give DecodedDataFrag to the reader - .map(|decoded_payload| { - reader.handle_datafrag_msg(&datafrag.decoded(decoded_payload), datafrag_flags, mr_state); - }); + datafrag_flags, + mr_state, + ); + }); } fn handle_reader_submessage(&self, submessage: ReaderSubmessage) { diff --git a/src/rtps/reader.rs b/src/rtps/reader.rs index 5873ae18..933eda03 100644 --- a/src/rtps/reader.rs +++ b/src/rtps/reader.rs @@ -28,7 +28,9 @@ use crate::{ protocol_id::ProtocolId, protocol_version::ProtocolVersion, submessages::{ - elements::{inline_qos::InlineQos, parameter_list::ParameterList}, + elements::{ + inline_qos::InlineQos, parameter_list::ParameterList, serialized_payload::SerializedPayload, + }, submessages::*, }, vendor_id::VendorId, @@ -485,7 +487,7 @@ impl Reader { // handles regular data message and updates history cache pub fn handle_data_msg( &mut self, - data: DecodedData, + data: Data, data_flags: BitFlags, mr_state: &MessageReceiverState, ) { @@ -529,7 +531,7 @@ impl Reader { pub fn handle_datafrag_msg( &mut self, - datafrag: &DecodedDataFrag, + datafrag: &DataFrag, datafrag_flags: BitFlags, mr_state: &MessageReceiverState, ) { @@ -557,7 +559,7 @@ impl Reader { // parse write_options out of the message // TODO: This is almost duplicate code from DATA processing let mut write_options_b = WriteOptionsBuilder::new(); - // Check if we have s source timestamp + // Check if we have a source timestamp if let Some(source_timestamp) = mr_state.source_timestamp { write_options_b = write_options_b.source_timestamp(source_timestamp); } @@ -707,7 +709,7 @@ impl Reader { fn data_to_dds_data( &self, - data: DecodedData, + data: Data, data_flags: BitFlags, ) -> Result { let representation_identifier = DATA_Flags::cdr_representation_identifier(data_flags); @@ -717,16 +719,18 @@ impl Reader { data_flags.contains(DATA_Flags::Data), data_flags.contains(DATA_Flags::Key), ) { - (Some(sp), true, false) => { + (Some(serialized_payload), true, false) => { // data - Ok(DDSData::new(sp)) + Ok(DDSData::new( + SerializedPayload::from_bytes(&serialized_payload).map_err(|e| format!("{e:?}"))?, + )) } - (Some(sp), false, true) => { + (Some(serialized_payload), false, true) => { // key Ok(DDSData::new_disposed_by_key( Self::deduce_change_kind(&data.inline_qos, false, representation_identifier), - sp, + SerializedPayload::from_bytes(&serialized_payload).map_err(|e| format!("{e:?}"))?, )) } @@ -1467,7 +1471,7 @@ mod tests { let data_flags = BitFlags::::from_flag(DATA_Flags::Data); // 4. Feed the data for the reader to handle - reader.handle_data_msg(data.no_crypto_decoded(), data_flags, &mr_state); + reader.handle_data_msg(data, data_flags, &mr_state); // 5. Verify that the reader sends a notification about the new data assert!( @@ -1552,7 +1556,7 @@ mod tests { let sequence_num = data.writer_sn; // 4. Feed the data for the reader to handle - reader.handle_data_msg(data.no_crypto_decoded(), data_flags, &mr_state); + reader.handle_data_msg(data.clone(), data_flags, &mr_state); // 5. Verify that the reader sent the data to the topic cache let topic_cache = topic_cache_handle.lock().unwrap(); @@ -1563,7 +1567,7 @@ mod tests { // 6. Verify that the content of the cache change is as expected // Construct a cache change with the expected content - let dds_data = DDSData::new(data.no_crypto_decoded().serialized_payload.unwrap()); + let dds_data = DDSData::new(data.unwrap_serialized_payload()); let cc_locally_built = CacheChange::new( writer_guid, sequence_num, @@ -1782,7 +1786,7 @@ mod tests { }; let data_flags = BitFlags::::from_flag(DATA_Flags::Data); - reader.handle_data_msg(data.no_crypto_decoded(), data_flags, &mr_state); + reader.handle_data_msg(data, data_flags, &mr_state); // 6. Verify that the writer proxy reports seqnums below 5 as ackable // This should be the case since reader received data with seqnum 3 and seqnum 4 diff --git a/src/security/cryptographic/cryptographic_builtin/crypto_transform.rs b/src/security/cryptographic/cryptographic_builtin/crypto_transform.rs index d1816230..e42f34e0 100644 --- a/src/security/cryptographic/cryptographic_builtin/crypto_transform.rs +++ b/src/security/cryptographic/cryptographic_builtin/crypto_transform.rs @@ -24,7 +24,7 @@ use crate::{ security_error, }; use super::{ - aes_gcm_gmac::{decrypt, validate_mac}, + aes_gcm_gmac::{compute_mac, decrypt, encrypt, validate_mac}, encode::{encode_gcm, encode_gmac}, key_material::*, validate_receiver_specific_macs::validate_receiver_specific_mac, @@ -146,14 +146,20 @@ impl CryptoTransform for CryptographicBuiltin { } BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_AES128_GMAC | BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_AES256_GMAC => { - let mac = aes_gcm_gmac::compute_mac(&session_key, initialization_vector, &plain_buffer)?; + let mac = compute_mac(&session_key, initialization_vector, &plain_buffer)?; (plain_buffer, BuiltinCryptoFooter::only_common_mac(mac)) } BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_AES128_GCM | BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_AES256_GCM => { - let (ciphertext, mac) = - aes_gcm_gmac::encrypt(&session_key, initialization_vector, &plain_buffer)?; - (ciphertext, BuiltinCryptoFooter::only_common_mac(mac)) + let (ciphertext, mac) = encrypt(&session_key, initialization_vector, &plain_buffer)?; + ( + CryptoContent { data: ciphertext } + .write_to_vec() + .map_err(|err| { + security_error!("Error converting CryptoContent to byte vector: {}", err) + })?, + BuiltinCryptoFooter::only_common_mac(mac), + ) } }; @@ -162,9 +168,7 @@ impl CryptoTransform for CryptographicBuiltin { .map_err(|err| security_error!("Error converting CryptoHeader to byte vector: {}", err))?; let footer_vec = Vec::::try_from(footer)?; Ok(( - CryptoContent::from([header_vec, encoded_data, footer_vec].concat()) - .write_to_vec() - .map_err(|e| security_error!("Error serializing CryptoContent: {e:?}"))?, + [header_vec, encoded_data, footer_vec].concat(), ParameterList::new(), // TODO: If the payload was not data but key, then construct a key_hash // and return that to be appended to the InlineQoS of the outgoing DATA Submessage. @@ -741,7 +745,7 @@ impl CryptoTransform for CryptographicBuiltin { fn decode_serialized_payload( &self, - crypto_header_content_footer_buffer: Vec, + encoded_buffer: Vec, _inline_qos: ParameterList, _receiving_datareader_crypto_handle: DatareaderCryptoHandle, sending_datawriter_crypto_handle: DatawriterCryptoHandle, @@ -765,16 +769,19 @@ impl CryptoTransform for CryptographicBuiltin { // check length so that following split do not panic and subtract does not // underflow - if crypto_header_content_footer_buffer.len() < head_len + foot_len + 4 { + if encoded_buffer.len() < head_len + foot_len { return Err(security_error("Encoded payload smaller than minimum size")); } - let (header_bytes, content_and_footer_bytes) = - crypto_header_content_footer_buffer.split_at(head_len); + let (header_bytes, content_and_footer_bytes) = encoded_buffer.split_at(head_len); let (content_bytes, footer_bytes) = content_and_footer_bytes.split_at(content_and_footer_bytes.len() - foot_len); - let crypto_header = CryptoHeader::read_from_buffer(header_bytes)?; // Deserialize crypto header and footer + + // .read_from_buffer() does not need endianness, because BuiltinCryptoHeader + // only contains byte-oriented data, which is insensitive to endianness. + let crypto_header = CryptoHeader::read_from_buffer(header_bytes)?; + let BuiltinCryptoHeader { transform_identifier: BuiltinCryptoTransformIdentifier { @@ -783,8 +790,6 @@ impl CryptoTransform for CryptographicBuiltin { }, builtin_crypto_header_extra: BuiltinCryptoHeaderExtra(initialization_vector), } = crypto_header.try_into()?; - // .read_from_buffer() does not need endianness, because BuiltinCryptoHeader - // only contains byte-oriented data, which is insensitive to endianness. let BuiltinCryptoFooter { common_mac, .. } = BuiltinCryptoFooter::try_from(footer_bytes)?; @@ -796,17 +801,6 @@ impl CryptoTransform for CryptographicBuiltin { initialization_vector, )?; - // Check that the key IDs match - if decode_key_material.key_id != transformation_key_id { - return Err(security_error!( - "Mismatched decode key IDs: the decoded CryptoHeader has {}, but the key associated with \ - the sending datawriter {} has {}.", - transformation_key_id, - sending_datawriter_crypto_handle, - decode_key_material.key_id - )); - } - // Check that the transformation kind stays consistent if decode_key_material.transformation_kind != transformation_kind { return Err(security_error!( @@ -821,24 +815,20 @@ impl CryptoTransform for CryptographicBuiltin { let decode_key = &decode_key_material.session_key; match transformation_kind { - BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_NONE - | BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_AES128_GMAC + BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_NONE => Err(security_error!( + "Transformation kind NONE found in decode_serialized_payload. If the transformation kind \ + is NONE, this method should not have been called." + )), + BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_AES128_GMAC | BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_AES256_GMAC => { - // Validate signature even if it is not requested to avoid - // unauthorized data injection attack. - if transformation_kind == BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_NONE { - warn!( - "decode_serialized_payload with crypto transformation kind = none. Does not make \ - sense, but validating MAC anyway." - ); - } - aes_gcm_gmac::validate_mac(decode_key, initialization_vector, content_bytes, common_mac) + validate_mac(decode_key, initialization_vector, content_bytes, common_mac) // if validate_mac succeeds, then map result to content bytes .map(|()| Vec::from(content_bytes)) } BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_AES128_GCM | BuiltinCryptoTransformationKind::CRYPTO_TRANSFORMATION_KIND_AES256_GCM => { - aes_gcm_gmac::decrypt(decode_key, initialization_vector, content_bytes, common_mac) + let ciphertext = CryptoContent::read_from_buffer(content_bytes)?.data; + decrypt(decode_key, initialization_vector, &ciphertext, common_mac) } } } diff --git a/src/security/cryptographic/cryptographic_plugin.rs b/src/security/cryptographic/cryptographic_plugin.rs index eb772f97..b24a55bb 100644 --- a/src/security/cryptographic/cryptographic_plugin.rs +++ b/src/security/cryptographic/cryptographic_plugin.rs @@ -167,8 +167,8 @@ pub trait CryptoTransform: Send { /// (v. 1.1) /// /// In a tuple, return the results that would be written in `encoded_buffer`, - /// which can be `CryptoContent`, or `SerializedPayload` if no encryption is - /// performed, and `extra_inline_qos`. + /// which is as bytes a serialized payload containing (a fragment of) the + /// encoded data, and `extra_inline_qos`. fn encode_serialized_payload( &self, plain_buffer: Vec, // (a fragment of) serialized `SerializedPayload` @@ -267,7 +267,7 @@ pub trait CryptoTransform: Send { /// decode_serialized_payload: section 8.5.1.9.9 of the Security specification /// (v. 1.1) /// - /// Return the (fragment of) serialized payload that would be written in + /// Return the (fragment of the) serialized payload that would be written in /// `plain_buffer` fn decode_serialized_payload( &self, diff --git a/src/security/security_plugins.rs b/src/security/security_plugins.rs index 88626cc3..a1dd92cf 100644 --- a/src/security/security_plugins.rs +++ b/src/security/security_plugins.rs @@ -4,15 +4,13 @@ use std::{ sync::{Arc, Mutex, MutexGuard}, }; +use bytes::Bytes; use log::{debug, error}; -use speedy::Readable; use crate::{ discovery::{DiscoveredReaderData, DiscoveredWriterData, TopicBuiltinTopicData}, messages::submessages::{ - elements::{ - crypto_content::CryptoContent, crypto_header::CryptoHeader, parameter_list::ParameterList, - }, + elements::{crypto_header::CryptoHeader, parameter_list::ParameterList}, secure_postfix::SecurePostfix, secure_prefix::SecurePrefix, submessage::{HasEntityIds, SecuritySubmessage}, @@ -1329,11 +1327,11 @@ impl SecurityPlugins { pub fn decode_serialized_payload( &self, - encoded_payload: Vec, + encoded_payload: Bytes, inline_qos: ParameterList, source_guid: &GUID, destination_guid: &GUID, - ) -> SecurityResult> { + ) -> SecurityResult { // TODO remove after testing, skips decoding if self.test_disable_crypto_transform { return Ok(encoded_payload); @@ -1342,12 +1340,15 @@ impl SecurityPlugins { if self.payload_not_protected(destination_guid) { Ok(encoded_payload) } else { - self.crypto.decode_serialized_payload( - CryptoContent::read_from_buffer_copying_data(&encoded_payload)?.data, //TODO needs changes for interoperability - inline_qos, - self.get_local_endpoint_crypto_handle(destination_guid)?, - self.get_remote_endpoint_crypto_handle((destination_guid, source_guid))?, - ) + self + .crypto + .decode_serialized_payload( + Vec::from(encoded_payload), + inline_qos, + self.get_local_endpoint_crypto_handle(destination_guid)?, + self.get_remote_endpoint_crypto_handle((destination_guid, source_guid))?, + ) + .map(Bytes::from) } } diff --git a/src/structure/duration.rs b/src/structure/duration.rs index 39aa517d..b39303b0 100644 --- a/src/structure/duration.rs +++ b/src/structure/duration.rs @@ -1,21 +1,10 @@ -use std::{convert::TryFrom, ops::Div}; +use std::{convert::TryFrom, fmt, ops::Div}; use speedy::{Readable, Writable}; use serde::{Deserialize, Serialize}; #[derive( - Debug, - PartialEq, - Eq, - Hash, - PartialOrd, - Ord, - Readable, - Writable, - Serialize, - Deserialize, - Copy, - Clone, + PartialEq, Eq, Hash, PartialOrd, Ord, Readable, Writable, Serialize, Deserialize, Copy, Clone, )] /// Duration is the DDS/RTPS representation for lengths of time, such as /// timeouts. It is very similar to [`std::time::Duration`]. See also @@ -148,6 +137,21 @@ impl std::ops::Mul for f64 { } } +impl fmt::Debug for Duration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if *self == Self::DURATION_INFINITE { + write!(f, "infinite") + } else { + write!(f, "{}", self.seconds)?; + if self.fraction > 0 { + let frac = format!("{:09}", (1_000_000_000 * (self.fraction as u64)) >> 32); + write!(f, ".{}", frac.trim_end_matches('0'))?; + } + write!(f, " sec") + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -207,4 +211,18 @@ mod tests { std::time::Duration::from_nanos(1_519_152_760 * NANOS_PER_SEC + 309_247_999) ); } + + fn fmt_check(d: Duration, s: &str) { + assert_eq!(format!("{:?}", d), s); + } + + #[test] + fn duration_format() { + fmt_check(Duration::from_frac_seconds(0.0), "0 sec"); + fmt_check(Duration::from_frac_seconds(0.5), "0.5 sec"); + fmt_check(Duration::from_frac_seconds(1.5), "1.5 sec"); + fmt_check(Duration::from_frac_seconds(20.0), "20 sec"); + fmt_check(Duration::from_frac_seconds(2.25), "2.25 sec"); + fmt_check(Duration::from_frac_seconds(10.0 / 3.0), "3.333333333 sec"); + } } diff --git a/src/structure/guid.rs b/src/structure/guid.rs index ced3bbdc..781aa873 100644 --- a/src/structure/guid.rs +++ b/src/structure/guid.rs @@ -62,7 +62,10 @@ impl AsRef<[u8]> for GuidPrefix { impl fmt::Debug for GuidPrefix { // This is so common that we skip all the introductions and just print the data. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.bytes.fmt(f) + for b in self.bytes.iter() { + write!(f, "{:02x}", b)?; + } + Ok(()) } } diff --git a/src/test/test_data.rs b/src/test/test_data.rs index 5b95da2a..16386e26 100644 --- a/src/test/test_data.rs +++ b/src/test/test_data.rs @@ -145,11 +145,7 @@ pub(crate) fn spdp_participant_msg_mod(port: u16) -> Message { SubmessageBody::Writer(WriterSubmessage::Data(d, _)) => { let mut participant_data: SpdpDiscoveredParticipantData = PlCdrDeserializerAdapter::::from_bytes( - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value, + &d.unwrap_serialized_payload_value(), RepresentationIdentifier::PL_CDR_LE, ) .unwrap(); @@ -159,13 +155,7 @@ pub(crate) fn spdp_participant_msg_mod(port: u16) -> Message { participant_data.default_unicast_locators.clear(); participant_data.default_multicast_locators.clear(); - let datalen = d - .no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value - .len() as u16; + let datalen = d.unwrap_serialized_payload_value().len() as u16; data = participant_data .to_pl_cdr_bytes(RepresentationIdentifier::PL_CDR_LE) .unwrap(); @@ -173,20 +163,8 @@ pub(crate) fn spdp_participant_msg_mod(port: u16) -> Message { // to_bytes::(&participant_data) .unwrap(), // ); - d.no_crypto_decoded() - .serialized_payload - .as_mut() - .unwrap() - .value = data.clone(); - submsglen = submsglen - + d - .no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value - .len() as u16 - - datalen; + d.update_serialized_payload_value(data.clone()); + submsglen = submsglen + d.unwrap_serialized_payload_value().len() as u16 - datalen; } SubmessageBody::Interpreter(_) => (), @@ -208,11 +186,7 @@ pub(crate) fn spdp_participant_data() -> Option { match &submsg.body { SubmessageBody::Writer(WriterSubmessage::Data(d, _)) => { let participant_data: SpdpDiscoveredParticipantData = PlCdrDeserializerAdapter::from_bytes( - &d.no_crypto_decoded() - .serialized_payload - .as_ref() - .unwrap() - .value, + &d.unwrap_serialized_payload_value(), RepresentationIdentifier::PL_CDR_LE, ) .unwrap(); @@ -411,7 +385,7 @@ pub(crate) fn create_rtps_data_message( writer_id, writer_sn: SequenceNumber::default(), inline_qos: None, - encoded_payload: Some(serialized_payload.write_to_vec().unwrap().into()), + serialized_payload: Some(serialized_payload.write_to_vec().unwrap().into()), }; let data_size = data_message @@ -462,7 +436,7 @@ pub(crate) fn create_cdr_pl_rtps_data_message( writer_id, writer_sn: SequenceNumber::default(), inline_qos: None, - encoded_payload: Some(serialized_payload.write_to_vec().unwrap().into()), + serialized_payload: Some(serialized_payload.write_to_vec().unwrap().into()), }; let data_size = data_message diff --git a/src/test/test_properties.rs b/src/test/test_properties.rs index c84011c2..6d246840 100644 --- a/src/test/test_properties.rs +++ b/src/test/test_properties.rs @@ -3,7 +3,7 @@ use speedy::Writable; use crate::{ messages::submessages::{ elements::serialized_payload::SerializedPayload, - submessages::{Data, DecodedData, RepresentationIdentifier}, + submessages::{Data, RepresentationIdentifier}, }, structure::{guid::EntityId, sequence_number::SequenceNumber}, }; @@ -32,15 +32,10 @@ impl Default for Data { writer_id: EntityId::default(), writer_sn: SequenceNumber::default(), inline_qos: None, - encoded_payload: Some(SerializedPayload::default().write_to_vec().unwrap().into()), + serialized_payload: Some(SerializedPayload::default().write_to_vec().unwrap().into()), } } } -impl Default for DecodedData { - fn default() -> Self { - Data::default().no_crypto_decoded() - } -} /* trait TestingTrait {