Skip to content

Commit

Permalink
get EXT_TIME from FDT
Browse files Browse the repository at this point in the history
  • Loading branch information
ypo committed Oct 31, 2024
1 parent 178f4cb commit 200aed2
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 45 deletions.
1 change: 1 addition & 0 deletions src/receiver/fdtreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl ObjectWriterBuilder for FdtWriterBuilder {
__meta: &ObjectMetadata,
_transfer_duration: std::time::Duration,
_now: std::time::SystemTime,
_ext_time: Option<std::time::SystemTime>,
) {
}
}
Expand Down
111 changes: 66 additions & 45 deletions src/receiver/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct Receiver {
last_activity: Instant,
closed_is_imminent: bool,
endpoint: UDPEndpoint,
last_timestamp: Option<SystemTime>
last_timestamp: Option<SystemTime>,
}

impl Receiver {
Expand All @@ -91,7 +91,7 @@ impl Receiver {
/// # Returns
///
/// A new `Receiver` instance.
///
///
pub fn new(
endpoint: &UDPEndpoint,
tsi: u64,
Expand All @@ -110,7 +110,7 @@ impl Receiver {
last_activity: Instant::now(),
closed_is_imminent: false,
endpoint: endpoint.clone(),
last_timestamp: None
last_timestamp: None,
}
}

Expand Down Expand Up @@ -343,7 +343,6 @@ impl Receiver {
fdtreceiver::State::Complete => {}
fdtreceiver::State::Error => return Err(FluteError::new("Fail to decode FDT")),
fdtreceiver::State::Expired => {

let expiration = fdt_receiver.get_expiration_time().unwrap_or(now);
let server_time = fdt_receiver.get_server_time(now);

Expand All @@ -354,7 +353,7 @@ impl Receiver {
"TSI={} FDT has been received but is already expired expiration time={} server time={}",
self.tsi,
expiration.to_rfc3339(),
server_time.to_rfc3339()
server_time.to_rfc3339()
);
return Ok(());
}
Expand All @@ -381,16 +380,26 @@ impl Receiver {
.unwrap_or(now);

let meta = fdt_current.fdt_meta().unwrap();
let transfer_duration = now.duration_since(fdt_current.reception_start_time).unwrap_or(std::time::Duration::new(0, 0));
let transfer_duration = now
.duration_since(fdt_current.reception_start_time)
.unwrap_or(std::time::Duration::new(0, 0));

self.writer
.fdt_received(&self.endpoint, &self.tsi, &xml, expiration_date, meta, transfer_duration, now);
let ext_time = alc::get_sender_current_time(alc_pkt).unwrap_or(None);
self.writer.fdt_received(
&self.endpoint,
&self.tsi,
&xml,
expiration_date,
meta,
transfer_duration,
now,
ext_time,
);
}
self.fdt_current.push_front(fdt_current);
self.attach_latest_fdt_to_objects(now);
self.gc_object_completed();
self.update_expiration_date_of_completed_objects_using_latest_fdt(now);


if self.fdt_current.len() > 10 {
self.fdt_current.pop_back();
Expand Down Expand Up @@ -432,32 +441,43 @@ impl Receiver {
let expiration_date = fdt_instance.get_expiration_date();

if let Some(true) = fdt_instance.full_fdt {
let files_toi: std::collections::HashMap<u128, Option<&String>> = files.iter().map(|f| (f.toi.parse().unwrap_or_default(), f.content_md5.as_ref())).collect();
let remove_candidates: std::collections::HashMap<u128, ObjectCompletedMeta> = self.objects_completed.iter().filter_map(|(toi, meta)| match files_toi.contains_key(toi) {
true => None,
false => Some((*toi, meta.clone()))
}).collect();

let files_toi: std::collections::HashMap<u128, Option<&String>> = files
.iter()
.map(|f| (f.toi.parse().unwrap_or_default(), f.content_md5.as_ref()))
.collect();
let remove_candidates: std::collections::HashMap<u128, ObjectCompletedMeta> = self
.objects_completed
.iter()
.filter_map(|(toi, meta)| match files_toi.contains_key(toi) {
true => None,
false => Some((*toi, meta.clone())),
})
.collect();

if !remove_candidates.is_empty() {
let content_locations: std::collections::HashSet<&str> = files.iter().map(|f| f.content_location.as_str()).collect();
let content_locations: std::collections::HashSet<&str> =
files.iter().map(|f| f.content_location.as_str()).collect();
let duration = std::time::Duration::from_secs(4);
for (toi, meta) in &remove_candidates {
let content_location = meta.content_location.to_string();
if !content_locations.contains(content_location.as_str()) && meta.expiration_date > now + duration {
if !content_locations.contains(content_location.as_str())
&& meta.expiration_date > now + duration
{
self.writer.set_cache_duration(
&self.endpoint,
&self.tsi,
toi,
&meta.content_location,
&duration,
now
now,
);
}
}
self.objects_completed.retain(|f, _| !remove_candidates.contains_key(f));
self.objects_completed
.retain(|f, _| !remove_candidates.contains_key(f));
}
}

for file in files {
let toi: u128 = file.toi.parse().unwrap_or_default();
let cache_duration = file.get_cache_duration(expiration_date, server_time);
Expand All @@ -481,7 +501,7 @@ impl Receiver {
&toi,
&obj.content_location,
&cache_duration,
now
now,
);
}
}
Expand All @@ -498,15 +518,18 @@ impl Receiver {
}

let payload_id = alc::get_fec_inline_payload_id(pkt)?;
log::warn!("Object already received tsi={} toi={}", payload_id.sbn, payload_id.esi);
log::warn!(
"Object already received tsi={} toi={}",
payload_id.sbn,
payload_id.esi
);
if payload_id.sbn == 0 && payload_id.esi == 0 {
self.objects_completed.remove(&pkt.lct.toi);
} else {
return Ok(());
}
}
if self.objects_error.contains(&pkt.lct.toi) {

let payload_id = alc::get_fec_inline_payload_id(pkt)?;
if payload_id.sbn == 0 && payload_id.esi == 0 {
log::warn!("Re-download object after errors");
Expand Down Expand Up @@ -588,39 +611,38 @@ impl Receiver {
self.tsi,
toi
);
let _success = self.objects.remove(&toi);
debug_assert!(_success.is_some());
self.objects.remove(&toi);
}
}

fn gc_object_completed(&mut self) {

let current_fdt = match self.fdt_current.front_mut() {
Some(fdt) => fdt,
None => return
None => return,
};

let instance = match current_fdt.fdt_instance() {
Some(instance) => instance,
None => return
None => return,
};

if let Some(true) = instance.full_fdt {
return;
}



let before = self.objects_completed.len();
if let Some(files) = instance.file.as_ref() {
let current_tois: std::collections::HashSet<u128> = files.iter().map(|file| file.toi.parse().unwrap_or(0)).collect();
let current_tois: std::collections::HashSet<u128> = files
.iter()
.map(|file| file.toi.parse().unwrap_or(0))
.collect();
self.objects_completed
.retain(|toi, _meta| current_tois.contains(toi) );
.retain(|toi, _meta| current_tois.contains(toi));
}
let after = self.objects_completed.len();
if before != after {
log::debug!("GC remove {} / {} objects", before - after, before);
}

}

fn gc_object_error(&mut self) {
Expand All @@ -638,11 +660,12 @@ impl Receiver {
None,
self.writer.clone(),
self.config.enable_md5_check,
self.config.object_max_cache_size.unwrap_or(10 * 1024 * 1024),
now
self.config
.object_max_cache_size
.unwrap_or(10 * 1024 * 1024),
now,
));


let mut is_attached = false;
for (fdt_index, fdt) in (&mut self.fdt_current.iter_mut()).enumerate() {
let fdt_id = fdt.fdt_id;
Expand Down Expand Up @@ -670,22 +693,22 @@ impl Receiver {
}

if !is_attached {
log::warn!("Object received before the FDT TSI={} TOI={}", self.tsi, toi);
log::warn!(
"Object received before the FDT TSI={} TOI={}",
self.tsi,
toi
);
}

self.objects.insert(*toi, obj);
}
}



impl Drop for Receiver {
fn drop(&mut self) {

log::info!("Drop Flute Receiver");

if let Some(fdt) = self.fdt_current.front_mut() {

if let Some(instance) = fdt.fdt_instance() {
if instance.full_fdt == Some(true) {
let duration = std::time::Duration::from_secs(0);
Expand All @@ -697,13 +720,11 @@ impl Drop for Receiver {
obj.0,
&obj.1.content_location,
&duration,
self.last_timestamp.unwrap_or_else(|| SystemTime::now())
self.last_timestamp.unwrap_or_else(|| SystemTime::now()),
);
}

}
}

}
}
}
1 change: 1 addition & 0 deletions src/receiver/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub trait ObjectWriterBuilder {
meta: &ObjectMetadata,
transfer_duration: Duration,
now: std::time::SystemTime,
ext_time: Option<std::time::SystemTime>,
);
}

Expand Down
1 change: 1 addition & 0 deletions src/receiver/writer/objectwriterbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl ObjectWriterBuilder for ObjectWriterBufferBuilder {
_meta: &ObjectMetadata,
_transfer_duration: std::time::Duration,
_now: std::time::SystemTime,
_ext_time: Option<std::time::SystemTime>,
) {
}
}
Expand Down
1 change: 1 addition & 0 deletions src/receiver/writer/objectwriterfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl ObjectWriterBuilder for ObjectWriterFSBuilder {
_meta: &ObjectMetadata,
_transfer_duration: std::time::Duration,
_now: std::time::SystemTime,
_ext_time: Option<std::time::SystemTime>,
) {
}
}
Expand Down

0 comments on commit 200aed2

Please sign in to comment.