From 346b14c38ce91fe948409e23a9415bbbcfcf0f04 Mon Sep 17 00:00:00 2001 From: David Estes Date: Wed, 26 Jun 2024 16:53:28 -0600 Subject: [PATCH] refactor: change insert_many to take an iterator and reduce allocations in service crate also added getters on the OrderEvents struct instead of pub(crate) fields --- service/src/event/migration.rs | 2 +- service/src/event/order_events.rs | 18 ++++-- service/src/event/ordering_task.rs | 6 +- service/src/event/service.rs | 96 ++++++++++++++++-------------- store/src/sql/access/event.rs | 10 ++-- store/src/sql/test.rs | 4 +- 6 files changed, 77 insertions(+), 59 deletions(-) diff --git a/service/src/event/migration.rs b/service/src/event/migration.rs index 4b85342ef..dfe348059 100644 --- a/service/src/event/migration.rs +++ b/service/src/event/migration.rs @@ -140,7 +140,7 @@ impl Migrator { Ok(()) } async fn write_batch(&mut self, sql_pool: &SqlitePool) -> Result<()> { - CeramicOneEvent::insert_many(sql_pool, &self.batch).await?; + CeramicOneEvent::insert_many(sql_pool, self.batch.iter()).await?; self.event_count += self.batch.len(); self.batch.truncate(0); Ok(()) diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs index 13ea20d28..f2102d0c6 100644 --- a/service/src/event/order_events.rs +++ b/service/src/event/order_events.rs @@ -8,8 +8,18 @@ use crate::Result; use super::service::EventMetadata; pub(crate) struct OrderEvents { - pub(crate) deliverable: Vec<(EventInsertable, EventMetadata)>, - pub(crate) missing_history: Vec<(EventInsertable, EventMetadata)>, + deliverable: Vec<(EventInsertable, EventMetadata)>, + missing_history: Vec<(EventInsertable, EventMetadata)>, +} + +impl OrderEvents { + pub fn deliverable(&self) -> &[(EventInsertable, EventMetadata)] { + &self.deliverable + } + + pub fn missing_history(&self) -> &[(EventInsertable, EventMetadata)] { + &self.missing_history + } } impl OrderEvents { @@ -246,7 +256,7 @@ mod test { .map(|(i, _)| i.clone()) .collect::>(); let mut remaining = insertable.into_iter().skip(3).collect::>(); - CeramicOneEvent::insert_many(&pool, &to_insert[..]) + CeramicOneEvent::insert_many(&pool, to_insert.iter()) .await .unwrap(); @@ -287,7 +297,7 @@ mod test { }) .collect::>(); let mut remaining = insertable.into_iter().skip(3).collect::>(); - CeramicOneEvent::insert_many(&pool, &to_insert[..]) + CeramicOneEvent::insert_many(&pool, to_insert.iter()) .await .unwrap(); diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index ffbede66f..ddc7c9dd4 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -634,14 +634,16 @@ mod test { init.body.set_deliverable(true); let undelivered = insertable.into_iter().skip(1).collect::>(); - let new = CeramicOneEvent::insert_many(pool, &undelivered[..]) + let new = CeramicOneEvent::insert_many(pool, undelivered.iter()) .await .unwrap(); assert_eq!(9, new.inserted.len()); assert_eq!(0, new.inserted.iter().filter(|e| e.deliverable).count()); - let new = CeramicOneEvent::insert_many(pool, &[init]).await.unwrap(); + let new = CeramicOneEvent::insert_many(pool, [&init].into_iter()) + .await + .unwrap(); assert_eq!(1, new.inserted.len()); assert_eq!(1, new.inserted.iter().filter(|e| e.deliverable).count()); } diff --git a/service/src/event/service.rs b/service/src/event/service.rs index bac3677e3..beb2f821e 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use async_trait::async_trait; use ceramic_core::{EventId, Network}; use ceramic_event::unvalidated; @@ -134,7 +136,7 @@ impl CeramicEventService { } let metadata = EventMetadata::from(parsed_event); - let body = EventInsertableBody::try_from_carfile(cid, carfile).await?; + let body = EventInsertableBody::try_from_carfile(cid, carfile).await?; Ok((EventInsertable::try_new(event_id, body)?, metadata)) } @@ -195,68 +197,72 @@ impl CeramicEventService { let ordered = OrderEvents::try_new(&self.pool, to_insert).await?; let missing_history = ordered - .missing_history + .missing_history() .iter() .map(|(e, _)| e.order_key.clone()) .collect(); - let to_insert_with_metadata = if history_required { - ordered.deliverable + // api writes shouldn't have any missed history so we don't insert those events and + // we can skip notifying the ordering task because it's impossible to be waiting on them + let store_result = if history_required { + let to_insert = ordered.deliverable().iter().map(|(e, _)| e); + CeramicOneEvent::insert_many(&self.pool, to_insert).await? } else { - ordered - .deliverable - .into_iter() - .chain(ordered.missing_history) - .collect() - }; + let to_insert = ordered + .deliverable() + .iter() + .map(|(e, _)| e) + .chain(ordered.missing_history().iter().map(|(e, _)| e)); - let to_insert = to_insert_with_metadata - .iter() - .map(|(e, _)| e.clone()) - .collect::>(); + let store_result = CeramicOneEvent::insert_many(&self.pool, to_insert).await?; + self.notify_ordering_task(&ordered, &store_result).await?; - let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..]).await?; + store_result + }; - // api writes shouldn't have any missed pieces that need ordering so we don't send those and return early - if history_required { - return Ok(InsertResult { - store_result: res, - missing_history, - }); - } + Ok(InsertResult { + store_result, + missing_history, + }) + } - let to_send = res + async fn notify_ordering_task( + &self, + ordered: &OrderEvents, + store_result: &ceramic_store::InsertResult, + ) -> Result<()> { + let new = store_result .inserted .iter() - .filter(|i| i.new_key) - .collect::>(); + .filter_map(|i| if i.new_key { i.order_key.cid() } else { None }) + .collect::>(); - for ev in to_send { - if let Some((ev, metadata)) = to_insert_with_metadata - .iter() - .find(|(i, _)| i.order_key == ev.order_key) - { - let discovered = DiscoveredEvent { + for (ev, metadata) in ordered + .deliverable() + .iter() + .chain(ordered.missing_history().iter()) + { + if new.contains(&ev.cid()) { + self.send_discovered_event(DiscoveredEvent { cid: ev.cid(), known_deliverable: ev.deliverable(), metadata: metadata.to_owned(), - }; - trace!(?discovered, "sending delivered to ordering task"); - if let Err(_e) = self.delivery_task.tx_inserted.send(discovered).await { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } else { - tracing::error!(event_id=%ev.order_key, "Missing header for inserted event should be unreachable!"); - debug_assert!(false); // panic in debug mode - continue; + }) + .await?; } } - Ok(InsertResult { - store_result: res, - missing_history, - }) + Ok(()) + } + + async fn send_discovered_event(&self, discovered: DiscoveredEvent) -> Result<()> { + trace!(?discovered, "sending delivered to ordering task"); + if let Err(_e) = self.delivery_task.tx_inserted.send(discovered).await { + warn!("Delivery task closed. shutting down"); + Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))) + } else { + Ok(()) + } } } diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 6dafc2712..a650c5cc7 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -161,11 +161,11 @@ impl CeramicOneEvent { /// That is, events will be processed in the order they are given so earlier events are given a lower global ordering /// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events /// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers. - pub async fn insert_many( - pool: &SqlitePool, - to_add: &[EventInsertable], - ) -> Result { - let mut inserted = Vec::with_capacity(to_add.len()); + pub async fn insert_many<'a, I>(pool: &SqlitePool, to_add: I) -> Result + where + I: Iterator, + { + let mut inserted = Vec::new(); let mut tx = pool.begin_tx().await.map_err(Error::from)?; for item in to_add { diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index 646f0c9b7..ad905f7d7 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -41,7 +41,7 @@ async fn hash_range_query() { let first = random_event("baeabeiazgwnti363jifhxaeaegbluw4ogcd2t5hsjaglo46wuwcgajqa5u"); let second = random_event("baeabeihyl35xdlfju3zrkvy2exmnl6wics3rc5ppz7hwg7l7g4brbtnpny"); - let x = CeramicOneEvent::insert_many(&pool, &[first, second]) + let x = CeramicOneEvent::insert_many(&pool, [&first, &second].into_iter()) .await .unwrap(); @@ -62,7 +62,7 @@ async fn range_query() { let first = random_event("baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524"); let second = random_event("baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty"); let pool = SqlitePool::connect_in_memory().await.unwrap(); - let x = CeramicOneEvent::insert_many(&pool, &[first, second]) + let x = CeramicOneEvent::insert_many(&pool, [&first, &second].into_iter()) .await .unwrap();