Skip to content

Commit

Permalink
refactor: change insert_many to take an iterator and reduce allocatio…
Browse files Browse the repository at this point in the history
…ns in service crate

also added getters on the OrderEvents struct instead of pub(crate) fields
  • Loading branch information
dav1do committed Jun 26, 2024
1 parent eae284d commit 346b14c
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 59 deletions.
2 changes: 1 addition & 1 deletion service/src/event/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Migrator {
Ok(())
}
async fn write_batch(&mut self, sql_pool: &SqlitePool) -> Result<()> {
CeramicOneEvent::insert_many(sql_pool, &self.batch).await?;
CeramicOneEvent::insert_many(sql_pool, self.batch.iter()).await?;
self.event_count += self.batch.len();
self.batch.truncate(0);
Ok(())
Expand Down
18 changes: 14 additions & 4 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,18 @@ use crate::Result;
use super::service::EventMetadata;

pub(crate) struct OrderEvents {
pub(crate) deliverable: Vec<(EventInsertable, EventMetadata)>,
pub(crate) missing_history: Vec<(EventInsertable, EventMetadata)>,
deliverable: Vec<(EventInsertable, EventMetadata)>,
missing_history: Vec<(EventInsertable, EventMetadata)>,
}

impl OrderEvents {
pub fn deliverable(&self) -> &[(EventInsertable, EventMetadata)] {
&self.deliverable
}

pub fn missing_history(&self) -> &[(EventInsertable, EventMetadata)] {
&self.missing_history
}
}

impl OrderEvents {
Expand Down Expand Up @@ -246,7 +256,7 @@ mod test {
.map(|(i, _)| i.clone())
.collect::<Vec<_>>();
let mut remaining = insertable.into_iter().skip(3).collect::<Vec<_>>();
CeramicOneEvent::insert_many(&pool, &to_insert[..])
CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();

Expand Down Expand Up @@ -287,7 +297,7 @@ mod test {
})
.collect::<Vec<_>>();
let mut remaining = insertable.into_iter().skip(3).collect::<Vec<_>>();
CeramicOneEvent::insert_many(&pool, &to_insert[..])
CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();

Expand Down
6 changes: 4 additions & 2 deletions service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,14 +634,16 @@ mod test {
init.body.set_deliverable(true);
let undelivered = insertable.into_iter().skip(1).collect::<Vec<_>>();

let new = CeramicOneEvent::insert_many(pool, &undelivered[..])
let new = CeramicOneEvent::insert_many(pool, undelivered.iter())
.await
.unwrap();

assert_eq!(9, new.inserted.len());
assert_eq!(0, new.inserted.iter().filter(|e| e.deliverable).count());

let new = CeramicOneEvent::insert_many(pool, &[init]).await.unwrap();
let new = CeramicOneEvent::insert_many(pool, [&init].into_iter())
.await
.unwrap();
assert_eq!(1, new.inserted.len());
assert_eq!(1, new.inserted.iter().filter(|e| e.deliverable).count());
}
Expand Down
96 changes: 51 additions & 45 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use async_trait::async_trait;
use ceramic_core::{EventId, Network};
use ceramic_event::unvalidated;
Expand Down Expand Up @@ -134,7 +136,7 @@ impl CeramicEventService {
}

let metadata = EventMetadata::from(parsed_event);
let body = EventInsertableBody::try_from_carfile(cid, carfile).await?;
let body = EventInsertableBody::try_from_carfile(cid, carfile).await?;

Ok((EventInsertable::try_new(event_id, body)?, metadata))
}
Expand Down Expand Up @@ -195,68 +197,72 @@ impl CeramicEventService {
let ordered = OrderEvents::try_new(&self.pool, to_insert).await?;

let missing_history = ordered
.missing_history
.missing_history()
.iter()
.map(|(e, _)| e.order_key.clone())
.collect();

let to_insert_with_metadata = if history_required {
ordered.deliverable
// api writes shouldn't have any missed history so we don't insert those events and
// we can skip notifying the ordering task because it's impossible to be waiting on them
let store_result = if history_required {
let to_insert = ordered.deliverable().iter().map(|(e, _)| e);
CeramicOneEvent::insert_many(&self.pool, to_insert).await?
} else {
ordered
.deliverable
.into_iter()
.chain(ordered.missing_history)
.collect()
};
let to_insert = ordered
.deliverable()
.iter()
.map(|(e, _)| e)
.chain(ordered.missing_history().iter().map(|(e, _)| e));

let to_insert = to_insert_with_metadata
.iter()
.map(|(e, _)| e.clone())
.collect::<Vec<_>>();
let store_result = CeramicOneEvent::insert_many(&self.pool, to_insert).await?;
self.notify_ordering_task(&ordered, &store_result).await?;

let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..]).await?;
store_result
};

