Skip to content

Commit

Permalink
Fixes to memory leaks identified in #325 (#327)
Browse files Browse the repository at this point in the history
* Implement reallocation of SerializedData in DDSCache. This closes memory leak of receive buffers.

* Add timed garbage collection to whole DDSCache. This is to force release of receive buffers from topics that rarely get new samples.

* Reduce Timer memory consumption. The default config of a single mio-0.6 Timer consumes over a megabyte of memory. Allocate new Timers with much smaller event buffers.

* fmt

* Add missing datafrag AssemblyBuffer garbage collection.
  • Loading branch information
jhelovuo authored Mar 14, 2024
1 parent 4d89280 commit 1f198b1
Show file tree
Hide file tree
Showing 17 changed files with 196 additions and 25 deletions.
33 changes: 33 additions & 0 deletions examples/no_key_async_usage_example/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use std::{io, time};

use log4rs::{
append::console::ConsoleAppender,
config::{Appender, Root},
Config,
};
use log::LevelFilter;
use futures::StreamExt;
use rustdds::*;
use serde::{Deserialize, Serialize};
Expand All @@ -8,6 +14,8 @@ use smol::Timer;
const SECOND: time::Duration = time::Duration::from_millis(1000);

fn main() {
configure_logging();

// DomainParticipant is always necessary
let domain_participant = DomainParticipant::new(0).unwrap();

Expand Down Expand Up @@ -113,3 +121,28 @@ fn main() {
})
}
}

fn configure_logging() {
// initialize logging, preferably from config file
log4rs::init_file(
"logging-config.yaml",
log4rs::config::Deserializers::default(),
)
.unwrap_or_else(|e| {
match e.downcast_ref::<io::Error>() {
// Config file did not work. If it is a simple "No such file or directory", then
// substitute some default config.
Some(os_err) if os_err.kind() == io::ErrorKind::NotFound => {
println!("No config file found in current working directory.");
let stdout = ConsoleAppender::builder().build();
let conf = Config::builder()
.appender(Appender::builder().build("stdout", Box::new(stdout)))
.build(Root::builder().appender("stdout").build(LevelFilter::Error))
.unwrap();
log4rs::init_config(conf).unwrap();
}
// Give up.
other_error => panic!("Config problem: {other_error:?}"),
}
});
}
8 changes: 8 additions & 0 deletions src/dds/ddsdata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ impl DDSData {
}
}

pub(crate) fn reallocate(&mut self) {
match self {
DDSData::Data { serialized_payload } => serialized_payload.reallocate(),
DDSData::DisposeByKey { key, .. } => key.reallocate(),
DDSData::DisposeByKeyHash { .. } => {}
}
}

