Skip to content

Commit

Permalink
Option to enable / disable the reception of objects only once
Browse files Browse the repository at this point in the history
  • Loading branch information
ypo committed Sep 9, 2024
1 parent d9fcf38 commit da7bb83
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
16 changes: 15 additions & 1 deletion src/receiver/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub struct Config {
pub object_max_cache_size: Option<usize>,
/// Enable MD5 check of the received objects. Default `true`
pub enable_md5_check: bool,
/// When set to `true`, the receiver will only reconstruct each object once.
/// If the same object is transferred again, it will be automatically discarded.
pub object_receive_once: bool,
}

impl Default for Config {
Expand All @@ -42,6 +45,7 @@ impl Default for Config {
object_timeout: Some(Duration::from_secs(10)),
object_max_cache_size: None,
enable_md5_check: true,
object_receive_once: true
}
}
}
Expand Down Expand Up @@ -295,7 +299,7 @@ impl Receiver {
.map(|f| f.fdt_instance_id)
.unwrap();

if self.is_fdt_received(fdt_instance_id) {
if self.config.object_receive_once && self.is_fdt_received(fdt_instance_id) {
return Ok(());
}

Expand Down Expand Up @@ -475,7 +479,17 @@ impl Receiver {

fn push_obj(&mut self, pkt: &alc::AlcPkt, now: SystemTime) -> Result<()> {
if self.objects_completed.contains_key(&pkt.lct.toi) {
if self.config.object_receive_once {
return Ok(());
}

let payload_id = alc::get_fec_inline_payload_id(pkt)?;
log::warn!("Object already received tsi={} toi={}", payload_id.sbn, payload_id.esi);
if payload_id.sbn == 0 && payload_id.esi == 0 {
self.objects_completed.remove(&pkt.lct.toi);
} else {
return Ok(());
}
}
if self.objects_error.contains(&pkt.lct.toi) {

Expand Down
54 changes: 54 additions & 0 deletions tests/flute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,4 +644,58 @@ mod tests {
let toi_result = sender.add_object(0, obj).unwrap();
assert!(toi_value == toi_result);
}

#[test]
pub fn test_receiver_disable_received_once() {
crate::tests::init();

let max_transfert_count = 5usize;
let oti: sender::Oti = Default::default();
let content_type = "application/octet-stream";

let (mut obj, _) = create_object(100000, content_type, sender::Cenc::Null, true, None);
obj.max_transfer_count = max_transfert_count as u32;
let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new());
let mut receiver_config = receiver::Config::default();
receiver_config.object_receive_once = false;
let mut receiver =
receiver::MultiReceiver::new(output.clone(), Some(receiver_config), false);

let mut sender = create_sender(vec![obj], &oti, sender::Cenc::Null, None);

let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_owned(), 5000);

loop {
let now_sender = std::time::SystemTime::now();
let data = sender.read(now_sender);
if data.is_none() {
break;
}

// Simulate reception 60s later -> FDT should be expired
let now_receiver = std::time::SystemTime::now() + std::time::Duration::from_secs(60);
receiver
.push(&endpoint, data.as_ref().unwrap(), now_receiver)
.unwrap();
receiver.cleanup(now_receiver);
}

let nb_complete_objects = output
.as_ref()
.objects
.borrow()
.iter()
.filter(|&obj| obj.borrow().complete)
.count();

let nb_error_objects = output
.as_ref()
.objects
.borrow()
.iter()
.filter(|&obj| obj.borrow().error)
.count();
assert!(nb_complete_objects == max_transfert_count);
assert!(nb_error_objects == 0);
}
}

0 comments on commit da7bb83

Please sign in to comment.