Skip to content

Commit

Permalink
Fix missing source timestamp when doing DATA retransmit. #299
Browse files Browse the repository at this point in the history
  • Loading branch information
jhelovuo committed Oct 8, 2023
1 parent d956303 commit 81691db
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ test-case = "3.1.0"
env_logger = "0.10"
test-log = "0.2"
hex-literal = "0.4"
anyhow = "1.0" # for test cases

# ros_visualizer
crossterm = "0.26"
Expand Down
14 changes: 9 additions & 5 deletions src/rtps/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,11 +1013,7 @@ impl Writer {
// Note: The reader_proxy is now removed from readers map
let reader_guid = reader_proxy.remote_reader_guid;
let mut partial_message = MessageBuilder::new()
.dst_submessage(self.endianness, reader_guid.prefix)
.ts_msg(self.endianness, Some(Timestamp::now()));
// TODO: This timestamp should probably not be
// the current (retransmit) time, but the initial sample production timestamp,
// i.e. should be read from DDSCache (and be implemented there)
.dst_submessage(self.endianness, reader_guid.prefix);
debug!(
"Repair data send due to ACKNACK. ReaderProxy Unsent changes: {:?}",
reader_proxy.unsent_changes_debug()
Expand All @@ -1035,6 +1031,14 @@ impl Writer {
if let Some(cache_change) = self.acquire_the_topic_cache_guard().get_change(&timestamp) {
// CacheChange found, check if we can send it in one piece (i.e. DATA)
if cache_change.data_value.payload_size() <= self.data_max_size_serialized {
// if there was a source timestamp, add that
let ts = cache_change.write_options.source_timestamp();
partial_message =
if ts.is_some() {
partial_message.ts_msg(self.endianness, ts)
} else {
partial_message
};
// construct DATA submessage
partial_message = partial_message.data_msg(
cache_change,
Expand Down
56 changes: 56 additions & 0 deletions src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,59 @@ pub(crate) mod random_data;
pub(crate) mod shape_type;
pub(crate) mod test_data;
pub(crate) mod test_properties;

use anyhow::Result;
use crate::{
policy::{Durability, History, Reliability},
DomainParticipant, QosPolicyBuilder, Timestamp, TopicKind,
};
use std::{thread, time::Duration};

#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct TestType;

#[test]
fn timestamp() -> Result<()> {
let participant = DomainParticipant::new(0)?;
let qos = QosPolicyBuilder::new()
.history(History::KeepAll)
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_secs(0).into(),
})
.durability(Durability::TransientLocal)
.build();
let writer = participant
.create_publisher(&qos)?
.create_datawriter_no_key_cdr::<TestType>(
&participant.create_topic(
"test".to_string(),
"TestType".to_string(),
&qos,
TopicKind::NoKey,
)?,
None,
)?;
let participant2 = DomainParticipant::new(0)?;
let mut reader = participant2
.create_subscriber(&qos)?
.create_datareader_no_key_cdr::<TestType>(
&participant.create_topic(
"test".to_string(),
"TestType".to_string(),
&qos,
TopicKind::NoKey,
)?,
None,
)?;
let timestamp = Timestamp::now();
writer.write(TestType, Some(timestamp))?;
thread::sleep(Duration::from_secs(3));
loop {
if let Ok(Some(sample)) = reader.take_next_sample() {
assert_eq!(timestamp, sample.sample_info().source_timestamp().unwrap());
break;
}
thread::sleep(Duration::from_millis(100));
}
Ok(())
}

0 comments on commit 81691db

Please sign in to comment.