diff --git a/src/receiver/blockwriter.rs b/src/receiver/blockwriter.rs index 663113f..084ae4f 100644 --- a/src/receiver/blockwriter.rs +++ b/src/receiver/blockwriter.rs @@ -1,3 +1,5 @@ +use std::time::SystemTime; + use base64::Engine; use crate::error::{FluteError, Result}; @@ -71,6 +73,7 @@ impl BlockWriter { sbn: u32, block: &BlockDecoder, writer: &dyn ObjectWriter, + now: SystemTime, ) -> Result { if self.sbn != sbn { return Ok(false); @@ -85,9 +88,9 @@ impl BlockWriter { }; if self.cenc == lct::Cenc::Null { - self.write_pkt_cenc_null(data, writer); + self.write_pkt_cenc_null(data, writer, now); } else { - self.decode_write_pkt(data, writer)?; + self.decode_write_pkt(data, writer, now)?; } debug_assert!(data.len() <= self.bytes_left); @@ -99,7 +102,7 @@ impl BlockWriter { // All blocks have been received -> flush the decoder if self.decoder.is_some() { self.decoder.as_mut().unwrap().finish(); - self.decoder_read(writer)?; + self.decoder_read(writer, now)?; } let output = self.md5_context.take().map(|ctx| ctx.compute().0); @@ -121,24 +124,29 @@ impl BlockWriter { self.buffer.resize(data.len(), 0); } - fn write_pkt_cenc_null(&mut self, data: &[u8], writer: &dyn ObjectWriter) { + fn write_pkt_cenc_null(&mut self, data: &[u8], writer: &dyn ObjectWriter, now: SystemTime) { if let Some(ctx) = self.md5_context.as_mut() { ctx.consume(data) } - writer.write(data); + writer.write(data, now); } - fn decode_write_pkt(&mut self, pkt: &[u8], writer: &dyn ObjectWriter) -> Result<()> { + fn decode_write_pkt( + &mut self, + pkt: &[u8], + writer: &dyn ObjectWriter, + now: SystemTime, + ) -> Result<()> { if self.decoder.is_none() { self.init_decoder(pkt); - self.decoder_read(writer)?; + self.decoder_read(writer, now)?; return Ok(()); } let mut offset: usize = 0; loop { let size = self.decoder.as_mut().unwrap().write(&pkt[offset..])?; - self.decoder_read(writer)?; + self.decoder_read(writer, now)?; offset += size; if offset == pkt.len() { break; @@ -147,7 +155,7 @@ impl BlockWriter { Ok(()) } - fn decoder_read(&mut self, writer: &dyn ObjectWriter) -> Result<()> { + fn decoder_read(&mut self, writer: &dyn ObjectWriter, now: SystemTime) -> Result<()> { let decoder = self.decoder.as_mut().unwrap(); if self.content_length_left == Some(0) { @@ -169,7 +177,7 @@ impl BlockWriter { ctx.consume(&self.buffer[..size]) } - writer.write(&self.buffer[..size]); + writer.write(&self.buffer[..size], now); if let Some(content_length_left) = self.content_length_left.as_mut() { *content_length_left = content_length_left.saturating_sub(size); diff --git a/src/receiver/fdtreceiver.rs b/src/receiver/fdtreceiver.rs index 9f875e2..494a725 100644 --- a/src/receiver/fdtreceiver.rs +++ b/src/receiver/fdtreceiver.rs @@ -211,6 +211,7 @@ impl ObjectWriterBuilder for FdtWriterBuilder { _toi: &u128, _content_location: &url::Url, _duration: &std::time::Duration, + _now: std::time::SystemTime, ) { } @@ -228,16 +229,16 @@ impl ObjectWriterBuilder for FdtWriterBuilder { } impl ObjectWriter for FdtWriter { - fn open(&self) -> Result<()> { + fn open(&self, _now: SystemTime) -> Result<()> { Ok(()) } - fn write(&self, data: &[u8]) { + fn write(&self, data: &[u8], _now: SystemTime) { let mut inner = self.inner.borrow_mut(); inner.data.extend(data); } - fn complete(&self) { + fn complete(&self, _now: SystemTime) { let mut inner = self.inner.borrow_mut(); match FdtInstance::parse(&inner.data) { Ok(inst) => { @@ -252,7 +253,7 @@ impl ObjectWriter for FdtWriter { }; } - fn error(&self) { + fn error(&self, _now: SystemTime) { let mut inner = self.inner.borrow_mut(); inner.state = State::Error; } diff --git a/src/receiver/objectreceiver.rs b/src/receiver/objectreceiver.rs index 87d3900..2f7a370 100644 --- a/src/receiver/objectreceiver.rs +++ b/src/receiver/objectreceiver.rs @@ -67,6 +67,7 @@ pub struct ObjectReceiver { content_type: Option, cache_duration: Option, groups: Vec, + last_timestamp: SystemTime, } impl ObjectReceiver { @@ -78,7 +79,7 @@ impl ObjectReceiver { object_writer_builder: Rc, enable_md5_check: bool, max_size_allocated: usize, - _now: SystemTime, + now: SystemTime, ) -> ObjectReceiver { log::debug!("Create new Object Receiver with toi {}", toi); ObjectReceiver { @@ -117,6 +118,7 @@ impl ObjectReceiver { content_type: None, cache_duration: None, groups: Vec::new(), + last_timestamp: now, } } @@ -133,6 +135,7 @@ impl ObjectReceiver { } pub fn push(&mut self, pkt: &alc::AlcPkt, now: std::time::SystemTime) { + self.last_timestamp = now; if self.state != State::Receiving { return; } @@ -140,7 +143,7 @@ impl ObjectReceiver { self.last_activity = Instant::now(); self.set_fdt_id_from_pkt(pkt); self.set_cenc_from_pkt(pkt); - self.set_oti_from_pkt(pkt); + self.set_oti_from_pkt(pkt, now); self.init_blocks_partitioning(); self.init_object_writer(now); @@ -148,12 +151,12 @@ impl ObjectReceiver { if self.oti.is_none() { self.cache(pkt) - .unwrap_or_else(|_| self.error("Fail to push pkt to cache")); + .unwrap_or_else(|_| self.error("Fail to push pkt to cache", now)); return; } self.push_to_block(pkt, now) - .unwrap_or_else(|_| self.error("Fail to push pkt to block")); + .unwrap_or_else(|_| self.error("Fail to push pkt to block", now)); } fn push_to_block(&mut self, pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> { @@ -268,6 +271,7 @@ impl ObjectReceiver { server_time: std::time::SystemTime, ) -> bool { debug_assert!(self.toi != lct::TOI_FDT); + self.last_timestamp = now; if self.fdt_instance_id.is_some() { return false; } @@ -323,10 +327,13 @@ impl ObjectReceiver { "Fail to parse content-location {} to URL", file.content_location ); - self.error(&format!( - "Fail to parse content-location {} to URL", - file.content_location - )); + self.error( + &format!( + "Fail to parse content-location {} to URL", + file.content_location + ), + now, + ); return false; } } @@ -364,7 +371,7 @@ impl ObjectReceiver { self.init_object_writer(now); self.push_from_cache(now); self.write_blocks(0, now) - .unwrap_or_else(|_| self.error("Fail to write blocks to storage")); + .unwrap_or_else(|_| self.error("Fail to write blocks to storage", now)); self.push_from_cache(now); true } @@ -429,8 +436,8 @@ impl ObjectReceiver { let object_writer = self.object_writer.as_mut().unwrap(); - if object_writer.writer.open().is_err() { - self.error("Fail to create destination on storage"); + if object_writer.writer.open(now).is_err() { + self.error("Fail to create destination on storage", now); return; }; @@ -473,6 +480,7 @@ impl ObjectReceiver { sbn as u32, block, self.object_writer.as_ref().unwrap().writer.as_ref(), + now, )?; if !success { break; @@ -501,10 +509,13 @@ impl ObjectReceiver { self.content_location ); - self.error(&format!( - "MD5 does not match expects {:?} received {:?}", - self.content_md5, &md5 - )); + self.error( + &format!( + "MD5 does not match expects {:?} received {:?}", + self.content_md5, &md5 + ), + now, + ); } break; } @@ -512,7 +523,7 @@ impl ObjectReceiver { Ok(()) } - fn complete(&mut self, _now: std::time::SystemTime) { + fn complete(&mut self, now: std::time::SystemTime) { #[cfg(feature = "opentelemetry")] let _span = self.logger.as_mut().map(|l| l.complete()); @@ -520,7 +531,7 @@ impl ObjectReceiver { if let Some(object_writer) = self.object_writer.as_mut() { object_writer.state = ObjectWriterSessionState::Closed; - object_writer.writer.complete(); + object_writer.writer.complete(now); } // Free space by removing blocks @@ -529,7 +540,7 @@ impl ObjectReceiver { self.cache_size = 0; } - fn error(&mut self, _description: &str) { + fn error(&mut self, _description: &str, now: SystemTime) { #[cfg(feature = "opentelemetry")] self.init_logger(None); @@ -540,7 +551,7 @@ impl ObjectReceiver { if let Some(object_writer) = self.object_writer.as_mut() { object_writer.state = ObjectWriterSessionState::Error; - object_writer.writer.error(); + object_writer.writer.error(now); } self.blocks.clear(); @@ -556,7 +567,7 @@ impl ObjectReceiver { while let Some(item) = self.cache.pop() { let pkt = item.to_pkt(); if self.push_to_block(&pkt, now).is_err() { - self.error("Fail to push block"); + self.error("Fail to push block", now); break; } } @@ -583,7 +594,7 @@ impl ObjectReceiver { self.fdt_instance_id = pkt.fdt_info.as_ref().map(|info| info.fdt_instance_id); } - fn set_oti_from_pkt(&mut self, pkt: &alc::AlcPkt) { + fn set_oti_from_pkt(&mut self, pkt: &alc::AlcPkt, now: SystemTime) { if self.oti.is_some() { return; } @@ -609,7 +620,7 @@ impl ObjectReceiver { if pkt.transfer_length.is_none() { log::warn!("Bug? Pkt contains OTI without transfer length"); - self.error("Bug? Pkt contains OTI without transfer length"); + self.error("Bug? Pkt contains OTI without transfer length", now); return; } @@ -706,7 +717,10 @@ impl Drop for ObjectReceiver { self.endpoint, self.content_location.as_ref().map(|u| u.to_string()) ); - self.error("Drop object in open state, pkt missing ?"); + self.error( + "Drop object in open state, pkt missing ?", + self.last_timestamp, + ); } else if object_writer.state == ObjectWriterSessionState::Error { log::error!( "Drop object received with state {:?} TOI={} Endpoint={:?} Content-Location={:?}", diff --git a/src/receiver/receiver.rs b/src/receiver/receiver.rs index be2554b..2797477 100644 --- a/src/receiver/receiver.rs +++ b/src/receiver/receiver.rs @@ -75,6 +75,7 @@ pub struct Receiver { last_activity: Instant, closed_is_imminent: bool, endpoint: UDPEndpoint, + last_timestamp: Option } impl Receiver { @@ -109,6 +110,7 @@ impl Receiver { last_activity: Instant::now(), closed_is_imminent: false, endpoint: endpoint.clone(), + last_timestamp: None } } @@ -167,6 +169,7 @@ impl Receiver { /// * `now` - The current `SystemTime` to use for time-related operations. /// pub fn cleanup(&mut self, now: std::time::SystemTime) { + self.last_timestamp = Some(now); self.cleanup_objects(); self.cleanup_fdt(now); } @@ -242,6 +245,7 @@ impl Receiver { /// Returns as error if the packet is not a valid /// pub fn push_data(&mut self, data: &[u8], now: std::time::SystemTime) -> Result<()> { + self.last_timestamp = Some(now); let alc = alc::parse_alc_pkt(data)?; if alc.lct.tsi != self.tsi { return Ok(()); @@ -266,6 +270,7 @@ impl Receiver { pub fn push(&mut self, alc_pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> { debug_assert!(self.tsi == alc_pkt.lct.tsi); self.last_activity = Instant::now(); + self.last_timestamp = Some(now); if alc_pkt.lct.close_session { log::info!("Close session"); @@ -445,6 +450,7 @@ impl Receiver { toi, &meta.content_location, &duration, + now ); } } @@ -475,6 +481,7 @@ impl Receiver { &toi, &obj.content_location, &cache_duration, + now ); } } @@ -690,6 +697,7 @@ impl Drop for Receiver { obj.0, &obj.1.content_location, &duration, + self.last_timestamp.unwrap_or_else(|| SystemTime::now()) ); } diff --git a/src/receiver/writer/mod.rs b/src/receiver/writer/mod.rs index 713780e..63642a0 100644 --- a/src/receiver/writer/mod.rs +++ b/src/receiver/writer/mod.rs @@ -12,6 +12,7 @@ use std::collections::HashMap; use std::time::Duration; +use std::time::SystemTime; use crate::common::udpendpoint::UDPEndpoint; use crate::core::lct::Cenc; @@ -68,6 +69,7 @@ pub trait ObjectWriterBuilder { toi: &u128, content_location: &url::Url, duration: &Duration, + now: std::time::SystemTime, ); /// Called when an FDT is received fn fdt_received( @@ -87,13 +89,13 @@ pub trait ObjectWriterBuilder { /// pub trait ObjectWriter { /// Open the destination - fn open(&self) -> Result<()>; + fn open(&self, now: SystemTime) -> Result<()>; /// Write data - fn write(&self, data: &[u8]); + fn write(&self, data: &[u8], now: SystemTime); /// Called when all the data has been written - fn complete(&self); + fn complete(&self, now: SystemTime); /// Called when an error occurred during the reception of this object - fn error(&self); + fn error(&self, now: SystemTime); } impl std::fmt::Debug for dyn ObjectWriterBuilder { diff --git a/src/receiver/writer/objectwriterbuffer.rs b/src/receiver/writer/objectwriterbuffer.rs index 25abcf7..aed446c 100644 --- a/src/receiver/writer/objectwriterbuffer.rs +++ b/src/receiver/writer/objectwriterbuffer.rs @@ -1,6 +1,6 @@ use super::{ObjectMetadata, ObjectWriter, ObjectWriterBuilder}; use crate::{common::udpendpoint::UDPEndpoint, tools::error::Result}; -use std::{cell::RefCell, rc::Rc}; +use std::{cell::RefCell, rc::Rc, time::SystemTime}; /// /// Write objects received by the `receiver` to a buffers @@ -75,6 +75,7 @@ impl ObjectWriterBuilder for ObjectWriterBufferBuilder { _toi: &u128, _content_location: &url::Url, _duration: &std::time::Duration, + _now: std::time::SystemTime, ) { } @@ -92,22 +93,22 @@ impl ObjectWriterBuilder for ObjectWriterBufferBuilder { } impl ObjectWriter for ObjectWriterBufferWrapper { - fn open(&self) -> Result<()> { + fn open(&self, _now: SystemTime) -> Result<()> { Ok(()) } - fn write(&self, data: &[u8]) { + fn write(&self, data: &[u8], _now: SystemTime) { let mut inner = self.inner.borrow_mut(); inner.data.extend(data); } - fn complete(&self) { + fn complete(&self, _now: SystemTime) { let mut inner = self.inner.borrow_mut(); log::info!("Object complete !"); inner.complete = true; } - fn error(&self) { + fn error(&self, _now: SystemTime) { let mut inner = self.inner.borrow_mut(); log::error!("Object received with error"); inner.error = true; diff --git a/src/receiver/writer/objectwriterfs.rs b/src/receiver/writer/objectwriterfs.rs index a8ba23b..416b8d3 100644 --- a/src/receiver/writer/objectwriterfs.rs +++ b/src/receiver/writer/objectwriterfs.rs @@ -3,7 +3,7 @@ use crate::{ common::udpendpoint::UDPEndpoint, error::{FluteError, Result}, }; -use std::{cell::RefCell, io::Write}; +use std::{cell::RefCell, io::Write, time::SystemTime}; /// /// Write objects received by the `receiver` to a filesystem @@ -52,6 +52,7 @@ impl ObjectWriterBuilder for ObjectWriterFSBuilder { _toi: &u128, _content_location: &url::Url, _duration: &std::time::Duration, + _now: std::time::SystemTime, ) { } @@ -91,7 +92,7 @@ pub struct ObjectWriterFSInner { } impl ObjectWriter for ObjectWriterFS { - fn open(&self) -> Result<()> { + fn open(&self, _now: SystemTime) -> Result<()> { let content_location_path = self.meta.content_location.path(); let relative_path = content_location_path .strip_prefix('/') @@ -118,7 +119,7 @@ impl ObjectWriter for ObjectWriterFS { Ok(()) } - fn write(&self, data: &[u8]) { + fn write(&self, data: &[u8], _now: SystemTime) { let mut inner = self.inner.borrow_mut(); if inner.writer.is_none() { return; @@ -129,7 +130,7 @@ impl ObjectWriter for ObjectWriterFS { }; } - fn complete(&self) { + fn complete(&self, _now: SystemTime) { let mut inner = self.inner.borrow_mut(); if inner.writer.is_none() { return; @@ -141,7 +142,7 @@ impl ObjectWriter for ObjectWriterFS { inner.destination = None } - fn error(&self) { + fn error(&self, _now: SystemTime) { let mut inner = self.inner.borrow_mut(); inner.writer = None; if inner.destination.is_some() {