diff --git a/service/Cargo.toml b/service/Cargo.toml index bb65f422f..b68033ba9 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -18,7 +18,6 @@ ceramic-store.workspace = true cid.workspace = true hex.workspace = true ipld-core.workspace = true -serde_ipld_dagcbor.workspace = true iroh-bitswap.workspace = true multihash-codetable.workspace = true recon.workspace = true @@ -32,8 +31,8 @@ ipld-core.workspace = true multibase.workspace = true paste = "1.0" rand.workspace = true -serde.workspace = true serde_ipld_dagcbor.workspace = true +serde.workspace = true test-log.workspace = true tmpdir.workspace = true tokio.workspace = true diff --git a/service/src/event/mod.rs b/service/src/event/mod.rs index f7dce573a..69e12b26c 100644 --- a/service/src/event/mod.rs +++ b/service/src/event/mod.rs @@ -1,3 +1,4 @@ +mod order_events; mod ordering_task; mod service; mod store; diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs new file mode 100644 index 000000000..4ed7820b1 --- /dev/null +++ b/service/src/event/order_events.rs @@ -0,0 +1,102 @@ +use std::collections::HashSet; + +use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; +use cid::Cid; + +use crate::Result; + +pub(crate) struct OrderEvents { + pub(crate) deliverable: Vec, + pub(crate) missing_history: Vec, +} + +impl OrderEvents { + /// Groups the events into lists of those with a delivered prev and those without. This can be used to return an error if the event is required to have history. + /// The events will be marked as deliverable so that they can be passed directly to the store to be persisted. + /// Will look up the prev from the database if needed to check if it's deliverable (could possibly change this for recon and allow the ordering task to handle it?) + pub async fn try_new( + pool: &SqlitePool, + mut candidate_events: Vec, + ) -> Result { + let new_cids: HashSet = HashSet::from_iter(candidate_events.iter().map(|e| e.cid())); + let mut deliverable = Vec::with_capacity(candidate_events.len()); + candidate_events.retain(|e| { + if e.deliverable() { + deliverable.push(e.clone()); + false + } else { + true + } + }); + if candidate_events.is_empty() { + return Ok(OrderEvents { + deliverable, + missing_history: Vec::new(), + }); + } + + let mut prevs_in_memory = Vec::with_capacity(candidate_events.len()); + let mut missing_history = Vec::with_capacity(candidate_events.len()); + + while let Some(mut event) = candidate_events.pop() { + match &event.prev() { + None => { + unreachable!("Init events should have been filtered out since they're always deliverable"); + } + Some(prev) => { + if new_cids.contains(prev) { + prevs_in_memory.push(event.clone()); + continue; + } else { + let (_exists, prev_deliverable) = + CeramicOneEvent::deliverable_by_cid(pool, prev).await?; + if prev_deliverable { + event.set_deliverable(true); + deliverable.push(event); + } else { + // technically, we may have the "rosetta stone" event in memory that could unlock this chain, if we loaded everything and recursed, + // but the immediate prev is not in this set and has not been delivered to the client yet, so they shouldn't have known how to + // construct this event so we'll consider this missing history. This can be used to return an error if the event is required to have history. + missing_history.push(event); + } + } + } + } + } + + // We add the events to the deliverable list until nothing changes. + // It should be a small set and it will shrink each loop, so continually looping is acceptable. + loop { + let mut made_changes = false; + while let Some(mut event) = prevs_in_memory.pop() { + match &event.prev() { + None => { + unreachable!( + "Init events should have been filtered out of the in memory set" + ); + } + Some(prev) => { + // a hashset would be better loopkup but we're not going to have that many events so hashing + // for a handful of lookups and then convert back to a vec probably isn't worth it. + if deliverable.iter().any(|e| e.cid() == *prev) { + event.set_deliverable(true); + deliverable.push(event); + made_changes = true; + } else { + prevs_in_memory.push(event); + } + } + } + } + if !made_changes { + missing_history.extend(prevs_in_memory); + break; + } + } + + Ok(OrderEvents { + deliverable, + missing_history, + }) + } +} diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index 9c7d4a0a7..8695a688e 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -1,7 +1,7 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; -use anyhow::anyhow; -use ceramic_store::{CeramicOneEvent, SqlitePool}; +use ceramic_core::EventId; +use ceramic_store::{CeramicOneEvent, EventInsertableBody, InsertedEvent, SqlitePool}; use cid::Cid; use tracing::{debug, error, info, trace, warn}; @@ -10,64 +10,22 @@ use crate::{CeramicEventService, Error, Result}; /// How many events to select at once to see if they've become deliverable when we have downtime /// Used at startup and occassionally in case we ever dropped something /// We keep the number small for now as we may need to traverse many prevs for each one of these and load them into memory. -const DELIVERABLE_EVENTS_BATCH_SIZE: usize = 1000; +const DELIVERABLE_EVENTS_BATCH_SIZE: u32 = 1000; + /// How many batches of undelivered events are we willing to process on start up? /// To avoid an infinite loop. It's going to take a long time to process `DELIVERABLE_EVENTS_BATCH_SIZE * MAX_ITERATIONS` events const MAX_ITERATIONS: usize = 100_000_000; -/// How often should we try to process all undelivered events in case we missed something -const CHECK_ALL_INTERVAL_SECONDS: u64 = 60 * 10; // 10 minutes - -type InitCid = cid::Cid; -type PrevCid = cid::Cid; -type EventCid = cid::Cid; - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DeliveredEvent { - pub(crate) cid: Cid, - pub(crate) init_cid: InitCid, -} - -impl DeliveredEvent { - pub fn new(cid: Cid, init_cid: InitCid) -> Self { - Self { cid, init_cid } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub(crate) struct DeliverableMetadata { - pub(crate) init_cid: InitCid, - pub(crate) prev: PrevCid, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DeliverableEvent { - pub(crate) cid: EventCid, - pub(crate) meta: DeliverableMetadata, - attempts: usize, - last_attempt: std::time::Instant, - started: std::time::Instant, - expires: Option, -} - -impl DeliverableEvent { - pub fn new(cid: Cid, meta: DeliverableMetadata, expires: Option) -> Self { - Self { - cid, - meta, - attempts: 0, - last_attempt: std::time::Instant::now(), - started: std::time::Instant::now(), - expires, - } - } -} +type StreamCid = Cid; +type EventCid = Cid; +type PrevCid = Cid; #[derive(Debug)] pub struct DeliverableTask { pub(crate) _handle: tokio::task::JoinHandle<()>, - pub(crate) tx: tokio::sync::mpsc::Sender, - pub(crate) tx_new: tokio::sync::mpsc::Sender, + /// Currently events discovered over recon that are out of order and need to be marked ready (deliverable) + /// when their prev chain is discovered and complete (i.e. my prev is deliverable then I am deliverable). + pub(crate) tx_inserted: tokio::sync::mpsc::Sender, } #[derive(Debug)] @@ -75,30 +33,32 @@ pub struct OrderingTask {} impl OrderingTask { pub async fn run(pool: SqlitePool, q_depth: usize, load_delivered: bool) -> DeliverableTask { - let (tx, rx) = tokio::sync::mpsc::channel::(q_depth); - let (tx_new, rx_new) = tokio::sync::mpsc::channel::(q_depth); + let (tx_inserted, rx_inserted) = tokio::sync::mpsc::channel::(q_depth); let handle = - tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx, rx_new).await }); + tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx_inserted).await }); DeliverableTask { _handle: handle, - tx, - tx_new, + tx_inserted, } } async fn run_loop( pool: SqlitePool, load_undelivered: bool, - mut rx: tokio::sync::mpsc::Receiver, - mut rx_new: tokio::sync::mpsc::Receiver, + mut rx_inserted: tokio::sync::mpsc::Receiver, ) { // before starting, make sure we've updated any events in the database we missed + // this could take a long time. possibly we want to put it in another task so we can start processing events immediately let mut state = OrderingState::new(); if load_undelivered && state - .process_all_undelivered_events(&pool, MAX_ITERATIONS) + .process_all_undelivered_events( + &pool, + MAX_ITERATIONS, + DELIVERABLE_EVENTS_BATCH_SIZE, + ) .await .map_err(Self::log_error) .is_err() @@ -106,53 +66,37 @@ impl OrderingTask { return; } - let mut last_processed = std::time::Instant::now(); loop { - let mut modified: Option> = None; - let mut need_prev_buf = Vec::with_capacity(100); - let mut newly_added_buf = Vec::with_capacity(100); - - tokio::select! { - incoming = rx.recv_many(&mut need_prev_buf, 100) => { - if incoming > 0 { - modified = Some(state.add_incoming_batch(need_prev_buf)); - } - } - new = rx_new.recv_many(&mut newly_added_buf, 100) => { - if new > 0 { - modified = Some(newly_added_buf.into_iter().map(|ev| ev.init_cid).collect::>()); - } - } - else => { - info!(stream_count=%state.pending_by_stream.len(), "Server dropped the ordering task. Processing once more before exiting..."); - let _ = state - .process_events(&pool, None) - .await - .map_err(Self::log_error); + let mut recon_events = Vec::with_capacity(100); + // consider trying to recv in a loop until X or 10ms whatever comes first and then process + // the more events we get in memory, the fewer queries we need to run. + if rx_inserted.recv_many(&mut recon_events, 100).await > 0 { + trace!(?recon_events, "new events discovered!"); + state.add_inserted_events(recon_events); + + if state + .process_streams(&pool) + .await + .map_err(Self::log_error) + .is_err() + { return; } - }; - // Given the math on OrderingState and the generally low number of updates to streams, we are going - // to ignore pruning until there's more of an indication that it's necessary. Just log some stats. - if last_processed.elapsed().as_secs() > CHECK_ALL_INTERVAL_SECONDS { - let stream_count = state.pending_by_stream.len(); - if stream_count > 1000 { - info!(%stream_count, "Over 1000 pending streams without recent updates."); - } else { - debug!(%stream_count, "Fewer than 1000 streams pending without recent updates."); - } - } + } else if rx_inserted.is_closed() { + debug!( + "Server dropped the delivered events channel. Attempting to processing streams in memory once more before exiting." + ); - if modified.is_some() - && state - .process_events(&pool, modified) + if state + .process_streams(&pool) .await .map_err(Self::log_error) .is_err() - { - return; + { + return; + } + break; } - last_processed = std::time::Instant::now(); } } @@ -175,188 +119,312 @@ impl OrderingTask { } } -#[derive(Debug)] -/// Rough size estimate: -/// pending_by_stream: 96 * stream_cnt + 540 * event_cnt -/// ready_events: 96 * ready_event_cnt -/// so for stream_cnt = 1000, event_cnt = 2, ready_event_cnt = 1000 -/// we get about 1 MB of memory used. -pub struct OrderingState { - /// Map of undelivered events by init CID (i.e. the stream CID). - pending_by_stream: HashMap, - /// Queue of events that can be marked ready to deliver. - /// Can be added as long as their prev is stored or in this list ahead of them. - ready_events: VecDeque, +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum StreamEvent { + InitEvent(EventCid), + /// An event that is known to be deliverable from the database + KnownDeliverable(StreamEventMetadata), + /// An event that needs more history to be deliverable + Undelivered(StreamEventMetadata), +} + +impl StreamEvent { + /// Builds a stream event from the database if it exists. + async fn load_by_cid(pool: &SqlitePool, cid: EventCid) -> Result> { + // TODO: one query + let (exists, deliverable) = CeramicOneEvent::deliverable_by_cid(pool, &cid).await?; + if exists { + let parsed_body = + if let Some(body) = CeramicEventService::load_by_cid(pool, cid).await? { + body + } else { + warn!(%cid, "No event body found for event that should exist"); + return Ok(None); + }; + + let known_prev = match parsed_body.header() { + ceramic_store::EventHeader::Init { cid, .. } => { + if !deliverable { + warn!(%cid,"Found init event in database that wasn't previously marked as deliverable. Updating now..."); + let mut tx = pool.begin_tx().await?; + CeramicOneEvent::mark_ready_to_deliver(&mut tx, cid).await?; + tx.commit().await?; + } + StreamEvent::InitEvent(*cid) + } + ceramic_store::EventHeader::Data { prev, .. } + | ceramic_store::EventHeader::Time { prev, .. } => { + if deliverable { + trace!(%cid, "Found deliverable event in database"); + StreamEvent::KnownDeliverable(StreamEventMetadata::new(cid, *prev)) + } else { + trace!(%cid, "Found undelivered event in database"); + StreamEvent::Undelivered(StreamEventMetadata::new(cid, *prev)) + } + } + }; + Ok(Some(known_prev)) + } else { + trace!(%cid, "Missing event in database"); + Ok(None) + } + } +} + +impl From for StreamEvent { + fn from(ev: InsertedEvent) -> Self { + match ev.header { + ceramic_store::EventHeader::Init { cid, .. } => StreamEvent::InitEvent(cid), + ceramic_store::EventHeader::Data { cid, prev, .. } + | ceramic_store::EventHeader::Time { cid, prev, .. } => { + let meta = StreamEventMetadata::new(cid, prev); + if ev.deliverable { + StreamEvent::KnownDeliverable(meta) + } else { + StreamEvent::Undelivered(meta) + } + } + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct StreamEventMetadata { + cid: EventCid, + prev: PrevCid, +} + +impl StreamEventMetadata { + fn new(cid: EventCid, prev: PrevCid) -> Self { + Self { cid, prev } + } } #[derive(Debug, Clone, Default)] /// ~540 bytes per event in this struct pub(crate) struct StreamEvents { + /// Map of `event.prev` to `event.cid` to find the previous event easily. prev_map: HashMap, - cid_map: HashMap, + /// Map of `event.cid` to `metadata` for quick lookup of the event metadata. + cid_map: HashMap, + /// whether we should process this stream because new events have been added + skip_processing: bool, + /// The newly discovered events that are deliverable and should be processed. + new_deliverable: VecDeque, } -impl FromIterator for StreamEvents { - fn from_iter>(iter: T) -> Self { - let mut stream = Self::new(); - for item in iter { - stream.add_event(item); - } - stream +impl StreamEvents { + fn new(event: StreamEvent) -> Self { + let mut new = Self::default(); + new.add_event(event); + new } -} -impl StreamEvents { - pub fn new() -> Self { - Self::default() + // we'll be processed if something in memory depends on this event + fn update_should_process_for_new_delivered(&mut self, new_cid: &EventCid) { + // don't reset a true flag to false + if self.skip_processing { + self.skip_processing = !self.prev_map.contains_key(new_cid); + } } - /// returns Some(Stream Init CID) if this is a new event, else None. - pub fn add_event(&mut self, event: DeliverableEvent) -> Option { - let res = if self.prev_map.insert(event.meta.prev, event.cid).is_none() { - Some(event.meta.init_cid) - } else { - None + /// returns true if this is a new event. + fn add_event(&mut self, event: StreamEvent) -> bool { + let cid = match &event { + StreamEvent::InitEvent(cid) => { + self.update_should_process_for_new_delivered(cid); + *cid + } + StreamEvent::KnownDeliverable(meta) => { + self.prev_map.insert(meta.prev, meta.cid); + self.update_should_process_for_new_delivered(&meta.cid); + meta.cid + } + StreamEvent::Undelivered(meta) => { + self.prev_map.insert(meta.prev, meta.cid); + if self.skip_processing { + // we depend on something in memory + self.skip_processing = !self.prev_map.contains_key(&meta.prev) + } + meta.cid + } }; - self.cid_map.insert(event.cid, event); - res - } - pub fn is_empty(&self) -> bool { - // these should always match - self.prev_map.is_empty() && self.cid_map.is_empty() + self.cid_map.insert(cid, event).is_none() } - fn remove_by_event_cid(&mut self, cid: &Cid) -> Option { - if let Some(cid) = self.cid_map.remove(cid) { - self.prev_map.remove(&cid.meta.prev); + fn remove_by_prev_cid(&mut self, prev: &Cid) -> Option { + if let Some(cid) = self.prev_map.remove(prev) { + self.cid_map.remove(&cid); Some(cid) } else { None } } - fn remove_by_prev_cid(&mut self, cid: &Cid) -> Option { - if let Some(cid) = self.prev_map.remove(cid) { - self.cid_map.remove(&cid); - Some(cid) - } else { - None + /// Called when we've persisted the deliverable events to the database and can clean up our state. + /// Returns true if we should be retained for future processing (i.e we have more we need to discover) + /// and false if we can be dropped from memory. + fn completed_processing(&mut self) -> bool { + self.skip_processing = true; + + for cid in self.new_deliverable.iter() { + if let Some(ev) = self.cid_map.get_mut(cid) { + match ev { + StreamEvent::InitEvent(_) => {} + StreamEvent::KnownDeliverable(_) => { + warn!( + ?ev, + "Found event in deliverable queue that was already marked as deliverable." + ) + } + StreamEvent::Undelivered(meta) => { + // we're delivered now + *ev = StreamEvent::KnownDeliverable(meta.clone()); + } + } + } } + self.new_deliverable.clear(); + self.cid_map + .iter() + .any(|(_, ev)| matches!(ev, StreamEvent::Undelivered(_))) } + + async fn order_events(&mut self, pool: &SqlitePool) -> Result<()> { + // We collect everything we can into memory and then order things. + // If our prev is the init event or already been delivered, we can mark ourselves as deliverable. + // If our prev wasn't deliverable yet, we track it and repeat (i.e. follow its prev if we don't have it) + + let mut deliverable_queue = VecDeque::new(); + let mut undelivered = + VecDeque::from_iter(self.cid_map.iter().filter_map(|(cid, ev)| match ev { + StreamEvent::Undelivered(meta) => { + assert_eq!(meta.cid, *cid); + Some((meta.cid, meta.prev)) + } + _ => None, + })); + + while let Some((cid, prev)) = undelivered.pop_front() { + if let Some(prev_event) = self.cid_map.get(&prev) { + match prev_event { + StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => { + trace!( + %prev, + %cid, + "Found event whose prev is already in memory and IS deliverable!" + ); + deliverable_queue.push_back(cid) + } + StreamEvent::Undelivered(_) => { + trace!( + %prev, + %cid, + "Found event whose prev is already in memory but NOT deliverable." + ); + // nothing to do until it arrives on the channel + } + } + + continue; + } + + let prev_event = StreamEvent::load_by_cid(pool, prev).await?; + if let Some(known_prev) = prev_event { + match &known_prev { + StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => { + deliverable_queue.push_back(cid); + } + StreamEvent::Undelivered(undelivered_ev) => { + // we'll try to follow this back to something deliverable + undelivered.push_back((undelivered_ev.cid, undelivered_ev.prev)); + } + } + self.add_event(known_prev); + } else { + trace!("Found event that depends on another event we haven't discovered yet"); + } + } + + let mut newly_ready = deliverable_queue.clone(); + while let Some(cid) = newly_ready.pop_front() { + if let Some(now_ready_ev) = self.remove_by_prev_cid(&cid) { + if let Some(ev) = self.cid_map.get(&now_ready_ev) { + match ev { + StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => { + warn!(?ev, "should not have found a deliverable event when we expected only undelivered events!"); + } + StreamEvent::Undelivered(_) => { + newly_ready.push_back(now_ready_ev); + } + } + } + deliverable_queue.push_back(now_ready_ev); + newly_ready.push_back(now_ready_ev); + } + } + self.new_deliverable = deliverable_queue; + debug!(count=%self.new_deliverable.len(), "deliverable events discovered"); + Ok(()) + } +} + +#[derive(Debug)] +pub struct OrderingState { + pending_by_stream: HashMap, + deliverable: VecDeque, } impl OrderingState { - pub fn new() -> Self { + fn new() -> Self { Self { pending_by_stream: HashMap::new(), - ready_events: VecDeque::new(), + deliverable: VecDeque::new(), } } - /// This will review all the events for any streams known to have undelivered events and see if any of them are now deliverable. - /// If `streams_to_process` is None, all streams will be processed, otherwise only the streams in the set will be processed. - /// Processing all streams could take a long time and not necessarily do anything productive (if we're missing a key event, we're still blocked). - /// However, passing a value for `streams_to_process` when we know something has changed is likely to have positive results and be much faster. - pub(crate) async fn process_events( - &mut self, - pool: &SqlitePool, - streams_to_process: Option>, - ) -> Result<()> { - self.persist_ready_events(pool).await?; - for (cid, stream_events) in self.pending_by_stream.iter_mut() { - if streams_to_process - .as_ref() - .map_or(false, |to_do| !to_do.contains(cid)) - { - continue; - } - let deliverable = Self::discover_deliverable_events(pool, stream_events).await?; - if !deliverable.is_empty() { - self.ready_events.extend(deliverable) - } - } - if !self.ready_events.is_empty() { - self.persist_ready_events(pool).await?; + /// Add a stream to the list of streams to process. + /// We ignore delivered events for streams we're not tracking as we can look them up later if we need them. + /// We will get lots of init events we can ignore unless we need them, otherwise they'll be stuck in memory for a long time. + fn add_inserted_events(&mut self, events: Vec) { + for ev in events { + let stream_cid = ev.header.stream_cid(); + let event = ev.into(); + self.add_stream_event(stream_cid, event); } - - Ok(()) } - /// Removes deliverable events from the `prev_map` and returns them. This means prev is already delivered or in the - /// list to be marked as delivered. The input is expected to be a list of CIDs for a given stream that are waiting - /// to be processed. It will still work if it's intermixed for multiple streams, but it's not the most efficient way to use it. - /// The returned CIDs in the VeqDeque are for events that are expected to be updated FIFO i.e. vec.pop_front() - /// - /// This breaks with multi-prev as we expect a single prev for each event. The input map is expected to contain the - /// (prev <- event) relationship (that is, the value is the event that depends on the key). - pub(crate) async fn discover_deliverable_events( - pool: &SqlitePool, - stream_map: &mut StreamEvents, - ) -> Result> { - if stream_map.is_empty() { - return Ok(VecDeque::new()); + fn add_stream_event(&mut self, stream_cid: StreamCid, event: StreamEvent) { + if let Some(stream) = self.pending_by_stream.get_mut(&stream_cid) { + stream.add_event(event); + } else if matches!(event, StreamEvent::Undelivered(_)) { + let stream = StreamEvents::new(event); + self.pending_by_stream.insert(stream_cid, stream); } + } - let mut deliverable = VecDeque::new(); - let prev_map_cln = stream_map.prev_map.clone(); - for (prev, ev_cid) in prev_map_cln { - if stream_map.cid_map.contains_key(&prev) { - trace!( - ?prev, - cid=?ev_cid, - "Found event that depends on another event in memory" - ); - // we have it in memory so we need to order it related to others to insert correctly - // although it may not be possible if the chain just goes back to some unknown event - // once we find the first event that's deliverable, we can go back through and find the rest + /// Process every stream we know about that has undelivered events that should be "unlocked" now. This could be adjusted to commit things in batches, + /// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried. + async fn process_streams(&mut self, pool: &SqlitePool) -> Result<()> { + for (_stream_cid, stream_events) in self.pending_by_stream.iter_mut() { + if stream_events.skip_processing { continue; - } else { - let (exists, delivered) = CeramicOneEvent::deliverable_by_cid(pool, &prev).await?; - if delivered { - trace!(deliverable=?ev_cid, "Found delivered prev in database. Adding to ready list"); - deliverable.push_back(ev_cid); - stream_map.remove_by_event_cid(&ev_cid); - } else if exists { - trace!("Found undelivered prev in database. Building data to check for deliverable."); - // if it's not in memory, we need to read it from the db and parse it for the prev value to add it to our set - let data = CeramicOneEvent::value_by_cid(pool, &prev) - .await? - .ok_or_else(|| { - Error::new_app(anyhow!( - "Missing data for event that exists should be impossible" - )) - })?; - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(prev, &data).await?; - - if let Some(prev) = maybe_prev { - let event = DeliverableEvent::new(insertable_body.cid, prev, None); - trace!(cid=%event.cid, "Adding event discovered in database to stream pending list"); - stream_map.add_event(event); - } else { - warn!(event_cid=%insertable_body.cid,"Found undelivered event with no prev while processing pending. Should not happen."); - deliverable.push_back(insertable_body.cid); - stream_map.remove_by_event_cid(&ev_cid); - } - } else { - trace!( - ?ev_cid, - "Found event that depends on unknown event. Will check later." - ); - } } + stream_events.order_events(pool).await?; + self.deliverable + .extend(stream_events.new_deliverable.iter()); } - let mut newly_ready = deliverable.clone(); - while let Some(cid) = newly_ready.pop_front() { - if let Some(now_ready_ev) = stream_map.remove_by_prev_cid(&cid) { - deliverable.push_back(now_ready_ev); - newly_ready.push_back(now_ready_ev); - } - } - debug!(?deliverable, "deliverable events discovered"); - Ok(deliverable) + self.persist_ready_events(pool).await?; + // keep things that still have missing history but don't process them again until we get something new + self.pending_by_stream + .retain(|_, stream_events| stream_events.completed_processing()); + + debug!(remaining_streams=%self.pending_by_stream.len(), "Finished processing streams"); + trace!(stream_state=?self, "Finished processing streams"); + + Ok(()) } /// Process all undelivered events in the database. This is a blocking operation that could take a long time. @@ -365,112 +433,97 @@ impl OrderingState { &mut self, pool: &SqlitePool, max_iterations: usize, - ) -> Result<()> { - let mut cnt = 0; - let mut offset: usize = 0; - while cnt < max_iterations { - cnt += 1; - let (new, found) = self - .add_undelivered_batch(pool, offset, DELIVERABLE_EVENTS_BATCH_SIZE) - .await?; - if new == 0 { + batch_size: u32, + ) -> Result { + let mut iter_cnt = 0; + let mut event_cnt = 0; + let mut highwater = 0; + while iter_cnt < max_iterations { + iter_cnt += 1; + let (undelivered, new_hw) = + CeramicOneEvent::undelivered_with_values(pool, batch_size.into(), highwater) + .await?; + highwater = new_hw; + if undelivered.is_empty() { break; } else { // We can start processing and we'll follow the stream history if we have it. In that case, we either arrive // at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory. // In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them // or otherwise mark them ignored somehow. - self.process_events(pool, None).await?; - if new < DELIVERABLE_EVENTS_BATCH_SIZE { + let found_all = undelivered.len() < batch_size as usize; + event_cnt += self + .process_undelivered_events_batch(pool, undelivered) + .await?; + if found_all { break; } - offset = offset.saturating_add(found); - } - if cnt >= max_iterations { - warn!(batch_size=DELIVERABLE_EVENTS_BATCH_SIZE, iterations=%max_iterations, "Exceeded max iterations for finding undelivered events!"); - break; } } - if self.ready_events.is_empty() { - Ok(()) - } else { - self.persist_ready_events(pool).await?; - Ok(()) + if iter_cnt > max_iterations { + info!(%batch_size, iterations=%iter_cnt, "Exceeded max iterations for finding undelivered events!"); } + + Ok(event_cnt) } - /// Add a batch of events from the database to the pending list to be processed. - /// Returns the (#events new events found , #events returned by query) - async fn add_undelivered_batch( + async fn process_undelivered_events_batch( &mut self, pool: &SqlitePool, - offset: usize, - limit: usize, - ) -> Result<(usize, usize)> { - let undelivered = CeramicOneEvent::undelivered_with_values(pool, offset, limit).await?; - trace!(count=%undelivered.len(), "Found undelivered events to process"); - if undelivered.is_empty() { - return Ok((0, 0)); - } - let found = undelivered.len(); - let mut new = 0; - for (key, data) in undelivered { - let event_cid = key.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", key)) + event_data: Vec<(EventId, Vec)>, + ) -> Result { + trace!(cnt=%event_data.len(), "Processing undelivered events batch"); + let mut to_store_asap = Vec::new(); + let mut event_cnt = 0; + for (event_id, carfile) in event_data { + let event_cid = event_id.cid().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", event_id)) })?; - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(event_cid, &data).await?; - if let Some(prev) = maybe_prev { - let event = DeliverableEvent::new(insertable_body.cid, prev, None); - if self.track_pending(event).is_some() { - new += 1; + + let parsed_body = EventInsertableBody::try_from_carfile(event_cid, &carfile).await?; + + let event = match parsed_body.header() { + ceramic_store::EventHeader::Init { cid, .. } => { + warn!(%cid,"Found init event in database that wasn't previously marked as deliverable. Updating now..."); + to_store_asap.push(*cid); + StreamEvent::InitEvent(*cid) } - } else { - // safe to ignore in tests, shows up because when we mark init events as undelivered even though they don't have a prev - info!(event_cid=%insertable_body.cid, "Found undelivered event with no prev while processing undelivered. Should not happen. Likely means events were dropped before."); - self.ready_events.push_back(insertable_body.cid); - new += 1; // we treat this as new since it might unlock something else but it's not actually going in our queue is it's a bit odd - } + ceramic_store::EventHeader::Data { cid, prev, .. } + | ceramic_store::EventHeader::Time { cid, prev, .. } => { + StreamEvent::Undelivered(StreamEventMetadata::new(*cid, *prev)) + } + }; + + event_cnt += 1; + self.add_stream_event(parsed_body.header().stream_cid(), event) } - trace!(%new, %found, "Adding undelivered events to pending set"); - Ok((new, found)) - } - fn add_incoming_batch(&mut self, events: Vec) -> HashSet { - let mut updated_streams = HashSet::with_capacity(events.len()); - for event in events { - if let Some(updated_stream) = self.track_pending(event) { - updated_streams.insert(updated_stream); + if !to_store_asap.is_empty() { + info!("storing init events that were somehow missed previously"); + let mut tx = pool.begin_tx().await?; + for cid in to_store_asap { + CeramicOneEvent::mark_ready_to_deliver(&mut tx, &cid).await?; } + tx.commit().await?; } - updated_streams - } + self.process_streams(pool).await?; - /// returns the init event CID (stream CID) if this is a new event - fn track_pending(&mut self, event: DeliverableEvent) -> Option { - self.pending_by_stream - .entry(event.meta.init_cid) - .or_default() - .add_event(event) + Ok(event_cnt) } - /// Modify all the events that are ready to be marked as delivered. - /// We should improve the error handling and likely add some batching if the number of ready events is very high. /// We copy the events up front to avoid losing any events if the task is cancelled. async fn persist_ready_events(&mut self, pool: &SqlitePool) -> Result<()> { - if !self.ready_events.is_empty() { - let mut to_process = self.ready_events.clone(); // to avoid cancel loss - tracing::debug!(count=%self.ready_events.len(), "Marking events as ready to deliver"); + if !self.deliverable.is_empty() { + tracing::debug!(count=%self.deliverable.len(), "Marking events as ready to deliver"); let mut tx = pool.begin_tx().await?; - - // We process the ready events as a FIFO queue so they are marked delivered before events - // that were added after and depend on them. - while let Some(cid) = to_process.pop_front() { - CeramicOneEvent::mark_ready_to_deliver(&mut tx, &cid).await?; + // We process the ready events as a FIFO queue so they are marked delivered before events that were added after and depend on them. + // Could use `pop_front` but we want to make sure we commit and then clear everything at once. + for cid in &self.deliverable { + CeramicOneEvent::mark_ready_to_deliver(&mut tx, cid).await?; } tx.commit().await?; - self.ready_events.clear(); // safe to clear since we are past any await points and hold exclusive access + self.deliverable.clear(); } Ok(()) } @@ -479,295 +532,172 @@ impl OrderingState { #[cfg(test)] mod test { use ceramic_store::EventInsertable; - use multihash_codetable::{Code, MultihashDigest}; - use recon::ReconItem; - use crate::tests::{build_event, check_deliverable, random_block, TestEventInfo}; + use crate::tests::get_n_events; use super::*; - /// these events are init events so they should have been delivered - /// need to build with data events that have the prev stored already - async fn build_insertable_undelivered() -> EventInsertable { - let TestEventInfo { - event_id: id, car, .. - } = build_event().await; - let cid = id.cid().unwrap(); + async fn get_n_insertable_events(n: usize) -> Vec { + let mut res = Vec::with_capacity(n); + let events = get_n_events(n).await; + for event in events { + let event = EventInsertable::try_new(event.0.to_owned(), &event.1) + .await + .unwrap(); + res.push(event); + } + res + } - let (body, _meta) = CeramicEventService::parse_event_carfile(cid, &car) + #[tokio::test] + async fn test_undelivered_batch_empty() { + let _ = ceramic_metrics::init_local_tracing(); + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let processed = OrderingState::new() + .process_all_undelivered_events(&pool, 10, 100) .await .unwrap(); - assert!(!body.deliverable); - EventInsertable::try_new(id, body).unwrap() + assert_eq!(0, processed); } - fn assert_stream_map_elems(map: &StreamEvents, size: usize) { - assert_eq!(size, map.cid_map.len(), "{:?}", map); - assert_eq!(size, map.prev_map.len(), "{:?}", map); - } + async fn insert_10_with_9_undelivered(pool: &SqlitePool) { + let insertable = get_n_insertable_events(10).await; + let init = insertable.first().unwrap().to_owned(); + let undelivered = insertable.into_iter().skip(1).collect::>(); - fn build_linked_events( - number: usize, - stream_cid: Cid, - first_prev: Cid, - ) -> Vec { - let mut events = Vec::with_capacity(number); - - let first_cid = random_block().cid; - events.push(DeliverableEvent::new( - first_cid, - DeliverableMetadata { - init_cid: stream_cid, - prev: first_prev, - }, - None, - )); - - for i in 1..number { - let random = random_block(); - let ev = DeliverableEvent::new( - random.cid, - DeliverableMetadata { - init_cid: stream_cid, - prev: events[i - 1].cid, - }, - None, - ); - events.push(ev); - } + let new = CeramicOneEvent::insert_many(pool, &undelivered[..]) + .await + .unwrap(); - events + assert_eq!(9, new.inserted.len()); + assert_eq!(0, new.inserted.iter().filter(|e| e.deliverable).count()); + + let new = CeramicOneEvent::insert_many(pool, &[init]).await.unwrap(); + assert_eq!(1, new.inserted.len()); + assert_eq!(1, new.inserted.iter().filter(|e| e.deliverable).count()); } #[tokio::test] - async fn test_none_deliverable_without_first() { - // they events all point to the one before but A has never been delivered so we can't do anything - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - let missing = Cid::new_v1(0x71, Code::Sha2_256.digest(b"missing")); - let events = build_linked_events(4, stream_cid, missing); - let mut prev_map = StreamEvents::from_iter(events); - + async fn test_undelivered_batch_offset() { + let _ = ceramic_metrics::init_local_tracing(); let pool = SqlitePool::connect_in_memory().await.unwrap(); - let deliverable = super::OrderingState::discover_deliverable_events(&pool, &mut prev_map) + insert_10_with_9_undelivered(&pool).await; + let (_, events) = CeramicOneEvent::new_events_since_value(&pool, 0, 100) .await .unwrap(); + assert_eq!(1, events.len()); - assert_eq!(0, deliverable.len()); - } - - #[tokio::test] - async fn test_all_deliverable_one_stream() { - let _ = ceramic_metrics::init_local_tracing(); - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) + let processed = OrderingState::new() + .process_all_undelivered_events(&pool, 1, 5) .await .unwrap(); - recon::Store::insert(&store, &ReconItem::new(&one_id, &one_car)) + assert_eq!(5, processed); + let (_, events) = CeramicOneEvent::new_events_since_value(&pool, 0, 100) .await .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - - let events = build_linked_events(4, stream_cid, one_cid); - let expected = VecDeque::from_iter(events.iter().map(|ev| ev.cid)); - let mut prev_map = StreamEvents::from_iter(events); - - assert_stream_map_elems(&prev_map, 4); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) - .await - .unwrap(); - - assert_eq!(4, deliverable.len()); - assert_eq!(expected, deliverable); - assert_stream_map_elems(&prev_map, 0); - } - - #[tokio::test] - async fn test_some_deliverable_one_stream() { - let _ = ceramic_metrics::init_local_tracing(); - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) + assert_eq!(6, events.len()); + // the last 5 are processed and we have 10 delivered + let processed = OrderingState::new() + .process_all_undelivered_events(&pool, 1, 5) .await .unwrap(); - recon::Store::insert(&store, &ReconItem::new(&one_id, &one_car)) + assert_eq!(4, processed); + let (_, events) = CeramicOneEvent::new_events_since_value(&pool, 0, 100) .await .unwrap(); + assert_eq!(10, events.len()); - check_deliverable(&store.pool, &one_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - let missing = Cid::new_v1(0x71, Code::Sha2_256.digest(b"missing")); - - let mut deliverable_events = build_linked_events(6, stream_cid, one_cid); - let stuck_events = build_linked_events(8, stream_cid, missing); - let expected = VecDeque::from_iter(deliverable_events.iter().map(|ev| ev.cid)); - deliverable_events.extend(stuck_events); - let mut prev_map = StreamEvents::from_iter(deliverable_events); - - assert_stream_map_elems(&prev_map, 14); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) - .await - .unwrap(); - - assert_eq!(6, deliverable.len()); - assert_eq!(expected, deliverable); - assert_stream_map_elems(&prev_map, 8); - } - - #[tokio::test] - // expected to be per stream but all events are combined for the history required version currently so - // this needs to work as well - async fn test_all_deliverable_multiple_streams() { - let _ = ceramic_metrics::init_local_tracing(); - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let TestEventInfo { - event_id: two_id, - car: two_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let two_cid = two_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) + // nothing left + let processed = OrderingState::new() + .process_all_undelivered_events(&pool, 1, 100) .await .unwrap(); - recon::Store::insert_many( - &store, - &[ - ReconItem::new(&one_id, &one_car), - ReconItem::new(&two_id, &two_car), - ], - ) - .await - .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - check_deliverable(&store.pool, &two_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary-one")); - let stream_cid_2 = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary-two")); - - let mut events_a = build_linked_events(4, stream_cid, one_cid); - let mut events_b = build_linked_events(10, stream_cid_2, two_cid); - let expected_a = VecDeque::from_iter(events_a.iter().map(|ev| ev.cid)); - let expected_b = VecDeque::from_iter(events_b.iter().map(|ev| ev.cid)); - // we expect the events to be in the prev chain order, but they can be intervleaved across streams - // we reverse the items in the input to proov this (it's a hashmap internally so there is no order, but still) - events_a.reverse(); - events_b.reverse(); - events_a.extend(events_b); - assert_eq!(14, events_a.len()); - let mut prev_map = StreamEvents::from_iter(events_a); - - assert_stream_map_elems(&prev_map, 14); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) - .await - .unwrap(); - - assert_eq!(14, deliverable.len()); - assert_eq!(0, prev_map.cid_map.len(), "{:?}", prev_map); - assert_eq!(0, prev_map.prev_map.len(), "{:?}", prev_map); - - let mut split_a = VecDeque::new(); - let mut split_b = VecDeque::new(); - for cid in deliverable { - if expected_a.contains(&cid) { - split_a.push_back(cid); - } else if expected_b.contains(&cid) { - split_b.push_back(cid); - } else { - panic!("Unexpected CID in deliverable list: {:?}", cid); - } - } - assert_eq!(expected_a, split_a); - assert_eq!(expected_b, split_b); + assert_eq!(0, processed); } #[tokio::test] - async fn test_undelivered_batch_empty() { + async fn test_undelivered_batch_iterations_ends_early() { let _ = ceramic_metrics::init_local_tracing(); + let pool = SqlitePool::connect_in_memory().await.unwrap(); - let (new, found) = OrderingState::new() - .add_undelivered_batch(&pool, 0, 10) + // create 5 streams with 9 undelivered events each + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) + .await + .unwrap(); + assert_eq!(5, event.len()); + let mut state = OrderingState::new(); + state + .process_all_undelivered_events(&pool, 4, 10) .await .unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); + + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) + .await + .unwrap(); + assert_eq!(45, event.len()); } #[tokio::test] - async fn test_undelivered_batch_offset() { + async fn test_undelivered_batch_iterations_ends_when_all_found() { let _ = ceramic_metrics::init_local_tracing(); + let pool = SqlitePool::connect_in_memory().await.unwrap(); - let insertable = build_insertable_undelivered().await; + // create 5 streams with 9 undelivered events each + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; - let _new = CeramicOneEvent::insert_many(&pool, &[insertable]) + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) .await .unwrap(); - + assert_eq!(5, event.len()); let mut state = OrderingState::new(); - let (new, found) = state.add_undelivered_batch(&pool, 0, 10).await.unwrap(); - assert_eq!(1, found); - assert_eq!(1, new); - let (new, found) = state.add_undelivered_batch(&pool, 10, 10).await.unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); - state.persist_ready_events(&pool).await.unwrap(); - let (new, found) = state.add_undelivered_batch(&pool, 0, 10).await.unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); + state + .process_all_undelivered_events(&pool, 100_000_000, 5) + .await + .unwrap(); + + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) + .await + .unwrap(); + assert_eq!(50, event.len()); } #[tokio::test] - async fn test_undelivered_batch_all() { + async fn test_process_all_undelivered_one_batch() { let _ = ceramic_metrics::init_local_tracing(); let pool = SqlitePool::connect_in_memory().await.unwrap(); - let mut undelivered = Vec::with_capacity(10); - for _ in 0..10 { - let insertable = build_insertable_undelivered().await; - undelivered.push(insertable); - } + // create 5 streams with 9 undelivered events each + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; + insert_10_with_9_undelivered(&pool).await; - let (hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); - assert_eq!(0, hw); - assert!(event.is_empty()); - - let _new = CeramicOneEvent::insert_many(&pool, &undelivered[..]) + let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) .await .unwrap(); - + assert_eq!(5, event.len()); let mut state = OrderingState::new(); state - .process_all_undelivered_events(&pool, 1) + .process_all_undelivered_events(&pool, 1, 100) .await .unwrap(); let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) .await .unwrap(); - assert_eq!(event.len(), 10); + assert_eq!(50, event.len()); } } diff --git a/service/src/event/service.rs b/service/src/event/service.rs index 973efd4ed..83d5bf000 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -1,16 +1,10 @@ -use std::collections::{HashMap, HashSet}; - use ceramic_core::EventId; -use ceramic_event::unvalidated; use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool}; -use cid::Cid; -use ipld_core::ipld::Ipld; -use recon::{InsertResult, ReconItem}; use tracing::{trace, warn}; -use super::ordering_task::{ - DeliverableEvent, DeliverableMetadata, DeliverableTask, DeliveredEvent, OrderingState, - OrderingTask, StreamEvents, +use super::{ + order_events::OrderEvents, + ordering_task::{DeliverableTask, OrderingTask}, }; use crate::{Error, Result}; @@ -41,7 +35,7 @@ impl CeramicEventService { } /// Skip loading all undelivered events from the database on startup (for testing) - #[allow(dead_code)] // used in tests + #[cfg(test)] pub(crate) async fn new_without_undelivered(pool: SqlitePool) -> Result { CeramicOneEvent::init_delivered_order(&pool).await?; @@ -72,71 +66,16 @@ impl CeramicEventService { Ok(()) } - /// This function is used to parse the event from the carfile and return the insertable event and the previous cid pointer. - /// Probably belongs in the event crate. - pub(crate) async fn parse_event_carfile( - event_cid: cid::Cid, - carfile: &[u8], - ) -> Result<(EventInsertableBody, Option)> { - let insertable = EventInsertableBody::try_from_carfile(event_cid, carfile).await?; - let ev_block = insertable.block_for_cid(&insertable.cid)?; - - trace!(count=%insertable.blocks.len(), cid=%event_cid, "parsing event blocks"); - let event_ipld: unvalidated::RawEvent = - serde_ipld_dagcbor::from_slice(&ev_block.bytes).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("event block is not valid event format"), - ) - })?; - - let maybe_init_prev = match event_ipld { - unvalidated::RawEvent::Time(t) => Some((t.id(), t.prev())), - unvalidated::RawEvent::Signed(signed) => { - let link = signed.link().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("event should have a link")) - })?; - let link = insertable.block_for_cid(&link).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("prev CID missing from carfile"), - ) - })?; - let payload: unvalidated::Payload = - serde_ipld_dagcbor::from_slice(&link.bytes).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("Failed to follow event link"), - ) - })?; - - match payload { - unvalidated::Payload::Data(d) => Some((*d.id(), *d.prev())), - unvalidated::Payload::Init(_init) => None, - } - } - unvalidated::RawEvent::Unsigned(_init) => None, - }; - let meta = maybe_init_prev.map(|(cid, prev)| DeliverableMetadata { - init_cid: cid, - prev, - }); - Ok((insertable, meta)) - } - #[tracing::instrument(skip(self, items), level = tracing::Level::DEBUG, fields(items = items.len()))] /// This function is used to insert events from a carfile requiring that the history is local to the node. /// This is likely used in API contexts when a user is trying to insert events. Events discovered from /// peers can come in any order and we will discover the prev chain over time. Use /// `insert_events_from_carfiles_remote_history` for that case. - pub(crate) async fn insert_events_from_carfiles_local_history<'a>( + pub(crate) async fn insert_events_from_carfiles_local_api<'a>( &self, items: &[recon::ReconItem<'a, EventId>], - ) -> Result { - if items.is_empty() { - return Ok(InsertResult::default()); - } - - let ordering = - InsertEventOrdering::discover_deliverable_local_history(items, &self.pool).await?; - self.process_events(ordering).await + ) -> Result { + self.insert_events(items, true).await } #[tracing::instrument(skip(self, items), level = tracing::Level::DEBUG, fields(items = items.len()))] @@ -144,233 +83,104 @@ impl CeramicEventService { /// This is used in recon contexts when we are discovering events from peers in a recon but not ceramic order and /// don't have the complete order. To enforce that the history is local, e.g. in API contexts, use /// `insert_events_from_carfiles_local_history`. - pub(crate) async fn insert_events_from_carfiles_remote_history<'a>( + pub(crate) async fn insert_events_from_carfiles_recon<'a>( &self, items: &[recon::ReconItem<'a, EventId>], ) -> Result { + let res = self.insert_events(items, false).await?; + let mut keys = vec![false; items.len()]; + // we need to put things back in the right order that the recon trait expects, even though we don't really care about the result + for (i, item) in items.iter().enumerate() { + let new_key = res + .store_result + .inserted + .iter() + .find(|e| e.order_key == *item.key) + .map_or(false, |e| e.new_key); // TODO: should we error if it's not in this set + keys[i] = new_key; + } + Ok(recon::InsertResult::new(keys)) + } + + async fn insert_events<'a>( + &self, + items: &[recon::ReconItem<'a, EventId>], + history_required: bool, + ) -> Result { if items.is_empty() { return Ok(InsertResult::default()); } - let ordering = InsertEventOrdering::discover_deliverable_remote_history(items).await?; - self.process_events(ordering).await - } - - async fn process_events(&self, ordering: InsertEventOrdering) -> Result { - let res = CeramicOneEvent::insert_many(&self.pool, &ordering.insert_now[..]).await?; + let mut to_insert = Vec::with_capacity(items.len()); - for ev in ordering.background_task_deliverable { - trace!(cid=%ev.0, prev=%ev.1.prev, init=%ev.1.init_cid, "sending to delivery task"); - if let Err(e) = self - .delivery_task - .tx - .try_send(DeliverableEvent::new(ev.0, ev.1, None)) - { - match e { - tokio::sync::mpsc::error::TrySendError::Full(e) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(cid=%e.cid, meta=?e.meta, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } - } + for event in items { + let insertable = EventInsertable::try_new(event.key.to_owned(), event.value).await?; + to_insert.push(insertable); } - for new in ordering.notify_task_new { - if let Err(e) = self.delivery_task.tx_new.try_send(new) { - match e { - tokio::sync::mpsc::error::TrySendError::Full(ev) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(attempt=?ev, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Notify new task full"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } - } - } - Ok(res) - } -} -struct InsertEventOrdering { - insert_now: Vec, - notify_task_new: Vec, - background_task_deliverable: HashMap, -} + let ordered = OrderEvents::try_new(&self.pool, to_insert).await?; -impl InsertEventOrdering { - /// This will mark events as deliverable if their prev exists locally. Otherwise they will be - /// sorted into the bucket for the background task to process. - pub(crate) async fn discover_deliverable_remote_history<'a>( - items: &[ReconItem<'a, EventId>], - ) -> Result { - let mut result = Self { - insert_now: Vec::with_capacity(items.len()), - notify_task_new: Vec::with_capacity(items.len()), - background_task_deliverable: HashMap::new(), + let missing_history = ordered + .missing_history + .iter() + .map(|e| e.order_key.clone()) + .collect(); + + let to_insert = if history_required { + ordered.deliverable + } else { + ordered + .deliverable + .into_iter() + .chain(ordered.missing_history) + .collect() }; - for item in items { - let (insertable, maybe_prev) = Self::parse_item(item).await?; - if let Some(meta) = maybe_prev { - result.mark_event_deliverable_later(insertable, meta); - } else { - let init_cid = insertable.body.cid; - result.mark_event_deliverable_now(insertable, init_cid); + let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..]).await?; + // api writes shouldn't have any missed pieces that need ordering so we don't send those + if !history_required { + for ev in &res.inserted { + trace!(event=?ev, "sending delivered to ordering task"); + if let Err(e) = self.delivery_task.tx_inserted.try_send(ev.clone()) { + match e { + tokio::sync::mpsc::error::TrySendError::Full(e) => { + // we should only be doing this during recon, in which case we can rediscover events. + // the delivery task will start picking up these events once it's drained since they are stored in the db + warn!(event=?e, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); + } + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + warn!("Delivery task closed. shutting down"); + return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); + } + } + } } } - Ok(result) + Ok(InsertResult { + store_result: res, + missing_history, + }) } - /// This will error if any of the events doesn't have its prev on the local node (in the database/memory or in this batch). - pub(crate) async fn discover_deliverable_local_history<'a>( - items: &[ReconItem<'a, EventId>], + pub(crate) async fn load_by_cid( pool: &SqlitePool, - ) -> Result { - let mut result = Self { - insert_now: Vec::with_capacity(items.len()), - notify_task_new: Vec::with_capacity(items.len()), - background_task_deliverable: HashMap::new(), + cid: ceramic_core::Cid, + ) -> Result> { + let data = if let Some(ev) = CeramicOneEvent::value_by_cid(pool, &cid).await? { + ev + } else { + return Ok(None); }; - let mut insert_after_history_check: Vec<(DeliverableMetadata, EventInsertable)> = - Vec::with_capacity(items.len()); - - for item in items { - let (insertable, maybe_prev) = Self::parse_item(item).await?; - if let Some(meta) = maybe_prev { - insert_after_history_check.push((meta, insertable)); - } else { - let init_cid = insertable.body.cid; - result.mark_event_deliverable_now(insertable, init_cid); - } - } - - trace!(local_events_checking=%insert_after_history_check.len(), "checking local history"); - result - .verify_history_inline(pool, insert_after_history_check) - .await?; - Ok(result) - } - - async fn parse_item<'a>( - item: &ReconItem<'a, EventId>, - ) -> Result<(EventInsertable, Option)> { - let cid = item.key.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", item.key)) - })?; - // we want to end a conversation if any of the events aren't ceramic events and not store them - // this includes making sure the key matched the body cid - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(cid, item.value).await?; - let insertable = EventInsertable::try_new(item.key.to_owned(), insertable_body)?; - Ok((insertable, maybe_prev)) + Ok(Some( + ceramic_store::EventInsertableBody::try_from_carfile(cid, &data).await?, + )) } +} - fn mark_event_deliverable_later( - &mut self, - insertable: EventInsertable, - meta: DeliverableMetadata, - ) { - self.background_task_deliverable - .insert(insertable.body.cid, meta); - self.insert_now.push(insertable); - } - - fn mark_event_deliverable_now(&mut self, mut ev: EventInsertable, init_cid: Cid) { - ev.set_deliverable(true); - self.notify_task_new - .push(DeliveredEvent::new(ev.body.cid, init_cid)); - self.insert_now.push(ev); - } - - async fn verify_history_inline( - &mut self, - pool: &SqlitePool, - to_check: Vec<(DeliverableMetadata, EventInsertable)>, - ) -> Result<()> { - if to_check.is_empty() { - return Ok(()); - } - - let incoming_deliverable_cids: HashSet = self - .insert_now - .iter() - .filter_map(|e| { - if e.body.deliverable { - Some(e.body.cid) - } else { - None - } - }) - .collect(); - - // ideally, this map would be per stream, but we are just processing all of them together for now - let mut to_check_map = StreamEvents::new(); - - let required_to_find = to_check.len(); - let mut found_in_batch = 0; - let mut insert_if_greenlit = HashMap::with_capacity(required_to_find); - - for (meta, ev) in to_check { - if incoming_deliverable_cids.contains(&meta.prev) { - trace!(new=%ev.body.cid, prev=%meta.prev, "prev event being added in same batch"); - found_in_batch += 1; - self.mark_event_deliverable_now(ev, meta.init_cid); - } else { - trace!(new=%ev.body.cid, prev=%meta.prev, "will check for prev event in db"); - - let _new = to_check_map.add_event(DeliverableEvent::new( - ev.body.cid, - meta.to_owned(), - None, - )); - insert_if_greenlit.insert(ev.body.cid, (meta, ev)); - } - } - - if to_check_map.is_empty() { - return Ok(()); - } - - let deliverable = - OrderingState::discover_deliverable_events(pool, &mut to_check_map).await?; - if deliverable.len() != required_to_find - found_in_batch { - let missing = insert_if_greenlit - .values() - .filter_map(|(_, ev)| { - if !deliverable.contains(&ev.body.cid) { - Some(ev.body.cid) - } else { - None - } - }) - .collect::>(); - - tracing::info!(?missing, ?deliverable, "Missing required `prev` event CIDs"); - - Err(Error::new_invalid_arg(anyhow::anyhow!( - "Missing required `prev` event CIDs: {:?}", - missing - ))) - } else { - // we need to use the deliverable list's order because we might depend on something in the same batch, and that function will - // ensure we have a queue in the correct order. So we follow the order and use our insert_if_greenlit map to get the details. - for cid in deliverable { - if let Some((meta, insertable)) = insert_if_greenlit.remove(&cid) { - self.mark_event_deliverable_now(insertable, meta.init_cid); - } else { - warn!(%cid, "Didn't find event to insert in memory when it was expected"); - } - } - Ok(()) - } - } +#[derive(Debug, PartialEq, Eq, Default)] +pub struct InsertResult { + pub(crate) store_result: ceramic_store::InsertResult, + pub(crate) missing_history: Vec, } diff --git a/service/src/event/store.rs b/service/src/event/store.rs index e61506bcf..51ae5ac22 100644 --- a/service/src/event/store.rs +++ b/service/src/event/store.rs @@ -5,7 +5,7 @@ use ceramic_core::EventId; use ceramic_store::{CeramicOneBlock, CeramicOneEvent}; use cid::Cid; use iroh_bitswap::Block; -use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a}; +use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; use crate::event::CeramicEventService; @@ -16,7 +16,7 @@ impl recon::Store for CeramicEventService { async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { let res = self - .insert_events_from_carfiles_remote_history(&[item.to_owned()]) + .insert_events_from_carfiles_recon(&[item.to_owned()]) .await?; Ok(res.keys.first().copied().unwrap_or(false)) @@ -25,10 +25,11 @@ impl recon::Store for CeramicEventService { /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> ReconResult { - let res = self - .insert_events_from_carfiles_remote_history(items) - .await?; + async fn insert_many( + &self, + items: &[ReconItem<'_, Self::Key>], + ) -> ReconResult { + let res = self.insert_events_from_carfiles_recon(items).await?; Ok(res) } @@ -112,9 +113,14 @@ impl ceramic_api::EventStore for CeramicEventService { .map(|(key, val)| ReconItem::new(key, val.as_slice())) .collect::>(); let res = self - .insert_events_from_carfiles_local_history(&items[..]) + .insert_events_from_carfiles_local_api(&items[..]) .await?; - Ok(res.keys) + Ok(res + .store_result + .inserted + .iter() + .map(|r| r.new_key) + .collect()) } async fn range_with_values( diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index 644d3081b..25a29e424 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -114,16 +114,6 @@ fn gen_rand_bytes() -> [u8; SIZE] { arr } -pub(crate) fn random_block() -> Block { - let mut data = [0u8; 1024]; - rand::Rng::fill(&mut ::rand::thread_rng(), &mut data); - let hash = Code::Sha2_256.digest(&data); - Block { - cid: Cid::new_v1(0x00, hash), - data: data.to_vec().into(), - } -} - pub(crate) async fn check_deliverable( pool: &ceramic_store::SqlitePool, cid: &Cid, @@ -172,28 +162,13 @@ async fn data_event( signed::Event::from_payload(unvalidated::Payload::Data(commit), signer.to_owned()).unwrap() } -async fn get_events_with_model(model: &StreamId) -> [(EventId, Vec); 3] { +// returns init + N events +async fn get_init_plus_n_events_with_model( + model: &StreamId, + number: usize, +) -> Vec<(EventId, Vec)> { let signer = Box::new(signer().await); - let data = gen_rand_bytes::<50>(); - let data2 = gen_rand_bytes::<50>(); - - let data = ipld!({ - "radius": 1, - "red": 2, - "green": 3, - "blue": 4, - "raw": data.as_slice(), - }); - - let data2 = ipld!({ - "radius": 1, - "red": 2, - "green": 3, - "blue": 4, - "raw": data2.as_slice(), - }); - let init = init_event(model, &signer).await; let init_cid = init.envelope_cid(); let (event_id, car) = ( @@ -202,33 +177,45 @@ async fn get_events_with_model(model: &StreamId) -> [(EventId, Vec); 3] { ); let init_cid = event_id.cid().unwrap(); - let data = data_event(init_cid, init_cid, data, &signer).await; - let cid = data.envelope_cid(); - let (data_id, data_car) = ( - build_event_id(&data.envelope_cid(), &init_cid, model), - data.encode_car().await.unwrap(), - ); - let data2 = data_event(init_cid, cid, data2, &signer).await; - let (data_id_2, data_car_2) = ( - build_event_id(&data2.envelope_cid(), &init_cid, model), - data2.encode_car().await.unwrap(), - ); - [ - (event_id, car), - (data_id, data_car), - (data_id_2, data_car_2), - ] + let mut events = Vec::with_capacity(number); + events.push((event_id, car)); + let mut prev = init_cid; + for _ in 0..number { + let data = gen_rand_bytes::<50>(); + let data = ipld!({ + "radius": 1, + "red": 2, + "green": 3, + "blue": 4, + "raw": data.as_slice(), + }); + + let data = data_event(init_cid, prev, data, &signer).await; + let (data_id, data_car) = ( + build_event_id(&data.envelope_cid(), &init_cid, model), + data.encode_car().await.unwrap(), + ); + prev = data_id.cid().unwrap(); + events.push((data_id, data_car)); + } + events } -pub(crate) async fn get_events_return_model() -> (StreamId, [(EventId, Vec); 3]) { +pub(crate) async fn get_events_return_model() -> (StreamId, Vec<(EventId, Vec)>) { let model = StreamId::document(random_cid()); - let events = get_events_with_model(&model).await; + let events = get_init_plus_n_events_with_model(&model, 3).await; (model, events) } // builds init -> data -> data that are a stream (will be a different stream each call) -pub(crate) async fn get_events() -> [(EventId, Vec); 3] { +pub(crate) async fn get_events() -> Vec<(EventId, Vec)> { let model = StreamId::document(random_cid()); - get_events_with_model(&model).await + get_init_plus_n_events_with_model(&model, 3).await +} + +// Get N events with the same model (init + N-1 data events) +pub(crate) async fn get_n_events(number: usize) -> Vec<(EventId, Vec)> { + let model = &StreamId::document(random_cid()); + get_init_plus_n_events_with_model(model, number - 1).await } diff --git a/service/src/tests/ordering.rs b/service/src/tests/ordering.rs index ddcbc7f23..8ed97dc53 100644 --- a/service/src/tests/ordering.rs +++ b/service/src/tests/ordering.rs @@ -1,5 +1,9 @@ +use std::collections::HashMap; + use ceramic_api::EventStore; use ceramic_core::EventId; +use rand::seq::SliceRandom; +use rand::thread_rng; use recon::ReconItem; use crate::{ @@ -12,14 +16,16 @@ async fn setup_service() -> CeramicEventService { let conn = ceramic_store::SqlitePool::connect_in_memory() .await .unwrap(); + CeramicEventService::new_without_undelivered(conn) .await .unwrap() } async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { + tracing::trace!("inserted event: {}", item.key.cid().unwrap()); let new = store - .insert_events_from_carfiles_remote_history(&[item]) + .insert_events_from_carfiles_recon(&[item]) .await .unwrap(); let new = new.keys.into_iter().filter(|k| *k).count(); @@ -28,10 +34,10 @@ async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: Recon async fn add_and_assert_new_local_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { let new = store - .insert_events_from_carfiles_local_history(&[item]) + .insert_events_from_carfiles_local_api(&[item]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(1, new); } @@ -45,30 +51,17 @@ async fn test_init_event_delivered() { } #[tokio::test] -async fn test_missing_prev_error_history_required() { +async fn test_missing_prev_history_required_not_inserted() { let store = setup_service().await; let events = get_events().await; let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ReconItem::new(&data.0, &data.1)]) - .await; - match new { - Ok(v) => panic!("should have errored: {:?}", v), - Err(e) => { - match e { - crate::Error::InvalidArgument { error } => { - // yes fragile, but we want to make sure it's not a parsing error or something unexpected - assert!(error - .to_string() - .contains("Missing required `prev` event CIDs")); - } - e => { - panic!("unexpected error: {:?}", e); - } - }; - } - }; + .insert_events_from_carfiles_local_api(&[ReconItem::new(&data.0, &data.1)]) + .await + .unwrap(); + assert!(new.store_result.inserted.is_empty()); + assert_eq!(1, new.missing_history.len()); } #[tokio::test] @@ -100,13 +93,13 @@ async fn test_prev_in_same_write_history_required() { let init: &(EventId, Vec) = &events[0]; let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ + .insert_events_from_carfiles_local_api(&[ ReconItem::new(&data.0, &data.1), ReconItem::new(&init.0, &init.1), ]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(2, new); check_deliverable(&store.pool, &init.0.cid().unwrap(), true).await; check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await; @@ -150,7 +143,7 @@ async fn test_missing_prev_pending_recon() { check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await; // This happens out of band, so give it a moment to make sure everything is updated - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX) @@ -178,9 +171,8 @@ async fn missing_prev_pending_recon_should_deliver_without_stream_update() { // now we add the second event, it should quickly become deliverable let data = &events[1]; add_and_assert_new_recon_event(&store, ReconItem::new(&data.0, &data.1)).await; - check_deliverable(&store.pool, &data.0.cid().unwrap(), false).await; // This happens out of band, so give it a moment to make sure everything is updated - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX) @@ -234,7 +226,7 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat // this _could_ be deliverable immediately if we checked but for now we just send to the other task, // so `check_deliverable` could return true or false depending on timing (but probably false). // as this is an implementation detail and we'd prefer true, we just use HW ordering to make sure it's been delivered - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX) .await @@ -266,3 +258,118 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat ]; assert_eq!(expected, delivered); } + +async fn validate_all_delivered(store: &CeramicEventService, expected_delivered: usize) { + loop { + let (_, delivered) = store + .events_since_highwater_mark(0, i64::MAX) + .await + .unwrap(); + let total = delivered.len(); + if total < expected_delivered { + tracing::trace!( + "found {} delivered, waiting for {}", + total, + expected_delivered + ); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } else { + break; + } + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn recon_lots_of_streams() { + // adds `per_stream` events to `num_streams` streams, mixes up the event order for each stream, inserts half + // the events for each stream before mixing up the stream order and inserting the rest + // took like a minute on my machine to run 100 streams with 1000 events each, mostly inserting :( since ordering only gets 5 seconds + let per_stream = 100; + let num_streams = 10; + let store = setup_service().await; + let expected = per_stream * num_streams; + let mut streams = Vec::new(); + let mut all_cids = Vec::new(); + let mut expected_stream_order = Vec::new(); + let mut cid_to_stream_map = HashMap::new(); + + for i in 0..num_streams { + let mut events = crate::tests::get_n_events(per_stream).await; + let cids = events + .iter() + .map(|e| e.0.cid().unwrap()) + .collect::>(); + cids.iter().for_each(|cid| { + cid_to_stream_map.insert(*cid, i); + }); + expected_stream_order.push(cids.clone()); + all_cids.extend(cids); + assert_eq!(per_stream, events.len()); + events.shuffle(&mut thread_rng()); + streams.push(events); + } + let mut total_added = 0; + + assert_eq!(expected, all_cids.len()); + + tracing::debug!(?all_cids, "starting test"); + for stream in streams.iter_mut() { + while let Some(event) = stream.pop() { + if stream.len() > per_stream / 2 { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + } else { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + break; + } + } + } + streams.shuffle(&mut thread_rng()); + for stream in streams.iter_mut() { + while let Some(event) = stream.pop() { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + } + } + // first just make sure they were all inserted (not delivered yet) + for (i, cid) in all_cids.iter().enumerate() { + let (exists, _delivered) = + ceramic_store::CeramicOneEvent::deliverable_by_cid(&store.pool, cid) + .await + .unwrap(); + assert!(exists, "idx: {}. missing cid: {}", i, cid); + } + + assert_eq!(expected, total_added); + tokio::time::timeout( + std::time::Duration::from_secs(5), + validate_all_delivered(&store, expected), + ) + .await + .unwrap(); + + let (_, delivered) = store + .events_since_highwater_mark(0, i64::MAX) + .await + .unwrap(); + + assert_eq!(expected, delivered.len()); + let mut streams_at_the_end = Vec::new(); + for _ in 0..num_streams { + streams_at_the_end.push(Vec::with_capacity(per_stream)); + } + for cid in delivered { + let stream = cid_to_stream_map.get(&cid).unwrap(); + let stream = streams_at_the_end.get_mut(*stream).unwrap(); + stream.push(cid); + } + // now we check that all the events are deliverable + for cid in all_cids.iter() { + check_deliverable(&store.pool, cid, true).await; + } + // and make sure the events were delivered for each stream streams in the same order as they were at the start + for (i, stream) in expected_stream_order.iter().enumerate() { + assert_eq!(*stream, streams_at_the_end[i]); + } +} diff --git a/store/Cargo.toml b/store/Cargo.toml index 250021320..deb815be4 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -13,19 +13,22 @@ anyhow.workspace = true async-trait.workspace = true ceramic-api.workspace = true ceramic-core.workspace = true +ceramic-event.workspace = true ceramic-metrics.workspace = true cid.workspace = true futures.workspace = true hex.workspace = true +ipld-core.workspace = true iroh-bitswap.workspace = true iroh-car.workspace = true itertools = "0.12.0" -multihash.workspace = true multihash-codetable.workspace = true +multihash.workspace = true prometheus-client.workspace = true -thiserror.workspace = true recon.workspace = true +serde_ipld_dagcbor.workspace = true sqlx.workspace = true +thiserror.workspace = true tokio.workspace = true [dev-dependencies] diff --git a/store/src/event_metadata.rs b/store/src/event_metadata.rs new file mode 100644 index 000000000..28822a576 --- /dev/null +++ b/store/src/event_metadata.rs @@ -0,0 +1,50 @@ +use cid::Cid; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[allow(missing_docs)] +pub enum EventType { + Init, + Data, + Time, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event header wrapper for use in the store crate. +/// TODO: replace this with something from the event crate +#[allow(missing_docs)] +pub enum EventHeader { + Init { + cid: Cid, + }, + Data { + cid: Cid, + stream_cid: Cid, + prev: Cid, + }, + Time { + cid: Cid, + stream_cid: Cid, + prev: Cid, + }, +} + +impl EventHeader { + /// Returns the event type of the event header + pub fn event_type(&self) -> EventType { + match self { + EventHeader::Init { .. } => EventType::Init, + EventHeader::Data { .. } => EventType::Data, + EventHeader::Time { .. } => EventType::Time, + } + } + + /// Returns the stream CID of the event + pub fn stream_cid(&self) -> Cid { + match self { + EventHeader::Init { cid, .. } => *cid, + EventHeader::Data { stream_cid, .. } | EventHeader::Time { stream_cid, .. } => { + *stream_cid + } + } + } +} diff --git a/store/src/lib.rs b/store/src/lib.rs index 217c8751d..49f588310 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -10,8 +10,12 @@ pub use error::Error; pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use sql::{ entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent, - CeramicOneEventBlock, CeramicOneInterest, Migrations, SqlitePool, SqliteRootStore, - SqliteTransaction, + CeramicOneEventBlock, CeramicOneInterest, InsertResult, InsertedEvent, Migrations, SqlitePool, + SqliteRootStore, SqliteTransaction, }; pub(crate) type Result = std::result::Result; + +// TODO: replace from event crate +mod event_metadata; +pub use event_metadata::{EventHeader, EventType}; diff --git a/store/src/metrics.rs b/store/src/metrics.rs index 855d4e960..be8442e72 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -13,7 +13,7 @@ use prometheus_client::{ }, registry::Registry, }; -use recon::{AssociativeHash, HashCount, InsertResult, ReconItem, Result as ReconResult}; +use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult}; use tokio::time::Instant; #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -253,7 +253,7 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { + async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 2e9d055de..01e22fb9f 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::anyhow; use ceramic_core::{event_id::InvalidEventId, EventId}; use cid::Cid; -use recon::{AssociativeHash, HashCount, InsertResult, Key, Result as ReconResult, Sha256a}; +use recon::{AssociativeHash, HashCount, Key, Result as ReconResult, Sha256a}; use crate::{ sql::{ @@ -18,11 +18,56 @@ use crate::{ query::{EventQuery, ReconQuery, ReconType, SqlBackend}, sqlite::SqliteTransaction, }, - CeramicOneBlock, CeramicOneEventBlock, Error, Result, SqlitePool, + CeramicOneBlock, CeramicOneEventBlock, Error, EventHeader, Result, SqlitePool, }; static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0); +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event that was inserted into the database +pub struct InsertedEvent { + /// The event order key that was inserted + pub order_key: EventId, + /// The event header metadata including the stream CID and type of event + pub header: EventHeader, + /// Whether the event was marked as deliverable + pub deliverable: bool, + /// Whether the event was a new key + pub new_key: bool, +} + +impl InsertedEvent { + /// Create a new delivered event + fn new(order_key: EventId, new_key: bool, header: EventHeader, deliverable: bool) -> Self { + Self { + order_key, + header, + deliverable, + new_key, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +/// The result of inserting events into the database +pub struct InsertResult { + /// The events that were marked as delivered in this batch + pub inserted: Vec, +} + +impl InsertResult { + /// The count of new keys added in this batch + pub fn count_new_keys(&self) -> usize { + self.inserted.iter().filter(|e| e.new_key).count() + } +} + +impl InsertResult { + fn new(inserted: Vec) -> Self { + Self { inserted } + } +} + /// Access to the ceramic event table and related logic pub struct CeramicOneEvent {} @@ -110,49 +155,80 @@ impl CeramicOneEvent { Ok(()) } - /// Insert many events into the database. This is the main function to use when storing events. + /// Insert many events into the database. The events and their blocks and metadata are inserted in a single + /// transaction and either all successful or rolled back. + /// + /// IMPORTANT: + /// It is the caller's responsibility to order events marked deliverable correctly. + /// That is, events will be processed in the order they are given so earlier events are given a lower global ordering + /// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events + /// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers. pub async fn insert_many( pool: &SqlitePool, to_add: &[EventInsertable], ) -> Result { - let mut new_keys = vec![false; to_add.len()]; + let mut inserted = Vec::with_capacity(to_add.len()); let mut tx = pool.begin_tx().await.map_err(Error::from)?; - for (idx, item) in to_add.iter().enumerate() { - let new_key = - Self::insert_event(&mut tx, &item.order_key, item.body.deliverable).await?; + for item in to_add { + let new_key = Self::insert_event(&mut tx, &item.order_key, item.deliverable()).await?; + inserted.push(InsertedEvent::new( + item.order_key.clone(), + new_key, + item.body.header().to_owned(), + item.deliverable(), + )); if new_key { - for block in item.body.blocks.iter() { + for block in item.body.blocks().iter() { CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; CeramicOneEventBlock::insert(&mut tx, block).await?; } } - if !new_key && item.body.deliverable { - Self::mark_ready_to_deliver(&mut tx, &item.body.cid).await?; + // the item already existed so we didn't mark it as deliverable on insert + if !new_key && item.deliverable() { + Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?; } - new_keys[idx] = new_key; } tx.commit().await.map_err(Error::from)?; - let res = InsertResult::new(new_keys); + let res = InsertResult::new(inserted); Ok(res) } - /// Find events that haven't been delivered to the client and may be ready + /// Find events that haven't been delivered to the client and may be ready. + /// Returns the events and their values, and the highwater mark of the last event. + /// The highwater mark can be used on the next call to get the next batch of events and will be 0 when done. pub async fn undelivered_with_values( pool: &SqlitePool, - offset: usize, - limit: usize, - ) -> Result)>> { - let all_blocks: Vec = + limit: i64, + highwater_mark: i64, + ) -> Result<(Vec<(EventId, Vec)>, i64)> { + struct UndeliveredEventBlockRow { + block: ReconEventBlockRaw, + row_id: i64, + } + + use sqlx::Row as _; + + impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for UndeliveredEventBlockRow { + fn from_row(row: &sqlx::sqlite::SqliteRow) -> std::result::Result { + let row_id = row.try_get("rowid")?; + let block = ReconEventBlockRaw::from_row(row)?; + Ok(Self { block, row_id }) + } + } + + let all_blocks: Vec = sqlx::query_as(EventQuery::undelivered_with_values()) - .bind(limit as i64) - .bind(offset as i64) + .bind(limit) + .bind(highwater_mark) .fetch_all(pool.reader()) .await?; - let values = ReconEventBlockRaw::into_carfiles(all_blocks).await?; - Ok(values) + let max_highwater = all_blocks.iter().map(|row| row.row_id).max().unwrap_or(0); // if there's nothing in the list we just return 0 + let blocks = all_blocks.into_iter().map(|b| b.block).collect(); + let values = ReconEventBlockRaw::into_carfiles(blocks).await?; + Ok((values, max_highwater)) } /// Calculate the hash of a range of events diff --git a/store/src/sql/access/mod.rs b/store/src/sql/access/mod.rs index 912a462ac..84a9131ff 100644 --- a/store/src/sql/access/mod.rs +++ b/store/src/sql/access/mod.rs @@ -4,6 +4,6 @@ mod event_block; mod interest; pub use block::CeramicOneBlock; -pub use event::CeramicOneEvent; +pub use event::{CeramicOneEvent, InsertResult, InsertedEvent}; pub use event_block::CeramicOneEventBlock; pub use interest::CeramicOneInterest; diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 005e45b1c..1b4008060 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -1,13 +1,15 @@ use anyhow::anyhow; use ceramic_core::EventId; +use ceramic_event::unvalidated; use cid::Cid; +use ipld_core::ipld::Ipld; use iroh_car::{CarHeader, CarReader, CarWriter}; use std::collections::BTreeSet; use crate::{ sql::entities::{BlockRow, EventBlockRaw}, - Error, Result, + Error, EventHeader, Result, }; pub async fn rebuild_car(blocks: Vec) -> Result>> { @@ -50,19 +52,40 @@ pub struct EventInsertable { } impl EventInsertable { - /// Try to build the EventInsertable struct. Will error if the key and body don't match. - pub fn try_new(order_key: EventId, body: EventInsertableBody) -> Result { - if order_key.cid().as_ref() != Some(&body.cid) { - return Err(Error::new_app(anyhow!( - "Event ID and body CID do not match: {:?} != {:?}", - order_key.cid(), - body.cid - )))?; - } + /// Try to build the EventInsertable struct from a carfile. + pub async fn try_new(order_key: EventId, body: &[u8]) -> Result { + let cid = order_key.cid().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key)) + })?; + let body = EventInsertableBody::try_from_carfile(cid, body).await?; Ok(Self { order_key, body }) } - /// change the deliverable status of the event + /// Get the CID of the event + pub fn cid(&self) -> Cid { + self.body.cid + } + + /// Get the stream CID + pub fn stream_cid(&self) -> Cid { + self.body.header.stream_cid() + } + + /// Get the previous event CID if any + pub fn prev(&self) -> Option { + match &self.body.header { + EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => Some(*prev), + EventHeader::Init { .. } => None, + } + } + + /// Whether this event is deliverable currently + pub fn deliverable(&self) -> bool { + self.body.deliverable + } + + /// Mark the event as deliverable. + /// This will be used when inserting the event to make sure the field is updated accordingly. pub fn set_deliverable(&mut self, deliverable: bool) { self.body.deliverable = deliverable; } @@ -72,24 +95,42 @@ impl EventInsertable { /// The type we use to insert events into the database pub struct EventInsertableBody { /// The event CID i.e. the root CID from the car file - pub cid: Cid, - /// Whether this event is deliverable to clients or is waiting for more data - pub deliverable: bool, + cid: Cid, + /// The event header data about the event type and stream + header: EventHeader, + /// Whether the event is deliverable i.e. it's prev has been delivered and the chain is continuous to an init event + deliverable: bool, /// The blocks of the event // could use a map but there aren't that many blocks per event (right?) - pub blocks: Vec, + blocks: Vec, } impl EventInsertableBody { /// Create a new EventInsertRaw struct. Deliverable is set to false by default. - pub fn new(cid: Cid, blocks: Vec) -> Self { + pub fn new( + cid: Cid, + header: EventHeader, + blocks: Vec, + deliverable: bool, + ) -> Self { Self { cid, - deliverable: false, + header, blocks, + deliverable, } } + /// Get the CID of the event + pub fn cid(&self) -> Cid { + self.cid + } + + /// Get the blocks of the event + pub fn blocks(&self) -> &Vec { + &self.blocks + } + /// Find a block from the carfile for a given CID if it's included pub fn block_for_cid_opt(&self, cid: &Cid) -> Option<&EventBlockRaw> { self.blocks @@ -103,6 +144,12 @@ impl EventInsertableBody { .ok_or_else(|| Error::new_app(anyhow!("Event data is missing data for CID {}", cid))) } + /// Returns the event header + /// TODO: should come from event crate types + pub fn header(&self) -> &EventHeader { + &self.header + } + /// Builds a new EventInsertRaw from a CAR file. Will error if the CID in the EventID doesn't match the /// first root of the carfile. pub async fn try_from_carfile(event_cid: Cid, val: &[u8]) -> Result { @@ -136,6 +183,67 @@ impl EventInsertableBody { blocks.push(ebr); idx += 1; } - Ok(Self::new(event_cid, blocks)) + + Self::build_from_blocks(event_cid, blocks) + } + + /// Takes EventCid and Blocks to finish parsing the header/ipld data + fn build_from_blocks(event_cid: Cid, blocks: Vec) -> Result { + let ev_block = blocks + .iter() + .find(|b| b.cid() == event_cid) + .ok_or_else(|| { + Error::new_app(anyhow!( + "event block not found in car file: cid={}", + event_cid + )) + })?; + let event_ipld: unvalidated::RawEvent = + serde_ipld_dagcbor::from_slice(&ev_block.bytes).map_err(|e| { + Error::new_invalid_arg( + anyhow::anyhow!(e).context("event block is not valid event format"), + ) + })?; + + let cid = event_cid; // purely for convenience writing out the match + let (deliverable, header) = match event_ipld { + unvalidated::RawEvent::Time(t) => ( + false, + EventHeader::Time { + cid, + stream_cid: t.id(), + prev: t.prev(), + }, + ), + unvalidated::RawEvent::Signed(signed) => { + let link = signed.link().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("event should have a link")) + })?; + let link = blocks.iter().find(|b| b.cid() == link).ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("prev CID missing from carfile")) + })?; + let payload: unvalidated::Payload = + serde_ipld_dagcbor::from_slice(&link.bytes).map_err(|e| { + Error::new_invalid_arg( + anyhow::anyhow!(e).context("Failed to follow event link"), + ) + })?; + + match payload { + unvalidated::Payload::Data(d) => ( + false, + EventHeader::Data { + cid, + stream_cid: *d.id(), + prev: *d.prev(), + }, + ), + unvalidated::Payload::Init(_init) => (true, EventHeader::Init { cid }), + } + } + unvalidated::RawEvent::Unsigned(_init) => (true, EventHeader::Init { cid }), + }; + + Ok(Self::new(cid, header, blocks, deliverable)) } } diff --git a/store/src/sql/mod.rs b/store/src/sql/mod.rs index 884b04b9b..437de753a 100644 --- a/store/src/sql/mod.rs +++ b/store/src/sql/mod.rs @@ -6,7 +6,10 @@ mod sqlite; #[cfg(test)] mod test; -pub use access::{CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest}; +pub use access::{ + CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, InsertResult, + InsertedEvent, +}; pub use root::SqliteRootStore; pub use sqlite::{SqlitePool, SqliteTransaction}; diff --git a/store/src/sql/query.rs b/store/src/sql/query.rs index fc652dfcf..2969a3244 100644 --- a/store/src/sql/query.rs +++ b/store/src/sql/query.rs @@ -81,20 +81,21 @@ impl EventQuery { /// Find event CIDs that have not yet been delivered to the client /// Useful after a restart, or if the task managing delivery has availability to try old events + /// Requires binding two parameters: + /// $1: limit (i64) + /// $2: rowid (i64) pub fn undelivered_with_values() -> &'static str { r#"SELECT - key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes + key.order_key, key.event_cid, eb.codec, eb.root, eb.idx, b.multihash, b.bytes, key.rowid FROM ( SELECT - e.cid as event_cid, e.order_key + e.cid as event_cid, e.order_key, e.rowid FROM ceramic_one_event e WHERE EXISTS (SELECT 1 FROM ceramic_one_event_block where event_cid = e.cid) - AND e.delivered IS NULL + AND e.delivered IS NULL and e.rowid > $2 LIMIT $1 - OFFSET - $2 ) key JOIN ceramic_one_event_block eb ON key.event_cid = eb.event_cid diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index d0e55c777..ad7c1be06 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -30,11 +30,7 @@ fn random_event(cid: &str) -> EventInsertable { let cid = order_key.cid().unwrap(); EventInsertable { order_key, - body: EventInsertableBody { - cid, - deliverable: false, - blocks: vec![], - }, + body: EventInsertableBody::new(cid, crate::EventHeader::Init { cid }, vec![], true), } } @@ -48,8 +44,7 @@ async fn hash_range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let hash = CeramicOneEvent::hash_range( &pool, @@ -70,8 +65,7 @@ async fn range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let ids = CeramicOneEvent::range( &pool, @@ -128,10 +122,11 @@ async fn range_query() { #[tokio::test] async fn undelivered_with_values() { let pool = SqlitePool::connect_in_memory().await.unwrap(); - let res = CeramicOneEvent::undelivered_with_values(&pool, 0, 10000) + let (res, hw) = CeramicOneEvent::undelivered_with_values(&pool, 0, 10000) .await .unwrap(); assert_eq!(res.len(), 0); + assert_eq!(hw, 0); } #[tokio::test]