diff --git a/Cargo.toml b/Cargo.toml index 80a6bb86..c889aae0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ test-case = "3.1.0" env_logger = "0.10" test-log = "0.2" hex-literal = "0.4" +anyhow = "1.0" # for test cases # ros_visualizer crossterm = "0.26" diff --git a/src/rtps/writer.rs b/src/rtps/writer.rs index 7a5f30ad..0dd7dbca 100644 --- a/src/rtps/writer.rs +++ b/src/rtps/writer.rs @@ -1013,11 +1013,7 @@ impl Writer { // Note: The reader_proxy is now removed from readers map let reader_guid = reader_proxy.remote_reader_guid; let mut partial_message = MessageBuilder::new() - .dst_submessage(self.endianness, reader_guid.prefix) - .ts_msg(self.endianness, Some(Timestamp::now())); - // TODO: This timestamp should probably not be - // the current (retransmit) time, but the initial sample production timestamp, - // i.e. should be read from DDSCache (and be implemented there) + .dst_submessage(self.endianness, reader_guid.prefix); debug!( "Repair data send due to ACKNACK. ReaderProxy Unsent changes: {:?}", reader_proxy.unsent_changes_debug() @@ -1035,6 +1031,14 @@ impl Writer { if let Some(cache_change) = self.acquire_the_topic_cache_guard().get_change(×tamp) { // CacheChange found, check if we can send it in one piece (i.e. DATA) if cache_change.data_value.payload_size() <= self.data_max_size_serialized { + // if there was a source timestamp, add that + let ts = cache_change.write_options.source_timestamp(); + partial_message = + if ts.is_some() { + partial_message.ts_msg(self.endianness, ts) + } else { + partial_message + }; // construct DATA submessage partial_message = partial_message.data_msg( cache_change, diff --git a/src/test/mod.rs b/src/test/mod.rs index 1ea92277..a6db14e2 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -4,3 +4,59 @@ pub(crate) mod random_data; pub(crate) mod shape_type; pub(crate) mod test_data; pub(crate) mod test_properties; + +use anyhow::Result; +use crate::{ + policy::{Durability, History, Reliability}, + DomainParticipant, QosPolicyBuilder, Timestamp, TopicKind, +}; +use std::{thread, time::Duration}; + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct TestType; + +#[test] +fn timestamp() -> Result<()> { + let participant = DomainParticipant::new(0)?; + let qos = QosPolicyBuilder::new() + .history(History::KeepAll) + .reliability(Reliability::Reliable { + max_blocking_time: Duration::from_secs(0).into(), + }) + .durability(Durability::TransientLocal) + .build(); + let writer = participant + .create_publisher(&qos)? + .create_datawriter_no_key_cdr::( + &participant.create_topic( + "test".to_string(), + "TestType".to_string(), + &qos, + TopicKind::NoKey, + )?, + None, + )?; + let participant2 = DomainParticipant::new(0)?; + let mut reader = participant2 + .create_subscriber(&qos)? + .create_datareader_no_key_cdr::( + &participant.create_topic( + "test".to_string(), + "TestType".to_string(), + &qos, + TopicKind::NoKey, + )?, + None, + )?; + let timestamp = Timestamp::now(); + writer.write(TestType, Some(timestamp))?; + thread::sleep(Duration::from_secs(3)); + loop { + if let Ok(Some(sample)) = reader.take_next_sample() { + assert_eq!(timestamp, sample.sample_info().source_timestamp().unwrap()); + break; + } + thread::sleep(Duration::from_millis(100)); + } + Ok(()) +} \ No newline at end of file