Skip to content

Commit

Permalink
Possibility to limit the amount of memory allocated to receive an object
Browse files Browse the repository at this point in the history
  • Loading branch information
ypo committed Mar 11, 2024
1 parent 98b5db7 commit 43127dd
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pyo3 = { version = "0.20", features = ["extension-module"], optional = true }
pyo3-log = { version = "0.9", optional = true }
raptorq = "1.7"
raptor-code = "1.0.5"
opentelemetry = { version = "0.21", optional = true }
opentelemetry = { version = "0.22", optional = true }
rand = "0.8"

[dev-dependencies]
Expand Down
4 changes: 4 additions & 0 deletions src/receiver/blockdecoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::tools::error::Result;
pub struct BlockDecoder {
pub completed: bool,
pub initialized: bool,
pub block_size: usize,
decoder: Option<Box<dyn FecDecoder>>,
#[cfg(feature = "opentelemetry")]
telemetry_ctx: Option<opentelemetry::Context>,
Expand All @@ -30,6 +31,7 @@ impl BlockDecoder {
completed: false,
initialized: false,
decoder: None,
block_size: 0,
#[cfg(feature = "opentelemetry")]
telemetry_ctx: None,
}
Expand Down Expand Up @@ -113,6 +115,7 @@ impl BlockDecoder {
}

self.initialized = true;
self.block_size = block_size;
Ok(())
}

