diff --git a/src/receiver/receiver.rs b/src/receiver/receiver.rs index 83d6fa0..e29d8ea 100644 --- a/src/receiver/receiver.rs +++ b/src/receiver/receiver.rs @@ -32,6 +32,9 @@ pub struct Config { pub object_max_cache_size: Option, /// Enable MD5 check of the received objects. Default `true` pub enable_md5_check: bool, + /// When set to `true`, the receiver will only reconstruct each object once. + /// If the same object is transferred again, it will be automatically discarded. + pub object_receive_once: bool, } impl Default for Config { @@ -42,6 +45,7 @@ impl Default for Config { object_timeout: Some(Duration::from_secs(10)), object_max_cache_size: None, enable_md5_check: true, + object_receive_once: true } } } @@ -295,7 +299,7 @@ impl Receiver { .map(|f| f.fdt_instance_id) .unwrap(); - if self.is_fdt_received(fdt_instance_id) { + if self.config.object_receive_once && self.is_fdt_received(fdt_instance_id) { return Ok(()); } @@ -475,7 +479,17 @@ impl Receiver { fn push_obj(&mut self, pkt: &alc::AlcPkt, now: SystemTime) -> Result<()> { if self.objects_completed.contains_key(&pkt.lct.toi) { + if self.config.object_receive_once { return Ok(()); + } + + let payload_id = alc::get_fec_inline_payload_id(pkt)?; + log::warn!("Object already received tsi={} toi={}", payload_id.sbn, payload_id.esi); + if payload_id.sbn == 0 && payload_id.esi == 0 { + self.objects_completed.remove(&pkt.lct.toi); + } else { + return Ok(()); + } } if self.objects_error.contains(&pkt.lct.toi) { diff --git a/tests/flute.rs b/tests/flute.rs index 930e3a5..ced0d50 100644 --- a/tests/flute.rs +++ b/tests/flute.rs @@ -644,4 +644,58 @@ mod tests { let toi_result = sender.add_object(0, obj).unwrap(); assert!(toi_value == toi_result); } + + #[test] + pub fn test_receiver_disable_received_once() { + crate::tests::init(); + + let max_transfert_count = 5usize; + let oti: sender::Oti = Default::default(); + let content_type = "application/octet-stream"; + + let (mut obj, _) = create_object(100000, content_type, sender::Cenc::Null, true, None); + obj.max_transfer_count = max_transfert_count as u32; + let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new()); + let mut receiver_config = receiver::Config::default(); + receiver_config.object_receive_once = false; + let mut receiver = + receiver::MultiReceiver::new(output.clone(), Some(receiver_config), false); + + let mut sender = create_sender(vec![obj], &oti, sender::Cenc::Null, None); + + let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_owned(), 5000); + + loop { + let now_sender = std::time::SystemTime::now(); + let data = sender.read(now_sender); + if data.is_none() { + break; + } + + // Simulate reception 60s later -> FDT should be expired + let now_receiver = std::time::SystemTime::now() + std::time::Duration::from_secs(60); + receiver + .push(&endpoint, data.as_ref().unwrap(), now_receiver) + .unwrap(); + receiver.cleanup(now_receiver); + } + + let nb_complete_objects = output + .as_ref() + .objects + .borrow() + .iter() + .filter(|&obj| obj.borrow().complete) + .count(); + + let nb_error_objects = output + .as_ref() + .objects + .borrow() + .iter() + .filter(|&obj| obj.borrow().error) + .count(); + assert!(nb_complete_objects == max_transfert_count); + assert!(nb_error_objects == 0); + } }