From 1cd9f15c166889ad88ebff0e58db2f2945af31ba Mon Sep 17 00:00:00 2001 From: David Estes Date: Tue, 18 Jun 2024 16:01:45 -0600 Subject: [PATCH] fix: correct/better IOD - is now correct and handles long streams out of order successfully in all cases (afaict). Should be more performant as it only does work when it should be necessary. - includes test of long streams --- service/src/event/mod.rs | 1 + service/src/event/order_events.rs | 100 +++ service/src/event/ordering_task.rs | 1008 +++++++++++++--------------- service/src/event/service.rs | 431 +++++------- service/src/tests/mod.rs | 87 +-- service/src/tests/ordering.rs | 151 ++++- store/Cargo.toml | 6 +- store/benches/sqlite_store.rs | 2 +- store/src/sql/access/event.rs | 39 +- store/src/sql/entities/event.rs | 64 +- store/src/sql/query.rs | 11 +- store/src/sql/test.rs | 3 +- 12 files changed, 1005 insertions(+), 898 deletions(-) create mode 100644 service/src/event/order_events.rs diff --git a/service/src/event/mod.rs b/service/src/event/mod.rs index f7dce573a..69e12b26c 100644 --- a/service/src/event/mod.rs +++ b/service/src/event/mod.rs @@ -1,3 +1,4 @@ +mod order_events; mod ordering_task; mod service; mod store; diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs new file mode 100644 index 000000000..88111ea0f --- /dev/null +++ b/service/src/event/order_events.rs @@ -0,0 +1,100 @@ +use std::collections::HashSet; + +use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; +use cid::Cid; + +use crate::Result; + +use super::service::EventHeader; + +pub(crate) struct OrderEvents { + pub(crate) deliverable: Vec<(EventInsertable, EventHeader)>, + pub(crate) missing_history: Vec<(EventInsertable, EventHeader)>, +} + +impl OrderEvents { + /// Groups the events into lists of those with a delivered prev and those without. This can be used to return an error if the event is required to have history. + /// The events will be marked as deliverable so that they can be passed directly to the store to be persisted. + /// Will look up the prev from the database if needed to check if it's deliverable (could possibly change this for recon and allow the ordering task to handle it?) + pub async fn try_new( + pool: &SqlitePool, + mut candidate_events: Vec<(EventInsertable, EventHeader)>, + ) -> Result { + let new_cids: HashSet = + HashSet::from_iter(candidate_events.iter().map(|(e, _)| e.cid())); + let mut deliverable = Vec::with_capacity(candidate_events.len()); + candidate_events.retain(|(e, h)| { + if e.deliverable() { + deliverable.push((e.clone(), h.clone())); + false + } else { + true + } + }); + if candidate_events.is_empty() { + return Ok(OrderEvents { + deliverable, + missing_history: Vec::new(), + }); + } + + let mut prevs_in_memory = Vec::with_capacity(candidate_events.len()); + let mut missing_history = Vec::with_capacity(candidate_events.len()); + + while let Some((mut event, header)) = candidate_events.pop() { + match header.prev() { + None => { + unreachable!("Init events should have been filtered out since they're always deliverable"); + } + Some(prev) => { + if new_cids.contains(&prev) { + prevs_in_memory.push((event, header)); + continue; + } else { + let (_exists, prev_deliverable) = + CeramicOneEvent::deliverable_by_cid(pool, &prev).await?; + if prev_deliverable { + event.body.set_deliverable(true); + deliverable.push((event, header)); + } else { + // technically, we may have the "rosetta stone" event in memory that could unlock this chain, if we loaded everything and recursed, + // but the immediate prev is not in this set and has not been delivered to the client yet, so they shouldn't have known how to + // construct this event so we'll consider this missing history. This can be used to return an error if the event is required to have history. + missing_history.push((event, header)); + } + } + } + } + } + + // We add the events to the deliverable list until nothing changes + while let Some((mut event, header)) = prevs_in_memory.pop() { + let mut made_changes = false; + match header.prev() { + None => { + unreachable!("Init events should have been filtered out of the in memory set"); + } + Some(prev) => { + // a hashset would be better loopkup but we're not going to have that many events so hashing + // for a handful of lookups and then convert back to a vec probably isn't worth it. + if deliverable.iter().any(|(e, _)| e.cid() == prev) { + event.body.set_deliverable(true); + deliverable.push((event, header)); + made_changes = true; + } else { + prevs_in_memory.push((event, header)); + } + } + } + if !made_changes { + missing_history.extend(prevs_in_memory); + break; + } + } + + Ok(OrderEvents { + deliverable, + missing_history, + }) + } +} diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index 9956e44f9..d3ce40f72 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -1,73 +1,33 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; -use anyhow::anyhow; +use ceramic_core::EventId; use ceramic_store::{CeramicOneEvent, SqlitePool}; use cid::Cid; use tracing::{debug, error, info, trace, warn}; use crate::{CeramicEventService, Error, Result}; +use super::service::{EventHeader, InsertableBodyWithHeader}; + /// How many events to select at once to see if they've become deliverable when we have downtime /// Used at startup and occassionally in case we ever dropped something /// We keep the number small for now as we may need to traverse many prevs for each one of these and load them into memory. -const DELIVERABLE_EVENTS_BATCH_SIZE: usize = 1000; +const DELIVERABLE_EVENTS_BATCH_SIZE: u32 = 1000; + /// How many batches of undelivered events are we willing to process on start up? /// To avoid an infinite loop. It's going to take a long time to process `DELIVERABLE_EVENTS_BATCH_SIZE * MAX_ITERATIONS` events const MAX_ITERATIONS: usize = 100_000_000; -/// How often should we try to process all undelivered events in case we missed something -const CHECK_ALL_INTERVAL_SECONDS: u64 = 60 * 10; // 10 minutes - -type InitCid = cid::Cid; -type PrevCid = cid::Cid; -type EventCid = cid::Cid; - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DeliveredEvent { - pub(crate) cid: Cid, - pub(crate) init_cid: InitCid, -} - -impl DeliveredEvent { - pub fn new(cid: Cid, init_cid: InitCid) -> Self { - Self { cid, init_cid } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub(crate) struct DeliverableMetadata { - pub(crate) init_cid: InitCid, - pub(crate) prev: PrevCid, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DeliverableEvent { - pub(crate) cid: EventCid, - pub(crate) meta: DeliverableMetadata, - attempts: usize, - last_attempt: std::time::Instant, - started: std::time::Instant, - expires: Option, -} - -impl DeliverableEvent { - pub fn new(cid: Cid, meta: DeliverableMetadata, expires: Option) -> Self { - Self { - cid, - meta, - attempts: 0, - last_attempt: std::time::Instant::now(), - started: std::time::Instant::now(), - expires, - } - } -} +type StreamCid = Cid; +type EventCid = Cid; +type PrevCid = Cid; #[derive(Debug)] pub struct DeliverableTask { pub(crate) _handle: tokio::task::JoinHandle<()>, - pub(crate) tx: tokio::sync::mpsc::Sender, - pub(crate) tx_new: tokio::sync::mpsc::Sender, + /// Currently events discovered over recon that are out of order and need to be marked ready (deliverable) + /// when their prev chain is discovered and complete (i.e. my prev is deliverable then I am deliverable). + pub(crate) tx_inserted: tokio::sync::mpsc::Sender, } #[derive(Debug)] @@ -75,30 +35,33 @@ pub struct OrderingTask {} impl OrderingTask { pub async fn run(pool: SqlitePool, q_depth: usize, load_delivered: bool) -> DeliverableTask { - let (tx, rx) = tokio::sync::mpsc::channel::(q_depth); - let (tx_new, rx_new) = tokio::sync::mpsc::channel::(q_depth); + let (tx_inserted, rx_inserted) = + tokio::sync::mpsc::channel::(q_depth); let handle = - tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx, rx_new).await }); + tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx_inserted).await }); DeliverableTask { _handle: handle, - tx, - tx_new, + tx_inserted, } } async fn run_loop( pool: SqlitePool, load_undelivered: bool, - mut rx: tokio::sync::mpsc::Receiver, - mut rx_new: tokio::sync::mpsc::Receiver, + mut rx_inserted: tokio::sync::mpsc::Receiver, ) { // before starting, make sure we've updated any events in the database we missed + // this could take a long time. possibly we want to put it in another task so we can start processing events immediately let mut state = OrderingState::new(); if load_undelivered && state - .process_all_undelivered_events(&pool, MAX_ITERATIONS) + .process_all_undelivered_events( + &pool, + MAX_ITERATIONS, + DELIVERABLE_EVENTS_BATCH_SIZE, + ) .await .map_err(Self::log_error) .is_err() @@ -106,53 +69,37 @@ impl OrderingTask { return; } - let mut last_processed = std::time::Instant::now(); loop { - let mut modified: Option> = None; - let mut need_prev_buf = Vec::with_capacity(100); - let mut newly_added_buf = Vec::with_capacity(100); - - tokio::select! { - incoming = rx.recv_many(&mut need_prev_buf, 100) => { - if incoming > 0 { - modified = Some(state.add_incoming_batch(need_prev_buf)); - } - } - new = rx_new.recv_many(&mut newly_added_buf, 100) => { - if new > 0 { - modified = Some(newly_added_buf.into_iter().map(|ev| ev.init_cid).collect::>()); - } - } - else => { - info!(stream_count=%state.pending_by_stream.len(), "Server dropped the ordering task. Processing once more before exiting..."); - let _ = state - .process_events(&pool, None) - .await - .map_err(Self::log_error); + let mut recon_events = Vec::with_capacity(100); + // consider trying to recv in a loop until X or 10ms whatever comes first and then process + // the more events we get in memory, the fewer queries we need to run. + if rx_inserted.recv_many(&mut recon_events, 100).await > 0 { + trace!(?recon_events, "new events discovered!"); + state.add_inserted_events(recon_events); + + if state + .process_streams(&pool) + .await + .map_err(Self::log_error) + .is_err() + { return; } - }; - // Given the math on OrderingState and the generally low number of updates to streams, we are going - // to ignore pruning until there's more of an indication that it's necessary. Just log some stats. - if last_processed.elapsed().as_secs() > CHECK_ALL_INTERVAL_SECONDS { - let stream_count = state.pending_by_stream.len(); - if stream_count > 1000 { - info!(%stream_count, "Over 1000 pending streams without recent updates."); - } else { - debug!(%stream_count, "Fewer than 1000 streams pending without recent updates."); - } - } + } else if rx_inserted.is_closed() { + debug!( + "Server dropped the delivered events channel. Attempting to processing streams in memory once more before exiting." + ); - if modified.is_some() - && state - .process_events(&pool, modified) + if state + .process_streams(&pool) .await .map_err(Self::log_error) .is_err() - { - return; + { + return; + } + break; } - last_processed = std::time::Instant::now(); } } @@ -175,188 +122,310 @@ impl OrderingTask { } } -#[derive(Debug)] -/// Rough size estimate: -/// pending_by_stream: 96 * stream_cnt + 540 * event_cnt -/// ready_events: 96 * ready_event_cnt -/// so for stream_cnt = 1000, event_cnt = 2, ready_event_cnt = 1000 -/// we get about 1 MB of memory used. -pub struct OrderingState { - /// Map of undelivered events by init CID (i.e. the stream CID). - pending_by_stream: HashMap, - /// Queue of events that can be marked ready to deliver. - /// Can be added as long as their prev is stored or in this list ahead of them. - ready_events: VecDeque, +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum StreamEvent { + InitEvent(EventCid), + /// An event that is known to be deliverable from the database + KnownDeliverable(StreamEventMetadata), + /// An event that needs more history to be deliverable + Undelivered(StreamEventMetadata), +} + +impl StreamEvent { + /// Builds a stream event from the database if it exists. + async fn load_by_cid(pool: &SqlitePool, cid: EventCid) -> Result> { + // TODO: one query + let (exists, deliverable) = CeramicOneEvent::deliverable_by_cid(pool, &cid).await?; + if exists { + let parsed_body = + if let Some(body) = CeramicEventService::load_by_cid(pool, cid).await? { + body + } else { + warn!(%cid, "No event body found for event that should exist"); + return Ok(None); + }; + + let known_prev = match &parsed_body.header { + EventHeader::Init { cid, .. } => { + if !deliverable { + warn!(%cid,"Found init event in database that wasn't previously marked as deliverable. Updating now..."); + let mut tx = pool.begin_tx().await?; + CeramicOneEvent::mark_ready_to_deliver(&mut tx, cid).await?; + tx.commit().await?; + } + StreamEvent::InitEvent(*cid) + } + EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => { + if deliverable { + trace!(%cid, "Found deliverable event in database"); + StreamEvent::KnownDeliverable(StreamEventMetadata::new(cid, *prev)) + } else { + trace!(%cid, "Found undelivered event in database"); + StreamEvent::Undelivered(StreamEventMetadata::new(cid, *prev)) + } + } + }; + Ok(Some(known_prev)) + } else { + trace!(%cid, "Missing event in database"); + Ok(None) + } + } +} + +impl From for StreamEvent { + fn from(ev: InsertableBodyWithHeader) -> Self { + match ev.header { + EventHeader::Init { cid, .. } => StreamEvent::InitEvent(cid), + EventHeader::Data { cid, prev, .. } | EventHeader::Time { cid, prev, .. } => { + let meta = StreamEventMetadata::new(cid, prev); + if ev.body.deliverable() { + StreamEvent::KnownDeliverable(meta) + } else { + StreamEvent::Undelivered(meta) + } + } + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct StreamEventMetadata { + cid: EventCid, + prev: PrevCid, +} + +impl StreamEventMetadata { + fn new(cid: EventCid, prev: PrevCid) -> Self { + Self { cid, prev } + } } #[derive(Debug, Clone, Default)] /// ~540 bytes per event in this struct pub(crate) struct StreamEvents { + /// Map of `event.prev` to `event.cid` to find the previous event easily. prev_map: HashMap, - cid_map: HashMap, + /// Map of `event.cid` to `metadata` for quick lookup of the event metadata. + cid_map: HashMap, + /// whether we should process this stream because new events have been added + skip_processing: bool, + /// The newly discovered events that are deliverable and should be processed. + new_deliverable: VecDeque, } -impl FromIterator for StreamEvents { - fn from_iter>(iter: T) -> Self { - let mut stream = Self::new(); - for item in iter { - stream.add_event(item); - } - stream +impl StreamEvents { + fn new(event: StreamEvent) -> Self { + let mut new = Self::default(); + new.add_event(event); + new } -} -impl StreamEvents { - pub fn new() -> Self { - Self::default() + // we'll be processed if something in memory depends on this event + fn update_should_process_for_new_delivered(&mut self, new_cid: &EventCid) { + // don't reset a true flag to false + if self.skip_processing { + self.skip_processing = !self.prev_map.contains_key(new_cid); + } } - /// returns Some(Stream Init CID) if this is a new event, else None. - pub fn add_event(&mut self, event: DeliverableEvent) -> Option { - let res = if self.prev_map.insert(event.meta.prev, event.cid).is_none() { - Some(event.meta.init_cid) - } else { - None + /// returns true if this is a new event. + fn add_event(&mut self, event: StreamEvent) -> bool { + let cid = match &event { + StreamEvent::InitEvent(cid) => { + self.update_should_process_for_new_delivered(cid); + *cid + } + StreamEvent::KnownDeliverable(meta) => { + self.prev_map.insert(meta.prev, meta.cid); + self.update_should_process_for_new_delivered(&meta.cid); + meta.cid + } + StreamEvent::Undelivered(meta) => { + self.prev_map.insert(meta.prev, meta.cid); + if self.skip_processing { + // we depend on something in memory + self.skip_processing = !self.prev_map.contains_key(&meta.prev) + } + meta.cid + } }; - self.cid_map.insert(event.cid, event); - res - } - pub fn is_empty(&self) -> bool { - // these should always match - self.prev_map.is_empty() && self.cid_map.is_empty() + self.cid_map.insert(cid, event).is_none() } - fn remove_by_event_cid(&mut self, cid: &Cid) -> Option { - if let Some(cid) = self.cid_map.remove(cid) { - self.prev_map.remove(&cid.meta.prev); + fn remove_by_prev_cid(&mut self, prev: &Cid) -> Option { + if let Some(cid) = self.prev_map.remove(prev) { + self.cid_map.remove(&cid); Some(cid) } else { None } } - fn remove_by_prev_cid(&mut self, cid: &Cid) -> Option { - if let Some(cid) = self.prev_map.remove(cid) { - self.cid_map.remove(&cid); - Some(cid) - } else { - None + /// Called when we've persisted the deliverable events to the database and can clean up our state. + /// Returns true if we should be retained for future processing (i.e we have more we need to discover) + /// and false if we can be dropped from memory. + fn completed_processing(&mut self) -> bool { + self.skip_processing = true; + + for cid in self.new_deliverable.iter() { + if let Some(ev) = self.cid_map.get_mut(cid) { + match ev { + StreamEvent::InitEvent(_) => {} + StreamEvent::KnownDeliverable(_) => { + warn!( + ?ev, + "Found event in deliverable queue that was already marked as deliverable." + ) + } + StreamEvent::Undelivered(meta) => { + // we're delivered now + *ev = StreamEvent::KnownDeliverable(meta.clone()); + } + } + } } + self.new_deliverable.clear(); + self.cid_map + .iter() + .any(|(_, ev)| matches!(ev, StreamEvent::Undelivered(_))) + } + + async fn order_events(&mut self, pool: &SqlitePool) -> Result<()> { + // We collect everything we can into memory and then order things. + // If our prev is the init event or already been delivered, we can mark ourselves as deliverable. + // If our prev wasn't deliverable yet, we track it and repeat (i.e. follow its prev if we don't have it) + + let mut deliverable_queue = VecDeque::new(); + let mut undelivered = + VecDeque::from_iter(self.cid_map.iter().filter_map(|(cid, ev)| match ev { + StreamEvent::Undelivered(meta) => { + assert_eq!(meta.cid, *cid); + Some((meta.cid, meta.prev)) + } + _ => None, + })); + + while let Some((cid, prev)) = undelivered.pop_front() { + if let Some(prev_event) = self.cid_map.get(&prev) { + match prev_event { + StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => { + trace!( + %prev, + %cid, + "Found event whose prev is already in memory and IS deliverable!" + ); + deliverable_queue.push_back(cid) + } + StreamEvent::Undelivered(_) => { + trace!( + %prev, + %cid, + "Found event whose prev is already in memory but NOT deliverable." + ); + // nothing to do until it arrives on the channel + } + } + + continue; + } + + let prev_event = StreamEvent::load_by_cid(pool, prev).await?; + if let Some(known_prev) = prev_event { + match &known_prev { + StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => { + deliverable_queue.push_back(cid); + } + StreamEvent::Undelivered(undelivered_ev) => { + // we'll try to follow this back to something deliverable + undelivered.push_back((undelivered_ev.cid, undelivered_ev.prev)); + } + } + self.add_event(known_prev); + } else { + trace!("Found event that depends on another event we haven't discovered yet"); + } + } + + let mut newly_ready = deliverable_queue.clone(); + while let Some(cid) = newly_ready.pop_front() { + if let Some(now_ready_ev) = self.remove_by_prev_cid(&cid) { + if let Some(ev) = self.cid_map.get(&now_ready_ev) { + match ev { + StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => { + warn!(?ev, "should not have found a deliverable event when we expected only undelivered events!"); + } + StreamEvent::Undelivered(_) => { + newly_ready.push_back(now_ready_ev); + } + } + } + deliverable_queue.push_back(now_ready_ev); + newly_ready.push_back(now_ready_ev); + } + } + self.new_deliverable = deliverable_queue; + debug!(count=%self.new_deliverable.len(), "deliverable events discovered"); + Ok(()) } } +#[derive(Debug)] +pub struct OrderingState { + pending_by_stream: HashMap, + deliverable: VecDeque, +} + impl OrderingState { - pub fn new() -> Self { + fn new() -> Self { Self { pending_by_stream: HashMap::new(), - ready_events: VecDeque::new(), + deliverable: VecDeque::new(), } } - /// This will review all the events for any streams known to have undelivered events and see if any of them are now deliverable. - /// If `streams_to_process` is None, all streams will be processed, otherwise only the streams in the set will be processed. - /// Processing all streams could take a long time and not necessarily do anything productive (if we're missing a key event, we're still blocked). - /// However, passing a value for `streams_to_process` when we know something has changed is likely to have positive results and be much faster. - pub(crate) async fn process_events( - &mut self, - pool: &SqlitePool, - streams_to_process: Option>, - ) -> Result<()> { - self.persist_ready_events(pool).await?; - for (cid, stream_events) in self.pending_by_stream.iter_mut() { - if streams_to_process - .as_ref() - .map_or(false, |to_do| !to_do.contains(cid)) - { - continue; - } - let deliverable = Self::discover_deliverable_events(pool, stream_events).await?; - if !deliverable.is_empty() { - self.ready_events.extend(deliverable) - } - } - if !self.ready_events.is_empty() { - self.persist_ready_events(pool).await?; + /// Add a stream to the list of streams to process. + /// We ignore delivered events for streams we're not tracking as we can look them up later if we need them. + /// We will get lots of init events we can ignore unless we need them, otherwise they'll be stuck in memory for a long time. + fn add_inserted_events(&mut self, events: Vec) { + for ev in events { + let stream_cid = ev.header.stream_cid(); + let event = ev.into(); + self.add_stream_event(stream_cid, event); } - - Ok(()) } - /// Removes deliverable events from the `prev_map` and returns them. This means prev is already delivered or in the - /// list to be marked as delivered. The input is expected to be a list of CIDs for a given stream that are waiting - /// to be processed. It will still work if it's intermixed for multiple streams, but it's not the most efficient way to use it. - /// The returned CIDs in the VeqDeque are for events that are expected to be updated FIFO i.e. vec.pop_front() - /// - /// This breaks with multi-prev as we expect a single prev for each event. The input map is expected to contain the - /// (prev <- event) relationship (that is, the value is the event that depends on the key). - pub(crate) async fn discover_deliverable_events( - pool: &SqlitePool, - stream_map: &mut StreamEvents, - ) -> Result> { - if stream_map.is_empty() { - return Ok(VecDeque::new()); + fn add_stream_event(&mut self, stream_cid: StreamCid, event: StreamEvent) { + if let Some(stream) = self.pending_by_stream.get_mut(&stream_cid) { + stream.add_event(event); + } else if matches!(event, StreamEvent::Undelivered(_)) { + let stream = StreamEvents::new(event); + self.pending_by_stream.insert(stream_cid, stream); } + } - let mut deliverable = VecDeque::new(); - let prev_map_cln = stream_map.prev_map.clone(); - for (prev, ev_cid) in prev_map_cln { - if stream_map.cid_map.contains_key(&prev) { - trace!( - ?prev, - cid=?ev_cid, - "Found event that depends on another event in memory" - ); - // we have it in memory so we need to order it related to others to insert correctly - // although it may not be possible if the chain just goes back to some unknown event - // once we find the first event that's deliverable, we can go back through and find the rest + /// Process every stream we know about that has undelivered events that should be "unlocked" now. This could be adjusted to commit things in batches, + /// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried. + async fn process_streams(&mut self, pool: &SqlitePool) -> Result<()> { + for (_stream_cid, stream_events) in self.pending_by_stream.iter_mut() { + if stream_events.skip_processing { continue; - } else { - let (exists, delivered) = CeramicOneEvent::deliverable_by_cid(pool, &prev).await?; - if delivered { - trace!(deliverable=?ev_cid, "Found delivered prev in database. Adding to ready list"); - deliverable.push_back(ev_cid); - stream_map.remove_by_event_cid(&ev_cid); - } else if exists { - trace!("Found undelivered prev in database. Building data to check for deliverable."); - // if it's not in memory, we need to read it from the db and parse it for the prev value to add it to our set - let data = CeramicOneEvent::value_by_cid(pool, &prev) - .await? - .ok_or_else(|| { - Error::new_app(anyhow!( - "Missing data for event that exists should be impossible" - )) - })?; - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(prev, &data).await?; - - if let Some(prev) = maybe_prev { - let event = DeliverableEvent::new(insertable_body.cid, prev, None); - trace!(cid=%event.cid, "Adding event discovered in database to stream pending list"); - stream_map.add_event(event); - } else { - warn!(event_cid=%insertable_body.cid,"Found undelivered event with no prev while processing pending. Should not happen."); - deliverable.push_back(insertable_body.cid); - stream_map.remove_by_event_cid(&ev_cid); - } - } else { - trace!( - ?ev_cid, - "Found event that depends on unknown event. Will check later." - ); - } } + stream_events.order_events(pool).await?; + self.deliverable + .extend(stream_events.new_deliverable.iter()); } - let mut newly_ready = deliverable.clone(); - while let Some(cid) = newly_ready.pop_front() { - if let Some(now_ready_ev) = stream_map.remove_by_prev_cid(&cid) { - deliverable.push_back(now_ready_ev); - newly_ready.push_back(now_ready_ev); - } - } - debug!(?deliverable, "deliverable events discovered"); - Ok(deliverable) + self.persist_ready_events(pool).await?; + // keep things that still have missing history but don't process them again until we get something new + self.pending_by_stream + .retain(|_, stream_events| stream_events.completed_processing()); + + debug!(remaining_streams=%self.pending_by_stream.len(), "Finished processing streams"); + trace!(stream_state=?self, "Finished processing streams"); + + Ok(()) } /// Process all undelivered events in the database. This is a blocking operation that could take a long time. @@ -365,112 +434,96 @@ impl OrderingState { &mut self, pool: &SqlitePool, max_iterations: usize, - ) -> Result<()> { - let mut cnt = 0; - let mut offset: usize = 0; - while cnt < max_iterations { - cnt += 1; - let (new, found) = self - .add_undelivered_batch(pool, offset, DELIVERABLE_EVENTS_BATCH_SIZE) - .await?; - if new == 0 { + batch_size: u32, + ) -> Result { + let mut iter_cnt = 0; + let mut event_cnt = 0; + let mut highwater = 0; + while iter_cnt < max_iterations { + iter_cnt += 1; + let (undelivered, new_hw) = + CeramicOneEvent::undelivered_with_values(pool, batch_size.into(), highwater) + .await?; + highwater = new_hw; + if undelivered.is_empty() { break; } else { // We can start processing and we'll follow the stream history if we have it. In that case, we either arrive // at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory. // In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them // or otherwise mark them ignored somehow. - self.process_events(pool, None).await?; - if new < DELIVERABLE_EVENTS_BATCH_SIZE { + let found_all = undelivered.len() < batch_size as usize; + event_cnt += self + .process_undelivered_events_batch(pool, undelivered) + .await?; + if found_all { break; } - offset = offset.saturating_add(found); - } - if cnt >= max_iterations { - warn!(batch_size=DELIVERABLE_EVENTS_BATCH_SIZE, iterations=%max_iterations, "Exceeded max iterations for finding undelivered events!"); - break; } } - if self.ready_events.is_empty() { - Ok(()) - } else { - self.persist_ready_events(pool).await?; - Ok(()) + if iter_cnt > max_iterations { + info!(%batch_size, iterations=%iter_cnt, "Exceeded max iterations for finding undelivered events!"); } + + Ok(event_cnt) } - /// Add a batch of events from the database to the pending list to be processed. - /// Returns the (#events new events found , #events returned by query) - async fn add_undelivered_batch( + async fn process_undelivered_events_batch( &mut self, pool: &SqlitePool, - offset: usize, - limit: usize, - ) -> Result<(usize, usize)> { - let undelivered = CeramicOneEvent::undelivered_with_values(pool, offset, limit).await?; - trace!(count=%undelivered.len(), "Found undelivered events to process"); - if undelivered.is_empty() { - return Ok((0, 0)); - } - let found = undelivered.len(); - let mut new = 0; - for (key, data) in undelivered { - let event_cid = key.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", key)) + event_data: Vec<(EventId, Vec)>, + ) -> Result { + trace!(cnt=%event_data.len(), "Processing undelivered events batch"); + let mut to_store_asap = Vec::new(); + let mut event_cnt = 0; + for (event_id, carfile) in event_data { + let event_cid = event_id.cid().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", event_id)) })?; - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(event_cid, &data).await?; - if let Some(prev) = maybe_prev { - let event = DeliverableEvent::new(insertable_body.cid, prev, None); - if self.track_pending(event).is_some() { - new += 1; + + let loaded = CeramicEventService::parse_event_carfile_cid(event_cid, &carfile).await?; + + let event = match &loaded.header { + EventHeader::Init { cid, .. } => { + warn!(%cid,"Found init event in database that wasn't previously marked as deliverable. Updating now..."); + to_store_asap.push(*cid); + StreamEvent::InitEvent(*cid) } - } else { - // safe to ignore in tests, shows up because when we mark init events as undelivered even though they don't have a prev - info!(event_cid=%insertable_body.cid, "Found undelivered event with no prev while processing undelivered. Should not happen. Likely means events were dropped before."); - self.ready_events.push_back(insertable_body.cid); - new += 1; // we treat this as new since it might unlock something else but it's not actually going in our queue is it's a bit odd - } + EventHeader::Data { cid, prev, .. } | EventHeader::Time { cid, prev, .. } => { + StreamEvent::Undelivered(StreamEventMetadata::new(*cid, *prev)) + } + }; + + event_cnt += 1; + self.add_stream_event(loaded.header.stream_cid(), event); } - trace!(%new, %found, "Adding undelivered events to pending set"); - Ok((new, found)) - } - fn add_incoming_batch(&mut self, events: Vec) -> HashSet { - let mut updated_streams = HashSet::with_capacity(events.len()); - for event in events { - if let Some(updated_stream) = self.track_pending(event) { - updated_streams.insert(updated_stream); + if !to_store_asap.is_empty() { + info!("storing init events that were somehow missed previously"); + let mut tx = pool.begin_tx().await?; + for cid in to_store_asap { + CeramicOneEvent::mark_ready_to_deliver(&mut tx, &cid).await?; } + tx.commit().await?; } - updated_streams - } + self.process_streams(pool).await?; - /// returns the init event CID (stream CID) if this is a new event - fn track_pending(&mut self, event: DeliverableEvent) -> Option { - self.pending_by_stream - .entry(event.meta.init_cid) - .or_default() - .add_event(event) + Ok(event_cnt) } - /// Modify all the events that are ready to be marked as delivered. - /// We should improve the error handling and likely add some batching if the number of ready events is very high. /// We copy the events up front to avoid losing any events if the task is cancelled. async fn persist_ready_events(&mut self, pool: &SqlitePool) -> Result<()> { - if !self.ready_events.is_empty() { - let mut to_process = self.ready_events.clone(); // to avoid cancel loss - tracing::debug!(count=%self.ready_events.len(), "Marking events as ready to deliver"); + if !self.deliverable.is_empty() { + tracing::debug!(count=%self.deliverable.len(), "Marking events as ready to deliver"); let mut tx = pool.begin_tx().await?; - - // We process the ready events as a FIFO queue so they are marked delivered before events - // that were added after and depend on them. - while let Some(cid) = to_process.pop_front() { - CeramicOneEvent::mark_ready_to_deliver(&mut tx, &cid).await?; + // We process the ready events as a FIFO queue so they are marked delivered before events that were added after and depend on them. + // Could use `pop_front` but we want to make sure we commit and then clear everything at once. + for cid in &self.deliverable { + CeramicOneEvent::mark_ready_to_deliver(&mut tx, cid).await?; } tx.commit().await?; - self.ready_events.clear(); // safe to clear since we are past any await points and hold exclusive access + self.deliverable.clear(); } Ok(()) } @@ -479,289 +532,166 @@ impl OrderingState { #[cfg(test)] mod test { use ceramic_store::EventInsertable; - use multihash_codetable::{Code, MultihashDigest}; - use recon::ReconItem; use test_log::test; - use crate::tests::{build_event, check_deliverable, random_block, TestEventInfo}; + use crate::tests::get_n_events; use super::*; - /// these events are init events so they should have been delivered - /// need to build with data events that have the prev stored already - async fn build_insertable_undelivered() -> EventInsertable { - let TestEventInfo { - event_id: id, car, .. - } = build_event().await; - let cid = id.cid().unwrap(); + async fn get_n_insertable_events(n: usize) -> Vec { + let mut res = Vec::with_capacity(n); + let events = get_n_events(n).await; + for event in events { + let (event, _) = + CeramicEventService::parse_event_carfile_order_key(event.0.to_owned(), &event.1) + .await + .unwrap(); + res.push(event); + } + res + } - let (body, _meta) = CeramicEventService::parse_event_carfile(cid, &car) + #[test(tokio::test)] + async fn test_undelivered_batch_empty() { + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let processed = OrderingState::new() + .process_all_undelivered_events(&pool, 10, 100) .await .unwrap(); - assert!(!body.deliverable); - EventInsertable::try_new(id, body).unwrap() + assert_eq!(0, processed); } - fn assert_stream_map_elems(map: &StreamEvents, size: usize) { - assert_eq!(size, map.cid_map.len(), "{:?}", map); - assert_eq!(size, map.prev_map.len(), "{:?}", map); - } + async fn insert_10_with_9_undelivered(pool: &SqlitePool) { + let insertable = get_n_insertable_events(10).await; + let init = insertable.first().unwrap().to_owned(); + let undelivered = insertable.into_iter().skip(1).collect::>(); - fn build_linked_events( - number: usize, - stream_cid: Cid, - first_prev: Cid, - ) -> Vec { - let mut events = Vec::with_capacity(number); - - let first_cid = random_block().cid; - events.push(DeliverableEvent::new( - first_cid, - DeliverableMetadata { - init_cid: stream_cid, - prev: first_prev, - }, - None, - )); - - for i in 1..number { - let random = random_block(); - let ev = DeliverableEvent::new( - random.cid, - DeliverableMetadata { - init_cid: stream_cid, - prev: events[i - 1].cid, - }, - None, - ); - events.push(ev); - } + let new = CeramicOneEvent::insert_many(pool, &undelivered[..]) + .await + .unwrap(); - events + assert_eq!(9, new.inserted.len()); + assert_eq!(0, new.inserted.iter().filter(|e| e.deliverable).count()); + + let new = CeramicOneEvent::insert_many(pool, &[init]).await.unwrap(); + assert_eq!(1, new.inserted.len()); + assert_eq!(1, new.inserted.iter().filter(|e| e.deliverable).count()); } #[test(tokio::test)] - async fn test_none_deliverable_without_first() { - // they events all point to the one before but A has never been delivered so we can't do anything - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - let missing = Cid::new_v1(0x71, Code::Sha2_256.digest(b"missing")); - let events = build_linked_events(4, stream_cid, missing); - let mut prev_map = StreamEvents::from_iter(events); - + async fn test_undelivered_batch_offset() { let pool = SqlitePool::connect_in_memory().await.unwrap(); - let deliverable = super::OrderingState::discover_deliverable_events(&pool, &mut prev_map) + insert_10_with_9_undelivered(&pool).await; + let (_, events) = CeramicOneEvent::new_events_since_value(&pool, 0, 100) .await .unwrap(); + assert_eq!(1, events.len()); - assert_eq!(0, deliverable.len()); - } - - #[test(tokio::test)] - async fn test_all_deliverable_one_stream() { - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) + let processed = OrderingState::new() + .process_all_undelivered_events(&pool, 1, 5) .await .unwrap(); - recon::Store::insert(&store, &ReconItem::new(&one_id, &one_car)) + assert_eq!(5, processed); + let (_, events) = CeramicOneEvent::new_events_since_value(&pool, 0, 100) .await .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - - let events = build_linked_events(4, stream_cid, one_cid); - let expected = VecDeque::from_iter(events.iter().map(|ev| ev.cid)); - let mut prev_map = StreamEvents::from_iter(events); - - assert_stream_map_elems(&prev_map, 4); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) - .await - .unwrap(); - - assert_eq!(4, deliverable.len()); - assert_eq!(expected, deliverable); - assert_stream_map_elems(&prev_map, 0); - } - - #[test(tokio::test)] - async fn test_some_deliverable_one_stream() { - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) + assert_eq!(6, events.len()); + // the last 5 are processed and we have 10 delivered + let processed = OrderingState::new() + .process_all_undelivered_events(&pool, 1, 5) .await .unwrap(); - recon::Store::insert(&store, &ReconItem::new(&one_id, &one_car)) + assert_eq!(4, processed); + let (_, events) = CeramicOneEvent::new_events_since_value(&pool, 0, 100) .await .unwrap(); + assert_eq!(10, events.len()); - check_deliverable(&store.pool, &one_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - let missing = Cid::new_v1(0x71, Code::Sha2_256.digest(b"missing")); - - let mut deliverable_events = build_linked_events(6, stream_cid, one_cid); - let stuck_events = build_linked_events(8, stream_cid, missing); - let expected = VecDeque::from_iter(deliverable_events.iter().map(|ev| ev.cid)); - deliverable_events.extend(stuck_events); - let mut prev_map = StreamEvents::from_iter(deliverable_events); - - assert_stream_map_elems(&prev_map, 14); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) - .await - .unwrap(); - - assert_eq!(6, deliverable.len()); - assert_eq!(expected, deliverable); - assert_stream_map_elems(&prev_map, 8); - } - - #[test(tokio::test)] - // expected to be per stream but all events are combined for the history required version currently so - // this needs to work as well - async fn test_all_deliverable_multiple_streams() { - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let TestEventInfo { - event_id: two_id, - car: two_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let two_cid = two_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) + // nothing left + let processed = OrderingState::new() + .process_all_undelivered_events(&pool, 1, 100) .await .unwrap(); - recon::Store::insert_many( - &store, - &[ - ReconItem::new(&one_id, &one_car), - ReconItem::new(&two_id, &two_car), - ], - ) - .await - .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - check_deliverable(&store.pool, &two_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary-one")); - let stream_cid_2 = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary-two")); - - let mut events_a = build_linked_events(4, stream_cid, one_cid); - let mut events_b = build_linked_events(10, stream_cid_2, two_cid); - let expected_a = VecDeque::from_iter(events_a.iter().map(|ev| ev.cid)); - let expected_b = VecDeque::from_iter(events_b.iter().map(|ev| ev.cid)); - // we expect the events to be in the prev chain order, but they can be intervleaved across streams - // we reverse the items in the input to proov this (it's a hashmap internally so there is no order, but still) - events_a.reverse(); - events_b.reverse(); - events_a.extend(events_b); - assert_eq!(14, events_a.len()); - let mut prev_map = StreamEvents::from_iter(events_a); - - assert_stream_map_elems(&prev_map, 14); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) - .await - .unwrap(); - - assert_eq!(14, deliverable.len()); - assert_eq!(0, prev_map.cid_map.len(), "{:?}", prev_map); - assert_eq!(0, prev_map.prev_map.len(), "{:?}", prev_map); - - let mut split_a = VecDeque::new(); - let mut split_b = VecDeque::new(); - for cid in deliverable { - if expected_a.contains(&cid) { - split_a.push_back(cid); - } else if expected_b.contains(&cid) { - split_b.push_back(cid); - } else { - panic!("Unexpected CID in deliverable list: {:?}", cid); - } - } - assert_eq!(expected_a, split_a); - assert_eq!(expected_b, split_b); + assert_eq!(0, processed); } #[test(tokio::test)] - async fn test_undelivered_batch_empty() { + async fn test_undelivered_batch_iterations_ends_early() { let pool = SqlitePool::connect_in_memory().await.unwrap(); - let (new, found) = OrderingState::new() - .add_undelivered_batch(&pool, 0, 10) + // create 5 streams with 9 undelivered events each + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) + .await + .unwrap(); + assert_eq!(5, event.len()); + let mut state = OrderingState::new(); + state + .process_all_undelivered_events(&pool, 4, 10) .await .unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); + + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) + .await + .unwrap(); + assert_eq!(45, event.len()); } #[test(tokio::test)] - async fn test_undelivered_batch_offset() { + async fn test_undelivered_batch_iterations_ends_when_all_found() { let pool = SqlitePool::connect_in_memory().await.unwrap(); - let insertable = build_insertable_undelivered().await; + // create 5 streams with 9 undelivered events each + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; - let _new = CeramicOneEvent::insert_many(&pool, &[insertable]) + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) .await .unwrap(); - + assert_eq!(5, event.len()); let mut state = OrderingState::new(); - let (new, found) = state.add_undelivered_batch(&pool, 0, 10).await.unwrap(); - assert_eq!(1, found); - assert_eq!(1, new); - let (new, found) = state.add_undelivered_batch(&pool, 10, 10).await.unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); - state.persist_ready_events(&pool).await.unwrap(); - let (new, found) = state.add_undelivered_batch(&pool, 0, 10).await.unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); + state + .process_all_undelivered_events(&pool, 100_000_000, 5) + .await + .unwrap(); + + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) + .await + .unwrap(); + assert_eq!(50, event.len()); } #[test(tokio::test)] - async fn test_undelivered_batch_all() { + async fn test_process_all_undelivered_one_batch() { let pool = SqlitePool::connect_in_memory().await.unwrap(); - let mut undelivered = Vec::with_capacity(10); - for _ in 0..10 { - let insertable = build_insertable_undelivered().await; - undelivered.push(insertable); - } + // create 5 streams with 9 undelivered events each + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; - let (hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); - assert_eq!(0, hw); - assert!(event.is_empty()); - - let _new = CeramicOneEvent::insert_many(&pool, &undelivered[..]) + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) .await .unwrap(); - + assert_eq!(5, event.len()); let mut state = OrderingState::new(); state - .process_all_undelivered_events(&pool, 1) + .process_all_undelivered_events(&pool, 1, 100) .await .unwrap(); let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) .await .unwrap(); - assert_eq!(event.len(), 10); + assert_eq!(50, event.len()); } } diff --git a/service/src/event/service.rs b/service/src/event/service.rs index ddb864fb5..d6682fb0d 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -1,16 +1,12 @@ -use std::collections::{HashMap, HashSet}; - use ceramic_core::EventId; use ceramic_event::unvalidated; use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool}; -use cid::Cid; use ipld_core::ipld::Ipld; -use recon::ReconItem; use tracing::{trace, warn}; -use super::ordering_task::{ - DeliverableEvent, DeliverableMetadata, DeliverableTask, DeliveredEvent, OrderingState, - OrderingTask, StreamEvents, +use super::{ + order_events::OrderEvents, + ordering_task::{DeliverableTask, OrderingTask}, }; use crate::{Error, Result}; @@ -41,7 +37,7 @@ impl CeramicEventService { } /// Skip loading all undelivered events from the database on startup (for testing) - #[allow(dead_code)] // used in tests + #[cfg(test)] pub(crate) async fn new_without_undelivered(pool: SqlitePool) -> Result { CeramicOneEvent::init_delivered_order(&pool).await?; @@ -71,17 +67,33 @@ impl CeramicEventService { .await?; Ok(()) } - /// This function is used to parse the event from the carfile and return the insertable event and the previous cid pointer. /// Probably belongs in the event crate. - pub(crate) async fn parse_event_carfile( - event_cid: cid::Cid, + pub(crate) async fn parse_event_carfile_order_key( + event_id: EventId, carfile: &[u8], - ) -> Result<(EventInsertableBody, Option)> { - let insertable = EventInsertableBody::try_from_carfile(event_cid, carfile).await?; - let ev_block = insertable.block_for_cid(&insertable.cid)?; + ) -> Result<(EventInsertable, EventHeader)> { + let mut insertable = EventInsertable::try_from_carfile(event_id, carfile).await?; + + let header = Self::parse_event_body(&mut insertable.body).await?; + Ok((insertable, header)) + } - trace!(count=%insertable.blocks.len(), cid=%event_cid, "parsing event blocks"); + pub(crate) async fn parse_event_carfile_cid( + cid: ceramic_core::Cid, + carfile: &[u8], + ) -> Result { + let mut body = EventInsertableBody::try_from_carfile(cid, carfile).await?; + + let header = Self::parse_event_body(&mut body).await?; + Ok(InsertableBodyWithHeader { body, header }) + } + + pub(crate) async fn parse_event_body(body: &mut EventInsertableBody) -> Result { + let cid = body.cid(); // purely for convenience writing out the match + let ev_block = body.block_for_cid(&cid)?; + + trace!(count=%body.blocks().len(), %cid, "parsing event blocks"); let event_ipld: unvalidated::RawEvent = serde_ipld_dagcbor::from_slice(&ev_block.bytes).map_err(|e| { Error::new_invalid_arg( @@ -89,17 +101,26 @@ impl CeramicEventService { ) })?; - let maybe_init_prev = match event_ipld { - unvalidated::RawEvent::Time(t) => Some((t.id(), t.prev())), + let (deliverable, header) = match event_ipld { + unvalidated::RawEvent::Time(t) => ( + false, + EventHeader::Time { + cid, + stream_cid: t.id(), + prev: t.prev(), + }, + ), unvalidated::RawEvent::Signed(signed) => { let link = signed.link().ok_or_else(|| { Error::new_invalid_arg(anyhow::anyhow!("event should have a link")) })?; - let link = insertable.block_for_cid(&link).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("prev CID missing from carfile"), - ) - })?; + let link = body + .blocks() + .iter() + .find(|b| b.cid() == link) + .ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("prev CID missing from carfile")) + })?; let payload: unvalidated::Payload = serde_ipld_dagcbor::from_slice(&link.bytes).map_err(|e| { Error::new_invalid_arg( @@ -108,17 +129,21 @@ impl CeramicEventService { })?; match payload { - unvalidated::Payload::Data(d) => Some((*d.id(), *d.prev())), - unvalidated::Payload::Init(_init) => None, + unvalidated::Payload::Data(d) => ( + false, + EventHeader::Data { + cid, + stream_cid: *d.id(), + prev: *d.prev(), + }, + ), + unvalidated::Payload::Init(_init) => (true, EventHeader::Init { cid }), } } - unvalidated::RawEvent::Unsigned(_init) => None, + unvalidated::RawEvent::Unsigned(_init) => (true, EventHeader::Init { cid }), }; - let meta = maybe_init_prev.map(|(cid, prev)| DeliverableMetadata { - init_cid: cid, - prev, - }); - Ok((insertable, meta)) + body.set_deliverable(deliverable); + Ok(header) } #[tracing::instrument(skip(self, items), level = tracing::Level::DEBUG, fields(items = items.len()))] @@ -130,13 +155,7 @@ impl CeramicEventService { &self, items: &[recon::ReconItem<'a, EventId>], ) -> Result { - if items.is_empty() { - return Ok(InsertResult::default()); - } - - let ordering = - InsertEventOrdering::discover_deliverable_local_history(items, &self.pool).await?; - self.process_events(ordering).await + self.insert_events(items, true).await } #[tracing::instrument(skip(self, items), level = tracing::Level::DEBUG, fields(items = items.len()))] @@ -148,14 +167,9 @@ impl CeramicEventService { &self, items: &[recon::ReconItem<'a, EventId>], ) -> Result { - if items.is_empty() { - return Ok(recon::InsertResult::default()); - } - - let ordering = InsertEventOrdering::discover_deliverable_remote_history(items).await?; - let res = self.process_events(ordering).await?; - // we need to put things back in the right order that the recon trait expects, even though we don't really care about the result + let res = self.insert_events(items, false).await?; let mut keys = vec![false; items.len()]; + // we need to put things back in the right order that the recon trait expects, even though we don't really care about the result for (i, item) in items.iter().enumerate() { let new_key = res .store_result @@ -168,225 +182,108 @@ impl CeramicEventService { Ok(recon::InsertResult::new(keys)) } - async fn process_events(&self, ordering: InsertEventOrdering) -> Result { - let res = CeramicOneEvent::insert_many(&self.pool, &ordering.insert_now[..]).await?; - - for ev in ordering.background_task_deliverable { - trace!(cid=%ev.0, prev=%ev.1.prev, init=%ev.1.init_cid, "sending to delivery task"); - if let Err(e) = self - .delivery_task - .tx - .try_send(DeliverableEvent::new(ev.0, ev.1, None)) - { - match e { - tokio::sync::mpsc::error::TrySendError::Full(e) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(cid=%e.cid, meta=?e.meta, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } - } - } - for new in ordering.notify_task_new { - if let Err(e) = self.delivery_task.tx_new.try_send(new) { - match e { - tokio::sync::mpsc::error::TrySendError::Full(ev) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(attempt=?ev, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Notify new task full"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } - } + async fn insert_events<'a>( + &self, + items: &[recon::ReconItem<'a, EventId>], + history_required: bool, + ) -> Result { + if items.is_empty() { + return Ok(InsertResult::default()); } - Ok(InsertResult { - store_result: res, - missing_history: vec![], - }) - } -} - -struct InsertEventOrdering { - insert_now: Vec, - notify_task_new: Vec, - background_task_deliverable: HashMap, -} -impl InsertEventOrdering { - /// This will mark events as deliverable if their prev exists locally. Otherwise they will be - /// sorted into the bucket for the background task to process. - pub(crate) async fn discover_deliverable_remote_history<'a>( - items: &[ReconItem<'a, EventId>], - ) -> Result { - let mut result = Self { - insert_now: Vec::with_capacity(items.len()), - notify_task_new: Vec::with_capacity(items.len()), - background_task_deliverable: HashMap::new(), - }; + let mut to_insert = Vec::with_capacity(items.len()); - for item in items { - let (insertable, maybe_prev) = Self::parse_item(item).await?; - if let Some(meta) = maybe_prev { - result.mark_event_deliverable_later(insertable, meta); - } else { - let init_cid = insertable.body.cid; - result.mark_event_deliverable_now(insertable, init_cid); - } + for event in items { + let insertable = + Self::parse_event_carfile_order_key(event.key.to_owned(), event.value).await?; + to_insert.push(insertable); } - Ok(result) - } - - /// This will error if any of the events doesn't have its prev on the local node (in the database/memory or in this batch). - pub(crate) async fn discover_deliverable_local_history<'a>( - items: &[ReconItem<'a, EventId>], - pool: &SqlitePool, - ) -> Result { - let mut result = Self { - insert_now: Vec::with_capacity(items.len()), - notify_task_new: Vec::with_capacity(items.len()), - background_task_deliverable: HashMap::new(), - }; - - let mut insert_after_history_check: Vec<(DeliverableMetadata, EventInsertable)> = - Vec::with_capacity(items.len()); - - for item in items { - let (insertable, maybe_prev) = Self::parse_item(item).await?; - if let Some(meta) = maybe_prev { - insert_after_history_check.push((meta, insertable)); - } else { - let init_cid = insertable.body.cid; - result.mark_event_deliverable_now(insertable, init_cid); - } - } + let ordered = OrderEvents::try_new(&self.pool, to_insert).await?; - trace!(local_events_checking=%insert_after_history_check.len(), "checking local history"); - result - .verify_history_inline(pool, insert_after_history_check) - .await?; - Ok(result) - } + let missing_history = ordered + .missing_history + .iter() + .map(|(e, _)| e.order_key.clone()) + .collect(); - async fn parse_item<'a>( - item: &ReconItem<'a, EventId>, - ) -> Result<(EventInsertable, Option)> { - let cid = item.key.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", item.key)) - })?; - // we want to end a conversation if any of the events aren't ceramic events and not store them - // this includes making sure the key matched the body cid - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(cid, item.value).await?; - let insertable = EventInsertable::try_new(item.key.to_owned(), insertable_body)?; - Ok((insertable, maybe_prev)) - } + let to_insert_with_header = if history_required { + ordered.deliverable + } else { + ordered + .deliverable + .into_iter() + .chain(ordered.missing_history) + .collect() + }; - fn mark_event_deliverable_later( - &mut self, - insertable: EventInsertable, - meta: DeliverableMetadata, - ) { - self.background_task_deliverable - .insert(insertable.body.cid, meta); - self.insert_now.push(insertable); - } + let to_insert = to_insert_with_header + .iter() + .map(|(e, _)| e.clone()) + .collect::>(); - fn mark_event_deliverable_now(&mut self, mut ev: EventInsertable, init_cid: Cid) { - ev.set_deliverable(true); - self.notify_task_new - .push(DeliveredEvent::new(ev.body.cid, init_cid)); - self.insert_now.push(ev); - } + // need to make a better interface around events. needing to know about the stream in some places but + // not in the store makes it inconvenient to map back and forth + let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..]).await?; - async fn verify_history_inline( - &mut self, - pool: &SqlitePool, - to_check: Vec<(DeliverableMetadata, EventInsertable)>, - ) -> Result<()> { - if to_check.is_empty() { - return Ok(()); - } + // api writes shouldn't have any missed pieces that need ordering so we don't send those + if !history_required { + let to_send = res + .inserted + .iter() + .filter(|i| i.new_key) + .collect::>(); - let incoming_deliverable_cids: HashSet = self - .insert_now - .iter() - .filter_map(|e| { - if e.body.deliverable { - Some(e.body.cid) + for ev in to_send { + if let Some((ev, header)) = to_insert_with_header + .iter() + .find(|(i, _)| i.order_key == ev.order_key) + { + let new = InsertableBodyWithHeader { + body: ev.body.clone(), + header: header.to_owned(), + }; + trace!(event=?ev, "sending delivered to ordering task"); + if let Err(e) = self.delivery_task.tx_inserted.try_send(new) { + match e { + tokio::sync::mpsc::error::TrySendError::Full(e) => { + warn!(event=?e, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable new stream event arrives or process is restarted"); + } + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + warn!("Delivery task closed. shutting down"); + return Err(Error::new_fatal(anyhow::anyhow!( + "Delivery task closed" + ))); + } + } + } } else { - None + tracing::error!(event_id=%ev.order_key, "Missing header for inserted event should be unreachable!"); + debug_assert!(false); // panic in debug mode + continue; } - }) - .collect(); - - // ideally, this map would be per stream, but we are just processing all of them together for now - let mut to_check_map = StreamEvents::new(); - - let required_to_find = to_check.len(); - let mut found_in_batch = 0; - let mut insert_if_greenlit = HashMap::with_capacity(required_to_find); - - for (meta, ev) in to_check { - if incoming_deliverable_cids.contains(&meta.prev) { - trace!(new=%ev.body.cid, prev=%meta.prev, "prev event being added in same batch"); - found_in_batch += 1; - self.mark_event_deliverable_now(ev, meta.init_cid); - } else { - trace!(new=%ev.body.cid, prev=%meta.prev, "will check for prev event in db"); - - let _new = to_check_map.add_event(DeliverableEvent::new( - ev.body.cid, - meta.to_owned(), - None, - )); - insert_if_greenlit.insert(ev.body.cid, (meta, ev)); } } - if to_check_map.is_empty() { - return Ok(()); - } - - let deliverable = - OrderingState::discover_deliverable_events(pool, &mut to_check_map).await?; - if deliverable.len() != required_to_find - found_in_batch { - let missing = insert_if_greenlit - .values() - .filter_map(|(_, ev)| { - if !deliverable.contains(&ev.body.cid) { - Some(ev.body.cid) - } else { - None - } - }) - .collect::>(); - - tracing::info!(?missing, ?deliverable, "Missing required `prev` event CIDs"); + Ok(InsertResult { + store_result: res, + missing_history, + }) + } - Err(Error::new_invalid_arg(anyhow::anyhow!( - "Missing required `prev` event CIDs: {:?}", - missing - ))) + pub(crate) async fn load_by_cid( + pool: &SqlitePool, + cid: ceramic_core::Cid, + ) -> Result> { + let data = if let Some(ev) = CeramicOneEvent::value_by_cid(pool, &cid).await? { + ev } else { - // we need to use the deliverable list's order because we might depend on something in the same batch, and that function will - // ensure we have a queue in the correct order. So we follow the order and use our insert_if_greenlit map to get the details. - for cid in deliverable { - if let Some((meta, insertable)) = insert_if_greenlit.remove(&cid) { - self.mark_event_deliverable_now(insertable, meta.init_cid); - } else { - warn!(%cid, "Didn't find event to insert in memory when it was expected"); - } - } - Ok(()) - } + return Ok(None); + }; + + let mut body = EventInsertableBody::try_from_carfile(cid, &data).await?; + let header = Self::parse_event_body(&mut body).await?; + Ok(Some(InsertableBodyWithHeader { body, header })) } } @@ -412,3 +309,47 @@ impl From for Vec { api_res } } + +#[derive(Debug, Clone)] +pub(crate) struct InsertableBodyWithHeader { + pub(crate) body: EventInsertableBody, + pub(crate) header: EventHeader, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event header wrapper for use in the store crate. +/// TODO: replace this with something from the event crate +pub(crate) enum EventHeader { + Init { + cid: ceramic_core::Cid, + }, + Data { + cid: ceramic_core::Cid, + stream_cid: ceramic_core::Cid, + prev: ceramic_core::Cid, + }, + Time { + cid: ceramic_core::Cid, + stream_cid: ceramic_core::Cid, + prev: ceramic_core::Cid, + }, +} + +impl EventHeader { + /// Returns the stream CID of the event + pub(crate) fn stream_cid(&self) -> ceramic_core::Cid { + match self { + EventHeader::Init { cid, .. } => *cid, + EventHeader::Data { stream_cid, .. } | EventHeader::Time { stream_cid, .. } => { + *stream_cid + } + } + } + + pub(crate) fn prev(&self) -> Option { + match self { + EventHeader::Init { .. } => None, + EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => Some(*prev), + } + } +} diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index 644d3081b..25a29e424 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -114,16 +114,6 @@ fn gen_rand_bytes() -> [u8; SIZE] { arr } -pub(crate) fn random_block() -> Block { - let mut data = [0u8; 1024]; - rand::Rng::fill(&mut ::rand::thread_rng(), &mut data); - let hash = Code::Sha2_256.digest(&data); - Block { - cid: Cid::new_v1(0x00, hash), - data: data.to_vec().into(), - } -} - pub(crate) async fn check_deliverable( pool: &ceramic_store::SqlitePool, cid: &Cid, @@ -172,28 +162,13 @@ async fn data_event( signed::Event::from_payload(unvalidated::Payload::Data(commit), signer.to_owned()).unwrap() } -async fn get_events_with_model(model: &StreamId) -> [(EventId, Vec); 3] { +// returns init + N events +async fn get_init_plus_n_events_with_model( + model: &StreamId, + number: usize, +) -> Vec<(EventId, Vec)> { let signer = Box::new(signer().await); - let data = gen_rand_bytes::<50>(); - let data2 = gen_rand_bytes::<50>(); - - let data = ipld!({ - "radius": 1, - "red": 2, - "green": 3, - "blue": 4, - "raw": data.as_slice(), - }); - - let data2 = ipld!({ - "radius": 1, - "red": 2, - "green": 3, - "blue": 4, - "raw": data2.as_slice(), - }); - let init = init_event(model, &signer).await; let init_cid = init.envelope_cid(); let (event_id, car) = ( @@ -202,33 +177,45 @@ async fn get_events_with_model(model: &StreamId) -> [(EventId, Vec); 3] { ); let init_cid = event_id.cid().unwrap(); - let data = data_event(init_cid, init_cid, data, &signer).await; - let cid = data.envelope_cid(); - let (data_id, data_car) = ( - build_event_id(&data.envelope_cid(), &init_cid, model), - data.encode_car().await.unwrap(), - ); - let data2 = data_event(init_cid, cid, data2, &signer).await; - let (data_id_2, data_car_2) = ( - build_event_id(&data2.envelope_cid(), &init_cid, model), - data2.encode_car().await.unwrap(), - ); - [ - (event_id, car), - (data_id, data_car), - (data_id_2, data_car_2), - ] + let mut events = Vec::with_capacity(number); + events.push((event_id, car)); + let mut prev = init_cid; + for _ in 0..number { + let data = gen_rand_bytes::<50>(); + let data = ipld!({ + "radius": 1, + "red": 2, + "green": 3, + "blue": 4, + "raw": data.as_slice(), + }); + + let data = data_event(init_cid, prev, data, &signer).await; + let (data_id, data_car) = ( + build_event_id(&data.envelope_cid(), &init_cid, model), + data.encode_car().await.unwrap(), + ); + prev = data_id.cid().unwrap(); + events.push((data_id, data_car)); + } + events } -pub(crate) async fn get_events_return_model() -> (StreamId, [(EventId, Vec); 3]) { +pub(crate) async fn get_events_return_model() -> (StreamId, Vec<(EventId, Vec)>) { let model = StreamId::document(random_cid()); - let events = get_events_with_model(&model).await; + let events = get_init_plus_n_events_with_model(&model, 3).await; (model, events) } // builds init -> data -> data that are a stream (will be a different stream each call) -pub(crate) async fn get_events() -> [(EventId, Vec); 3] { +pub(crate) async fn get_events() -> Vec<(EventId, Vec)> { let model = StreamId::document(random_cid()); - get_events_with_model(&model).await + get_init_plus_n_events_with_model(&model, 3).await +} + +// Get N events with the same model (init + N-1 data events) +pub(crate) async fn get_n_events(number: usize) -> Vec<(EventId, Vec)> { + let model = &StreamId::document(random_cid()); + get_init_plus_n_events_with_model(model, number - 1).await } diff --git a/service/src/tests/ordering.rs b/service/src/tests/ordering.rs index 976e9b1cd..1cad39fc9 100644 --- a/service/src/tests/ordering.rs +++ b/service/src/tests/ordering.rs @@ -1,5 +1,9 @@ +use std::collections::HashMap; + use ceramic_api::EventStore; use ceramic_core::EventId; +use rand::seq::SliceRandom; +use rand::thread_rng; use recon::ReconItem; use test_log::test; @@ -12,12 +16,14 @@ async fn setup_service() -> CeramicEventService { let conn = ceramic_store::SqlitePool::connect_in_memory() .await .unwrap(); + CeramicEventService::new_without_undelivered(conn) .await .unwrap() } async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { + tracing::trace!("inserted event: {}", item.key.cid().unwrap()); let new = store .insert_events_from_carfiles_recon(&[item]) .await @@ -45,30 +51,17 @@ async fn test_init_event_delivered() { } #[test(tokio::test)] -async fn test_missing_prev_error_history_required() { +async fn test_missing_prev_history_required_not_inserted() { let store = setup_service().await; let events = get_events().await; let data = &events[1]; let new = store .insert_events_from_carfiles_local_api(&[ReconItem::new(&data.0, &data.1)]) - .await; - match new { - Ok(v) => panic!("should have errored: {:?}", v), - Err(e) => { - match e { - crate::Error::InvalidArgument { error } => { - // yes fragile, but we want to make sure it's not a parsing error or something unexpected - assert!(error - .to_string() - .contains("Missing required `prev` event CIDs")); - } - e => { - panic!("unexpected error: {:?}", e); - } - }; - } - }; + .await + .unwrap(); + assert!(new.store_result.inserted.is_empty()); + assert_eq!(1, new.missing_history.len()); } #[test(tokio::test)] @@ -150,7 +143,7 @@ async fn test_missing_prev_pending_recon() { check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await; // This happens out of band, so give it a moment to make sure everything is updated - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX) @@ -178,9 +171,8 @@ async fn missing_prev_pending_recon_should_deliver_without_stream_update() { // now we add the second event, it should quickly become deliverable let data = &events[1]; add_and_assert_new_recon_event(&store, ReconItem::new(&data.0, &data.1)).await; - check_deliverable(&store.pool, &data.0.cid().unwrap(), false).await; // This happens out of band, so give it a moment to make sure everything is updated - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX) @@ -234,7 +226,7 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat // this _could_ be deliverable immediately if we checked but for now we just send to the other task, // so `check_deliverable` could return true or false depending on timing (but probably false). // as this is an implementation detail and we'd prefer true, we just use HW ordering to make sure it's been delivered - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX) .await @@ -266,3 +258,118 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat ]; assert_eq!(expected, delivered); } + +async fn validate_all_delivered(store: &CeramicEventService, expected_delivered: usize) { + loop { + let (_, delivered) = store + .events_since_highwater_mark(0, i64::MAX) + .await + .unwrap(); + let total = delivered.len(); + if total < expected_delivered { + tracing::trace!( + "found {} delivered, waiting for {}", + total, + expected_delivered + ); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } else { + break; + } + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn recon_lots_of_streams() { + // adds `per_stream` events to `num_streams` streams, mixes up the event order for each stream, inserts half + // the events for each stream before mixing up the stream order and inserting the rest + // took like a minute on my machine to run 100 streams with 1000 events each, mostly inserting :( since ordering only gets 5 seconds + let per_stream = 100; + let num_streams = 10; + let store = setup_service().await; + let expected = per_stream * num_streams; + let mut streams = Vec::new(); + let mut all_cids = Vec::new(); + let mut expected_stream_order = Vec::new(); + let mut cid_to_stream_map = HashMap::new(); + + for i in 0..num_streams { + let mut events = crate::tests::get_n_events(per_stream).await; + let cids = events + .iter() + .map(|e| e.0.cid().unwrap()) + .collect::>(); + cids.iter().for_each(|cid| { + cid_to_stream_map.insert(*cid, i); + }); + expected_stream_order.push(cids.clone()); + all_cids.extend(cids); + assert_eq!(per_stream, events.len()); + events.shuffle(&mut thread_rng()); + streams.push(events); + } + let mut total_added = 0; + + assert_eq!(expected, all_cids.len()); + + tracing::debug!(?all_cids, "starting test"); + for stream in streams.iter_mut() { + while let Some(event) = stream.pop() { + if stream.len() > per_stream / 2 { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + } else { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + break; + } + } + } + streams.shuffle(&mut thread_rng()); + for stream in streams.iter_mut() { + while let Some(event) = stream.pop() { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + } + } + // first just make sure they were all inserted (not delivered yet) + for (i, cid) in all_cids.iter().enumerate() { + let (exists, _delivered) = + ceramic_store::CeramicOneEvent::deliverable_by_cid(&store.pool, cid) + .await + .unwrap(); + assert!(exists, "idx: {}. missing cid: {}", i, cid); + } + + assert_eq!(expected, total_added); + tokio::time::timeout( + std::time::Duration::from_secs(5), + validate_all_delivered(&store, expected), + ) + .await + .unwrap(); + + let (_, delivered) = store + .events_since_highwater_mark(0, i64::MAX) + .await + .unwrap(); + + assert_eq!(expected, delivered.len()); + let mut streams_at_the_end = Vec::new(); + for _ in 0..num_streams { + streams_at_the_end.push(Vec::with_capacity(per_stream)); + } + for cid in delivered { + let stream = cid_to_stream_map.get(&cid).unwrap(); + let stream = streams_at_the_end.get_mut(*stream).unwrap(); + stream.push(cid); + } + // now we check that all the events are deliverable + for cid in all_cids.iter() { + check_deliverable(&store.pool, cid, true).await; + } + // and make sure the events were delivered for each stream streams in the same order as they were at the start + for (i, stream) in expected_stream_order.iter().enumerate() { + assert_eq!(*stream, streams_at_the_end[i]); + } +} diff --git a/store/Cargo.toml b/store/Cargo.toml index 3fa78aefb..d9482a1a4 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -13,19 +13,21 @@ anyhow.workspace = true async-trait.workspace = true ceramic-api.workspace = true ceramic-core.workspace = true +ceramic-event.workspace = true ceramic-metrics.workspace = true cid.workspace = true futures.workspace = true hex.workspace = true +ipld-core.workspace = true iroh-bitswap.workspace = true iroh-car.workspace = true itertools = "0.12.0" -multihash.workspace = true multihash-codetable.workspace = true +multihash.workspace = true prometheus-client.workspace = true -thiserror.workspace = true recon.workspace = true sqlx.workspace = true +thiserror.workspace = true tokio.workspace = true [dev-dependencies] diff --git a/store/benches/sqlite_store.rs b/store/benches/sqlite_store.rs index b028df40e..31dda9ee8 100644 --- a/store/benches/sqlite_store.rs +++ b/store/benches/sqlite_store.rs @@ -75,7 +75,7 @@ async fn model_setup(tpe: ModelType, cnt: usize) -> ModelSetup { let body = EventInsertableBody::try_from_carfile(init.0.cid().unwrap(), &init.1) .await .unwrap(); - events.push(EventInsertable::try_new(init.0, body).unwrap()); + events.push(EventInsertable::try_from_carfile(init.0, body).unwrap()); } let pool = SqlitePool::connect_in_memory().await.unwrap(); diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index eaa0a9e33..2e9cbb622 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -175,7 +175,7 @@ impl CeramicOneEvent { item.deliverable(), )); if new_key { - for block in item.blocks().iter() { + for block in item.body.blocks().iter() { CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; CeramicOneEventBlock::insert(&mut tx, block).await?; } @@ -191,21 +191,40 @@ impl CeramicOneEvent { Ok(res) } - /// Find events that haven't been delivered to the client and may be ready + /// Find events that haven't been delivered to the client and may be ready. + /// Returns the events and their values, and the highwater mark of the last event. + /// The highwater mark can be used on the next call to get the next batch of events and will be 0 when done. pub async fn undelivered_with_values( pool: &SqlitePool, - offset: usize, - limit: usize, - ) -> Result)>> { - let all_blocks: Vec = + limit: i64, + highwater_mark: i64, + ) -> Result<(Vec<(EventId, Vec)>, i64)> { + struct UndeliveredEventBlockRow { + block: ReconEventBlockRaw, + row_id: i64, + } + + use sqlx::Row as _; + + impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for UndeliveredEventBlockRow { + fn from_row(row: &sqlx::sqlite::SqliteRow) -> std::result::Result { + let row_id = row.try_get("rowid")?; + let block = ReconEventBlockRaw::from_row(row)?; + Ok(Self { block, row_id }) + } + } + + let all_blocks: Vec = sqlx::query_as(EventQuery::undelivered_with_values()) - .bind(limit as i64) - .bind(offset as i64) + .bind(limit) + .bind(highwater_mark) .fetch_all(pool.reader()) .await?; - let values = ReconEventBlockRaw::into_carfiles(all_blocks).await?; - Ok(values) + let max_highwater = all_blocks.iter().map(|row| row.row_id).max().unwrap_or(0); // if there's nothing in the list we just return 0 + let blocks = all_blocks.into_iter().map(|b| b.block).collect(); + let values = ReconEventBlockRaw::into_carfiles(blocks).await?; + Ok((values, max_highwater)) } /// Calculate the hash of a range of events diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index fb8ae1692..9d3d41035 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -50,14 +50,22 @@ pub struct EventInsertable { } impl EventInsertable { - /// Try to build the EventInsertable struct. Will error if the key and body don't match. + /// Try to build the EventInsertable struct from a carfile. + pub async fn try_from_carfile(order_key: EventId, body: &[u8]) -> Result { + let cid = order_key.cid().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key)) + })?; + let body = EventInsertableBody::try_from_carfile(cid, body).await?; + Ok(Self { order_key, body }) + } + + /// Build the EventInsertable struct from an EventID and EventInsertableBody. + /// Will error if the CID in the EventID doesn't match the CID in the EventInsertableBody. pub fn try_new(order_key: EventId, body: EventInsertableBody) -> Result { - if order_key.cid().as_ref() != Some(&body.cid) { - return Err(Error::new_app(anyhow!( - "Event ID and body CID do not match: {:?} != {:?}", - order_key.cid(), - body.cid - )))?; + if order_key.cid() != Some(body.cid()) { + return Err(Error::new_invalid_arg(anyhow!( + "EventID CID does not match the body CID" + ))); } Ok(Self { order_key, body }) } @@ -69,18 +77,7 @@ impl EventInsertable { /// Whether this event is deliverable currently pub fn deliverable(&self) -> bool { - self.body.deliverable - } - - /// Whether this event is deliverable currently - pub fn blocks(&self) -> &Vec { - &self.body.blocks - } - - /// Mark the event as deliverable. - /// This will be used when inserting the event to make sure the field is updated accordingly. - pub fn set_deliverable(&mut self, deliverable: bool) { - self.body.deliverable = deliverable; + self.body.deliverable() } } @@ -88,12 +85,12 @@ impl EventInsertable { /// The type we use to insert events into the database pub struct EventInsertableBody { /// The event CID i.e. the root CID from the car file - pub cid: Cid, - /// Whether this event is deliverable to clients or is waiting for more data - pub deliverable: bool, + cid: Cid, + /// Whether the event is deliverable i.e. it's prev has been delivered and the chain is continuous to an init event + deliverable: bool, /// The blocks of the event // could use a map but there aren't that many blocks per event (right?) - pub blocks: Vec, + blocks: Vec, } impl EventInsertableBody { @@ -106,6 +103,27 @@ impl EventInsertableBody { } } + /// Get the CID of the event + pub fn cid(&self) -> Cid { + self.cid + } + + /// Whether this event is deliverable currently + pub fn blocks(&self) -> &Vec { + &self.blocks + } + + /// Whether this event is deliverable currently + pub fn deliverable(&self) -> bool { + self.deliverable + } + + /// Mark the event as deliverable. + /// This will be used when inserting the event to make sure the field is updated accordingly. + pub fn set_deliverable(&mut self, deliverable: bool) { + self.deliverable = deliverable; + } + /// Find a block from the carfile for a given CID if it's included pub fn block_for_cid_opt(&self, cid: &Cid) -> Option<&EventBlockRaw> { self.blocks diff --git a/store/src/sql/query.rs b/store/src/sql/query.rs index fc652dfcf..2969a3244 100644 --- a/store/src/sql/query.rs +++ b/store/src/sql/query.rs @@ -81,20 +81,21 @@ impl EventQuery { /// Find event CIDs that have not yet been delivered to the client /// Useful after a restart, or if the task managing delivery has availability to try old events + /// Requires binding two parameters: + /// $1: limit (i64) + /// $2: rowid (i64) pub fn undelivered_with_values() -> &'static str { r#"SELECT - key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes + key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes, key.rowid FROM ( SELECT - e.cid as event_cid, e.order_key + e.cid as event_cid, e.order_key, e.rowid FROM ceramic_one_event e WHERE EXISTS (SELECT 1 FROM ceramic_one_event_block where event_cid = e.cid) - AND e.delivered IS NULL + AND e.delivered IS NULL and e.rowid > $2 LIMIT $1 - OFFSET - $2 ) key JOIN ceramic_one_event_block eb ON key.event_cid = eb.event_cid diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index c8beaeaf1..646f0c9b7 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -123,10 +123,11 @@ async fn range_query() { #[test(tokio::test)] async fn undelivered_with_values() { let pool = SqlitePool::connect_in_memory().await.unwrap(); - let res = CeramicOneEvent::undelivered_with_values(&pool, 0, 10000) + let (res, hw) = CeramicOneEvent::undelivered_with_values(&pool, 0, 10000) .await .unwrap(); assert_eq!(res.len(), 0); + assert_eq!(hw, 0); } #[test(tokio::test)]