#[cfg(test)]
pub fn data(&self) -> Bytes {
self.payload_bytes()
Expand Down
2 changes: 2 additions & 0 deletions src/dds/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,7 @@ impl DomainParticipantInner {
};

let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
let dds_cache_clone = Arc::clone(&dds_cache);

let (discovery_db_event_sender, discovery_db_event_receiver) =
mio_channel::sync_channel::<()>(1);
Expand All @@ -1118,6 +1119,7 @@ impl DomainParticipantInner {
.spawn(move || {
let dp_event_loop = DPEventLoop::new(
domain_info,
dds_cache_clone,
listeners,
disc_db_clone,
participant_guid.prefix,
Expand Down
9 changes: 5 additions & 4 deletions src/discovery/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
},
spdp_participant_data::{Participant_GUID, SpdpDiscoveredParticipantData},
},
polling::new_simple_timer,
rtps::constant::*,
serialization::{
cdr_deserializer::CDRDeserializerAdapter, cdr_serializer::CDRSerializerAdapter,
Expand Down Expand Up @@ -383,7 +384,7 @@ impl Discovery {
.register(&reader, $reader_token, Ready::readable(), PollOpt::edge())
.expect("Failed to register a discovery reader to poll.");

let mut timer: Timer<()> = Timer::default();
let mut timer: Timer<()> = new_simple_timer();
if let Some((timeout_value, timer_token)) = $timeout_and_timer_token_opt {
timer.set_timeout(timeout_value, ());
poll
Expand Down Expand Up @@ -433,7 +434,7 @@ impl Discovery {
);

// create lease duration check timer
let mut participant_cleanup_timer: Timer<()> = Timer::default();
let mut participant_cleanup_timer: Timer<()> = new_simple_timer();
participant_cleanup_timer.set_timeout(Self::PARTICIPANT_CLEANUP_PERIOD, ());
try_construct!(
poll.register(
Expand Down Expand Up @@ -492,7 +493,7 @@ impl Discovery {
);

// create lease duration check timer
let mut topic_cleanup_timer: Timer<()> = Timer::default();
let mut topic_cleanup_timer: Timer<()> = new_simple_timer();
topic_cleanup_timer.set_timeout(Self::TOPIC_CLEANUP_PERIOD, ());
try_construct!(
poll.register(
Expand Down Expand Up @@ -626,7 +627,7 @@ impl Discovery {
// (authentication, key exchange) messages
#[cfg(feature = "security")]
let secure_message_resend_timer = {
let mut secure_message_resend_timer: Timer<()> = Timer::default();
let mut secure_message_resend_timer: Timer<()> = new_simple_timer();
secure_message_resend_timer
.set_timeout(Self::CACHED_SECURE_DISCOVERY_MESSAGE_RESEND_PERIOD, ());
try_construct!(
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
clippy::option_map_unit_fn,
)]

mod polling;
#[macro_use]
mod serialization_test;
#[macro_use]
Expand Down
4 changes: 4 additions & 0 deletions src/messages/submessages/elements/serialized_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ impl SerializedPayload {
}
}

pub(crate) fn reallocate(&mut self) {
self.value = Bytes::copy_from_slice(&self.value);
}

/// serialized size in bytes
pub fn len_serialized(&self) -> usize {
H_LEN + self.value.len()
Expand Down
20 changes: 20 additions & 0 deletions src/polling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Currently, just helpers for mio library

// TODO: Expand this to become an iternal API for polling operations:
// * sockets (send/recv)
// * timers
// * inter-thread channels
//
// Then we cold implement them either on top of mio-0.6, mio-0.8 or something
// else

use mio_extras::{timer, timer::Timer};

// The default timer has 256 timer wheel slots and capacity
// ("Max number of timeouts that can be in flight at a given time") of 65536.
// This seems to take up a lot of memory.
// Additionally, each timer creates its own background thread, which also takes
// up resources, but changing that would be a major change.
pub fn new_simple_timer<E>() -> Timer<E> {
timer::Builder::default().num_slots(4).capacity(2).build()
}
3 changes: 3 additions & 0 deletions src/rtps/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::{

pub const PREEMPTIVE_ACKNACK_PERIOD: Duration = Duration::from_secs(5);

pub const CACHE_CLEAN_PERIOD: Duration = Duration::from_secs(4);

// RTPS spec Section 8.4.7.1.1 "Default Timing-Related Values"
pub const NACK_RESPONSE_DELAY: Duration = Duration::from_millis(200);
pub const NACK_SUPPRESSION_DURATION: Duration = Duration::from_millis(0);
Expand Down Expand Up @@ -234,6 +236,7 @@ pub const DISCOVERY_PARTICIPANT_MESSAGE_TOKEN: Token = Token(40 + PTB);
pub const DISCOVERY_PARTICIPANT_MESSAGE_TIMER_TOKEN: Token = Token(41 + PTB);

pub const DPEV_ACKNACK_TIMER_TOKEN: Token = Token(45 + PTB);
pub const DPEV_CACHE_CLEAN_TIMER_TOKEN: Token = Token(46 + PTB);

pub const SECURE_DISCOVERY_PARTICIPANT_DATA_TOKEN: Token = Token(50 + PTB);
// pub const DISCOVERY_PARTICIPANT_CLEANUP_TOKEN: Token = Token(51 + PTB);
Expand Down
31 changes: 28 additions & 3 deletions src/rtps/dp_event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
},
messages::submessages::submessages::AckSubmessage,
network::{udp_listener::UDPListener, udp_sender::UDPSender},
polling::new_simple_timer,
qos::HasQoSPolicy,
rtps::{
constant::*,
Expand All @@ -31,6 +32,7 @@ use crate::{
writer::{Writer, WriterIngredients},
},
structure::{
dds_cache::DDSCache,
entity::RTPSEntity,
guid::{EntityId, GuidPrefix, TokenDecode, GUID},
},
Expand Down Expand Up @@ -58,6 +60,7 @@ pub(crate) enum EventLoopCommand {
pub struct DPEventLoop {
domain_info: DomainInfo,
poll: Poll,
dds_cache: Arc<RwLock<DDSCache>>,
discovery_db: Arc<RwLock<DiscoveryDB>>,
udp_listeners: HashMap<Token, UDPListener>,
message_receiver: MessageReceiver, // This contains our Readers
Expand Down Expand Up @@ -93,6 +96,7 @@ impl DPEventLoop {
#[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)]
pub(crate) fn new(
domain_info: DomainInfo,
dds_cache: Arc<RwLock<DDSCache>>,
udp_listeners: HashMap<Token, UDPListener>,
discovery_db: Arc<RwLock<DiscoveryDB>>,
participant_guid_prefix: GuidPrefix,
Expand Down Expand Up @@ -196,6 +200,7 @@ impl DPEventLoop {
Self {
domain_info,
poll,
dds_cache,
discovery_db,
udp_listeners,
udp_sender: Rc::new(udp_sender),
Expand Down Expand Up @@ -223,9 +228,13 @@ impl DPEventLoop {

pub fn event_loop(self) {
let mut events = Events::with_capacity(16); // too small capacity just delays events to next poll
let mut acknack_timer = mio_extras::timer::Timer::default();

let mut acknack_timer = new_simple_timer();
acknack_timer.set_timeout(PREEMPTIVE_ACKNACK_PERIOD, ());

let mut cache_gc_timer = new_simple_timer();
cache_gc_timer.set_timeout(CACHE_CLEAN_PERIOD, ());

self
.poll
.register(
Expand All @@ -235,6 +244,15 @@ impl DPEventLoop {
PollOpt::edge(),
)
.unwrap();
self
.poll
.register(
&cache_gc_timer,
DPEV_CACHE_CLEAN_TIMER_TOKEN,
Ready::readable(),
PollOpt::edge(),
)
.unwrap();
let mut poll_alive = Instant::now();
let mut ev_wrapper = self;
let mut preparing_to_stop = false;
Expand Down Expand Up @@ -363,6 +381,11 @@ impl DPEventLoop {
ev_wrapper.message_receiver.send_preemptive_acknacks();
acknack_timer.set_timeout(PREEMPTIVE_ACKNACK_PERIOD, ());
}
DPEV_CACHE_CLEAN_TIMER_TOKEN => {
debug!("Clean DDSCache on timer");
ev_wrapper.dds_cache.write().unwrap().garbage_collect();
cache_gc_timer.set_timeout(CACHE_CLEAN_PERIOD, ());
}

fixed_unknown => {
error!(
Expand Down Expand Up @@ -820,7 +843,7 @@ impl DPEventLoop {
}

fn add_local_reader(&mut self, reader_ing: ReaderIngredients) {
let timer = mio_extras::timer::Builder::default().num_slots(8).build();
let timer = new_simple_timer();
self
.poll
.register(
Expand Down Expand Up @@ -883,7 +906,7 @@ impl DPEventLoop {
}

fn add_local_writer(&mut self, writer_ing: WriterIngredients) {
let timer = mio_extras::timer::Builder::default().num_slots(8).build();
let timer = new_simple_timer();
self
.poll
.register(
Expand Down Expand Up @@ -1079,6 +1102,7 @@ mod tests {
sync_status_channel(16).unwrap();

let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
let dds_cache_clone = Arc::clone(&dds_cache);
let (discovery_db_event_sender, _discovery_db_event_receiver) =
mio_channel::sync_channel::<()>(4);

Expand All @@ -1100,6 +1124,7 @@ mod tests {
let child = thread::spawn(move || {
let dp_event_loop = DPEventLoop::new(
domain_info,
dds_cache_clone,
HashMap::new(),
discovery_db,
GuidPrefix::default(),
Expand Down
10 changes: 10 additions & 0 deletions src/rtps/fragment_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ impl FragmentAssembler {
}
}

pub fn garbage_collect_before(&mut self, expire_before: Timestamp) {
self.assembly_buffers.retain(|sn, ab| {
let retain = ab.modified_time >= expire_before;
if !retain {
info!("AssemblyBuffer dropping {sn:?}");
}
retain
});
}

// pub fn partially_received_sequence_numbers_iterator(&self) -> Box<dyn
// Iterator<Item=SequenceNumber>> { // Since we should only know about SNs
// via DATAFRAG messages // and AssemblyBuffers are removed immediately on
Expand Down
2 changes: 1 addition & 1 deletion src/rtps/message_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub(crate) struct MessageReceiver {
pub multicast_reply_locator_list: Vec<Locator>,
pub source_timestamp: Option<Timestamp>,

submessage_count: usize, // Used in tests only?
submessage_count: usize, // Used in tests and error messages only?
secure_receiver_state: Option<SecureReceiverState>,
#[cfg(feature = "security")]
secure_rtps_wrapped: Option<SecureWrapping>,
Expand Down
27 changes: 25 additions & 2 deletions src/rtps/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::{
structure::{
cache_change::{CacheChange, ChangeKind},
dds_cache::TopicCache,
duration::Duration,
entity::RTPSEntity,
guid::{EntityId, GuidPrefix, GUID},
locator::Locator,
Expand Down Expand Up @@ -134,6 +135,7 @@ pub(crate) struct Reader {
received_heartbeat_count: i32,

fragment_assemblers: BTreeMap<GUID, FragmentAssembler>,
last_fragment_garbage_collect: Timestamp,
matched_writers: BTreeMap<GUID, RtpsWriterProxy>,
writer_match_count_total: i32, // total count, never decreases

Expand All @@ -151,6 +153,12 @@ pub(crate) struct Reader {
security_plugins: Option<SecurityPluginsHandle>,
}

// If we are assembling a fragment, but it does not receive any updates
// for this time, the AssemblyBuffer is just dropped.
const FRAGMENT_ASSEMBLY_TIMEOUT: Duration = Duration::from_secs(10);
// minimum interval (max frequency) of AssemblyBuffer GC
const MIN_FRAGMENT_GC_INTERVAL: Duration = Duration::from_secs(2);

impl Reader {
pub(crate) fn new(
i: ReaderIngredients,
Expand Down Expand Up @@ -193,6 +201,7 @@ impl Reader {
heartbeat_suppression_duration: StdDuration::new(0, 0),
received_heartbeat_count: 0,
fragment_assemblers: BTreeMap::new(),
last_fragment_garbage_collect: Timestamp::now(),
matched_writers: BTreeMap::new(),
writer_match_count_total: 0,
requested_deadline_missed_count: 0,
Expand Down Expand Up @@ -644,12 +653,26 @@ impl Reader {
}

fn garbage_collect_fragments(&mut self) {
// TODO: On most calls, do nothing.
//
// If GC time/packet limit has been exceeded, iterate through
// fragment assemblers and discard those assembly buffers whose
// creation / modification timestamps look like it is no longer receiving
// data and can therefore be discarded.
let now = Timestamp::now();
if now - self.last_fragment_garbage_collect > MIN_FRAGMENT_GC_INTERVAL {
self.last_fragment_garbage_collect = now;

let expire_before = now - FRAGMENT_ASSEMBLY_TIMEOUT;

self
.fragment_assemblers
.iter_mut()
.for_each(|(writer, fa)| {
debug!("AssemblyBuffer GC writer {:?}", writer);
fa.garbage_collect_before(expire_before);
});
} else {
trace!("Not yet AssemblyBuffer GC time.");
}
}

fn missing_frags_for(
Expand Down
Loading

0 comments on commit 1f198b1

Please sign in to comment.