Expand All @@ -126,6 +129,7 @@ impl BlockDecoder {

pub fn deallocate(&mut self) {
self.decoder = None;
self.block_size = 0;
}

pub fn push(&mut self, pkt: &alc::AlcPkt, payload_id: &alc::PayloadID) {
Expand Down
1 change: 1 addition & 0 deletions src/receiver/fdtreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl FdtReceiver {
Some(fdt_id),
fdt_builder,
true,
1024 * 1024,
now,
))),
inner: inner.clone(),
Expand Down
56 changes: 52 additions & 4 deletions src/receiver/objectreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct ObjectReceiver {
oti: Option<oti::Oti>,
cache: Vec<Box<alc::AlcPktCache>>,
cache_size: usize,
max_size_allocated: usize,
blocks: Vec<BlockDecoder>,
blocks_variable_size: bool,
pub transfer_length: Option<u64>,
Expand All @@ -59,6 +60,8 @@ pub struct ObjectReceiver {
last_activity: Instant,
pub cache_expiration_date: Option<SystemTime>,
pub content_location: Option<url::Url>,
nb_allocated_blocks: usize,
total_allocated_blocks_size: usize,

#[cfg(feature = "opentelemetry")]
logger: ObjectReceiverLogger,
Expand All @@ -72,6 +75,7 @@ impl ObjectReceiver {
_fdt_instance_id: Option<u32>,
object_writer_builder: Rc<dyn ObjectWriterBuilder>,
enable_md5_check: bool,
max_size_allocated: usize,
_now: SystemTime,
) -> ObjectReceiver {
log::debug!("Create new Object Receiver with toi {}", toi);
Expand All @@ -80,6 +84,7 @@ impl ObjectReceiver {
oti: None,
cache: Vec::new(),
cache_size: 0,
max_size_allocated,
blocks: Vec::new(),
transfer_length: None,
cenc: None,
Expand All @@ -100,6 +105,8 @@ impl ObjectReceiver {
last_activity: Instant::now(),
cache_expiration_date: None,
content_location: None,
nb_allocated_blocks: 0,
total_allocated_blocks_size: 0,
#[cfg(feature = "opentelemetry")]
logger: ObjectReceiverLogger::new(endpoint, tsi, toi.clone(), _fdt_instance_id, _now),
}
Expand Down Expand Up @@ -132,7 +139,8 @@ impl ObjectReceiver {
self.push_from_cache(now);

if self.oti.is_none() {
self.cache(pkt);
self.cache(pkt)
.unwrap_or_else(|_| self.error("Fail to push pkt to cache"));
return;
}

Expand All @@ -144,6 +152,7 @@ impl ObjectReceiver {
debug_assert!(self.oti.is_some());
debug_assert!(self.transfer_length.is_some());
let payload_id = alc::parse_payload_id(pkt, self.oti.as_ref().unwrap())?;
let nb_blocks = self.blocks.len();
log::debug!(
"toi={} sbn={} esi={} meta={:?}",
self.toi,
Expand Down Expand Up @@ -197,6 +206,24 @@ impl ObjectReceiver {
) as usize,
};

if self.nb_allocated_blocks >= 2
&& self.total_allocated_blocks_size + block_length > self.max_size_allocated
{
log::error!(
"NB Allocated blocks={}/{} total_allocated={}/{} block_length={}",
self.nb_allocated_blocks,
nb_blocks,
self.total_allocated_blocks_size,
self.max_size_allocated,
block_length,
);

self.state = State::Error;
return Err(FluteError::new(
"Maximum number of blocks allocated is reached",
));
}

log::debug!("Init block {} with length {}", payload_id.sbn, block_length);
match block.init(oti, source_block_length, block_length, payload_id.sbn) {
Ok(_) => {}
Expand All @@ -205,6 +232,8 @@ impl ObjectReceiver {
return Err(FluteError::new("Fail to init source block decoder"));
}
}
self.nb_allocated_blocks += 1;
self.total_allocated_blocks_size += block_length;

#[cfg(feature = "opentelemetry")]
block.op_init(
Expand Down Expand Up @@ -408,6 +437,9 @@ impl ObjectReceiver {
break;
}
sbn += 1;
debug_assert!(self.total_allocated_blocks_size >= block.block_size);
self.total_allocated_blocks_size -= block.block_size;
self.nb_allocated_blocks -= 1;
block.deallocate();

if writer.is_completed() {
Expand Down Expand Up @@ -537,17 +569,25 @@ impl ObjectReceiver {
}
}

fn cache(&mut self, pkt: &alc::AlcPkt) {
fn cache(&mut self, pkt: &alc::AlcPkt) -> Result<()> {
if self.cache_size == 0 {
log::warn!(
"TSI={} TOI={} Packet with FTI received before the FDT",
"TSI={} TOI={} Packet without FTI received before the FDT",
self.tsi,
self.toi
);
}

if self.cache_size >= self.max_size_allocated {
return Err(FluteError::new("Pkt cache is full"));
}

match self.cache_size.checked_add(pkt.data.len()) {
Some(_) => Ok(()),
None => Err(FluteError::new("add overflow")),
}?;
self.cache.push(Box::new(pkt.to_cache()));
self.cache_size += pkt.data.len()
Ok(())
}

/// Block Partitioning Algorithm
Expand Down Expand Up @@ -617,6 +657,14 @@ impl Drop for ObjectReceiver {
self.content_location.as_ref().map(|u| u.to_string())
);
self.error("Drop object in open state, pkt missing ?");
} else if object_writer.state == ObjectWriterSessionState::Error {
log::error!(
"Drop object received with state {:?} TOI={} Endpoint={:?} Content-Location={:?}",
object_writer.state,
self.toi,
self.endpoint,
self.content_location.as_ref().map(|u| u.to_string())
);
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/receiver/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct Config {
/// Objects expire if no data has been received before this timeout
/// `None` Objects never expires, not recommended as object that are not fully reconstructed might continue to consume memory for an finite amount of time.
pub object_timeout: Option<Duration>,
/// Maximum cache size that can be allocated to received an object. Default is 10MB.
pub object_max_cache_size: Option<usize>,
/// Enable MD5 check of the received objects. Default `true`
pub enable_md5_check: bool,
/// CHeck if FDT is already received
Expand All @@ -40,6 +42,7 @@ impl Default for Config {
max_objects_error: 0,
session_timeout: None,
object_timeout: Some(Duration::from_secs(10)),
object_max_cache_size: None,
enable_md5_check: true,
check_fdt_received: true
}
Expand Down Expand Up @@ -503,7 +506,8 @@ impl Receiver {
self.tsi,
toi
);
self.objects.remove(&toi);
let _success = self.objects.remove(&toi);
debug_assert!(_success.is_some());
}
}

Expand Down Expand Up @@ -552,6 +556,7 @@ impl Receiver {
None,
self.writer.clone(),
self.config.enable_md5_check,
self.config.object_max_cache_size.unwrap_or(10 * 1024 * 1024),
now
));

Expand Down Expand Up @@ -585,7 +590,7 @@ impl Receiver {
}

if is_attached == false {
log::warn!("Object received before the FDT");
log::warn!("Object received before the FDT TSI={} TOI={}", self.tsi, toi);
}

self.objects.insert(toi.clone(), obj);
Expand Down
2 changes: 1 addition & 1 deletion src/sender/blockencoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl BlockEncoder {
}

fn read_block(&mut self) -> Result<()> {
assert!(self.read_end == false);
debug_assert!(self.read_end == false);

if self.fd.is_some() {
return self.read_fd_block();
Expand Down
2 changes: 1 addition & 1 deletion src/sender/fdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ mod tests {

let output_print = std::str::from_utf8(&output.stderr).expect("ascii to text went wrong ");

debug_assert!(
assert!(
output.status.success(),
"\n\nValidation failed\n\n{}\n\n",
output_print
Expand Down

0 comments on commit 43127dd

Please sign in to comment.