// api writes shouldn't have any missed pieces that need ordering so we don't send those and return early
if history_required {
return Ok(InsertResult {
store_result: res,
missing_history,
});
}
Ok(InsertResult {
store_result,
missing_history,
})
}

let to_send = res
async fn notify_ordering_task(
&self,
ordered: &OrderEvents,
store_result: &ceramic_store::InsertResult,
) -> Result<()> {
let new = store_result
.inserted
.iter()
.filter(|i| i.new_key)
.collect::<Vec<_>>();
.filter_map(|i| if i.new_key { i.order_key.cid() } else { None })
.collect::<HashSet<_>>();

for ev in to_send {
if let Some((ev, metadata)) = to_insert_with_metadata
.iter()
.find(|(i, _)| i.order_key == ev.order_key)
{
let discovered = DiscoveredEvent {
for (ev, metadata) in ordered
.deliverable()
.iter()
.chain(ordered.missing_history().iter())
{
if new.contains(&ev.cid()) {
self.send_discovered_event(DiscoveredEvent {
cid: ev.cid(),
known_deliverable: ev.deliverable(),
metadata: metadata.to_owned(),
};
trace!(?discovered, "sending delivered to ordering task");
if let Err(_e) = self.delivery_task.tx_inserted.send(discovered).await {
warn!("Delivery task closed. shutting down");
return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed")));
}
} else {
tracing::error!(event_id=%ev.order_key, "Missing header for inserted event should be unreachable!");
debug_assert!(false); // panic in debug mode
continue;
})
.await?;
}
}

Ok(InsertResult {
store_result: res,
missing_history,
})
Ok(())
}

async fn send_discovered_event(&self, discovered: DiscoveredEvent) -> Result<()> {
trace!(?discovered, "sending delivered to ordering task");
if let Err(_e) = self.delivery_task.tx_inserted.send(discovered).await {
warn!("Delivery task closed. shutting down");
Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed")))
} else {
Ok(())
}
}
}

Expand Down
10 changes: 5 additions & 5 deletions store/src/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ impl CeramicOneEvent {
/// That is, events will be processed in the order they are given so earlier events are given a lower global ordering
/// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events
/// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers.
pub async fn insert_many(
pool: &SqlitePool,
to_add: &[EventInsertable],
) -> Result<InsertResult> {
let mut inserted = Vec::with_capacity(to_add.len());
pub async fn insert_many<'a, I>(pool: &SqlitePool, to_add: I) -> Result<InsertResult>
where
I: Iterator<Item = &'a EventInsertable>,
{
let mut inserted = Vec::new();
let mut tx = pool.begin_tx().await.map_err(Error::from)?;

for item in to_add {
Expand Down
4 changes: 2 additions & 2 deletions store/src/sql/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn hash_range_query() {
let first = random_event("baeabeiazgwnti363jifhxaeaegbluw4ogcd2t5hsjaglo46wuwcgajqa5u");
let second = random_event("baeabeihyl35xdlfju3zrkvy2exmnl6wics3rc5ppz7hwg7l7g4brbtnpny");

let x = CeramicOneEvent::insert_many(&pool, &[first, second])
let x = CeramicOneEvent::insert_many(&pool, [&first, &second].into_iter())
.await
.unwrap();

Expand All @@ -62,7 +62,7 @@ async fn range_query() {
let first = random_event("baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524");
let second = random_event("baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty");
let pool = SqlitePool::connect_in_memory().await.unwrap();
let x = CeramicOneEvent::insert_many(&pool, &[first, second])
let x = CeramicOneEvent::insert_many(&pool, [&first, &second].into_iter())
.await
.unwrap();

Expand Down

0 comments on commit 346b14c

Please sign in to comment.