diff --git a/src/receiver/fdtreceiver.rs b/src/receiver/fdtreceiver.rs index 494a725..8cee422 100644 --- a/src/receiver/fdtreceiver.rs +++ b/src/receiver/fdtreceiver.rs @@ -224,6 +224,7 @@ impl ObjectWriterBuilder for FdtWriterBuilder { __meta: &ObjectMetadata, _transfer_duration: std::time::Duration, _now: std::time::SystemTime, + _ext_time: Option, ) { } } diff --git a/src/receiver/receiver.rs b/src/receiver/receiver.rs index 2797477..814ff52 100644 --- a/src/receiver/receiver.rs +++ b/src/receiver/receiver.rs @@ -75,7 +75,7 @@ pub struct Receiver { last_activity: Instant, closed_is_imminent: bool, endpoint: UDPEndpoint, - last_timestamp: Option + last_timestamp: Option, } impl Receiver { @@ -91,7 +91,7 @@ impl Receiver { /// # Returns /// /// A new `Receiver` instance. - /// + /// pub fn new( endpoint: &UDPEndpoint, tsi: u64, @@ -110,7 +110,7 @@ impl Receiver { last_activity: Instant::now(), closed_is_imminent: false, endpoint: endpoint.clone(), - last_timestamp: None + last_timestamp: None, } } @@ -343,7 +343,6 @@ impl Receiver { fdtreceiver::State::Complete => {} fdtreceiver::State::Error => return Err(FluteError::new("Fail to decode FDT")), fdtreceiver::State::Expired => { - let expiration = fdt_receiver.get_expiration_time().unwrap_or(now); let server_time = fdt_receiver.get_server_time(now); @@ -354,7 +353,7 @@ impl Receiver { "TSI={} FDT has been received but is already expired expiration time={} server time={}", self.tsi, expiration.to_rfc3339(), - server_time.to_rfc3339() + server_time.to_rfc3339() ); return Ok(()); } @@ -381,16 +380,26 @@ impl Receiver { .unwrap_or(now); let meta = fdt_current.fdt_meta().unwrap(); - let transfer_duration = now.duration_since(fdt_current.reception_start_time).unwrap_or(std::time::Duration::new(0, 0)); + let transfer_duration = now + .duration_since(fdt_current.reception_start_time) + .unwrap_or(std::time::Duration::new(0, 0)); - self.writer - .fdt_received(&self.endpoint, &self.tsi, &xml, expiration_date, meta, transfer_duration, now); + let ext_time = alc::get_sender_current_time(alc_pkt).unwrap_or(None); + self.writer.fdt_received( + &self.endpoint, + &self.tsi, + &xml, + expiration_date, + meta, + transfer_duration, + now, + ext_time, + ); } self.fdt_current.push_front(fdt_current); self.attach_latest_fdt_to_objects(now); self.gc_object_completed(); self.update_expiration_date_of_completed_objects_using_latest_fdt(now); - if self.fdt_current.len() > 10 { self.fdt_current.pop_back(); @@ -432,32 +441,43 @@ impl Receiver { let expiration_date = fdt_instance.get_expiration_date(); if let Some(true) = fdt_instance.full_fdt { - let files_toi: std::collections::HashMap> = files.iter().map(|f| (f.toi.parse().unwrap_or_default(), f.content_md5.as_ref())).collect(); - let remove_candidates: std::collections::HashMap = self.objects_completed.iter().filter_map(|(toi, meta)| match files_toi.contains_key(toi) { - true => None, - false => Some((*toi, meta.clone())) - }).collect(); - + let files_toi: std::collections::HashMap> = files + .iter() + .map(|f| (f.toi.parse().unwrap_or_default(), f.content_md5.as_ref())) + .collect(); + let remove_candidates: std::collections::HashMap = self + .objects_completed + .iter() + .filter_map(|(toi, meta)| match files_toi.contains_key(toi) { + true => None, + false => Some((*toi, meta.clone())), + }) + .collect(); + if !remove_candidates.is_empty() { - let content_locations: std::collections::HashSet<&str> = files.iter().map(|f| f.content_location.as_str()).collect(); + let content_locations: std::collections::HashSet<&str> = + files.iter().map(|f| f.content_location.as_str()).collect(); let duration = std::time::Duration::from_secs(4); for (toi, meta) in &remove_candidates { let content_location = meta.content_location.to_string(); - if !content_locations.contains(content_location.as_str()) && meta.expiration_date > now + duration { + if !content_locations.contains(content_location.as_str()) + && meta.expiration_date > now + duration + { self.writer.set_cache_duration( &self.endpoint, &self.tsi, toi, &meta.content_location, &duration, - now + now, ); } } - self.objects_completed.retain(|f, _| !remove_candidates.contains_key(f)); + self.objects_completed + .retain(|f, _| !remove_candidates.contains_key(f)); } } - + for file in files { let toi: u128 = file.toi.parse().unwrap_or_default(); let cache_duration = file.get_cache_duration(expiration_date, server_time); @@ -481,7 +501,7 @@ impl Receiver { &toi, &obj.content_location, &cache_duration, - now + now, ); } } @@ -498,7 +518,11 @@ impl Receiver { } let payload_id = alc::get_fec_inline_payload_id(pkt)?; - log::warn!("Object already received tsi={} toi={}", payload_id.sbn, payload_id.esi); + log::warn!( + "Object already received tsi={} toi={}", + payload_id.sbn, + payload_id.esi + ); if payload_id.sbn == 0 && payload_id.esi == 0 { self.objects_completed.remove(&pkt.lct.toi); } else { @@ -506,7 +530,6 @@ impl Receiver { } } if self.objects_error.contains(&pkt.lct.toi) { - let payload_id = alc::get_fec_inline_payload_id(pkt)?; if payload_id.sbn == 0 && payload_id.esi == 0 { log::warn!("Re-download object after errors"); @@ -588,39 +611,38 @@ impl Receiver { self.tsi, toi ); - let _success = self.objects.remove(&toi); - debug_assert!(_success.is_some()); + self.objects.remove(&toi); } } fn gc_object_completed(&mut self) { - let current_fdt = match self.fdt_current.front_mut() { Some(fdt) => fdt, - None => return + None => return, }; let instance = match current_fdt.fdt_instance() { Some(instance) => instance, - None => return + None => return, }; - + if let Some(true) = instance.full_fdt { return; } - - + let before = self.objects_completed.len(); if let Some(files) = instance.file.as_ref() { - let current_tois: std::collections::HashSet = files.iter().map(|file| file.toi.parse().unwrap_or(0)).collect(); + let current_tois: std::collections::HashSet = files + .iter() + .map(|file| file.toi.parse().unwrap_or(0)) + .collect(); self.objects_completed - .retain(|toi, _meta| current_tois.contains(toi) ); + .retain(|toi, _meta| current_tois.contains(toi)); } let after = self.objects_completed.len(); if before != after { log::debug!("GC remove {} / {} objects", before - after, before); } - } fn gc_object_error(&mut self) { @@ -638,11 +660,12 @@ impl Receiver { None, self.writer.clone(), self.config.enable_md5_check, - self.config.object_max_cache_size.unwrap_or(10 * 1024 * 1024), - now + self.config + .object_max_cache_size + .unwrap_or(10 * 1024 * 1024), + now, )); - let mut is_attached = false; for (fdt_index, fdt) in (&mut self.fdt_current.iter_mut()).enumerate() { let fdt_id = fdt.fdt_id; @@ -670,22 +693,22 @@ impl Receiver { } if !is_attached { - log::warn!("Object received before the FDT TSI={} TOI={}", self.tsi, toi); + log::warn!( + "Object received before the FDT TSI={} TOI={}", + self.tsi, + toi + ); } self.objects.insert(*toi, obj); } } - - impl Drop for Receiver { fn drop(&mut self) { - log::info!("Drop Flute Receiver"); if let Some(fdt) = self.fdt_current.front_mut() { - if let Some(instance) = fdt.fdt_instance() { if instance.full_fdt == Some(true) { let duration = std::time::Duration::from_secs(0); @@ -697,13 +720,11 @@ impl Drop for Receiver { obj.0, &obj.1.content_location, &duration, - self.last_timestamp.unwrap_or_else(|| SystemTime::now()) + self.last_timestamp.unwrap_or_else(|| SystemTime::now()), ); } - } } - } } } diff --git a/src/receiver/writer/mod.rs b/src/receiver/writer/mod.rs index 63642a0..45cd5e1 100644 --- a/src/receiver/writer/mod.rs +++ b/src/receiver/writer/mod.rs @@ -81,6 +81,7 @@ pub trait ObjectWriterBuilder { meta: &ObjectMetadata, transfer_duration: Duration, now: std::time::SystemTime, + ext_time: Option, ); } diff --git a/src/receiver/writer/objectwriterbuffer.rs b/src/receiver/writer/objectwriterbuffer.rs index aed446c..5039d62 100644 --- a/src/receiver/writer/objectwriterbuffer.rs +++ b/src/receiver/writer/objectwriterbuffer.rs @@ -88,6 +88,7 @@ impl ObjectWriterBuilder for ObjectWriterBufferBuilder { _meta: &ObjectMetadata, _transfer_duration: std::time::Duration, _now: std::time::SystemTime, + _ext_time: Option, ) { } } diff --git a/src/receiver/writer/objectwriterfs.rs b/src/receiver/writer/objectwriterfs.rs index 416b8d3..4af5ab7 100644 --- a/src/receiver/writer/objectwriterfs.rs +++ b/src/receiver/writer/objectwriterfs.rs @@ -65,6 +65,7 @@ impl ObjectWriterBuilder for ObjectWriterFSBuilder { _meta: &ObjectMetadata, _transfer_duration: std::time::Duration, _now: std::time::SystemTime, + _ext_time: Option, ) { } }