From 1f198b13bb6103f357bd1c240874abf8f1042b0d Mon Sep 17 00:00:00 2001 From: Juhana Helovuo Date: Thu, 14 Mar 2024 13:14:04 +0200 Subject: [PATCH] Fixes to memory leaks identified in #325 (#327) * Implement reallocation of SerializedData in DDSCache. This closes memory leak of receive buffers. * Add timed garbage collection to whole DDSCache. This is to force release of receive buffers from topics that rarely get new samples. * Reduce Timer memory consumption. The default config of a single mio-0.6 Timer consumes over a megabyte of memory. Allocate new Timers with much smaller event buffers. * fmt * Add missing datafrag AssemblyBuffer garbage collection. --- examples/no_key_async_usage_example/main.rs | 33 +++++++++++ src/dds/ddsdata.rs | 8 +++ src/dds/participant.rs | 2 + src/discovery/discovery.rs | 9 +-- src/lib.rs | 1 + .../elements/serialized_payload.rs | 4 ++ src/polling.rs | 20 +++++++ src/rtps/constant.rs | 3 + src/rtps/dp_event_loop.rs | 31 +++++++++- src/rtps/fragment_assembler.rs | 10 ++++ src/rtps/message_receiver.rs | 2 +- src/rtps/reader.rs | 27 ++++++++- src/rtps/rtps_writer_proxy.rs | 3 - .../authentication_builtin/types.rs | 2 +- src/security/certificate.rs | 3 +- src/structure/cache_change.rs | 7 +++ src/structure/dds_cache.rs | 56 ++++++++++++++++--- 17 files changed, 196 insertions(+), 25 deletions(-) create mode 100644 src/polling.rs diff --git a/examples/no_key_async_usage_example/main.rs b/examples/no_key_async_usage_example/main.rs index ca6046ba..1430433a 100644 --- a/examples/no_key_async_usage_example/main.rs +++ b/examples/no_key_async_usage_example/main.rs @@ -1,5 +1,11 @@ use std::{io, time}; +use log4rs::{ + append::console::ConsoleAppender, + config::{Appender, Root}, + Config, +}; +use log::LevelFilter; use futures::StreamExt; use rustdds::*; use serde::{Deserialize, Serialize}; @@ -8,6 +14,8 @@ use smol::Timer; const SECOND: time::Duration = time::Duration::from_millis(1000); fn main() { + configure_logging(); + // DomainParticipant is always necessary let domain_participant = DomainParticipant::new(0).unwrap(); @@ -113,3 +121,28 @@ fn main() { }) } } + +fn configure_logging() { + // initialize logging, preferably from config file + log4rs::init_file( + "logging-config.yaml", + log4rs::config::Deserializers::default(), + ) + .unwrap_or_else(|e| { + match e.downcast_ref::() { + // Config file did not work. If it is a simple "No such file or directory", then + // substitute some default config. + Some(os_err) if os_err.kind() == io::ErrorKind::NotFound => { + println!("No config file found in current working directory."); + let stdout = ConsoleAppender::builder().build(); + let conf = Config::builder() + .appender(Appender::builder().build("stdout", Box::new(stdout))) + .build(Root::builder().appender("stdout").build(LevelFilter::Error)) + .unwrap(); + log4rs::init_config(conf).unwrap(); + } + // Give up. + other_error => panic!("Config problem: {other_error:?}"), + } + }); +} diff --git a/src/dds/ddsdata.rs b/src/dds/ddsdata.rs index 1cecf315..c7df0a6e 100644 --- a/src/dds/ddsdata.rs +++ b/src/dds/ddsdata.rs @@ -60,6 +60,14 @@ impl DDSData { } } + pub(crate) fn reallocate(&mut self) { + match self { + DDSData::Data { serialized_payload } => serialized_payload.reallocate(), + DDSData::DisposeByKey { key, .. } => key.reallocate(), + DDSData::DisposeByKeyHash { .. } => {} + } + } + #[cfg(test)] pub fn data(&self) -> Bytes { self.payload_bytes() diff --git a/src/dds/participant.rs b/src/dds/participant.rs index 2a519bb4..086584e5 100644 --- a/src/dds/participant.rs +++ b/src/dds/participant.rs @@ -1097,6 +1097,7 @@ impl DomainParticipantInner { }; let dds_cache = Arc::new(RwLock::new(DDSCache::new())); + let dds_cache_clone = Arc::clone(&dds_cache); let (discovery_db_event_sender, discovery_db_event_receiver) = mio_channel::sync_channel::<()>(1); @@ -1118,6 +1119,7 @@ impl DomainParticipantInner { .spawn(move || { let dp_event_loop = DPEventLoop::new( domain_info, + dds_cache_clone, listeners, disc_db_clone, participant_guid.prefix, diff --git a/src/discovery/discovery.rs b/src/discovery/discovery.rs index 7a833225..e61332be 100644 --- a/src/discovery/discovery.rs +++ b/src/discovery/discovery.rs @@ -32,6 +32,7 @@ use crate::{ }, spdp_participant_data::{Participant_GUID, SpdpDiscoveredParticipantData}, }, + polling::new_simple_timer, rtps::constant::*, serialization::{ cdr_deserializer::CDRDeserializerAdapter, cdr_serializer::CDRSerializerAdapter, @@ -383,7 +384,7 @@ impl Discovery { .register(&reader, $reader_token, Ready::readable(), PollOpt::edge()) .expect("Failed to register a discovery reader to poll."); - let mut timer: Timer<()> = Timer::default(); + let mut timer: Timer<()> = new_simple_timer(); if let Some((timeout_value, timer_token)) = $timeout_and_timer_token_opt { timer.set_timeout(timeout_value, ()); poll @@ -433,7 +434,7 @@ impl Discovery { ); // create lease duration check timer - let mut participant_cleanup_timer: Timer<()> = Timer::default(); + let mut participant_cleanup_timer: Timer<()> = new_simple_timer(); participant_cleanup_timer.set_timeout(Self::PARTICIPANT_CLEANUP_PERIOD, ()); try_construct!( poll.register( @@ -492,7 +493,7 @@ impl Discovery { ); // create lease duration check timer - let mut topic_cleanup_timer: Timer<()> = Timer::default(); + let mut topic_cleanup_timer: Timer<()> = new_simple_timer(); topic_cleanup_timer.set_timeout(Self::TOPIC_CLEANUP_PERIOD, ()); try_construct!( poll.register( @@ -626,7 +627,7 @@ impl Discovery { // (authentication, key exchange) messages #[cfg(feature = "security")] let secure_message_resend_timer = { - let mut secure_message_resend_timer: Timer<()> = Timer::default(); + let mut secure_message_resend_timer: Timer<()> = new_simple_timer(); secure_message_resend_timer .set_timeout(Self::CACHED_SECURE_DISCOVERY_MESSAGE_RESEND_PERIOD, ()); try_construct!( diff --git a/src/lib.rs b/src/lib.rs index 5a926fb1..cbe3fde3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -162,6 +162,7 @@ clippy::option_map_unit_fn, )] +mod polling; #[macro_use] mod serialization_test; #[macro_use] diff --git a/src/messages/submessages/elements/serialized_payload.rs b/src/messages/submessages/elements/serialized_payload.rs index e10d6b49..58b1885b 100644 --- a/src/messages/submessages/elements/serialized_payload.rs +++ b/src/messages/submessages/elements/serialized_payload.rs @@ -49,6 +49,10 @@ impl SerializedPayload { } } + pub(crate) fn reallocate(&mut self) { + self.value = Bytes::copy_from_slice(&self.value); + } + /// serialized size in bytes pub fn len_serialized(&self) -> usize { H_LEN + self.value.len() diff --git a/src/polling.rs b/src/polling.rs new file mode 100644 index 00000000..dfa2e0f5 --- /dev/null +++ b/src/polling.rs @@ -0,0 +1,20 @@ +// Currently, just helpers for mio library + +// TODO: Expand this to become an iternal API for polling operations: +// * sockets (send/recv) +// * timers +// * inter-thread channels +// +// Then we cold implement them either on top of mio-0.6, mio-0.8 or something +// else + +use mio_extras::{timer, timer::Timer}; + +// The default timer has 256 timer wheel slots and capacity +// ("Max number of timeouts that can be in flight at a given time") of 65536. +// This seems to take up a lot of memory. +// Additionally, each timer creates its own background thread, which also takes +// up resources, but changing that would be a major change. +pub fn new_simple_timer() -> Timer { + timer::Builder::default().num_slots(4).capacity(2).build() +} diff --git a/src/rtps/constant.rs b/src/rtps/constant.rs index 1ffe5357..d090708a 100644 --- a/src/rtps/constant.rs +++ b/src/rtps/constant.rs @@ -13,6 +13,8 @@ use crate::{ pub const PREEMPTIVE_ACKNACK_PERIOD: Duration = Duration::from_secs(5); +pub const CACHE_CLEAN_PERIOD: Duration = Duration::from_secs(4); + // RTPS spec Section 8.4.7.1.1 "Default Timing-Related Values" pub const NACK_RESPONSE_DELAY: Duration = Duration::from_millis(200); pub const NACK_SUPPRESSION_DURATION: Duration = Duration::from_millis(0); @@ -234,6 +236,7 @@ pub const DISCOVERY_PARTICIPANT_MESSAGE_TOKEN: Token = Token(40 + PTB); pub const DISCOVERY_PARTICIPANT_MESSAGE_TIMER_TOKEN: Token = Token(41 + PTB); pub const DPEV_ACKNACK_TIMER_TOKEN: Token = Token(45 + PTB); +pub const DPEV_CACHE_CLEAN_TIMER_TOKEN: Token = Token(46 + PTB); pub const SECURE_DISCOVERY_PARTICIPANT_DATA_TOKEN: Token = Token(50 + PTB); // pub const DISCOVERY_PARTICIPANT_CLEANUP_TOKEN: Token = Token(51 + PTB); diff --git a/src/rtps/dp_event_loop.rs b/src/rtps/dp_event_loop.rs index 61d48431..f1944b7b 100644 --- a/src/rtps/dp_event_loop.rs +++ b/src/rtps/dp_event_loop.rs @@ -21,6 +21,7 @@ use crate::{ }, messages::submessages::submessages::AckSubmessage, network::{udp_listener::UDPListener, udp_sender::UDPSender}, + polling::new_simple_timer, qos::HasQoSPolicy, rtps::{ constant::*, @@ -31,6 +32,7 @@ use crate::{ writer::{Writer, WriterIngredients}, }, structure::{ + dds_cache::DDSCache, entity::RTPSEntity, guid::{EntityId, GuidPrefix, TokenDecode, GUID}, }, @@ -58,6 +60,7 @@ pub(crate) enum EventLoopCommand { pub struct DPEventLoop { domain_info: DomainInfo, poll: Poll, + dds_cache: Arc>, discovery_db: Arc>, udp_listeners: HashMap, message_receiver: MessageReceiver, // This contains our Readers @@ -93,6 +96,7 @@ impl DPEventLoop { #[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)] pub(crate) fn new( domain_info: DomainInfo, + dds_cache: Arc>, udp_listeners: HashMap, discovery_db: Arc>, participant_guid_prefix: GuidPrefix, @@ -196,6 +200,7 @@ impl DPEventLoop { Self { domain_info, poll, + dds_cache, discovery_db, udp_listeners, udp_sender: Rc::new(udp_sender), @@ -223,9 +228,13 @@ impl DPEventLoop { pub fn event_loop(self) { let mut events = Events::with_capacity(16); // too small capacity just delays events to next poll - let mut acknack_timer = mio_extras::timer::Timer::default(); + + let mut acknack_timer = new_simple_timer(); acknack_timer.set_timeout(PREEMPTIVE_ACKNACK_PERIOD, ()); + let mut cache_gc_timer = new_simple_timer(); + cache_gc_timer.set_timeout(CACHE_CLEAN_PERIOD, ()); + self .poll .register( @@ -235,6 +244,15 @@ impl DPEventLoop { PollOpt::edge(), ) .unwrap(); + self + .poll + .register( + &cache_gc_timer, + DPEV_CACHE_CLEAN_TIMER_TOKEN, + Ready::readable(), + PollOpt::edge(), + ) + .unwrap(); let mut poll_alive = Instant::now(); let mut ev_wrapper = self; let mut preparing_to_stop = false; @@ -363,6 +381,11 @@ impl DPEventLoop { ev_wrapper.message_receiver.send_preemptive_acknacks(); acknack_timer.set_timeout(PREEMPTIVE_ACKNACK_PERIOD, ()); } + DPEV_CACHE_CLEAN_TIMER_TOKEN => { + debug!("Clean DDSCache on timer"); + ev_wrapper.dds_cache.write().unwrap().garbage_collect(); + cache_gc_timer.set_timeout(CACHE_CLEAN_PERIOD, ()); + } fixed_unknown => { error!( @@ -820,7 +843,7 @@ impl DPEventLoop { } fn add_local_reader(&mut self, reader_ing: ReaderIngredients) { - let timer = mio_extras::timer::Builder::default().num_slots(8).build(); + let timer = new_simple_timer(); self .poll .register( @@ -883,7 +906,7 @@ impl DPEventLoop { } fn add_local_writer(&mut self, writer_ing: WriterIngredients) { - let timer = mio_extras::timer::Builder::default().num_slots(8).build(); + let timer = new_simple_timer(); self .poll .register( @@ -1079,6 +1102,7 @@ mod tests { sync_status_channel(16).unwrap(); let dds_cache = Arc::new(RwLock::new(DDSCache::new())); + let dds_cache_clone = Arc::clone(&dds_cache); let (discovery_db_event_sender, _discovery_db_event_receiver) = mio_channel::sync_channel::<()>(4); @@ -1100,6 +1124,7 @@ mod tests { let child = thread::spawn(move || { let dp_event_loop = DPEventLoop::new( domain_info, + dds_cache_clone, HashMap::new(), discovery_db, GuidPrefix::default(), diff --git a/src/rtps/fragment_assembler.rs b/src/rtps/fragment_assembler.rs index 3281579c..7a877842 100644 --- a/src/rtps/fragment_assembler.rs +++ b/src/rtps/fragment_assembler.rs @@ -209,6 +209,16 @@ impl FragmentAssembler { } } + pub fn garbage_collect_before(&mut self, expire_before: Timestamp) { + self.assembly_buffers.retain(|sn, ab| { + let retain = ab.modified_time >= expire_before; + if !retain { + info!("AssemblyBuffer dropping {sn:?}"); + } + retain + }); + } + // pub fn partially_received_sequence_numbers_iterator(&self) -> Box> { // Since we should only know about SNs // via DATAFRAG messages // and AssemblyBuffers are removed immediately on diff --git a/src/rtps/message_receiver.rs b/src/rtps/message_receiver.rs index 00217338..7c9f8167 100644 --- a/src/rtps/message_receiver.rs +++ b/src/rtps/message_receiver.rs @@ -115,7 +115,7 @@ pub(crate) struct MessageReceiver { pub multicast_reply_locator_list: Vec, pub source_timestamp: Option, - submessage_count: usize, // Used in tests only? + submessage_count: usize, // Used in tests and error messages only? secure_receiver_state: Option, #[cfg(feature = "security")] secure_rtps_wrapped: Option, diff --git a/src/rtps/reader.rs b/src/rtps/reader.rs index b33db864..aad469bd 100644 --- a/src/rtps/reader.rs +++ b/src/rtps/reader.rs @@ -46,6 +46,7 @@ use crate::{ structure::{ cache_change::{CacheChange, ChangeKind}, dds_cache::TopicCache, + duration::Duration, entity::RTPSEntity, guid::{EntityId, GuidPrefix, GUID}, locator::Locator, @@ -134,6 +135,7 @@ pub(crate) struct Reader { received_heartbeat_count: i32, fragment_assemblers: BTreeMap, + last_fragment_garbage_collect: Timestamp, matched_writers: BTreeMap, writer_match_count_total: i32, // total count, never decreases @@ -151,6 +153,12 @@ pub(crate) struct Reader { security_plugins: Option, } +// If we are assembling a fragment, but it does not receive any updates +// for this time, the AssemblyBuffer is just dropped. +const FRAGMENT_ASSEMBLY_TIMEOUT: Duration = Duration::from_secs(10); +// minimum interval (max frequency) of AssemblyBuffer GC +const MIN_FRAGMENT_GC_INTERVAL: Duration = Duration::from_secs(2); + impl Reader { pub(crate) fn new( i: ReaderIngredients, @@ -193,6 +201,7 @@ impl Reader { heartbeat_suppression_duration: StdDuration::new(0, 0), received_heartbeat_count: 0, fragment_assemblers: BTreeMap::new(), + last_fragment_garbage_collect: Timestamp::now(), matched_writers: BTreeMap::new(), writer_match_count_total: 0, requested_deadline_missed_count: 0, @@ -644,12 +653,26 @@ impl Reader { } fn garbage_collect_fragments(&mut self) { - // TODO: On most calls, do nothing. - // // If GC time/packet limit has been exceeded, iterate through // fragment assemblers and discard those assembly buffers whose // creation / modification timestamps look like it is no longer receiving // data and can therefore be discarded. + let now = Timestamp::now(); + if now - self.last_fragment_garbage_collect > MIN_FRAGMENT_GC_INTERVAL { + self.last_fragment_garbage_collect = now; + + let expire_before = now - FRAGMENT_ASSEMBLY_TIMEOUT; + + self + .fragment_assemblers + .iter_mut() + .for_each(|(writer, fa)| { + debug!("AssemblyBuffer GC writer {:?}", writer); + fa.garbage_collect_before(expire_before); + }); + } else { + trace!("Not yet AssemblyBuffer GC time."); + } } fn missing_frags_for( diff --git a/src/rtps/rtps_writer_proxy.rs b/src/rtps/rtps_writer_proxy.rs index ba1fd7ec..55ba5e07 100644 --- a/src/rtps/rtps_writer_proxy.rs +++ b/src/rtps/rtps_writer_proxy.rs @@ -74,7 +74,6 @@ pub(crate) struct RtpsWriterProxy { // These are used for quick tracking of last_received_sequence_number: SequenceNumber, last_received_timestamp: Timestamp, - //fragment_assembler: Option, } impl RtpsWriterProxy { @@ -97,7 +96,6 @@ impl RtpsWriterProxy { ack_base: SequenceNumber::new(1), last_received_sequence_number: SequenceNumber::new(0), last_received_timestamp: Timestamp::INVALID, - //fragment_assembler: None, } } @@ -330,7 +328,6 @@ impl RtpsWriterProxy { ack_base: SequenceNumber::default(), last_received_sequence_number: SequenceNumber::new(0), last_received_timestamp: Timestamp::INVALID, - //fragment_assembler: None, } } // fn diff --git a/src/security/authentication/authentication_builtin/types.rs b/src/security/authentication/authentication_builtin/types.rs index 715f9fea..cec9dc90 100644 --- a/src/security/authentication/authentication_builtin/types.rs +++ b/src/security/authentication/authentication_builtin/types.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use log::debug; -use x509_certificate::{self, KeyAlgorithm}; +use x509_certificate::KeyAlgorithm; use crate::{ create_security_error, diff --git a/src/security/certificate.rs b/src/security/certificate.rs index f2eac656..d7633afa 100644 --- a/src/security/certificate.rs +++ b/src/security/certificate.rs @@ -21,7 +21,7 @@ use log::{debug, error, info, trace, warn}; use bytes::Bytes; use x509_certificate::{ - self, certificate::CapturedX509Certificate, EcdsaCurve, KeyAlgorithm, SignatureAlgorithm, + certificate::CapturedX509Certificate, EcdsaCurve, KeyAlgorithm, SignatureAlgorithm, }; use der::Decode; use bcder::{encode::Values, Mode}; @@ -33,7 +33,6 @@ use crate::security::{ config::{to_config_error_other, to_config_error_parse, ConfigError}, types::{security_error, SecurityResult}, }; -//use crate::security_error; // This is mostly a wrapper around // x509_certificate::certificate::CapturedX509Certificate diff --git a/src/structure/cache_change.rs b/src/structure/cache_change.rs index 66f0db61..ad9ebeff 100644 --- a/src/structure/cache_change.rs +++ b/src/structure/cache_change.rs @@ -1,3 +1,6 @@ +#[allow(unused_imports)] +use log::{debug, error, info, trace, warn}; + use crate::{ dds::{ddsdata::DDSData, with_key::datawriter::WriteOptions}, structure::{guid::GUID, sequence_number::SequenceNumber}, @@ -43,6 +46,10 @@ impl CacheChange { } } + pub(crate) fn reallocate(&mut self) { + self.data_value.reallocate(); + } + // Not needed? // pub fn change_kind(&self) -> ChangeKind { // self.data_value.change_kind() diff --git a/src/structure/dds_cache.rs b/src/structure/dds_cache.rs index 67b6f040..b15ec6fb 100644 --- a/src/structure/dds_cache.rs +++ b/src/structure/dds_cache.rs @@ -6,7 +6,7 @@ use std::{ }; #[allow(unused_imports)] -use log::{debug, error, info, trace}; +use log::{debug, error, info, trace, warn}; use crate::{ create_error_internal, @@ -86,6 +86,17 @@ impl DDSCache { self.topic_caches.remove(topic_name); } } + + pub fn garbage_collect(&mut self) { + for tc in self.topic_caches.values_mut() { + let mut tc = tc.lock().unwrap(); + if let Some((last_timestamp, _)) = tc.changes.iter().next_back() { + if *last_timestamp > tc.changes_reallocated_up_to { + tc.remove_changes_before(Timestamp::ZERO); + } + } + } + } } #[derive(Debug)] @@ -100,10 +111,16 @@ pub(crate) struct TopicCache { max_keep_samples: i32, // from QoS, for quick, repeated access // TODO: Change this to Option, where None means "no limit". - // Tha main content of the cache is in this map. + // The main content of the cache is in this map. // Timestamp is assumed to be unique id over all the CacheChanges. changes: BTreeMap, + // The underlying Bytes buffers are reallocated after some time (once for each) in + // order to release the original receive buffer. The idea behind this is that if a CacheChange + // persists in the TopicCaceh for some time, it should no longer hold onto the receive buffer, + // so we can recycle the buffers. Otherwise, we (practically) leak memory. + changes_reallocated_up_to: Timestamp, + // sequence_numbers is an index to "changes" by GUID and SN sequence_numbers: BTreeMap>, @@ -125,6 +142,7 @@ impl TopicCache { * this */ max_keep_samples: 1, // dummy value, next call will overwrite this changes: BTreeMap::new(), + changes_reallocated_up_to: Timestamp::ZERO, sequence_numbers: BTreeMap::new(), received_reliably_before: BTreeMap::new(), }; @@ -180,7 +198,7 @@ impl TopicCache { self .add_change_internal(instant, cache_change) .map(|cc_back| { - debug!( + warn!( "DDSCache insert failed topic={:?} cache_change={:?}", self.topic_name, cc_back ); @@ -195,10 +213,10 @@ impl TopicCache { // First, do garbage collection. // But not at every insert, just to save time and effort. // Some heuristic to decide if we should collect now. - let payload_size = max(1, cache_change.data_value.payload_size()); + // let payload_size = max(1, cache_change.data_value.payload_size()); let semi_random_number = i64::from(cache_change.sequence_number) as usize; - let fairly_large_constant = 0xffff; - let modulus = fairly_large_constant / payload_size; + // let fairly_large_constant = 0xffff; + let modulus = 64; if modulus == 0 || semi_random_number % modulus == 0 { debug!("Garbage collecting topic {}", self.topic_name); self.remove_changes_before(Timestamp::ZERO); @@ -209,11 +227,16 @@ impl TopicCache { // Now to the actual adding business. if let Some(old_instant) = self.find_by_sn(&cache_change) { // Got duplicate DATA for a SN that we already have. It should be discarded. - debug!( + trace!( "add_change: discarding duplicate {:?} from {:?}. old timestamp = {:?}, new = {:?}", - cache_change.sequence_number, cache_change.writer_guid, old_instant, instant, + cache_change.sequence_number, + cache_change.writer_guid, + old_instant, + instant, ); - Some(cache_change) + // We are keeping this quiet, because e.g. FastDDS Discoery keeps sending the + // same SequenceNumber in periodic updates. + None } else { // This is a new (to us) SequenceNumber, this is the default processing path. self.insert_sn(*instant, &cache_change); @@ -372,6 +395,21 @@ impl TopicCache { // update also SequenceNumber map to_remove.values().for_each(|r| self.remove_sn(r)); + + // Now, reallocate old cache changes + let reallocate_timeout = crate::Duration::from_secs(5); + let now = Timestamp::now(); + let reallocate_limit = now - reallocate_timeout; + + self + .changes + .range_mut(( + Excluded(self.changes_reallocated_up_to), + Included(reallocate_limit), + )) + .for_each(|(_, cc)| cc.reallocate()); + + self.changes_reallocated_up_to = reallocate_limit; } pub fn topic_name(&self) -> String {