From 62ad2c421fb8588890815389176ce98e03f0845f Mon Sep 17 00:00:00 2001 From: David Estes Date: Wed, 26 Jun 2024 15:58:41 -0600 Subject: [PATCH 1/5] refactor: add explicit lifetime on recon store trait --- recon/src/libp2p/tests.rs | 4 ++-- recon/src/recon.rs | 8 ++++---- recon/src/recon/btreestore.rs | 4 ++-- service/src/event/store.rs | 6 +++--- service/src/interest/store.rs | 7 +++++-- store/src/metrics.rs | 7 +++++-- 6 files changed, 21 insertions(+), 15 deletions(-) diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 0711fc143..66b50d7f3 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -75,13 +75,13 @@ where type Key = K; type Hash = H; - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> ReconResult { self.as_error()?; self.inner.insert(item).await } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { + async fn insert_many<'a>(&self, items: &[ReconItem<'a, K>]) -> ReconResult { self.as_error()?; self.inner.insert_many(items).await diff --git a/recon/src/recon.rs b/recon/src/recon.rs index 956afc13e..255179390 100644 --- a/recon/src/recon.rs +++ b/recon/src/recon.rs @@ -462,12 +462,12 @@ pub trait Store { /// Insert a new key into the key space. Returns true if the key did not exist. /// The value will be updated if included - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> Result; + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> Result; /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> Result; + async fn insert_many<'a>(&self, items: &[ReconItem<'a, Self::Key>]) -> Result; /// Return the hash of all keys in the range between left_fencepost and right_fencepost. /// The upper range bound is exclusive. @@ -575,11 +575,11 @@ where type Key = K; type Hash = H; - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> Result { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> Result { self.as_ref().insert(item).await } - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> Result { + async fn insert_many<'a>(&self, items: &[ReconItem<'a, Self::Key>]) -> Result { self.as_ref().insert_many(items).await } diff --git a/recon/src/recon/btreestore.rs b/recon/src/recon/btreestore.rs index 565b34609..11ea93bf7 100644 --- a/recon/src/recon/btreestore.rs +++ b/recon/src/recon/btreestore.rs @@ -132,7 +132,7 @@ where type Key = K; type Hash = H; - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> Result { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> Result { let mut inner = self.inner.lock().await; let new = inner .keys @@ -143,7 +143,7 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> Result { + async fn insert_many<'a>(&self, items: &[ReconItem<'a, K>]) -> Result { let mut new = vec![false; items.len()]; for (idx, item) in items.iter().enumerate() { new[idx] = self.insert(item).await?; diff --git a/service/src/event/store.rs b/service/src/event/store.rs index fef6f4661..b33c142e1 100644 --- a/service/src/event/store.rs +++ b/service/src/event/store.rs @@ -14,7 +14,7 @@ impl recon::Store for CeramicEventService { type Key = EventId; type Hash = Sha256a; - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> ReconResult { let res = self .insert_events_from_carfiles_recon(&[item.to_owned()]) .await?; @@ -25,9 +25,9 @@ impl recon::Store for CeramicEventService { /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many( + async fn insert_many<'a>( &self, - items: &[ReconItem<'_, Self::Key>], + items: &[ReconItem<'a, Self::Key>], ) -> ReconResult { let res = self.insert_events_from_carfiles_recon(items).await?; Ok(res) diff --git a/service/src/interest/store.rs b/service/src/interest/store.rs index 5643ac76e..a63e4bd6b 100644 --- a/service/src/interest/store.rs +++ b/service/src/interest/store.rs @@ -13,7 +13,7 @@ impl recon::Store for CeramicInterestService { type Hash = Sha256a; #[instrument(skip(self))] - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> ReconResult { Ok(CeramicOneInterest::insert(&self.pool, item.key).await?) } @@ -21,7 +21,10 @@ impl recon::Store for CeramicInterestService { /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. #[instrument(skip(self))] - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> ReconResult { + async fn insert_many<'a>( + &self, + items: &[ReconItem<'a, Self::Key>], + ) -> ReconResult { let keys = items.iter().map(|item| item.key).collect::>(); Ok(CeramicOneInterest::insert_many(&self.pool, &keys).await?) } diff --git a/store/src/metrics.rs b/store/src/metrics.rs index f6db4292a..4870816cd 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -248,7 +248,7 @@ where type Key = K; type Hash = H; - async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { + async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> ReconResult { let new = StoreMetricsMiddleware::::record(&self.metrics, "insert", self.store.insert(item)) .await?; @@ -256,7 +256,10 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { + async fn insert_many<'a>( + &self, + items: &[ReconItem<'a, K>], + ) -> ReconResult { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", From 7850c90ba6122029b656a2a7089ee8f0d4654f38 Mon Sep 17 00:00:00 2001 From: David Estes Date: Wed, 26 Jun 2024 16:43:09 -0600 Subject: [PATCH 2/5] refactor: move where we mark init events deliverable into ordering --- service/src/event/order_events.rs | 13 ++++++++----- service/src/event/ordering_task.rs | 3 ++- service/src/event/service.rs | 6 ++---- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs index 79acef087..13ea20d28 100644 --- a/service/src/event/order_events.rs +++ b/service/src/event/order_events.rs @@ -28,11 +28,14 @@ impl OrderEvents { pool: &SqlitePool, mut candidate_events: Vec<(EventInsertable, EventMetadata)>, ) -> Result { - let mut new_cids: HashMap = HashMap::from_iter( - candidate_events - .iter() - .map(|(e, _)| (e.cid(), e.deliverable())), - ); + let mut new_cids: HashMap = + HashMap::from_iter(candidate_events.iter_mut().map(|(e, meta)| { + // all init events are deliverable so we mark them as such before we do anything else + if matches!(meta, EventMetadata::Init { .. }) { + e.body.set_deliverable(true); + } + (e.cid(), e.deliverable()) + })); let mut deliverable = Vec::with_capacity(candidate_events.len()); candidate_events.retain(|(e, h)| { if e.deliverable() { diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index 7afe35441..499800618 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -636,7 +636,8 @@ mod test { async fn insert_10_with_9_undelivered(pool: &SqlitePool) { let insertable = get_n_insertable_events(10).await; - let init = insertable.first().unwrap().to_owned(); + let mut init = insertable.first().unwrap().to_owned(); + init.body.set_deliverable(true); let undelivered = insertable.into_iter().skip(1).collect::>(); let new = CeramicOneEvent::insert_many(pool, &undelivered[..]) diff --git a/service/src/event/service.rs b/service/src/event/service.rs index 61f3fa2ce..bac3677e3 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -111,8 +111,7 @@ impl CeramicEventService { Ok(()) } - /// Currently only verifies that the event parses into a valid ceramic event, determining whether it's - /// immediately deliverable because it's an init event or it needs review. + /// Currently only verifies that the event parses into a valid ceramic event. /// In the future, we will need to do more event validation (verify all EventID pieces, hashes, signatures, etc). pub(crate) async fn validate_discovered_event( event_id: ceramic_core::EventId, @@ -135,8 +134,7 @@ impl CeramicEventService { } let metadata = EventMetadata::from(parsed_event); - let mut body = EventInsertableBody::try_from_carfile(cid, carfile).await?; - body.set_deliverable(matches!(metadata, EventMetadata::Init { .. })); + let body = EventInsertableBody::try_from_carfile(cid, carfile).await?; Ok((EventInsertable::try_new(event_id, body)?, metadata)) } From 92d3469de767e812097d105931cda06af80069d7 Mon Sep 17 00:00:00 2001 From: David Estes Date: Wed, 26 Jun 2024 16:53:28 -0600 Subject: [PATCH 3/5] refactor: change insert_many to take an iterator and reduce allocations in service crate also added getters on the OrderEvents struct instead of pub(crate) fields --- service/src/event/migration.rs | 2 +- service/src/event/order_events.rs | 18 ++++-- service/src/event/ordering_task.rs | 6 +- service/src/event/service.rs | 96 ++++++++++++++++-------------- store/src/sql/access/event.rs | 10 ++-- store/src/sql/test.rs | 4 +- 6 files changed, 77 insertions(+), 59 deletions(-) diff --git a/service/src/event/migration.rs b/service/src/event/migration.rs index 10b94da67..11e2e691d 100644 --- a/service/src/event/migration.rs +++ b/service/src/event/migration.rs @@ -140,7 +140,7 @@ impl Migrator { Ok(()) } async fn write_batch(&mut self, sql_pool: &SqlitePool) -> Result<()> { - CeramicOneEvent::insert_many(sql_pool, &self.batch).await?; + CeramicOneEvent::insert_many(sql_pool, self.batch.iter()).await?; self.event_count += self.batch.len(); self.batch.truncate(0); Ok(()) diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs index 13ea20d28..f2102d0c6 100644 --- a/service/src/event/order_events.rs +++ b/service/src/event/order_events.rs @@ -8,8 +8,18 @@ use crate::Result; use super::service::EventMetadata; pub(crate) struct OrderEvents { - pub(crate) deliverable: Vec<(EventInsertable, EventMetadata)>, - pub(crate) missing_history: Vec<(EventInsertable, EventMetadata)>, + deliverable: Vec<(EventInsertable, EventMetadata)>, + missing_history: Vec<(EventInsertable, EventMetadata)>, +} + +impl OrderEvents { + pub fn deliverable(&self) -> &[(EventInsertable, EventMetadata)] { + &self.deliverable + } + + pub fn missing_history(&self) -> &[(EventInsertable, EventMetadata)] { + &self.missing_history + } } impl OrderEvents { @@ -246,7 +256,7 @@ mod test { .map(|(i, _)| i.clone()) .collect::>(); let mut remaining = insertable.into_iter().skip(3).collect::>(); - CeramicOneEvent::insert_many(&pool, &to_insert[..]) + CeramicOneEvent::insert_many(&pool, to_insert.iter()) .await .unwrap(); @@ -287,7 +297,7 @@ mod test { }) .collect::>(); let mut remaining = insertable.into_iter().skip(3).collect::>(); - CeramicOneEvent::insert_many(&pool, &to_insert[..]) + CeramicOneEvent::insert_many(&pool, to_insert.iter()) .await .unwrap(); diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index 499800618..070cf0501 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -640,14 +640,16 @@ mod test { init.body.set_deliverable(true); let undelivered = insertable.into_iter().skip(1).collect::>(); - let new = CeramicOneEvent::insert_many(pool, &undelivered[..]) + let new = CeramicOneEvent::insert_many(pool, undelivered.iter()) .await .unwrap(); assert_eq!(9, new.inserted.len()); assert_eq!(0, new.inserted.iter().filter(|e| e.deliverable).count()); - let new = CeramicOneEvent::insert_many(pool, &[init]).await.unwrap(); + let new = CeramicOneEvent::insert_many(pool, [&init].into_iter()) + .await + .unwrap(); assert_eq!(1, new.inserted.len()); assert_eq!(1, new.inserted.iter().filter(|e| e.deliverable).count()); } diff --git a/service/src/event/service.rs b/service/src/event/service.rs index bac3677e3..beb2f821e 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use async_trait::async_trait; use ceramic_core::{EventId, Network}; use ceramic_event::unvalidated; @@ -134,7 +136,7 @@ impl CeramicEventService { } let metadata = EventMetadata::from(parsed_event); - let body = EventInsertableBody::try_from_carfile(cid, carfile).await?; + let body = EventInsertableBody::try_from_carfile(cid, carfile).await?; Ok((EventInsertable::try_new(event_id, body)?, metadata)) } @@ -195,68 +197,72 @@ impl CeramicEventService { let ordered = OrderEvents::try_new(&self.pool, to_insert).await?; let missing_history = ordered - .missing_history + .missing_history() .iter() .map(|(e, _)| e.order_key.clone()) .collect(); - let to_insert_with_metadata = if history_required { - ordered.deliverable + // api writes shouldn't have any missed history so we don't insert those events and + // we can skip notifying the ordering task because it's impossible to be waiting on them + let store_result = if history_required { + let to_insert = ordered.deliverable().iter().map(|(e, _)| e); + CeramicOneEvent::insert_many(&self.pool, to_insert).await? } else { - ordered - .deliverable - .into_iter() - .chain(ordered.missing_history) - .collect() - }; + let to_insert = ordered + .deliverable() + .iter() + .map(|(e, _)| e) + .chain(ordered.missing_history().iter().map(|(e, _)| e)); - let to_insert = to_insert_with_metadata - .iter() - .map(|(e, _)| e.clone()) - .collect::>(); + let store_result = CeramicOneEvent::insert_many(&self.pool, to_insert).await?; + self.notify_ordering_task(&ordered, &store_result).await?; - let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..]).await?; + store_result + }; - // api writes shouldn't have any missed pieces that need ordering so we don't send those and return early - if history_required { - return Ok(InsertResult { - store_result: res, - missing_history, - }); - } + Ok(InsertResult { + store_result, + missing_history, + }) + } - let to_send = res + async fn notify_ordering_task( + &self, + ordered: &OrderEvents, + store_result: &ceramic_store::InsertResult, + ) -> Result<()> { + let new = store_result .inserted .iter() - .filter(|i| i.new_key) - .collect::>(); + .filter_map(|i| if i.new_key { i.order_key.cid() } else { None }) + .collect::>(); - for ev in to_send { - if let Some((ev, metadata)) = to_insert_with_metadata - .iter() - .find(|(i, _)| i.order_key == ev.order_key) - { - let discovered = DiscoveredEvent { + for (ev, metadata) in ordered + .deliverable() + .iter() + .chain(ordered.missing_history().iter()) + { + if new.contains(&ev.cid()) { + self.send_discovered_event(DiscoveredEvent { cid: ev.cid(), known_deliverable: ev.deliverable(), metadata: metadata.to_owned(), - }; - trace!(?discovered, "sending delivered to ordering task"); - if let Err(_e) = self.delivery_task.tx_inserted.send(discovered).await { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } else { - tracing::error!(event_id=%ev.order_key, "Missing header for inserted event should be unreachable!"); - debug_assert!(false); // panic in debug mode - continue; + }) + .await?; } } - Ok(InsertResult { - store_result: res, - missing_history, - }) + Ok(()) + } + + async fn send_discovered_event(&self, discovered: DiscoveredEvent) -> Result<()> { + trace!(?discovered, "sending delivered to ordering task"); + if let Err(_e) = self.delivery_task.tx_inserted.send(discovered).await { + warn!("Delivery task closed. shutting down"); + Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))) + } else { + Ok(()) + } } } diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 2e9cbb622..646d3f4b9 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -160,11 +160,11 @@ impl CeramicOneEvent { /// That is, events will be processed in the order they are given so earlier events are given a lower global ordering /// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events /// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers. - pub async fn insert_many( - pool: &SqlitePool, - to_add: &[EventInsertable], - ) -> Result { - let mut inserted = Vec::with_capacity(to_add.len()); + pub async fn insert_many<'a, I>(pool: &SqlitePool, to_add: I) -> Result + where + I: Iterator, + { + let mut inserted = Vec::new(); let mut tx = pool.begin_tx().await.map_err(Error::from)?; for item in to_add { diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index 646f0c9b7..ad905f7d7 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -41,7 +41,7 @@ async fn hash_range_query() { let first = random_event("baeabeiazgwnti363jifhxaeaegbluw4ogcd2t5hsjaglo46wuwcgajqa5u"); let second = random_event("baeabeihyl35xdlfju3zrkvy2exmnl6wics3rc5ppz7hwg7l7g4brbtnpny"); - let x = CeramicOneEvent::insert_many(&pool, &[first, second]) + let x = CeramicOneEvent::insert_many(&pool, [&first, &second].into_iter()) .await .unwrap(); @@ -62,7 +62,7 @@ async fn range_query() { let first = random_event("baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524"); let second = random_event("baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty"); let pool = SqlitePool::connect_in_memory().await.unwrap(); - let x = CeramicOneEvent::insert_many(&pool, &[first, second]) + let x = CeramicOneEvent::insert_many(&pool, [&first, &second].into_iter()) .await .unwrap(); From 8a17c76f46a32c8e5226879e437569e890e5ea35 Mon Sep 17 00:00:00 2001 From: David Estes Date: Wed, 26 Jun 2024 17:13:37 -0600 Subject: [PATCH 4/5] refactor: extract some test code into functions for clarity --- service/src/event/order_events.rs | 83 ++++++++++++++++--------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs index f2102d0c6..c28c4f2f7 100644 --- a/service/src/event/order_events.rs +++ b/service/src/event/order_events.rs @@ -163,6 +163,43 @@ mod test { (stream_1, stream_2, to_insert) } + /// Asserts the events are deliverable and returns IDs for events in stream_1 as the first value and things in stream_2 as the second + fn split_deliverable_order_by_stream( + stream_1: &[(EventId, Vec)], + stream_2: &[(EventId, Vec)], + events: &[(EventInsertable, EventMetadata)], + ) -> (Vec, Vec) { + let mut after_1 = Vec::with_capacity(stream_1.len()); + let mut after_2 = Vec::with_capacity(stream_2.len()); + for (event, _) in events { + assert!(event.deliverable()); + if stream_1.iter().any(|e| e.0 == event.order_key) { + after_1.push(event.order_key.clone()); + } else { + after_2.push(event.order_key.clone()); + } + } + + (after_1, after_2) + } + + async fn get_insertable_events( + events: &[(EventId, Vec)], + ) -> Vec<(EventInsertable, EventMetadata)> { + let mut insertable = Vec::with_capacity(events.len()); + for event in events { + let new = CeramicEventService::validate_discovered_event( + event.0.to_owned(), + event.1.as_slice(), + ) + .await + .unwrap(); + insertable.push(new); + } + + insertable + } + #[test(tokio::test)] async fn out_of_order_streams_valid() { let pool = SqlitePool::connect_in_memory().await.unwrap(); @@ -177,16 +214,8 @@ mod test { ordered.missing_history.len(), ordered.missing_history ); - let mut after_1 = Vec::with_capacity(10); - let mut after_2 = Vec::with_capacity(10); - for (event, _) in ordered.deliverable { - assert!(event.deliverable()); - if stream_1.iter().any(|e| e.0 == event.order_key) { - after_1.push(event.order_key.clone()); - } else { - after_2.push(event.order_key.clone()); - } - } + let (after_1, after_2) = + split_deliverable_order_by_stream(&stream_1, &stream_2, ordered.deliverable()); assert_eq!( stream_1.into_iter().map(|e| e.0).collect::>(), @@ -214,16 +243,8 @@ mod test { "Missing history: {:?}", ordered.missing_history ); - let mut after_1 = Vec::with_capacity(10); - let mut after_2 = Vec::with_capacity(10); - for (event, _) in ordered.deliverable { - assert!(event.deliverable()); - if stream_1.iter().any(|e| e.0 == event.order_key) { - after_1.push(event.order_key.clone()); - } else { - after_2.push(event.order_key.clone()); - } - } + let (after_1, after_2) = + split_deliverable_order_by_stream(&stream_1, &stream_2, ordered.deliverable()); assert_eq!(vec![stream_1[0].0.clone()], after_1); assert_eq!( @@ -240,16 +261,7 @@ mod test { let pool = SqlitePool::connect_in_memory().await.unwrap(); let stream_1 = get_n_events(10).await; - let mut insertable = Vec::with_capacity(10); - for event in stream_1.iter() { - let new = CeramicEventService::validate_discovered_event( - event.0.to_owned(), - event.1.as_slice(), - ) - .await - .unwrap(); - insertable.push(new); - } + let insertable = get_insertable_events(&stream_1).await; let to_insert = insertable .iter() .take(3) @@ -278,16 +290,7 @@ mod test { let pool = SqlitePool::connect_in_memory().await.unwrap(); let stream_1 = get_n_events(10).await; - let mut insertable = Vec::with_capacity(10); - for event in stream_1.iter() { - let new = CeramicEventService::validate_discovered_event( - event.0.to_owned(), - event.1.as_slice(), - ) - .await - .unwrap(); - insertable.push(new); - } + let mut insertable = get_insertable_events(&stream_1).await; let to_insert = insertable .iter_mut() .take(3) From c12f526a45d4e814a5f6b4e5af0a8f87a25d72de Mon Sep 17 00:00:00 2001 From: David Estes Date: Wed, 26 Jun 2024 17:13:58 -0600 Subject: [PATCH 5/5] refactor: flip !any() to all(!) --- service/src/event/ordering_task.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index 070cf0501..5cfc239a6 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -268,10 +268,10 @@ impl StreamEvents { /// Returns `false` if we have more work to do and should be retained for future processing fn processing_completed(&mut self) -> bool { // if we're done, we don't need to bother cleaning up since we get dropped - if !self + if self .cid_map .iter() - .any(|(_, ev)| matches!(ev, StreamEvent::Undelivered(_))) + .all(|(_, ev)| !matches!(ev, StreamEvent::Undelivered(_))) { true } else {