Skip to content

Commit

Permalink
Add now time parameter to API
Browse files Browse the repository at this point in the history
  • Loading branch information
ypo committed Oct 28, 2024
1 parent 74b5980 commit 1bf31cb
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 51 deletions.
28 changes: 18 additions & 10 deletions src/receiver/blockwriter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::SystemTime;

use base64::Engine;

use crate::error::{FluteError, Result};
Expand Down Expand Up @@ -71,6 +73,7 @@ impl BlockWriter {
sbn: u32,
block: &BlockDecoder,
writer: &dyn ObjectWriter,
now: SystemTime,
) -> Result<bool> {
if self.sbn != sbn {
return Ok(false);
Expand All @@ -85,9 +88,9 @@ impl BlockWriter {
};

if self.cenc == lct::Cenc::Null {
self.write_pkt_cenc_null(data, writer);
self.write_pkt_cenc_null(data, writer, now);
} else {
self.decode_write_pkt(data, writer)?;
self.decode_write_pkt(data, writer, now)?;
}

debug_assert!(data.len() <= self.bytes_left);
Expand All @@ -99,7 +102,7 @@ impl BlockWriter {
// All blocks have been received -> flush the decoder
if self.decoder.is_some() {
self.decoder.as_mut().unwrap().finish();
self.decoder_read(writer)?;
self.decoder_read(writer, now)?;
}

let output = self.md5_context.take().map(|ctx| ctx.compute().0);
Expand All @@ -121,24 +124,29 @@ impl BlockWriter {
self.buffer.resize(data.len(), 0);
}

fn write_pkt_cenc_null(&mut self, data: &[u8], writer: &dyn ObjectWriter) {
fn write_pkt_cenc_null(&mut self, data: &[u8], writer: &dyn ObjectWriter, now: SystemTime) {
if let Some(ctx) = self.md5_context.as_mut() {
ctx.consume(data)
}
writer.write(data);
writer.write(data, now);
}

fn decode_write_pkt(&mut self, pkt: &[u8], writer: &dyn ObjectWriter) -> Result<()> {
fn decode_write_pkt(
&mut self,
pkt: &[u8],
writer: &dyn ObjectWriter,
now: SystemTime,
) -> Result<()> {
if self.decoder.is_none() {
self.init_decoder(pkt);
self.decoder_read(writer)?;
self.decoder_read(writer, now)?;
return Ok(());
}

let mut offset: usize = 0;
loop {
let size = self.decoder.as_mut().unwrap().write(&pkt[offset..])?;
self.decoder_read(writer)?;
self.decoder_read(writer, now)?;
offset += size;
if offset == pkt.len() {
break;
Expand All @@ -147,7 +155,7 @@ impl BlockWriter {
Ok(())
}

fn decoder_read(&mut self, writer: &dyn ObjectWriter) -> Result<()> {
fn decoder_read(&mut self, writer: &dyn ObjectWriter, now: SystemTime) -> Result<()> {
let decoder = self.decoder.as_mut().unwrap();

if self.content_length_left == Some(0) {
Expand All @@ -169,7 +177,7 @@ impl BlockWriter {
ctx.consume(&self.buffer[..size])
}

writer.write(&self.buffer[..size]);
writer.write(&self.buffer[..size], now);

if let Some(content_length_left) = self.content_length_left.as_mut() {
*content_length_left = content_length_left.saturating_sub(size);
Expand Down
9 changes: 5 additions & 4 deletions src/receiver/fdtreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ impl ObjectWriterBuilder for FdtWriterBuilder {
_toi: &u128,
_content_location: &url::Url,
_duration: &std::time::Duration,
_now: std::time::SystemTime,
) {
}

Expand All @@ -228,16 +229,16 @@ impl ObjectWriterBuilder for FdtWriterBuilder {
}

impl ObjectWriter for FdtWriter {
fn open(&self) -> Result<()> {
fn open(&self, _now: SystemTime) -> Result<()> {
Ok(())
}

fn write(&self, data: &[u8]) {
fn write(&self, data: &[u8], _now: SystemTime) {
let mut inner = self.inner.borrow_mut();
inner.data.extend(data);
}

fn complete(&self) {
fn complete(&self, _now: SystemTime) {
let mut inner = self.inner.borrow_mut();
match FdtInstance::parse(&inner.data) {
Ok(inst) => {
Expand All @@ -252,7 +253,7 @@ impl ObjectWriter for FdtWriter {
};
}

fn error(&self) {
fn error(&self, _now: SystemTime) {
let mut inner = self.inner.borrow_mut();
inner.state = State::Error;
}
Expand Down
60 changes: 37 additions & 23 deletions src/receiver/objectreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct ObjectReceiver {
content_type: Option<String>,
cache_duration: Option<Duration>,
groups: Vec<String>,
last_timestamp: SystemTime,
}

impl ObjectReceiver {
Expand All @@ -78,7 +79,7 @@ impl ObjectReceiver {
object_writer_builder: Rc<dyn ObjectWriterBuilder>,
enable_md5_check: bool,
max_size_allocated: usize,
_now: SystemTime,
now: SystemTime,
) -> ObjectReceiver {
log::debug!("Create new Object Receiver with toi {}", toi);
ObjectReceiver {
Expand Down Expand Up @@ -117,6 +118,7 @@ impl ObjectReceiver {
content_type: None,
cache_duration: None,
groups: Vec::new(),
last_timestamp: now,
}
}

Expand All @@ -133,27 +135,28 @@ impl ObjectReceiver {
}

pub fn push(&mut self, pkt: &alc::AlcPkt, now: std::time::SystemTime) {
self.last_timestamp = now;
if self.state != State::Receiving {
return;
}

self.last_activity = Instant::now();
self.set_fdt_id_from_pkt(pkt);
self.set_cenc_from_pkt(pkt);
self.set_oti_from_pkt(pkt);
self.set_oti_from_pkt(pkt, now);

self.init_blocks_partitioning();
self.init_object_writer(now);
self.push_from_cache(now);

if self.oti.is_none() {
self.cache(pkt)
.unwrap_or_else(|_| self.error("Fail to push pkt to cache"));
.unwrap_or_else(|_| self.error("Fail to push pkt to cache", now));
return;
}

self.push_to_block(pkt, now)
.unwrap_or_else(|_| self.error("Fail to push pkt to block"));
.unwrap_or_else(|_| self.error("Fail to push pkt to block", now));
}

fn push_to_block(&mut self, pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> {
Expand Down Expand Up @@ -268,6 +271,7 @@ impl ObjectReceiver {
server_time: std::time::SystemTime,
) -> bool {
debug_assert!(self.toi != lct::TOI_FDT);
self.last_timestamp = now;
if self.fdt_instance_id.is_some() {
return false;
}
Expand Down Expand Up @@ -323,10 +327,13 @@ impl ObjectReceiver {
"Fail to parse content-location {} to URL",
file.content_location
);
self.error(&format!(
"Fail to parse content-location {} to URL",
file.content_location
));
self.error(
&format!(
"Fail to parse content-location {} to URL",
file.content_location
),
now,
);
return false;
}
}
Expand Down Expand Up @@ -364,7 +371,7 @@ impl ObjectReceiver {
self.init_object_writer(now);
self.push_from_cache(now);
self.write_blocks(0, now)
.unwrap_or_else(|_| self.error("Fail to write blocks to storage"));
.unwrap_or_else(|_| self.error("Fail to write blocks to storage", now));
self.push_from_cache(now);
true
}
Expand Down Expand Up @@ -429,8 +436,8 @@ impl ObjectReceiver {

let object_writer = self.object_writer.as_mut().unwrap();

if object_writer.writer.open().is_err() {
self.error("Fail to create destination on storage");
if object_writer.writer.open(now).is_err() {
self.error("Fail to create destination on storage", now);
return;
};

Expand Down Expand Up @@ -473,6 +480,7 @@ impl ObjectReceiver {
sbn as u32,
block,
self.object_writer.as_ref().unwrap().writer.as_ref(),
now,
)?;
if !success {
break;
Expand Down Expand Up @@ -501,26 +509,29 @@ impl ObjectReceiver {
self.content_location
);

self.error(&format!(
"MD5 does not match expects {:?} received {:?}",
self.content_md5, &md5
));
self.error(
&format!(
"MD5 does not match expects {:?} received {:?}",
self.content_md5, &md5
),
now,
);
}
break;
}
}
Ok(())
}

fn complete(&mut self, _now: std::time::SystemTime) {
fn complete(&mut self, now: std::time::SystemTime) {
#[cfg(feature = "opentelemetry")]
let _span = self.logger.as_mut().map(|l| l.complete());

self.state = State::Completed;

if let Some(object_writer) = self.object_writer.as_mut() {
object_writer.state = ObjectWriterSessionState::Closed;
object_writer.writer.complete();
object_writer.writer.complete(now);
}

// Free space by removing blocks
Expand All @@ -529,7 +540,7 @@ impl ObjectReceiver {
self.cache_size = 0;
}

fn error(&mut self, _description: &str) {
fn error(&mut self, _description: &str, now: SystemTime) {
#[cfg(feature = "opentelemetry")]
self.init_logger(None);

Expand All @@ -540,7 +551,7 @@ impl ObjectReceiver {

if let Some(object_writer) = self.object_writer.as_mut() {
object_writer.state = ObjectWriterSessionState::Error;
object_writer.writer.error();
object_writer.writer.error(now);
}

self.blocks.clear();
Expand All @@ -556,7 +567,7 @@ impl ObjectReceiver {
while let Some(item) = self.cache.pop() {
let pkt = item.to_pkt();
if self.push_to_block(&pkt, now).is_err() {
self.error("Fail to push block");
self.error("Fail to push block", now);
break;
}
}
Expand All @@ -583,7 +594,7 @@ impl ObjectReceiver {
self.fdt_instance_id = pkt.fdt_info.as_ref().map(|info| info.fdt_instance_id);
}

fn set_oti_from_pkt(&mut self, pkt: &alc::AlcPkt) {
fn set_oti_from_pkt(&mut self, pkt: &alc::AlcPkt, now: SystemTime) {
if self.oti.is_some() {
return;
}
Expand All @@ -609,7 +620,7 @@ impl ObjectReceiver {

if pkt.transfer_length.is_none() {
log::warn!("Bug? Pkt contains OTI without transfer length");
self.error("Bug? Pkt contains OTI without transfer length");
self.error("Bug? Pkt contains OTI without transfer length", now);
return;
}

Expand Down Expand Up @@ -706,7 +717,10 @@ impl Drop for ObjectReceiver {
self.endpoint,
self.content_location.as_ref().map(|u| u.to_string())
);
self.error("Drop object in open state, pkt missing ?");
self.error(
"Drop object in open state, pkt missing ?",
self.last_timestamp,
);
} else if object_writer.state == ObjectWriterSessionState::Error {
log::error!(
"Drop object received with state {:?} TOI={} Endpoint={:?} Content-Location={:?}",
Expand Down
8 changes: 8 additions & 0 deletions src/receiver/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct Receiver {
last_activity: Instant,
closed_is_imminent: bool,
endpoint: UDPEndpoint,
last_timestamp: Option<SystemTime>
}

impl Receiver {
Expand Down Expand Up @@ -109,6 +110,7 @@ impl Receiver {
last_activity: Instant::now(),
closed_is_imminent: false,
endpoint: endpoint.clone(),
last_timestamp: None
}
}

Expand Down Expand Up @@ -167,6 +169,7 @@ impl Receiver {
/// * `now` - The current `SystemTime` to use for time-related operations.
///
pub fn cleanup(&mut self, now: std::time::SystemTime) {
self.last_timestamp = Some(now);
self.cleanup_objects();
self.cleanup_fdt(now);
}
Expand Down Expand Up @@ -242,6 +245,7 @@ impl Receiver {
/// Returns as error if the packet is not a valid
///
pub fn push_data(&mut self, data: &[u8], now: std::time::SystemTime) -> Result<()> {
self.last_timestamp = Some(now);
let alc = alc::parse_alc_pkt(data)?;
if alc.lct.tsi != self.tsi {
return Ok(());
Expand All @@ -266,6 +270,7 @@ impl Receiver {
pub fn push(&mut self, alc_pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> {
debug_assert!(self.tsi == alc_pkt.lct.tsi);
self.last_activity = Instant::now();
self.last_timestamp = Some(now);

if alc_pkt.lct.close_session {
log::info!("Close session");
Expand Down Expand Up @@ -445,6 +450,7 @@ impl Receiver {
toi,
&meta.content_location,
&duration,
now
);
}
}
Expand Down Expand Up @@ -475,6 +481,7 @@ impl Receiver {
&toi,
&obj.content_location,
&cache_duration,
now
);
}
}
Expand Down Expand Up @@ -690,6 +697,7 @@ impl Drop for Receiver {
obj.0,
&obj.1.content_location,
&duration,
self.last_timestamp.unwrap_or_else(|| SystemTime::now())
);
}

Expand Down
Loading

0 comments on commit 1bf31cb

Please sign in to comment.