diff --git a/src/dds/participant.rs b/src/dds/participant.rs index fbe60af5..a0ebdc09 100644 --- a/src/dds/participant.rs +++ b/src/dds/participant.rs @@ -992,7 +992,10 @@ impl DomainParticipantInner { info!( "New DomainParticipantInner: domain_id={:?} participant_id={:?} GUID={:?} security={}", - domain_id, participant_id, participant_guid, cfg!(security) + domain_id, + participant_id, + participant_guid, + cfg!(security) ); Ok(Self { diff --git a/src/dds/pubsub.rs b/src/dds/pubsub.rs index a3a2266e..e88201ac 100644 --- a/src/dds/pubsub.rs +++ b/src/dds/pubsub.rs @@ -49,16 +49,10 @@ use super::{ no_key::wrappers::{DAWrapper, NoKeyWrapper, SAWrapper}, with_key::simpledatareader::ReaderCommand, }; - #[cfg(feature = "security")] -use crate::{ - security::security_plugins::SecurityPluginsHandle, -}; - +use crate::security::security_plugins::SecurityPluginsHandle; #[cfg(not(feature = "security"))] -use crate::{ - no_security::security_plugins::SecurityPluginsHandle, -}; +use crate::no_security::security_plugins::SecurityPluginsHandle; // ------------------------------------------------------------------- @@ -257,7 +251,7 @@ impl Publisher { .create_datawriter(self, Some(entity_id), topic, qos, writer_like_stateless) } - #[cfg(feature="security")] // to avoid "never used" warning + #[cfg(feature = "security")] // to avoid "never used" warning pub(crate) fn create_datawriter_with_entity_id_no_key( &self, entity_id: EntityId, @@ -798,7 +792,7 @@ impl Subscriber { .create_datareader(self, topic, Some(entity_id), qos, reader_like_stateless) } - #[cfg(feature="security")] // to avoid "never used" warning + #[cfg(feature = "security")] // to avoid "never used" warning pub(crate) fn create_datareader_with_entity_id_no_key( &self, topic: &Topic, diff --git a/src/discovery/discovery.rs b/src/discovery/discovery.rs index 47efd730..8fdd4d94 100644 --- a/src/discovery/discovery.rs +++ b/src/discovery/discovery.rs @@ -681,7 +681,10 @@ impl Discovery { loop { let mut events = Events::with_capacity(32); // Should this be outside of the loop? - match self.poll.poll(&mut events, Some(std::time::Duration::from_millis(5000))) { + match self + .poll + .poll(&mut events, Some(std::time::Duration::from_millis(5000))) + { Ok(_) => (), Err(e) => { error!("Failed in waiting of poll in discovery. {e:?}"); @@ -692,7 +695,6 @@ impl Discovery { debug!("Discovery event loop idling."); } - for event in events.into_iter() { match event.token() { DISCOVERY_COMMAND_TOKEN => { diff --git a/src/rtps/dp_event_loop.rs b/src/rtps/dp_event_loop.rs index 6b910e02..040712c7 100644 --- a/src/rtps/dp_event_loop.rs +++ b/src/rtps/dp_event_loop.rs @@ -36,11 +36,9 @@ use crate::{ use crate::discovery::secure_discovery::AuthenticationStatus; #[cfg(feature = "security")] use crate::security::security_plugins::SecurityPluginsHandle; - #[cfg(not(feature = "security"))] use crate::no_security::security_plugins::SecurityPluginsHandle; - pub struct DomainInfo { pub domain_participant_guid: GUID, pub domain_id: u16, diff --git a/src/rtps/writer.rs b/src/rtps/writer.rs index 0dd7dbca..ef9bfbc4 100644 --- a/src/rtps/writer.rs +++ b/src/rtps/writer.rs @@ -1012,8 +1012,8 @@ impl Writer { fn handle_repair_data_send_worker(&mut self, reader_proxy: &mut RtpsReaderProxy) { // Note: The reader_proxy is now removed from readers map let reader_guid = reader_proxy.remote_reader_guid; - let mut partial_message = MessageBuilder::new() - .dst_submessage(self.endianness, reader_guid.prefix); + let mut partial_message = + MessageBuilder::new().dst_submessage(self.endianness, reader_guid.prefix); debug!( "Repair data send due to ACKNACK. ReaderProxy Unsent changes: {:?}", reader_proxy.unsent_changes_debug() @@ -1033,12 +1033,11 @@ impl Writer { if cache_change.data_value.payload_size() <= self.data_max_size_serialized { // if there was a source timestamp, add that let ts = cache_change.write_options.source_timestamp(); - partial_message = - if ts.is_some() { - partial_message.ts_msg(self.endianness, ts) - } else { - partial_message - }; + partial_message = if ts.is_some() { + partial_message.ts_msg(self.endianness, ts) + } else { + partial_message + }; // construct DATA submessage partial_message = partial_message.data_msg( cache_change, diff --git a/src/structure/dds_cache.rs b/src/structure/dds_cache.rs index f137e35b..01202632 100644 --- a/src/structure/dds_cache.rs +++ b/src/structure/dds_cache.rs @@ -277,7 +277,7 @@ impl TopicCache { .unwrap_or(SequenceNumber::zero()); let upper_bound_exc = self.reliable_before(*guid); // make sure lower < upper, so that `.range()` does not panic. - let upper_bound_exc = max(upper_bound_exc, lower_bound_exc.plus_1() ); + let upper_bound_exc = max(upper_bound_exc, lower_bound_exc.plus_1()); sn_map.range((Excluded(lower_bound_exc), Excluded(upper_bound_exc))) }) // we get iterator of Timestamp .filter_map(|(_sn, t)| self.get_change(t).map(|cc| (*t, cc))), diff --git a/src/structure/sequence_number.rs b/src/structure/sequence_number.rs index 6bb5cfa4..123f3c03 100644 --- a/src/structure/sequence_number.rs +++ b/src/structure/sequence_number.rs @@ -45,7 +45,7 @@ impl SequenceNumber { } pub const fn plus_1(&self) -> Self { - SequenceNumber( self.0 + 1 ) + SequenceNumber(self.0 + 1) } } @@ -79,7 +79,6 @@ impl From for i64 { } } - // --------------------------------------- #[derive(Clone, Copy, Debug)] @@ -404,7 +403,7 @@ where end }; // sanity ok. Now do the actual work. - let num_bits = i64::from( end - base + N::from(1) ); + let num_bits = i64::from(end - base + N::from(1)); let mut sns = Self::new(base, num_bits as u32); for s in set.iter().filter(|s| base <= **s && **s <= end) { sns.insert(*s); diff --git a/src/test/mod.rs b/src/test/mod.rs index a6db14e2..dcd98fa5 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -5,58 +5,60 @@ pub(crate) mod shape_type; pub(crate) mod test_data; pub(crate) mod test_properties; +use std::{thread, time::Duration}; + use anyhow::Result; + use crate::{ - policy::{Durability, History, Reliability}, - DomainParticipant, QosPolicyBuilder, Timestamp, TopicKind, + policy::{Durability, History, Reliability}, + DomainParticipant, QosPolicyBuilder, Timestamp, TopicKind, }; -use std::{thread, time::Duration}; #[derive(serde::Serialize, serde::Deserialize, Debug)] struct TestType; #[test] fn timestamp() -> Result<()> { - let participant = DomainParticipant::new(0)?; - let qos = QosPolicyBuilder::new() - .history(History::KeepAll) - .reliability(Reliability::Reliable { - max_blocking_time: Duration::from_secs(0).into(), - }) - .durability(Durability::TransientLocal) - .build(); - let writer = participant - .create_publisher(&qos)? - .create_datawriter_no_key_cdr::( - &participant.create_topic( - "test".to_string(), - "TestType".to_string(), - &qos, - TopicKind::NoKey, - )?, - None, - )?; - let participant2 = DomainParticipant::new(0)?; - let mut reader = participant2 - .create_subscriber(&qos)? - .create_datareader_no_key_cdr::( - &participant.create_topic( - "test".to_string(), - "TestType".to_string(), - &qos, - TopicKind::NoKey, - )?, - None, - )?; - let timestamp = Timestamp::now(); - writer.write(TestType, Some(timestamp))?; - thread::sleep(Duration::from_secs(3)); - loop { - if let Ok(Some(sample)) = reader.take_next_sample() { - assert_eq!(timestamp, sample.sample_info().source_timestamp().unwrap()); - break; - } - thread::sleep(Duration::from_millis(100)); + let participant = DomainParticipant::new(0)?; + let qos = QosPolicyBuilder::new() + .history(History::KeepAll) + .reliability(Reliability::Reliable { + max_blocking_time: Duration::from_secs(0).into(), + }) + .durability(Durability::TransientLocal) + .build(); + let writer = participant + .create_publisher(&qos)? + .create_datawriter_no_key_cdr::( + &participant.create_topic( + "test".to_string(), + "TestType".to_string(), + &qos, + TopicKind::NoKey, + )?, + None, + )?; + let participant2 = DomainParticipant::new(0)?; + let mut reader = participant2 + .create_subscriber(&qos)? + .create_datareader_no_key_cdr::( + &participant.create_topic( + "test".to_string(), + "TestType".to_string(), + &qos, + TopicKind::NoKey, + )?, + None, + )?; + let timestamp = Timestamp::now(); + writer.write(TestType, Some(timestamp))?; + thread::sleep(Duration::from_secs(3)); + loop { + if let Ok(Some(sample)) = reader.take_next_sample() { + assert_eq!(timestamp, sample.sample_info().source_timestamp().unwrap()); + break; } - Ok(()) -} \ No newline at end of file + thread::sleep(Duration::from_millis(100)); + } + Ok(()) +}