From 100185b5295153e5290bacd5632c616fb8121db9 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 13 Jun 2024 18:39:59 -0600 Subject: [PATCH] feat: use the stream and event_metadata tables when discovering events - populated when discovering events - rewrote IOD using it (been running `recon_lots_of_streams` in a loop and it keeps passing) - created crate specific InsertResult structs (api, recon, service, store). If any api batch write fails because of something in the service (i.e. no prev), it won't fail other writes in the batch --- api/src/lib.rs | 2 +- api/src/server.rs | 56 +- api/src/tests.rs | 55 +- service/Cargo.toml | 3 +- service/src/event/mod.rs | 1 + service/src/event/order_events.rs | 102 +++ service/src/event/ordering_task.rs | 851 +++++++---------------- service/src/event/service.rs | 366 +++------- service/src/event/store.rs | 23 +- service/src/tests/mod.rs | 89 +-- service/src/tests/ordering.rs | 135 +++- store/Cargo.toml | 7 +- store/src/lib.rs | 4 +- store/src/metrics.rs | 11 +- store/src/sql/access/event.rs | 120 +++- store/src/sql/access/mod.rs | 4 +- store/src/sql/access/stream.rs | 151 ++++ store/src/sql/entities/event.rs | 152 +++- store/src/sql/entities/event_block.rs | 4 + store/src/sql/entities/event_metadata.rs | 58 ++ store/src/sql/entities/hash.rs | 2 +- store/src/sql/entities/mod.rs | 8 +- store/src/sql/entities/stream.rs | 68 ++ store/src/sql/entities/utils.rs | 4 +- store/src/sql/mod.rs | 5 +- store/src/sql/query.rs | 8 +- store/src/sql/test.rs | 19 +- 27 files changed, 1258 insertions(+), 1050 deletions(-) create mode 100644 service/src/event/order_events.rs create mode 100644 store/src/sql/access/stream.rs create mode 100644 store/src/sql/entities/event_metadata.rs create mode 100644 store/src/sql/entities/stream.rs diff --git a/api/src/lib.rs b/api/src/lib.rs index 7cb8cca41..b306ac7c1 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -3,7 +3,7 @@ mod server; pub use resume_token::ResumeToken; -pub use server::{EventStore, InterestStore, Server}; +pub use server::{EventInsertResult, EventStore, InterestStore, Server}; #[cfg(test)] mod tests; diff --git a/api/src/server.rs b/api/src/server.rs index 2ce7d297a..5289f51de 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -6,6 +6,7 @@ mod event; +use std::collections::HashMap; use std::time::Duration; use std::{future::Future, ops::Range}; use std::{marker::PhantomData, ops::RangeBounds}; @@ -162,11 +163,28 @@ impl InterestStore for Arc { } } +#[derive(Debug, Clone)] +pub struct EventInsertResult { + id: EventId, + // if set, the reason this event couldn't be inserted + failed: Option, +} + +impl EventInsertResult { + pub fn new(id: EventId, failed: Option) -> Self { + Self { id, failed } + } + + pub fn success(&self) -> bool { + self.failed.is_none() + } +} + /// Trait for accessing persistent storage of Events #[async_trait] pub trait EventStore: Send + Sync { /// Returns (new_key, new_value) where true if was newly inserted, false if it already existed. - async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; + async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; async fn range_with_values( &self, range: Range, @@ -199,7 +217,7 @@ pub trait EventStore: Send + Sync { #[async_trait::async_trait] impl EventStore for Arc { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result> { + async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result> { self.as_ref().insert_many(items).await } @@ -241,7 +259,7 @@ impl EventStore for Arc { struct EventInsert { id: EventId, data: Vec, - tx: tokio::sync::oneshot::Sender>, + tx: tokio::sync::oneshot::Sender>, } struct InsertTask { @@ -325,25 +343,35 @@ where if events.is_empty() { return; } - let mut oneshots = Vec::with_capacity(events.len()); + let mut oneshots = HashMap::with_capacity(events.len()); let mut items = Vec::with_capacity(events.len()); events.drain(..).for_each(|req: EventInsert| { - oneshots.push(req.tx); + oneshots.insert(req.id.to_bytes(), req.tx); items.push((req.id, req.data)); }); tracing::trace!("calling insert many with {} items.", items.len()); match event_store.insert_many(&items).await { Ok(results) => { tracing::debug!("insert many returned {} results.", results.len()); - for (tx, result) in oneshots.into_iter().zip(results.into_iter()) { - if let Err(e) = tx.send(Ok(result)) { - tracing::warn!("failed to send success response to api listener: {:?}", e); + for result in results { + if let Some(tx) = oneshots.remove(&result.id.to_bytes()) { + if let Err(e) = tx.send(Ok(result)) { + tracing::warn!( + "failed to send success response to api listener: {:?}", + e + ); + } + } else { + tracing::warn!( + "lost channel to respond to API listener for event ID: {:?}", + result.id + ); } } } Err(e) => { tracing::warn!("failed to insert events: {e}"); - for tx in oneshots.into_iter() { + for tx in oneshots.into_values() { if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}"))) { tracing::warn!("failed to send failed response to api listener: {:?}", e); } @@ -495,7 +523,7 @@ where .await? .map_err(|_| ErrorResponse::new("Database service not available".to_owned()))?; - let _new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx) + let new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx) .await .map_err(|_| { ErrorResponse::new("Timeout waiting for database service response".to_owned()) @@ -503,7 +531,13 @@ where .map_err(|_| ErrorResponse::new("No response. Database service crashed".to_owned()))? .map_err(|e| ErrorResponse::new(format!("Failed to insert event: {e}")))?; - Ok(EventsPostResponse::Success) + if let Some(failed) = new.failed { + Ok(EventsPostResponse::BadRequest(BadRequestResponse::new( + failed, + ))) + } else { + Ok(EventsPostResponse::Success) + } } pub async fn post_interests( diff --git a/api/src/tests.rs b/api/src/tests.rs index b7837951b..0b1de6e33 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -5,6 +5,7 @@ use std::{ops::Range, str::FromStr, sync::Arc}; use crate::server::decode_multibase_data; use crate::server::BuildResponse; use crate::server::Server; +use crate::EventInsertResult; use crate::{EventStore, InterestStore}; use anyhow::Result; @@ -121,7 +122,7 @@ mock! { pub EventStoreTest {} #[async_trait] impl EventStore for EventStoreTest { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; + async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; async fn range_with_values( &self, range: Range, @@ -198,7 +199,12 @@ async fn create_event() { .expect_insert_many() .with(predicate::eq(args)) .times(1) - .returning(|_| Ok(vec![true])); + .returning(|input| { + Ok(input + .iter() + .map(|(id, _)| EventInsertResult::new(id.clone(), None)) + .collect()) + }); let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .events_post( @@ -211,6 +217,51 @@ async fn create_event() { .unwrap(); assert!(matches!(resp, EventsPostResponse::Success)); } + +#[tokio::test] +async fn create_event_fails() { + let peer_id = PeerId::random(); + let network = Network::Mainnet; + let expected_event_id = EventId::try_from(hex::decode(DATA_EVENT_ID).unwrap()).unwrap(); + + // Remove whitespace from event CAR file + let event_data = DATA_EVENT_CAR + .chars() + .filter(|c| !c.is_whitespace()) + .collect::(); + let mock_interest = MockAccessInterestStoreTest::new(); + let mut mock_event_store = MockEventStoreTest::new(); + mock_get_init_event(&mut mock_event_store); + let args = vec![( + expected_event_id.clone(), + decode_multibase_data(&event_data).unwrap(), + )]; + + mock_event_store + .expect_insert_many() + .with(predicate::eq(args)) + .times(1) + .returning(|input| { + Ok(input + .iter() + .map(|(id, _)| { + EventInsertResult::new(id.clone(), Some("Event is missing prev".to_string())) + }) + .collect()) + }); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let resp = server + .events_post( + models::EventData { + data: event_data.to_string(), + }, + &Context, + ) + .await + .unwrap(); + assert!(matches!(resp, EventsPostResponse::BadRequest(_))); +} + #[tokio::test] #[traced_test] async fn register_interest_sort_value() { diff --git a/service/Cargo.toml b/service/Cargo.toml index bb65f422f..b68033ba9 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -18,7 +18,6 @@ ceramic-store.workspace = true cid.workspace = true hex.workspace = true ipld-core.workspace = true -serde_ipld_dagcbor.workspace = true iroh-bitswap.workspace = true multihash-codetable.workspace = true recon.workspace = true @@ -32,8 +31,8 @@ ipld-core.workspace = true multibase.workspace = true paste = "1.0" rand.workspace = true -serde.workspace = true serde_ipld_dagcbor.workspace = true +serde.workspace = true test-log.workspace = true tmpdir.workspace = true tokio.workspace = true diff --git a/service/src/event/mod.rs b/service/src/event/mod.rs index f7dce573a..69e12b26c 100644 --- a/service/src/event/mod.rs +++ b/service/src/event/mod.rs @@ -1,3 +1,4 @@ +mod order_events; mod ordering_task; mod service; mod store; diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs new file mode 100644 index 000000000..ae4c70e72 --- /dev/null +++ b/service/src/event/order_events.rs @@ -0,0 +1,102 @@ +use std::collections::HashSet; + +use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; +use cid::Cid; + +use crate::Result; + +pub(crate) struct OrderEvents { + pub(crate) deliverable: Vec, + pub(crate) missing_history: Vec, +} + +impl OrderEvents { + /// Groups the events into lists those with a delivered prev and those without. This can be used to return an error if the event is required to have history. + /// The events will be marked as deliverable so that they can be passed directly to the store to be persisted. + pub async fn try_new( + pool: &SqlitePool, + mut candidate_events: Vec, + ) -> Result { + // move all the init events to the front so we make sure to add them first and get the deliverable order correct + let new_cids: HashSet = HashSet::from_iter(candidate_events.iter().map(|e| e.cid())); + let mut deliverable = Vec::with_capacity(candidate_events.len()); + candidate_events.retain(|e| { + if e.deliverable() { + deliverable.push(e.clone()); + false + } else { + true + } + }); + if candidate_events.is_empty() { + return Ok(OrderEvents { + deliverable, + missing_history: Vec::new(), + }); + } + + let mut prevs_in_memory = Vec::with_capacity(candidate_events.len()); + let mut missing_history = Vec::with_capacity(candidate_events.len()); + + while let Some(mut event) = candidate_events.pop() { + match &event.prev() { + None => { + unreachable!("Init events should have been filtered out since they're always deliverable"); + } + Some(prev) => { + if new_cids.contains(prev) { + prevs_in_memory.push(event.clone()); + continue; + } else { + let (_exists, prev_deliverable) = + CeramicOneEvent::deliverable_by_cid(pool, prev).await?; + if prev_deliverable { + event.set_deliverable(true); + deliverable.push(event); + } else { + // technically, we may have the "rosetta stone" event in memory that could unlock this chain, if we loaded everything and recursed, + // but the immediate prev is not in this set and has not been delivered to the client yet, so they shouldn't have known how to + // construct this event so we'll consider this missing history. This can be used to return an error if the event is required to have history. + missing_history.push(event); + } + } + } + } + } + + // We add the events to the deliverable list until nothing changes. + // It should be a small set and it will shrink each loop, so continually looping is acceptable. + loop { + let mut made_changes = false; + while let Some(mut event) = prevs_in_memory.pop() { + match &event.prev() { + None => { + unreachable!( + "Init events should have been filtered out of the in memory set" + ); + } + Some(prev) => { + // a hashset would be better loopkup but we're not going to have that many events so hashing + // for a handful of lookups and then convert back to a vec probably isn't worth it. + if deliverable.iter().any(|e| e.cid() == *prev) { + event.set_deliverable(true); + deliverable.push(event); + made_changes = true; + } else { + prevs_in_memory.push(event); + } + } + } + } + if !made_changes { + missing_history.extend(prevs_in_memory); + break; + } + } + + Ok(OrderEvents { + deliverable, + missing_history, + }) + } +} diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index a1e13f204..6398e04d1 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -1,73 +1,21 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use anyhow::anyhow; -use ceramic_store::{CeramicOneEvent, SqlitePool}; +use ceramic_store::{ + CeramicOneEvent, CeramicOneStream, InsertedEvent, SqlitePool, StreamEventMetadata, +}; use cid::Cid; use tracing::{debug, error, info, trace, warn}; -use crate::{CeramicEventService, Error, Result}; +use crate::{Error, Result}; -/// How many events to select at once to see if they've become deliverable when we have downtime -/// Used at startup and occassionally in case we ever dropped something -/// We keep the number small for now as we may need to traverse many prevs for each one of these and load them into memory. -const DELIVERABLE_EVENTS_BATCH_SIZE: usize = 1000; -/// How many batches of undelivered events are we willing to process on start up? -/// To avoid an infinite loop. It's going to take a long time to process `DELIVERABLE_EVENTS_BATCH_SIZE * MAX_ITERATIONS` events -const MAX_ITERATIONS: usize = 100_000_000; - -/// How often should we try to process all undelivered events in case we missed something -const CHECK_ALL_INTERVAL_SECONDS: u64 = 60 * 10; // 10 minutes - -type InitCid = cid::Cid; -type PrevCid = cid::Cid; -type EventCid = cid::Cid; - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DeliveredEvent { - pub(crate) cid: Cid, - pub(crate) init_cid: InitCid, -} - -impl DeliveredEvent { - pub fn new(cid: Cid, init_cid: InitCid) -> Self { - Self { cid, init_cid } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub(crate) struct DeliverableMetadata { - pub(crate) init_cid: InitCid, - pub(crate) prev: PrevCid, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DeliverableEvent { - pub(crate) cid: EventCid, - pub(crate) meta: DeliverableMetadata, - attempts: usize, - last_attempt: std::time::Instant, - started: std::time::Instant, - expires: Option, -} - -impl DeliverableEvent { - pub fn new(cid: Cid, meta: DeliverableMetadata, expires: Option) -> Self { - Self { - cid, - meta, - attempts: 0, - last_attempt: std::time::Instant::now(), - started: std::time::Instant::now(), - expires, - } - } -} +type StreamCid = Cid; +type EventCid = Cid; +type PrevCid = Cid; #[derive(Debug)] pub struct DeliverableTask { pub(crate) _handle: tokio::task::JoinHandle<()>, - pub(crate) tx: tokio::sync::mpsc::Sender, - pub(crate) tx_new: tokio::sync::mpsc::Sender, + pub(crate) tx_delivered: tokio::sync::mpsc::Sender, } #[derive(Debug)] @@ -75,30 +23,27 @@ pub struct OrderingTask {} impl OrderingTask { pub async fn run(pool: SqlitePool, q_depth: usize, load_delivered: bool) -> DeliverableTask { - let (tx, rx) = tokio::sync::mpsc::channel::(q_depth); - let (tx_new, rx_new) = tokio::sync::mpsc::channel::(q_depth); + let (tx_delivered, rx_delivered) = tokio::sync::mpsc::channel::(q_depth); let handle = - tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx, rx_new).await }); + tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx_delivered).await }); DeliverableTask { _handle: handle, - tx, - tx_new, + tx_delivered, } } async fn run_loop( pool: SqlitePool, load_undelivered: bool, - mut rx: tokio::sync::mpsc::Receiver, - mut rx_new: tokio::sync::mpsc::Receiver, + mut rx_delivered: tokio::sync::mpsc::Receiver, ) { // before starting, make sure we've updated any events in the database we missed let mut state = OrderingState::new(); if load_undelivered && state - .process_all_undelivered_events(&pool, MAX_ITERATIONS) + .process_all_undelivered_events(&pool) .await .map_err(Self::log_error) .is_err() @@ -106,53 +51,38 @@ impl OrderingTask { return; } - let mut last_processed = std::time::Instant::now(); loop { - let mut modified: Option> = None; - let mut need_prev_buf = Vec::with_capacity(100); - let mut newly_added_buf = Vec::with_capacity(100); - - tokio::select! { - incoming = rx.recv_many(&mut need_prev_buf, 100) => { - if incoming > 0 { - modified = Some(state.add_incoming_batch(need_prev_buf)); - } - } - new = rx_new.recv_many(&mut newly_added_buf, 100) => { - if new > 0 { - modified = Some(newly_added_buf.into_iter().map(|ev| ev.init_cid).collect::>()); - } + let mut delivered_events = Vec::with_capacity(100); + + if rx_delivered.recv_many(&mut delivered_events, 100).await > 0 { + debug!(?delivered_events, "new delivered events!"); + for event in delivered_events { + state.add_stream(event.stream_cid); } - else => { - info!(stream_count=%state.pending_by_stream.len(), "Server dropped the ordering task. Processing once more before exiting..."); - let _ = state - .process_events(&pool, None) - .await - .map_err(Self::log_error); + + if state + .process_streams(&pool) + .await + .map_err(Self::log_error) + .is_err() + { return; } - }; - // Given the math on OrderingState and the generally low number of updates to streams, we are going - // to ignore pruning until there's more of an indication that it's necessary. Just log some stats. - if last_processed.elapsed().as_secs() > CHECK_ALL_INTERVAL_SECONDS { - let stream_count = state.pending_by_stream.len(); - if stream_count > 1000 { - info!(%stream_count, "Over 1000 pending streams without recent updates."); - } else { - debug!(%stream_count, "Fewer than 1000 streams pending without recent updates."); - } - } + } else if rx_delivered.is_closed() { + debug!( + "Server dropped the delivered events channel. Attempting to processing streams in memory once more before exiting." + ); - if modified.is_some() - && state - .process_events(&pool, modified) + if state + .process_streams(&pool) .await .map_err(Self::log_error) .is_err() - { - return; + { + return; + } + break; } - last_processed = std::time::Instant::now(); } } @@ -175,599 +105,294 @@ impl OrderingTask { } } -#[derive(Debug)] -/// Rough size estimate: -/// pending_by_stream: 96 * stream_cnt + 540 * event_cnt -/// ready_events: 96 * ready_event_cnt -/// so for stream_cnt = 1000, event_cnt = 2, ready_event_cnt = 1000 -/// we get about 1 MB of memory used. -pub struct OrderingState { - /// Map of undelivered events by init CID (i.e. the stream CID). - pending_by_stream: HashMap, - /// Queue of events that can be marked ready to deliver. - /// Can be added as long as their prev is stored or in this list ahead of them. - ready_events: VecDeque, -} - #[derive(Debug, Clone, Default)] /// ~540 bytes per event in this struct pub(crate) struct StreamEvents { + /// Map of `event.prev` to `event.cid` for quick lookup of the next event in the stream. prev_map: HashMap, - cid_map: HashMap, + /// Map of `event.cid` to `metadata` for quick lookup of the event metadata. + cid_map: HashMap, + /// Events that can be delivered FIFO order for the stream + deliverable: VecDeque, + /// The total number of events in the stream when we started + total_events: usize, } -impl FromIterator for StreamEvents { - fn from_iter>(iter: T) -> Self { - let mut stream = Self::new(); - for item in iter { - stream.add_event(item); +impl StreamEvents { + fn new(_cid: StreamCid, events: I) -> Self + where + I: ExactSizeIterator, + { + let total_events = events.len(); + let mut new = Self { + prev_map: HashMap::with_capacity(total_events), + cid_map: HashMap::with_capacity(total_events), + deliverable: VecDeque::with_capacity(total_events), + total_events, + }; + + for event in events { + new.add_event(event); } - stream + new } -} -impl StreamEvents { - pub fn new() -> Self { - Self::default() + async fn new_from_db(stream: StreamCid, pool: &SqlitePool) -> Result { + let stream_events = CeramicOneStream::load_stream_events(pool, stream).await?; + trace!(?stream_events, "Loaded stream events for ordering"); + Ok(Self::new(stream, stream_events.into_iter())) } - /// returns Some(Stream Init CID) if this is a new event, else None. - pub fn add_event(&mut self, event: DeliverableEvent) -> Option { - let res = if self.prev_map.insert(event.meta.prev, event.cid).is_none() { - Some(event.meta.init_cid) - } else { - None - }; - self.cid_map.insert(event.cid, event); - res + fn is_empty(&self) -> bool { + // We only care if we have things that are pending to be delivered + self.prev_map.is_empty() } - pub fn is_empty(&self) -> bool { - // these should always match - self.prev_map.is_empty() && self.cid_map.is_empty() + /// returns true if this is a new event. + fn add_event(&mut self, event: StreamEventMetadata) -> bool { + if let Some(prev) = event.prev { + self.prev_map.insert(prev, event.cid); + } + self.cid_map.insert(event.cid, event).is_none() } - fn remove_by_event_cid(&mut self, cid: &Cid) -> Option { - if let Some(cid) = self.cid_map.remove(cid) { - self.prev_map.remove(&cid.meta.prev); - Some(cid) + fn remove_by_event_cid(&mut self, cid: &Cid) -> Option { + if let Some(ev) = self.cid_map.remove(cid) { + if let Some(prev) = ev.prev { + self.prev_map.remove(&prev); + } + Some(ev) } else { None } } - fn remove_by_prev_cid(&mut self, cid: &Cid) -> Option { - if let Some(cid) = self.prev_map.remove(cid) { + fn remove_by_prev_cid(&mut self, prev: &Cid) -> Option { + if let Some(cid) = self.prev_map.remove(prev) { self.cid_map.remove(&cid); Some(cid) } else { None } } -} -impl OrderingState { - pub fn new() -> Self { - Self { - pending_by_stream: HashMap::new(), - ready_events: VecDeque::new(), - } + fn delivered_events(&self) -> impl Iterator { + self.cid_map + .iter() + .filter_map(|(cid, event)| if event.deliverable { Some(cid) } else { None }) } - /// This will review all the events for any streams known to have undelivered events and see if any of them are now deliverable. - /// If `streams_to_process` is None, all streams will be processed, otherwise only the streams in the set will be processed. - /// Processing all streams could take a long time and not necessarily do anything productive (if we're missing a key event, we're still blocked). - /// However, passing a value for `streams_to_process` when we know something has changed is likely to have positive results and be much faster. - pub(crate) async fn process_events( - &mut self, - pool: &SqlitePool, - streams_to_process: Option>, - ) -> Result<()> { - self.persist_ready_events(pool).await?; - for (cid, stream_events) in self.pending_by_stream.iter_mut() { - if streams_to_process - .as_ref() - .map_or(false, |to_do| !to_do.contains(cid)) - { - continue; - } - let deliverable = Self::discover_deliverable_events(pool, stream_events).await?; - if !deliverable.is_empty() { - self.ready_events.extend(deliverable) - } - } - if !self.ready_events.is_empty() { - self.persist_ready_events(pool).await?; + async fn order_events(pool: &SqlitePool, stream: StreamCid) -> Result { + let mut to_process = Self::new_from_db(stream, pool).await?; + if to_process.delivered_events().count() == 0 { + return Ok(to_process); } - Ok(()) - } + let stream_event_count = to_process.cid_map.len(); + let delivered_cids = to_process.delivered_events().cloned().collect::>(); + let mut start_with = VecDeque::with_capacity(stream_event_count - delivered_cids.len()); - /// Removes deliverable events from the `prev_map` and returns them. This means prev is already delivered or in the - /// list to be marked as delivered. The input is expected to be a list of CIDs for a given stream that are waiting - /// to be processed. It will still work if it's intermixed for multiple streams, but it's not the most efficient way to use it. - /// The returned CIDs in the VeqDeque are for events that are expected to be updated FIFO i.e. vec.pop_front() - /// - /// This breaks with multi-prev as we expect a single prev for each event. The input map is expected to contain the - /// (prev <- event) relationship (that is, the value is the event that depends on the key). - pub(crate) async fn discover_deliverable_events( - pool: &SqlitePool, - stream_map: &mut StreamEvents, - ) -> Result> { - if stream_map.is_empty() { - return Ok(VecDeque::new()); + for cid in delivered_cids { + if let Some(next_event) = to_process.remove_by_prev_cid(&cid) { + to_process.remove_by_event_cid(&next_event); + start_with.push_back(next_event); + } } - let mut deliverable = VecDeque::new(); - let prev_map_cln = stream_map.prev_map.clone(); - for (prev, ev_cid) in prev_map_cln { - if stream_map.cid_map.contains_key(&prev) { - trace!( - ?prev, - cid=?ev_cid, - "Found event that depends on another event in memory" - ); - // we have it in memory so we need to order it related to others to insert correctly - // although it may not be possible if the chain just goes back to some unknown event - // once we find the first event that's deliverable, we can go back through and find the rest - continue; - } else { - let (exists, delivered) = CeramicOneEvent::delivered_by_cid(pool, &prev).await?; - if delivered { - trace!(deliverable=?ev_cid, "Found delivered prev in database. Adding to ready list"); - deliverable.push_back(ev_cid); - stream_map.remove_by_event_cid(&ev_cid); - } else if exists { - trace!("Found undelivered prev in database. Building data to check for deliverable."); - // if it's not in memory, we need to read it from the db and parse it for the prev value to add it to our set - let data = CeramicOneEvent::value_by_cid(pool, &prev) - .await? - .ok_or_else(|| { - Error::new_app(anyhow!( - "Missing data for event that exists should be impossible" - )) - })?; - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(prev, &data).await?; - - if let Some(prev) = maybe_prev { - let event = DeliverableEvent::new(insertable_body.cid, prev, None); - trace!(cid=%event.cid, "Adding event discovered in database to stream pending list"); - stream_map.add_event(event); - } else { - warn!(event_cid=%insertable_body.cid,"Found undelivered event with no prev while processing pending. Should not happen."); - deliverable.push_back(insertable_body.cid); - stream_map.remove_by_event_cid(&ev_cid); - } - } else { - trace!( - ?ev_cid, - "Found event that depends on unknown event. Will check later." - ); - } + while let Some(new_tip) = start_with.pop_front() { + to_process.deliverable.push_back(new_tip); + let mut tip = new_tip; + while let Some(next_event) = to_process.remove_by_prev_cid(&tip) { + to_process.deliverable.push_back(next_event); + tip = next_event; } } - let mut newly_ready = deliverable.clone(); - while let Some(cid) = newly_ready.pop_front() { - if let Some(now_ready_ev) = stream_map.remove_by_prev_cid(&cid) { - deliverable.push_back(now_ready_ev); - newly_ready.push_back(now_ready_ev); - } + Ok(to_process) + } +} + +#[derive(Debug)] +pub struct OrderingState { + streams: HashSet, + deliverable: VecDeque, +} + +impl OrderingState { + fn new() -> Self { + Self { + streams: HashSet::new(), + deliverable: VecDeque::new(), } - debug!(?deliverable, "deliverable events discovered"); + } - Ok(deliverable) + /// Add a stream to the list of streams to process. This implies it has undelivered events and is worthwhile to attempt. + fn add_stream(&mut self, stream: StreamCid) -> bool { + self.streams.insert(stream) } - /// Process all undelivered events in the database. This is a blocking operation that could take a long time. - /// It is intended to be run at startup but could be used on an interval or after some errors to recover. - pub(crate) async fn process_all_undelivered_events( - &mut self, - pool: &SqlitePool, - max_iterations: usize, - ) -> Result<()> { - let mut cnt = 0; - let mut offset: usize = 0; - while cnt < max_iterations { - cnt += 1; - let (new, found) = self - .add_undelivered_batch(pool, offset, DELIVERABLE_EVENTS_BATCH_SIZE) - .await?; - if new == 0 { - break; - } else { - // We can start processing and we'll follow the stream history if we have it. In that case, we either arrive - // at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory. - // In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them - // or otherwise mark them ignored somehow. - self.process_events(pool, None).await?; - if new < DELIVERABLE_EVENTS_BATCH_SIZE { - break; + /// Process every stream we know about that has undelivered events that should be "unlocked" now. This could be adjusted to process commit things in batches, + /// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried. Events that are + /// delivered multiple times will not change the original delivered state. + async fn process_streams(&mut self, pool: &SqlitePool) -> Result<()> { + let mut stream_cnt = HashMap::new(); + // we need to handle the fact that new writes can come in without knowing they're deliverable because we're still in the process of updating them. + // so when we finish the loop and we had streams we couldn't complete, we try again to see if they had new writes. if nothing changed we exit. + // this could certainly be optimized. we could only query the count of events, or we could load undelivered events and keep track of our + // total state, as anything forking that was deliverable would arrive on the incoming channel. for now, streams are short and this is probably sufficient. + loop { + let mut processed_streams = Vec::with_capacity(self.streams.len()); + for stream in &self.streams { + let ordered_events = StreamEvents::order_events(pool, *stream).await?; + stream_cnt.insert(*stream, ordered_events.total_events); + if ordered_events.is_empty() { + processed_streams.push(*stream); } - offset = offset.saturating_add(found); - } - if cnt >= max_iterations { - warn!(batch_size=DELIVERABLE_EVENTS_BATCH_SIZE, iterations=%max_iterations, "Exceeded max iterations for finding undelivered events!"); - break; + self.deliverable.extend(ordered_events.deliverable); } - } - if self.ready_events.is_empty() { - Ok(()) - } else { - self.persist_ready_events(pool).await?; - Ok(()) - } - } - /// Add a batch of events from the database to the pending list to be processed. - /// Returns the (#events new events found , #events returned by query) - async fn add_undelivered_batch( - &mut self, - pool: &SqlitePool, - offset: usize, - limit: usize, - ) -> Result<(usize, usize)> { - let undelivered = CeramicOneEvent::undelivered_with_values(pool, offset, limit).await?; - trace!(count=%undelivered.len(), "Found undelivered events to process"); - if undelivered.is_empty() { - return Ok((0, 0)); - } - let found = undelivered.len(); - let mut new = 0; - for (key, data) in undelivered { - let event_cid = key.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", key)) - })?; - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(event_cid, &data).await?; - if let Some(prev) = maybe_prev { - let event = DeliverableEvent::new(insertable_body.cid, prev, None); - if self.track_pending(event).is_some() { - new += 1; + let found_events = !self.deliverable.is_empty(); + if found_events { + tracing::debug!(count=%self.deliverable.len(), "Marking events as ready to deliver"); + let mut tx = pool.begin_tx().await?; + // We process the ready events as a FIFO queue so they are marked delivered before events that were added after and depend on them. + // Could use `pop_front` but we want to make sure we commit and then clear everything at once. + for cid in &self.deliverable { + CeramicOneEvent::mark_ready_to_deliver(&mut tx, cid).await?; } - } else { - // safe to ignore in tests, shows up because when we mark init events as undelivered even though they don't have a prev - info!(event_cid=%insertable_body.cid, "Found undelivered event with no prev while processing undelivered. Should not happen. Likely means events were dropped before."); - self.ready_events.push_back(insertable_body.cid); - new += 1; // we treat this as new since it might unlock something else but it's not actually going in our queue is it's a bit odd + tx.commit().await?; + self.deliverable.clear(); } - } - trace!(%new, %found, "Adding undelivered events to pending set"); - Ok((new, found)) - } - - fn add_incoming_batch(&mut self, events: Vec) -> HashSet { - let mut updated_streams = HashSet::with_capacity(events.len()); - for event in events { - if let Some(updated_stream) = self.track_pending(event) { - updated_streams.insert(updated_stream); + self.streams + .retain(|stream| !processed_streams.contains(stream)); + // not strictly necessary as the next loop will not do anything but we can avoid allocating + if self.streams.is_empty() || !found_events { + break; } + + debug!(stream_state=?self, ?processed_streams, "Finished processing streams loop with more to do"); } - updated_streams - } + debug!(stream_state=?self, "Finished processing streams"); - /// returns the init event CID (stream CID) if this is a new event - fn track_pending(&mut self, event: DeliverableEvent) -> Option { - self.pending_by_stream - .entry(event.meta.init_cid) - .or_default() - .add_event(event) + Ok(()) } - /// Modify all the events that are ready to be marked as delivered. - - /// We should improve the error handling and likely add some batching if the number of ready events is very high. - /// We copy the events up front to avoid losing any events if the task is cancelled. - async fn persist_ready_events(&mut self, pool: &SqlitePool) -> Result<()> { - if !self.ready_events.is_empty() { - let mut to_process = self.ready_events.clone(); // to avoid cancel loss - tracing::debug!(count=%self.ready_events.len(), "Marking events as ready to deliver"); - let mut tx = pool.begin_tx().await?; - - // We process the ready events as a FIFO queue so they are marked delivered before events - // that were added after and depend on them. - while let Some(cid) = to_process.pop_front() { - CeramicOneEvent::mark_ready_to_deliver(&mut tx, &cid).await?; + /// Processes all streams with undelivered events returning the total number of streams identified. This is a recursive function that will + /// continue to process streams until it finds no more streams with undelivered events. This is useful for bootstrapping the ordering task + /// in case we missed/dropped something in the past. Anything we can't process now requires discovering from a peer, so there isn't really + /// any advantage to keeping it in memory and trying again later. + async fn process_all_undelivered_events(&mut self, pool: &SqlitePool) -> Result { + tracing::trace!("Processing all undelivered events for ordering"); + + let mut streams_discovered = 0; + let mut resume_at = Some(0); + while let Some(highwater_mark) = resume_at { + let (cids, hw_mark) = + CeramicOneStream::load_stream_cids_with_undelivered_events(pool, highwater_mark) + .await?; + resume_at = hw_mark; + trace!(count=cids.len(), stream_cids=?cids, "Discovered streams with undelivered events"); + for cid in cids { + if self.add_stream(cid) { + streams_discovered += 1; + } } - tx.commit().await?; - self.ready_events.clear(); // safe to clear since we are past any await points and hold exclusive access + self.process_streams(pool).await?; } - Ok(()) + + Ok(streams_discovered) } } #[cfg(test)] mod test { use ceramic_store::EventInsertable; - use multihash_codetable::{Code, MultihashDigest}; - use recon::ReconItem; - use crate::tests::{build_event, check_deliverable, random_block, TestEventInfo}; + use crate::tests::get_n_events; use super::*; - /// these events are init events so they should have been delivered - /// need to build with data events that have the prev stored already - async fn build_insertable_undelivered() -> EventInsertable { - let TestEventInfo { - event_id: id, car, .. - } = build_event().await; - let cid = id.cid().unwrap(); - - let (body, _meta) = CeramicEventService::parse_event_carfile(cid, &car) - .await - .unwrap(); - assert!(!body.deliverable); - EventInsertable::try_new(id, body).unwrap() - } - - fn assert_stream_map_elems(map: &StreamEvents, size: usize) { - assert_eq!(size, map.cid_map.len(), "{:?}", map); - assert_eq!(size, map.prev_map.len(), "{:?}", map); - } - - fn build_linked_events( - number: usize, - stream_cid: Cid, - first_prev: Cid, - ) -> Vec { - let mut events = Vec::with_capacity(number); - - let first_cid = random_block().cid; - events.push(DeliverableEvent::new( - first_cid, - DeliverableMetadata { - init_cid: stream_cid, - prev: first_prev, - }, - None, - )); - - for i in 1..number { - let random = random_block(); - let ev = DeliverableEvent::new( - random.cid, - DeliverableMetadata { - init_cid: stream_cid, - prev: events[i - 1].cid, - }, - None, - ); - events.push(ev); - } - - events - } - #[tokio::test] - async fn test_none_deliverable_without_first() { - // they events all point to the one before but A has never been delivered so we can't do anything - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - let missing = Cid::new_v1(0x71, Code::Sha2_256.digest(b"missing")); - let events = build_linked_events(4, stream_cid, missing); - let mut prev_map = StreamEvents::from_iter(events); - + async fn test_undelivered_batch_empty() { + let _ = ceramic_metrics::init_local_tracing(); let pool = SqlitePool::connect_in_memory().await.unwrap(); - - let deliverable = super::OrderingState::discover_deliverable_events(&pool, &mut prev_map) + let total = OrderingState::new() + .process_all_undelivered_events(&pool) .await .unwrap(); - - assert_eq!(0, deliverable.len()); + assert_eq!(0, total); } #[tokio::test] - async fn test_all_deliverable_one_stream() { + async fn test_undelivered_streams_all() { let _ = ceramic_metrics::init_local_tracing(); - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) - .await - .unwrap(); - recon::Store::insert(&store, &ReconItem::new(&one_id, &one_car)) - .await - .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - - let events = build_linked_events(4, stream_cid, one_cid); - let expected = VecDeque::from_iter(events.iter().map(|ev| ev.cid)); - let mut prev_map = StreamEvents::from_iter(events); + let pool = SqlitePool::connect_in_memory().await.unwrap(); + let s1_events = get_n_events(3).await; + let s2_events = get_n_events(5).await; + let mut all_insertable = Vec::with_capacity(8); - assert_stream_map_elems(&prev_map, 4); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) + for event in s1_events.iter() { + let insertable = EventInsertable::try_new(event.0.to_owned(), &event.1) .await .unwrap(); + let expected_deliverable = insertable.deliverable(); + let res = CeramicOneEvent::insert_many(&pool, &[insertable.clone()]) + .await + .unwrap(); + assert_eq!(expected_deliverable, res.inserted[0].deliverable); - assert_eq!(4, deliverable.len()); - assert_eq!(expected, deliverable); - assert_stream_map_elems(&prev_map, 0); - } - - #[tokio::test] - async fn test_some_deliverable_one_stream() { - let _ = ceramic_metrics::init_local_tracing(); - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) - .await - .unwrap(); - recon::Store::insert(&store, &ReconItem::new(&one_id, &one_car)) - .await - .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary")); - let missing = Cid::new_v1(0x71, Code::Sha2_256.digest(b"missing")); - - let mut deliverable_events = build_linked_events(6, stream_cid, one_cid); - let stuck_events = build_linked_events(8, stream_cid, missing); - let expected = VecDeque::from_iter(deliverable_events.iter().map(|ev| ev.cid)); - deliverable_events.extend(stuck_events); - let mut prev_map = StreamEvents::from_iter(deliverable_events); - - assert_stream_map_elems(&prev_map, 14); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) + all_insertable.push(insertable); + } + for event in s2_events.iter() { + let insertable = EventInsertable::try_new(event.0.to_owned(), &event.1) .await .unwrap(); + let expected_deliverable = insertable.deliverable(); + let res = CeramicOneEvent::insert_many(&pool, &[insertable.clone()]) + .await + .unwrap(); + assert_eq!(expected_deliverable, res.inserted[0].deliverable); - assert_eq!(6, deliverable.len()); - assert_eq!(expected, deliverable); - assert_stream_map_elems(&prev_map, 8); - } + all_insertable.push(insertable); + } - #[tokio::test] - // expected to be per stream but all events are combined for the history required version currently so - // this needs to work as well - async fn test_all_deliverable_multiple_streams() { - let _ = ceramic_metrics::init_local_tracing(); - let TestEventInfo { - event_id: one_id, - car: one_car, - .. - } = build_event().await; - let TestEventInfo { - event_id: two_id, - car: two_car, - .. - } = build_event().await; - let one_cid = one_id.cid().unwrap(); - let two_cid = two_id.cid().unwrap(); - let store = CeramicEventService::new(SqlitePool::connect_in_memory().await.unwrap()) - .await - .unwrap(); - recon::Store::insert_many( - &store, - &[ - ReconItem::new(&one_id, &one_car), - ReconItem::new(&two_id, &two_car), - ], - ) - .await - .unwrap(); - - check_deliverable(&store.pool, &one_cid, true).await; - check_deliverable(&store.pool, &two_cid, true).await; - - let stream_cid = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary-one")); - let stream_cid_2 = Cid::new_v1(0x71, Code::Sha2_256.digest(b"arbitrary-two")); - - let mut events_a = build_linked_events(4, stream_cid, one_cid); - let mut events_b = build_linked_events(10, stream_cid_2, two_cid); - let expected_a = VecDeque::from_iter(events_a.iter().map(|ev| ev.cid)); - let expected_b = VecDeque::from_iter(events_b.iter().map(|ev| ev.cid)); - // we expect the events to be in the prev chain order, but they can be intervleaved across streams - // we reverse the items in the input to proov this (it's a hashmap internally so there is no order, but still) - events_a.reverse(); - events_b.reverse(); - events_a.extend(events_b); - assert_eq!(14, events_a.len()); - let mut prev_map = StreamEvents::from_iter(events_a); - - assert_stream_map_elems(&prev_map, 14); - let deliverable = - super::OrderingState::discover_deliverable_events(&store.pool, &mut prev_map) + for event in &all_insertable { + let (_exists, delivered) = CeramicOneEvent::deliverable_by_cid(&pool, &event.cid()) .await .unwrap(); - - assert_eq!(14, deliverable.len()); - assert_eq!(0, prev_map.cid_map.len(), "{:?}", prev_map); - assert_eq!(0, prev_map.prev_map.len(), "{:?}", prev_map); - - let mut split_a = VecDeque::new(); - let mut split_b = VecDeque::new(); - for cid in deliverable { - if expected_a.contains(&cid) { - split_a.push_back(cid); - } else if expected_b.contains(&cid) { - split_b.push_back(cid); + // init events are always delivered and the others should have been skipped + if event.cid() == event.stream_cid() + || event.order_key == s1_events[0].0 + || event.order_key == s2_events[0].0 + { + assert!( + delivered, + "Event {:?} was not delivered. init={:?}, s1={:?}, s2={:?}", + event.cid(), + event.stream_cid(), + s1_events + .iter() + .map(|(e, _)| e.cid().unwrap()) + .collect::>(), + s2_events + .iter() + .map(|(e, _)| e.cid().unwrap()) + .collect::>(), + ); } else { - panic!("Unexpected CID in deliverable list: {:?}", cid); + assert!(!delivered); } } - - assert_eq!(expected_a, split_a); - assert_eq!(expected_b, split_b); - } - - #[tokio::test] - async fn test_undelivered_batch_empty() { - let _ = ceramic_metrics::init_local_tracing(); - let pool = SqlitePool::connect_in_memory().await.unwrap(); - let (new, found) = OrderingState::new() - .add_undelivered_batch(&pool, 0, 10) - .await - .unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); - } - - #[tokio::test] - async fn test_undelivered_batch_offset() { - let _ = ceramic_metrics::init_local_tracing(); - let pool = SqlitePool::connect_in_memory().await.unwrap(); - let insertable = build_insertable_undelivered().await; - - let _new = CeramicOneEvent::insert_many(&pool, &[insertable]) + let total = OrderingState::new() + .process_all_undelivered_events(&pool) .await .unwrap(); - let mut state = OrderingState::new(); - let (new, found) = state.add_undelivered_batch(&pool, 0, 10).await.unwrap(); - assert_eq!(1, found); - assert_eq!(1, new); - let (new, found) = state.add_undelivered_batch(&pool, 10, 10).await.unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); - state.persist_ready_events(&pool).await.unwrap(); - let (new, found) = state.add_undelivered_batch(&pool, 0, 10).await.unwrap(); - assert_eq!(0, new); - assert_eq!(0, found); - } - - #[tokio::test] - async fn test_undelivered_batch_all() { - let _ = ceramic_metrics::init_local_tracing(); - - let pool = SqlitePool::connect_in_memory().await.unwrap(); - let mut undelivered = Vec::with_capacity(10); - for _ in 0..10 { - let insertable = build_insertable_undelivered().await; - undelivered.push(insertable); + assert_eq!(2, total); + for event in &all_insertable { + let (_exists, delivered) = CeramicOneEvent::deliverable_by_cid(&pool, &event.cid()) + .await + .unwrap(); + assert!(delivered); } - - let (hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); - assert_eq!(0, hw); - assert!(event.is_empty()); - - let _new = CeramicOneEvent::insert_many(&pool, &undelivered[..]) - .await - .unwrap(); - - let mut state = OrderingState::new(); - state - .process_all_undelivered_events(&pool, 1) - .await - .unwrap(); - - let (_hw, event) = CeramicOneEvent::new_events_since_value(&pool, 0, 1000) - .await - .unwrap(); - assert_eq!(event.len(), 10); } } diff --git a/service/src/event/service.rs b/service/src/event/service.rs index daddd3531..296e8e30f 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -1,16 +1,10 @@ -use std::collections::{HashMap, HashSet}; - use ceramic_core::EventId; -use ceramic_event::unvalidated; -use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool}; -use cid::Cid; -use ipld_core::ipld::Ipld; -use recon::{InsertResult, ReconItem}; +use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; use tracing::{trace, warn}; -use super::ordering_task::{ - DeliverableEvent, DeliverableMetadata, DeliverableTask, DeliveredEvent, OrderingState, - OrderingTask, StreamEvents, +use super::{ + order_events::OrderEvents, + ordering_task::{DeliverableTask, OrderingTask}, }; use crate::{Error, Result}; @@ -41,7 +35,7 @@ impl CeramicEventService { } /// Skip loading all undelivered events from the database on startup (for testing) - #[allow(dead_code)] // used in tests + #[cfg(test)] pub(crate) async fn new_without_undelivered(pool: SqlitePool) -> Result { CeramicOneEvent::init_delivered_order(&pool).await?; @@ -72,71 +66,16 @@ impl CeramicEventService { Ok(()) } - /// This function is used to parse the event from the carfile and return the insertable event and the previous cid pointer. - /// Probably belongs in the event crate. - pub(crate) async fn parse_event_carfile( - event_cid: cid::Cid, - carfile: &[u8], - ) -> Result<(EventInsertableBody, Option)> { - let insertable = EventInsertableBody::try_from_carfile(event_cid, carfile).await?; - let ev_block = insertable.block_for_cid(&insertable.cid)?; - - trace!(count=%insertable.blocks.len(), cid=%event_cid, "parsing event blocks"); - let event_ipld: unvalidated::RawEvent = - serde_ipld_dagcbor::from_slice(&ev_block.bytes).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("event block is not valid event format"), - ) - })?; - - let maybe_init_prev = match event_ipld { - unvalidated::RawEvent::Time(t) => Some((t.id(), t.prev())), - unvalidated::RawEvent::Signed(signed) => { - let link = signed.link().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("event should have a link")) - })?; - let link = insertable.block_for_cid(&link).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("prev CID missing from carfile"), - ) - })?; - let payload: unvalidated::Payload = - serde_ipld_dagcbor::from_slice(&link.bytes).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("Failed to follow event link"), - ) - })?; - - match payload { - unvalidated::Payload::Data(d) => Some((*d.id(), *d.prev())), - unvalidated::Payload::Init(_init) => None, - } - } - unvalidated::RawEvent::Unsigned(_init) => None, - }; - let meta = maybe_init_prev.map(|(cid, prev)| DeliverableMetadata { - init_cid: cid, - prev, - }); - Ok((insertable, meta)) - } - #[tracing::instrument(skip(self, items), level = tracing::Level::DEBUG, fields(items = items.len()))] /// This function is used to insert events from a carfile requiring that the history is local to the node. /// This is likely used in API contexts when a user is trying to insert events. Events discovered from /// peers can come in any order and we will discover the prev chain over time. Use /// `insert_events_from_carfiles_remote_history` for that case. - pub(crate) async fn insert_events_from_carfiles_local_history<'a>( + pub(crate) async fn insert_events_from_carfiles_local_api<'a>( &self, items: &[recon::ReconItem<'a, EventId>], - ) -> Result { - if items.is_empty() { - return Ok(InsertResult::default()); - } - - let ordering = - InsertEventOrdering::discover_deliverable_local_history(items, &self.pool).await?; - self.process_events(ordering).await + ) -> Result { + self.insert_events(items, true).await } #[tracing::instrument(skip(self, items), level = tracing::Level::DEBUG, fields(items = items.len()))] @@ -144,233 +83,110 @@ impl CeramicEventService { /// This is used in recon contexts when we are discovering events from peers in a recon but not ceramic order and /// don't have the complete order. To enforce that the history is local, e.g. in API contexts, use /// `insert_events_from_carfiles_local_history`. - pub(crate) async fn insert_events_from_carfiles_remote_history<'a>( + pub(crate) async fn insert_events_from_carfiles_recon<'a>( &self, items: &[recon::ReconItem<'a, EventId>], ) -> Result { - if items.is_empty() { - return Ok(InsertResult::default()); + let res = self.insert_events(items, false).await?; + let mut keys = vec![false; items.len()]; + // we need to put things back in the right order that the recon trait expects, even though we don't really care about the result + for (i, item) in items.iter().enumerate() { + let new_key = res + .store_result + .inserted + .iter() + .find(|e| e.order_key == *item.key) + .map_or(false, |e| e.new_key); // TODO: should we error if it's not in this set + keys[i] = new_key; } - - let ordering = InsertEventOrdering::discover_deliverable_remote_history(items).await?; - self.process_events(ordering).await + Ok(recon::InsertResult::new(keys)) } - async fn process_events(&self, ordering: InsertEventOrdering) -> Result { - let res = CeramicOneEvent::insert_many(&self.pool, &ordering.insert_now[..]).await?; - - for ev in ordering.background_task_deliverable { - trace!(cid=%ev.0, prev=%ev.1.prev, init=%ev.1.init_cid, "sending to delivery task"); - if let Err(e) = self - .delivery_task - .tx - .try_send(DeliverableEvent::new(ev.0, ev.1, None)) - { - match e { - tokio::sync::mpsc::error::TrySendError::Full(e) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(cid=%e.cid, meta=?e.meta, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } - } - } - for new in ordering.notify_task_new { - if let Err(e) = self.delivery_task.tx_new.try_send(new) { - match e { - tokio::sync::mpsc::error::TrySendError::Full(ev) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(attempt=?ev, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Notify new task full"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } - } - } - Ok(res) - } -} - -struct InsertEventOrdering { - insert_now: Vec, - notify_task_new: Vec, - background_task_deliverable: HashMap, -} - -impl InsertEventOrdering { - /// This will mark events as deliverable if their prev exists locally. Otherwise they will be - /// sorted into the bucket for the background task to process. - pub(crate) async fn discover_deliverable_remote_history<'a>( - items: &[ReconItem<'a, EventId>], - ) -> Result { - let mut result = Self { - insert_now: Vec::with_capacity(items.len()), - notify_task_new: Vec::with_capacity(items.len()), - background_task_deliverable: HashMap::new(), - }; - - for item in items { - let (insertable, maybe_prev) = Self::parse_item(item).await?; - if let Some(meta) = maybe_prev { - result.mark_event_deliverable_later(insertable, meta); - } else { - let init_cid = insertable.body.cid; - result.mark_event_deliverable_now(insertable, init_cid); - } + async fn insert_events<'a>( + &self, + items: &[recon::ReconItem<'a, EventId>], + history_required: bool, + ) -> Result { + if items.is_empty() { + return Ok(InsertResult::default()); } - Ok(result) - } + let mut to_insert = Vec::with_capacity(items.len()); - /// This will error if any of the events doesn't have its prev on the local node (in the database/memory or in this batch). - pub(crate) async fn discover_deliverable_local_history<'a>( - items: &[ReconItem<'a, EventId>], - pool: &SqlitePool, - ) -> Result { - let mut result = Self { - insert_now: Vec::with_capacity(items.len()), - notify_task_new: Vec::with_capacity(items.len()), - background_task_deliverable: HashMap::new(), - }; - - let mut insert_after_history_check: Vec<(DeliverableMetadata, EventInsertable)> = - Vec::with_capacity(items.len()); - - for item in items { - let (insertable, maybe_prev) = Self::parse_item(item).await?; - if let Some(meta) = maybe_prev { - insert_after_history_check.push((meta, insertable)); - } else { - let init_cid = insertable.body.cid; - result.mark_event_deliverable_now(insertable, init_cid); - } + for event in items { + let insertable = EventInsertable::try_new(event.key.to_owned(), event.value).await?; + to_insert.push(insertable); } - trace!(local_events_checking=%insert_after_history_check.len(), "checking local history"); - result - .verify_history_inline(pool, insert_after_history_check) - .await?; - Ok(result) - } - - async fn parse_item<'a>( - item: &ReconItem<'a, EventId>, - ) -> Result<(EventInsertable, Option)> { - let cid = item.key.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", item.key)) - })?; - // we want to end a conversation if any of the events aren't ceramic events and not store them - // this includes making sure the key matched the body cid - let (insertable_body, maybe_prev) = - CeramicEventService::parse_event_carfile(cid, item.value).await?; - let insertable = EventInsertable::try_new(item.key.to_owned(), insertable_body)?; - Ok((insertable, maybe_prev)) - } - - fn mark_event_deliverable_later( - &mut self, - insertable: EventInsertable, - meta: DeliverableMetadata, - ) { - self.background_task_deliverable - .insert(insertable.body.cid, meta); - self.insert_now.push(insertable); - } - - fn mark_event_deliverable_now(&mut self, mut ev: EventInsertable, init_cid: Cid) { - ev.deliverable(true); - self.notify_task_new - .push(DeliveredEvent::new(ev.body.cid, init_cid)); - self.insert_now.push(ev); - } - - async fn verify_history_inline( - &mut self, - pool: &SqlitePool, - to_check: Vec<(DeliverableMetadata, EventInsertable)>, - ) -> Result<()> { - if to_check.is_empty() { - return Ok(()); - } + let ordered = OrderEvents::try_new(&self.pool, to_insert).await?; - let incoming_deliverable_cids: HashSet = self - .insert_now + let missing_history = ordered + .missing_history .iter() - .filter_map(|e| { - if e.body.deliverable { - Some(e.body.cid) - } else { - None - } - }) + .map(|e| e.order_key.clone()) .collect(); - // ideally, this map would be per stream, but we are just processing all of them together for now - let mut to_check_map = StreamEvents::new(); - - let required_to_find = to_check.len(); - let mut found_in_batch = 0; - let mut insert_if_greenlit = HashMap::with_capacity(required_to_find); - - for (meta, ev) in to_check { - if incoming_deliverable_cids.contains(&meta.prev) { - trace!(new=%ev.body.cid, prev=%meta.prev, "prev event being added in same batch"); - found_in_batch += 1; - self.mark_event_deliverable_now(ev, meta.init_cid); - } else { - trace!(new=%ev.body.cid, prev=%meta.prev, "will check for prev event in db"); + let to_insert = if history_required { + ordered.deliverable + } else { + ordered + .deliverable + .into_iter() + .chain(ordered.missing_history) + .collect() + }; - let _new = to_check_map.add_event(DeliverableEvent::new( - ev.body.cid, - meta.to_owned(), - None, - )); - insert_if_greenlit.insert(ev.body.cid, (meta, ev)); + let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..]).await?; + // api writes shouldn't have any missed pieces that need ordering so we don't send those + if !history_required { + for ev in &res.inserted { + if ev.deliverable { + trace!(event=?ev, "sending delivered to ordering task"); + if let Err(e) = self.delivery_task.tx_delivered.try_send(ev.clone()) { + match e { + tokio::sync::mpsc::error::TrySendError::Full(e) => { + // we should only be doing this during recon, in which case we can rediscover events. + // the delivery task will start picking up these events once it's drained since they are stored in the db + warn!(event=?e, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); + } + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + warn!("Delivery task closed. shutting down"); + return Err(Error::new_fatal(anyhow::anyhow!( + "Delivery task closed" + ))); + } + } + } + } } } - if to_check_map.is_empty() { - return Ok(()); - } - - let deliverable = - OrderingState::discover_deliverable_events(pool, &mut to_check_map).await?; - if deliverable.len() != required_to_find - found_in_batch { - let missing = insert_if_greenlit - .values() - .filter_map(|(_, ev)| { - if !deliverable.contains(&ev.body.cid) { - Some(ev.body.cid) - } else { - None - } - }) - .collect::>(); + Ok(InsertResult { + store_result: res, + missing_history, + }) + } +} - tracing::info!(?missing, ?deliverable, "Missing required `prev` event CIDs"); +#[derive(Debug, PartialEq, Eq, Default)] +pub struct InsertResult { + pub(crate) store_result: ceramic_store::InsertResult, + pub(crate) missing_history: Vec, +} - Err(Error::new_invalid_arg(anyhow::anyhow!( - "Missing required `prev` event CIDs: {:?}", - missing - ))) - } else { - // we need to use the deliverable list's order because we might depend on something in the same batch, and that function will - // ensure we have a queue in the correct order. So we follow the order and use our insert_if_greenlit map to get the details. - for cid in deliverable { - if let Some((meta, insertable)) = insert_if_greenlit.remove(&cid) { - self.mark_event_deliverable_now(insertable, meta.init_cid); - } else { - warn!(%cid, "Didn't find event to insert in memory when it was expected"); - } - } - Ok(()) +impl From for Vec { + fn from(res: InsertResult) -> Self { + let mut api_res = + Vec::with_capacity(res.store_result.inserted.len() + res.missing_history.len()); + for ev in res.store_result.inserted { + api_res.push(ceramic_api::EventInsertResult::new(ev.order_key, None)); + } + for ev in res.missing_history { + api_res.push(ceramic_api::EventInsertResult::new( + ev, + Some("Failed to insert event as `prev` event was missing".to_owned()), + )); } + api_res } } diff --git a/service/src/event/store.rs b/service/src/event/store.rs index e61506bcf..fef6f4661 100644 --- a/service/src/event/store.rs +++ b/service/src/event/store.rs @@ -5,7 +5,7 @@ use ceramic_core::EventId; use ceramic_store::{CeramicOneBlock, CeramicOneEvent}; use cid::Cid; use iroh_bitswap::Block; -use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a}; +use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; use crate::event::CeramicEventService; @@ -16,7 +16,7 @@ impl recon::Store for CeramicEventService { async fn insert(&self, item: &ReconItem<'_, Self::Key>) -> ReconResult { let res = self - .insert_events_from_carfiles_remote_history(&[item.to_owned()]) + .insert_events_from_carfiles_recon(&[item.to_owned()]) .await?; Ok(res.keys.first().copied().unwrap_or(false)) @@ -25,10 +25,11 @@ impl recon::Store for CeramicEventService { /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> ReconResult { - let res = self - .insert_events_from_carfiles_remote_history(items) - .await?; + async fn insert_many( + &self, + items: &[ReconItem<'_, Self::Key>], + ) -> ReconResult { + let res = self.insert_events_from_carfiles_recon(items).await?; Ok(res) } @@ -106,15 +107,19 @@ impl iroh_bitswap::Store for CeramicEventService { #[async_trait::async_trait] impl ceramic_api::EventStore for CeramicEventService { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> anyhow::Result> { + async fn insert_many( + &self, + items: &[(EventId, Vec)], + ) -> anyhow::Result> { let items = items .iter() .map(|(key, val)| ReconItem::new(key, val.as_slice())) .collect::>(); let res = self - .insert_events_from_carfiles_local_history(&items[..]) + .insert_events_from_carfiles_local_api(&items[..]) .await?; - Ok(res.keys) + + Ok(res.into()) } async fn range_with_values( diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index de36fb97a..25a29e424 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -114,22 +114,12 @@ fn gen_rand_bytes() -> [u8; SIZE] { arr } -pub(crate) fn random_block() -> Block { - let mut data = [0u8; 1024]; - rand::Rng::fill(&mut ::rand::thread_rng(), &mut data); - let hash = Code::Sha2_256.digest(&data); - Block { - cid: Cid::new_v1(0x00, hash), - data: data.to_vec().into(), - } -} - pub(crate) async fn check_deliverable( pool: &ceramic_store::SqlitePool, cid: &Cid, deliverable: bool, ) { - let (exists, delivered) = ceramic_store::CeramicOneEvent::delivered_by_cid(pool, cid) + let (exists, delivered) = ceramic_store::CeramicOneEvent::deliverable_by_cid(pool, cid) .await .unwrap(); assert!(exists); @@ -172,28 +162,13 @@ async fn data_event( signed::Event::from_payload(unvalidated::Payload::Data(commit), signer.to_owned()).unwrap() } -async fn get_events_with_model(model: &StreamId) -> [(EventId, Vec); 3] { +// returns init + N events +async fn get_init_plus_n_events_with_model( + model: &StreamId, + number: usize, +) -> Vec<(EventId, Vec)> { let signer = Box::new(signer().await); - let data = gen_rand_bytes::<50>(); - let data2 = gen_rand_bytes::<50>(); - - let data = ipld!({ - "radius": 1, - "red": 2, - "green": 3, - "blue": 4, - "raw": data.as_slice(), - }); - - let data2 = ipld!({ - "radius": 1, - "red": 2, - "green": 3, - "blue": 4, - "raw": data2.as_slice(), - }); - let init = init_event(model, &signer).await; let init_cid = init.envelope_cid(); let (event_id, car) = ( @@ -202,33 +177,45 @@ async fn get_events_with_model(model: &StreamId) -> [(EventId, Vec); 3] { ); let init_cid = event_id.cid().unwrap(); - let data = data_event(init_cid, init_cid, data, &signer).await; - let cid = data.envelope_cid(); - let (data_id, data_car) = ( - build_event_id(&data.envelope_cid(), &init_cid, model), - data.encode_car().await.unwrap(), - ); - let data2 = data_event(init_cid, cid, data2, &signer).await; - let (data_id_2, data_car_2) = ( - build_event_id(&data2.envelope_cid(), &init_cid, model), - data2.encode_car().await.unwrap(), - ); - [ - (event_id, car), - (data_id, data_car), - (data_id_2, data_car_2), - ] + let mut events = Vec::with_capacity(number); + events.push((event_id, car)); + let mut prev = init_cid; + for _ in 0..number { + let data = gen_rand_bytes::<50>(); + let data = ipld!({ + "radius": 1, + "red": 2, + "green": 3, + "blue": 4, + "raw": data.as_slice(), + }); + + let data = data_event(init_cid, prev, data, &signer).await; + let (data_id, data_car) = ( + build_event_id(&data.envelope_cid(), &init_cid, model), + data.encode_car().await.unwrap(), + ); + prev = data_id.cid().unwrap(); + events.push((data_id, data_car)); + } + events } -pub(crate) async fn get_events_return_model() -> (StreamId, [(EventId, Vec); 3]) { +pub(crate) async fn get_events_return_model() -> (StreamId, Vec<(EventId, Vec)>) { let model = StreamId::document(random_cid()); - let events = get_events_with_model(&model).await; + let events = get_init_plus_n_events_with_model(&model, 3).await; (model, events) } // builds init -> data -> data that are a stream (will be a different stream each call) -pub(crate) async fn get_events() -> [(EventId, Vec); 3] { +pub(crate) async fn get_events() -> Vec<(EventId, Vec)> { let model = StreamId::document(random_cid()); - get_events_with_model(&model).await + get_init_plus_n_events_with_model(&model, 3).await +} + +// Get N events with the same model (init + N-1 data events) +pub(crate) async fn get_n_events(number: usize) -> Vec<(EventId, Vec)> { + let model = &StreamId::document(random_cid()); + get_init_plus_n_events_with_model(model, number - 1).await } diff --git a/service/src/tests/ordering.rs b/service/src/tests/ordering.rs index ddcbc7f23..61caae825 100644 --- a/service/src/tests/ordering.rs +++ b/service/src/tests/ordering.rs @@ -1,5 +1,7 @@ use ceramic_api::EventStore; use ceramic_core::EventId; +use rand::seq::SliceRandom; +use rand::thread_rng; use recon::ReconItem; use crate::{ @@ -12,14 +14,16 @@ async fn setup_service() -> CeramicEventService { let conn = ceramic_store::SqlitePool::connect_in_memory() .await .unwrap(); + CeramicEventService::new_without_undelivered(conn) .await .unwrap() } async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { + tracing::trace!("inserted event: {}", item.key.cid().unwrap()); let new = store - .insert_events_from_carfiles_remote_history(&[item]) + .insert_events_from_carfiles_recon(&[item]) .await .unwrap(); let new = new.keys.into_iter().filter(|k| *k).count(); @@ -28,10 +32,10 @@ async fn add_and_assert_new_recon_event(store: &CeramicEventService, item: Recon async fn add_and_assert_new_local_event(store: &CeramicEventService, item: ReconItem<'_, EventId>) { let new = store - .insert_events_from_carfiles_local_history(&[item]) + .insert_events_from_carfiles_local_api(&[item]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(1, new); } @@ -45,30 +49,17 @@ async fn test_init_event_delivered() { } #[tokio::test] -async fn test_missing_prev_error_history_required() { +async fn test_missing_prev_history_required_not_inserted() { let store = setup_service().await; let events = get_events().await; let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ReconItem::new(&data.0, &data.1)]) - .await; - match new { - Ok(v) => panic!("should have errored: {:?}", v), - Err(e) => { - match e { - crate::Error::InvalidArgument { error } => { - // yes fragile, but we want to make sure it's not a parsing error or something unexpected - assert!(error - .to_string() - .contains("Missing required `prev` event CIDs")); - } - e => { - panic!("unexpected error: {:?}", e); - } - }; - } - }; + .insert_events_from_carfiles_local_api(&[ReconItem::new(&data.0, &data.1)]) + .await + .unwrap(); + assert!(new.store_result.inserted.is_empty()); + assert_eq!(1, new.missing_history.len()); } #[tokio::test] @@ -100,13 +91,13 @@ async fn test_prev_in_same_write_history_required() { let init: &(EventId, Vec) = &events[0]; let data = &events[1]; let new = store - .insert_events_from_carfiles_local_history(&[ + .insert_events_from_carfiles_local_api(&[ ReconItem::new(&data.0, &data.1), ReconItem::new(&init.0, &init.1), ]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(2, new); check_deliverable(&store.pool, &init.0.cid().unwrap(), true).await; check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await; @@ -178,7 +169,6 @@ async fn missing_prev_pending_recon_should_deliver_without_stream_update() { // now we add the second event, it should quickly become deliverable let data = &events[1]; add_and_assert_new_recon_event(&store, ReconItem::new(&data.0, &data.1)).await; - check_deliverable(&store.pool, &data.0.cid().unwrap(), false).await; // This happens out of band, so give it a moment to make sure everything is updated tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -234,7 +224,7 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat // this _could_ be deliverable immediately if we checked but for now we just send to the other task, // so `check_deliverable` could return true or false depending on timing (but probably false). // as this is an implementation detail and we'd prefer true, we just use HW ordering to make sure it's been delivered - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let (_, delivered) = store .events_since_highwater_mark(0, i64::MAX) .await @@ -266,3 +256,96 @@ async fn multiple_streams_missing_prev_recon_should_deliver_without_stream_updat ]; assert_eq!(expected, delivered); } + +async fn validate_all_delivered(store: &CeramicEventService, expected_delivered: usize) { + loop { + let (_, delivered) = store + .events_since_highwater_mark(0, i64::MAX) + .await + .unwrap(); + let total = delivered.len(); + if total < expected_delivered { + tracing::trace!( + "found {} delivered, waiting for {}", + total, + expected_delivered + ); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } else { + break; + } + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn recon_lots_of_streams() { + // adds 100 events to 10 streams, mixes up the event order for each stream, inserts half + // the events for each stream before mixing up the stream order and inserting the rest + let per_stream = 100; + let num_streams = 10; + let store = setup_service().await; + let mut streams = Vec::new(); + let mut all_cids = Vec::new(); + let expected = per_stream * num_streams; + for _ in 0..num_streams { + let mut events = crate::tests::get_n_events(per_stream).await; + let cids = events + .iter() + .map(|e| e.0.cid().unwrap()) + .collect::>(); + all_cids.extend(cids); + assert_eq!(per_stream, events.len()); + events.shuffle(&mut thread_rng()); + streams.push(events); + } + let mut total_added = 0; + + assert_eq!(expected, all_cids.len()); + tracing::debug!(?all_cids, "starting test"); + for stream in streams.iter_mut() { + while let Some(event) = stream.pop() { + if stream.len() > per_stream / 2 { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + } else { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + break; + } + } + } + streams.shuffle(&mut thread_rng()); + for stream in streams.iter_mut() { + while let Some(event) = stream.pop() { + total_added += 1; + add_and_assert_new_recon_event(&store, ReconItem::new(&event.0, &event.1)).await; + } + } + // first just make sure they were all inserted (not delivered yet) + for (i, cid) in all_cids.iter().enumerate() { + let (exists, _delivered) = + ceramic_store::CeramicOneEvent::deliverable_by_cid(&store.pool, cid) + .await + .unwrap(); + assert!(exists, "idx: {}. missing cid: {}", i, cid); + } + + assert_eq!(expected, total_added); + tokio::time::timeout( + std::time::Duration::from_secs(5), + validate_all_delivered(&store, expected), + ) + .await + .unwrap(); + + let (_, delivered) = store + .events_since_highwater_mark(0, i64::MAX) + .await + .unwrap(); + + assert_eq!(expected, delivered.len()); + // now we check that all the events are deliverable + for cid in all_cids.iter() { + check_deliverable(&store.pool, cid, true).await; + } +} diff --git a/store/Cargo.toml b/store/Cargo.toml index df193391d..35318fa86 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -13,19 +13,22 @@ anyhow.workspace = true async-trait.workspace = true ceramic-api.workspace = true ceramic-core.workspace = true +ceramic-event.workspace = true ceramic-metrics.workspace = true cid.workspace = true futures.workspace = true hex.workspace = true +ipld-core.workspace = true iroh-bitswap.workspace = true iroh-car.workspace = true itertools = "0.12.0" -multihash.workspace = true multihash-codetable.workspace = true +multihash.workspace = true prometheus-client.workspace = true -thiserror.workspace = true recon.workspace = true +serde_ipld_dagcbor.workspace = true sqlx.workspace = true +thiserror.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/store/src/lib.rs b/store/src/lib.rs index f76bdf598..8c419696a 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -12,8 +12,8 @@ pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use migration::DataMigrator; pub use sql::{ entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent, - CeramicOneEventBlock, CeramicOneInterest, Migrations, SqlitePool, SqliteRootStore, - SqliteTransaction, + CeramicOneEventBlock, CeramicOneInterest, CeramicOneStream, InsertResult, InsertedEvent, + Migrations, SqlitePool, SqliteRootStore, SqliteTransaction, StreamEventMetadata, }; pub(crate) type Result = std::result::Result; diff --git a/store/src/metrics.rs b/store/src/metrics.rs index 855d4e960..18fbfbf78 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -13,7 +13,7 @@ use prometheus_client::{ }, registry::Registry, }; -use recon::{AssociativeHash, HashCount, InsertResult, ReconItem, Result as ReconResult}; +use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult}; use tokio::time::Instant; #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -158,7 +158,10 @@ impl ceramic_api::EventStore for StoreMetricsMiddleware where S: ceramic_api::EventStore, { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> anyhow::Result> { + async fn insert_many( + &self, + items: &[(EventId, Vec)], + ) -> anyhow::Result> { let new_keys = StoreMetricsMiddleware::::record( &self.metrics, "api_insert_many", @@ -166,7 +169,7 @@ where ) .await?; - let key_cnt = new_keys.iter().filter(|k| **k).count(); + let key_cnt = new_keys.iter().filter(|k| k.success()).count(); self.metrics.record(&InsertEvent { cnt: key_cnt as u64, @@ -253,7 +256,7 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { + async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 88014e1c9..4c19fe3ad 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -7,33 +7,93 @@ use std::{ use anyhow::anyhow; use ceramic_core::{event_id::InvalidEventId, EventId}; use cid::Cid; -use recon::{AssociativeHash, HashCount, InsertResult, Key, Result as ReconResult, Sha256a}; + +use recon::{AssociativeHash, HashCount, Key, Result as ReconResult, Sha256a}; use crate::{ sql::{ entities::{ - rebuild_car, BlockRow, CountRow, DeliveredEvent, EventInsertable, OrderKey, - ReconEventBlockRaw, ReconHash, + rebuild_car, BlockRow, CountRow, DeliveredEventRow, EventHeader, EventInsertable, + OrderKey, ReconEventBlockRaw, ReconHash, StreamCid, }, query::{EventQuery, ReconQuery, ReconType, SqlBackend}, sqlite::SqliteTransaction, }, - CeramicOneBlock, CeramicOneEventBlock, Error, Result, SqlitePool, + CeramicOneBlock, CeramicOneEventBlock, CeramicOneStream, Error, Result, SqlitePool, }; static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0); +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event that was inserted into the database +pub struct InsertedEvent { + /// The event order key that was inserted + pub order_key: EventId, + /// The Stream CID + pub stream_cid: StreamCid, + /// Whether the event was marked as deliverable + pub deliverable: bool, + /// Whether the event was a new key + pub new_key: bool, +} + +impl InsertedEvent { + /// Create a new delivered event + fn new(order_key: EventId, new_key: bool, stream_cid: StreamCid, deliverable: bool) -> Self { + Self { + order_key, + stream_cid, + deliverable, + new_key, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +/// The result of inserting events into the database +pub struct InsertResult { + /// The events that were marked as delivered in this batch + pub inserted: Vec, +} + +impl InsertResult { + /// The count of new keys added in this batch + pub fn count_new_keys(&self) -> usize { + self.inserted.iter().filter(|e| e.new_key).count() + } +} + +impl InsertResult { + fn new(inserted: Vec) -> Self { + Self { inserted } + } +} + /// Access to the ceramic event table and related logic pub struct CeramicOneEvent {} impl CeramicOneEvent { - async fn insert_key(tx: &mut SqliteTransaction<'_>, key: &EventId) -> Result { + fn next_deliverable() -> i64 { + GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst) + } + + /// Insert the event and its hash into the ceramic_one_event table + async fn insert_event( + tx: &mut SqliteTransaction<'_>, + key: &EventId, + deliverable: bool, + ) -> Result { let id = key.as_bytes(); let cid = key .cid() .map(|cid| cid.to_bytes()) .ok_or_else(|| Error::new_app(anyhow!("Event CID is required")))?; let hash = Sha256a::digest(key); + let delivered: Option = if deliverable { + Some(Self::next_deliverable()) + } else { + None + }; let resp = sqlx::query(ReconQuery::insert_event()) .bind(id) @@ -46,6 +106,7 @@ impl CeramicOneEvent { .bind(hash.as_u32s()[5]) .bind(hash.as_u32s()[6]) .bind(hash.as_u32s()[7]) + .bind(delivered) .execute(&mut **tx.inner()) .await; @@ -87,7 +148,7 @@ impl CeramicOneEvent { pub async fn mark_ready_to_deliver(conn: &mut SqliteTransaction<'_>, key: &Cid) -> Result<()> { // Fetch add happens with an open transaction (on one writer for the db) so we're guaranteed to get a unique value sqlx::query(EventQuery::mark_ready_to_deliver()) - .bind(GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst)) + .bind(Self::next_deliverable()) .bind(&key.to_bytes()) .execute(&mut **conn.inner()) .await?; @@ -95,33 +156,51 @@ impl CeramicOneEvent { Ok(()) } - /// Insert many events into the database. This is the main function to use when storing events. + /// Insert many events into the database. The events and their blocks and metadata are inserted in a single + /// transaction and either all successful or rolled back. + /// + /// IMPORTANT: + /// It is the caller's responsibility to order events marked deliverable correctly. + /// That is, events will be processed in the order they are given so earlier events are given a lower global ordering + /// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events + /// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers. pub async fn insert_many( pool: &SqlitePool, to_add: &[EventInsertable], ) -> Result { - let mut new_keys = vec![false; to_add.len()]; + let mut inserted = Vec::with_capacity(to_add.len()); let mut tx = pool.begin_tx().await.map_err(Error::from)?; - for (idx, item) in to_add.iter().enumerate() { - let new_key = Self::insert_key(&mut tx, &item.order_key).await?; + for item in to_add { + let new_key = + Self::insert_event(&mut tx, &item.order_key, item.body.deliverable).await?; + inserted.push(InsertedEvent::new( + item.order_key.clone(), + new_key, + item.stream_cid(), + item.body.deliverable, + )); + // the insert failed so we didn't mark it as deliverable.. is this possible? + if item.body.deliverable && !new_key { + Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?; + } if new_key { for block in item.body.blocks.iter() { CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?; CeramicOneEventBlock::insert(&mut tx, block).await?; } + if let EventHeader::Init { header, .. } = &item.body.header { + CeramicOneStream::insert_tx(&mut tx, item.stream_cid(), header).await?; + } + + CeramicOneStream::insert_event_header_tx(&mut tx, &item.body.header).await?; } - if item.body.deliverable { - Self::mark_ready_to_deliver(&mut tx, &item.body.cid).await?; - } - new_keys[idx] = new_key; } tx.commit().await.map_err(Error::from)?; - let res = InsertResult::new(new_keys); + let res = InsertResult::new(inserted); Ok(res) } - /// Find events that haven't been delivered to the client and may be ready pub async fn undelivered_with_values( pool: &SqlitePool, @@ -220,13 +299,13 @@ impl CeramicOneEvent { delivered: i64, limit: i64, ) -> Result<(i64, Vec)> { - let rows: Vec = sqlx::query_as(EventQuery::new_delivered_events()) + let rows: Vec = sqlx::query_as(EventQuery::new_delivered_events()) .bind(delivered) .bind(limit) .fetch_all(pool.reader()) .await?; - DeliveredEvent::parse_query_results(delivered, rows) + DeliveredEventRow::parse_query_results(delivered, rows) } /// Finds the event data by a given EventId i.e. "order key". @@ -248,8 +327,9 @@ impl CeramicOneEvent { } /// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered. - /// (bool, bool) = (exists, delivered) - pub async fn delivered_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { + /// returns (bool, bool) = (exists, deliverable) + /// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could. + pub async fn deliverable_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { #[derive(sqlx::FromRow)] struct CidExists { exists: bool, diff --git a/store/src/sql/access/mod.rs b/store/src/sql/access/mod.rs index 912a462ac..bd6eefc3c 100644 --- a/store/src/sql/access/mod.rs +++ b/store/src/sql/access/mod.rs @@ -2,8 +2,10 @@ mod block; mod event; mod event_block; mod interest; +mod stream; pub use block::CeramicOneBlock; -pub use event::CeramicOneEvent; +pub use event::{CeramicOneEvent, InsertResult, InsertedEvent}; pub use event_block::CeramicOneEventBlock; pub use interest::CeramicOneInterest; +pub use stream::{CeramicOneStream, StreamEventMetadata}; diff --git a/store/src/sql/access/stream.rs b/store/src/sql/access/stream.rs new file mode 100644 index 000000000..2197ee69e --- /dev/null +++ b/store/src/sql/access/stream.rs @@ -0,0 +1,151 @@ +use anyhow::anyhow; +use cid::Cid; + +use crate::{ + sql::entities::{ + EventHeader, EventMetadataRow, EventType, IncompleteStream, StreamCid, StreamEventRow, + StreamRow, + }, + Error, Result, SqlitePool, SqliteTransaction, +}; + +/// Access to the stream and related tables. Generally querying events as a stream. +pub struct CeramicOneStream {} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// Represents a stream event in a way that allows ordering it in the stream. It is metadata and not the event payload itself. +pub struct StreamEventMetadata { + /// The event CID + pub cid: Cid, + /// The previous event CID + pub prev: Option, + /// Whether the event is deliverable + pub deliverable: bool, +} + +impl TryFrom for StreamEventMetadata { + type Error = crate::Error; + + fn try_from(row: StreamEventRow) -> std::result::Result { + let cid = Cid::try_from(row.cid) + .map_err(|e| Error::new_app(anyhow!("Invalid event cid: {}", e)))?; + let prev = row + .prev + .map(Cid::try_from) + .transpose() + .map_err(|e| Error::new_app(anyhow!("Invalid event prev: {}", e)))?; + Ok(Self { + cid, + prev, + deliverable: row.deliverable, + }) + } +} + +impl CeramicOneStream { + /// Load the events for a given stream. Will return nothing if the stream does not exist (i.e. the init event is undiscovered). + pub async fn load_stream_events( + pool: &SqlitePool, + stream_cid: StreamCid, + ) -> Result> { + let rows: Vec<(Vec, Option>, bool)> = + sqlx::query_as(StreamEventRow::fetch_by_stream_cid()) + .bind(stream_cid.to_bytes()) + .fetch_all(pool.reader()) + .await?; + + let res = rows + .into_iter() + .map(|(cid, prev, delivered)| { + let cid = Cid::try_from(cid).expect("cid"); + let prev = prev.map(Cid::try_from).transpose().expect("prev"); + + StreamEventMetadata { + cid, + prev, + deliverable: delivered, + } + }) + .collect(); + + Ok(res) + } + + /// Load streams with undelivered events to see if they need to be delivered now. + /// highwater_mark is the i64 processed that you want to start after. + /// Start with `0` to start at the beginning. Will return None if there are no more streams to process. + pub async fn load_stream_cids_with_undelivered_events( + pool: &SqlitePool, + highwater_mark: i64, + ) -> Result<(Vec, Option)> { + let streams: Vec = + sqlx::query_as(IncompleteStream::fetch_all_with_undelivered()) + .bind(highwater_mark) + .bind(100) + .fetch_all(pool.reader()) + .await?; + + let row_id = streams.iter().map(|s| s.row_id).max(); + let streams = streams.into_iter().map(|s| s.stream_cid).collect(); + Ok((streams, row_id)) + } + + pub(crate) async fn insert_tx( + tx: &mut SqliteTransaction<'_>, + stream_cid: StreamCid, + header: &ceramic_event::unvalidated::init::Header, + ) -> Result<()> { + let _resp = sqlx::query(StreamRow::insert()) + .bind(stream_cid.to_bytes()) + .bind(header.sep()) + .bind(header.model()) + .fetch_one(&mut **tx.inner()) + .await?; + + Ok(()) + } + + pub(crate) async fn insert_event_header_tx( + tx: &mut SqliteTransaction<'_>, + header: &EventHeader, + ) -> Result<()> { + let (cid, event_type, stream_cid, prev) = match header { + EventHeader::Init { cid, .. } => ( + cid.to_bytes(), + EventType::Init, + header.stream_cid().to_bytes(), + None, + ), + EventHeader::Data { + cid, + stream_cid, + prev, + } => ( + cid.to_bytes(), + EventType::Data, + stream_cid.to_bytes(), + Some(prev.to_bytes()), + ), + EventHeader::Time { + cid, + stream_cid, + prev, + } => ( + cid.to_bytes(), + EventType::Time, + stream_cid.to_bytes(), + Some(prev.to_bytes()), + ), + }; + + let _res = sqlx::query(EventMetadataRow::insert()) + .bind(cid) + .bind(stream_cid) + .bind(event_type) + .bind(prev) + .execute(&mut **tx.inner()) + .await?; + + Ok(()) + } +} diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 32610eb12..442b9323b 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -1,6 +1,8 @@ use anyhow::anyhow; use ceramic_core::EventId; +use ceramic_event::unvalidated; use cid::Cid; +use ipld_core::ipld::Ipld; use iroh_car::{CarHeader, CarReader, CarWriter}; use std::collections::BTreeSet; @@ -10,6 +12,8 @@ use crate::{ Error, Result, }; +use super::{EventHeader, EventType}; + pub async fn rebuild_car(blocks: Vec) -> Result>> { if blocks.is_empty() { return Ok(None); @@ -50,20 +54,41 @@ pub struct EventInsertable { } impl EventInsertable { - /// Try to build the EventInsertable struct. Will error if the key and body don't match. - pub fn try_new(order_key: EventId, body: EventInsertableBody) -> Result { - if order_key.cid().as_ref() != Some(&body.cid) { - return Err(Error::new_app(anyhow!( - "Event ID and body CID do not match: {:?} != {:?}", - order_key.cid(), - body.cid - )))?; - } + /// Try to build the EventInsertable struct from a carfile. + pub async fn try_new(order_key: EventId, body: &[u8]) -> Result { + let cid = order_key.cid().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key)) + })?; + let body = EventInsertableBody::try_from_carfile(cid, body).await?; Ok(Self { order_key, body }) } - /// change the deliverable status of the event - pub fn deliverable(&mut self, deliverable: bool) { + /// Get the CID of the event + pub fn cid(&self) -> Cid { + self.body.cid + } + + /// Get the stream CID + pub fn stream_cid(&self) -> Cid { + self.body.header.stream_cid() + } + + /// Get the previous event CID if any + pub fn prev(&self) -> Option { + match &self.body.header { + EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => Some(*prev), + EventHeader::Init { .. } => None, + } + } + + /// Whether this event is deliverable currently + pub fn deliverable(&self) -> bool { + self.body.deliverable + } + + /// Mark the event as deliverable. + /// This will be used when inserting the event to make sure the field is updated accordingly. + pub fn set_deliverable(&mut self, deliverable: bool) { self.body.deliverable = deliverable; } } @@ -72,24 +97,48 @@ impl EventInsertable { /// The type we use to insert events into the database pub struct EventInsertableBody { /// The event CID i.e. the root CID from the car file - pub cid: Cid, - /// Whether this event is deliverable to clients or is waiting for more data - pub deliverable: bool, + pub(crate) cid: Cid, + /// The event header data about the event type and stream + pub(crate) header: EventHeader, + /// Whether the event is deliverable i.e. it's prev has been delivered and the chain is continuous to an init event + pub(crate) deliverable: bool, /// The blocks of the event // could use a map but there aren't that many blocks per event (right?) - pub blocks: Vec, + pub(crate) blocks: Vec, } impl EventInsertableBody { /// Create a new EventInsertRaw struct. Deliverable is set to false by default. - pub fn new(cid: Cid, blocks: Vec) -> Self { + pub fn new( + cid: Cid, + header: EventHeader, + blocks: Vec, + deliverable: bool, + ) -> Self { Self { cid, - deliverable: false, + header, blocks, + deliverable, } } + /// Get the CID of the event + pub fn cid(&self) -> Cid { + self.cid + } + + /// Whether this event is immediately deliverable to clients or the history chain needs to be reviewed + /// false indicates it can be stored and delivered immediately + pub fn event_type(&self) -> EventType { + self.header.event_type() + } + + /// Get the blocks of the event + pub fn blocks(&self) -> &Vec { + &self.blocks + } + /// Find a block from the carfile for a given CID if it's included pub fn block_for_cid_opt(&self, cid: &Cid) -> Option<&EventBlockRaw> { self.blocks @@ -136,6 +185,73 @@ impl EventInsertableBody { blocks.push(ebr); idx += 1; } - Ok(Self::new(event_cid, blocks)) + + let ev_block = blocks + .iter() + .find(|b| b.cid() == event_cid) + .ok_or_else(|| { + Error::new_app(anyhow!( + "event block not found in car file: cid={}", + event_cid + )) + })?; + let event_ipld: unvalidated::RawEvent = + serde_ipld_dagcbor::from_slice(&ev_block.bytes).map_err(|e| { + Error::new_invalid_arg( + anyhow::anyhow!(e).context("event block is not valid event format"), + ) + })?; + + let cid = event_cid; + + let (deliverable, header) = match event_ipld { + unvalidated::RawEvent::Time(t) => ( + false, + EventHeader::Time { + cid, + stream_cid: t.id(), + prev: t.prev(), + }, + ), + unvalidated::RawEvent::Signed(signed) => { + let link = signed.link().ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("event should have a link")) + })?; + let link = blocks.iter().find(|b| b.cid() == link).ok_or_else(|| { + Error::new_invalid_arg(anyhow::anyhow!("prev CID missing from carfile")) + })?; + let payload: unvalidated::Payload = + serde_ipld_dagcbor::from_slice(&link.bytes).map_err(|e| { + Error::new_invalid_arg( + anyhow::anyhow!(e).context("Failed to follow event link"), + ) + })?; + + match payload { + unvalidated::Payload::Data(d) => ( + false, + EventHeader::Data { + cid, + stream_cid: *d.id(), + prev: *d.prev(), + }, + ), + unvalidated::Payload::Init(init) => { + let header = init.header().to_owned(); + + (true, EventHeader::Init { cid, header }) + } + } + } + unvalidated::RawEvent::Unsigned(init) => ( + true, + EventHeader::Init { + cid, + header: init.header().to_owned(), + }, + ), + }; + + Ok(Self::new(event_cid, header, blocks, deliverable)) } } diff --git a/store/src/sql/entities/event_block.rs b/store/src/sql/entities/event_block.rs index e40698a69..58b80f2f6 100644 --- a/store/src/sql/entities/event_block.rs +++ b/store/src/sql/entities/event_block.rs @@ -134,4 +134,8 @@ impl EventBlockRaw { bytes, }) } + + pub fn cid(&self) -> Cid { + Cid::new_v1(self.codec as u64, self.multihash.clone().into_inner()) + } } diff --git a/store/src/sql/entities/event_metadata.rs b/store/src/sql/entities/event_metadata.rs new file mode 100644 index 000000000..b40d829b0 --- /dev/null +++ b/store/src/sql/entities/event_metadata.rs @@ -0,0 +1,58 @@ +use cid::Cid; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] +pub enum EventType { + Init, + Data, + Time, +} + +#[derive(Debug, Clone)] +pub struct EventMetadataRow {} + +impl EventMetadataRow { + pub fn insert() -> &'static str { + "INSERT INTO ceramic_one_event_metadata (cid, stream_cid, event_type, prev) VALUES ($1, $2, $3, $4)" + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event header wrapper for use in the store crate. +/// TODO: replace this with something from the event crate +pub enum EventHeader { + Init { + cid: Cid, + header: ceramic_event::unvalidated::init::Header, + }, + Data { + cid: Cid, + stream_cid: Cid, + prev: Cid, + }, + Time { + cid: Cid, + stream_cid: Cid, + prev: Cid, + }, +} + +impl EventHeader { + /// Returns the event type of the event header + pub(crate) fn event_type(&self) -> EventType { + match self { + EventHeader::Init { .. } => EventType::Init, + EventHeader::Data { .. } => EventType::Data, + EventHeader::Time { .. } => EventType::Time, + } + } + + /// Returns the stream CID of the event + pub(crate) fn stream_cid(&self) -> Cid { + match self { + EventHeader::Init { cid, .. } => *cid, + EventHeader::Data { stream_cid, .. } | EventHeader::Time { stream_cid, .. } => { + *stream_cid + } + } + } +} diff --git a/store/src/sql/entities/hash.rs b/store/src/sql/entities/hash.rs index 926e6af1e..a914b808f 100644 --- a/store/src/sql/entities/hash.rs +++ b/store/src/sql/entities/hash.rs @@ -4,7 +4,7 @@ use sqlx::{sqlite::SqliteRow, Row as _}; use crate::{Error, Result}; -#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::Type)] +#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)] pub struct BlockHash(Multihash<64>); impl BlockHash { diff --git a/store/src/sql/entities/mod.rs b/store/src/sql/entities/mod.rs index 8103296af..f03d7c9ce 100644 --- a/store/src/sql/entities/mod.rs +++ b/store/src/sql/entities/mod.rs @@ -1,12 +1,18 @@ mod block; mod event; mod event_block; +mod event_metadata; mod hash; +mod stream; mod utils; pub use block::{BlockBytes, BlockRow}; pub use event::{rebuild_car, EventInsertable, EventInsertableBody}; pub use event_block::{EventBlockRaw, ReconEventBlockRaw}; +pub use event_metadata::{EventHeader, EventMetadataRow, EventType}; pub use hash::{BlockHash, ReconHash}; +pub use stream::{IncompleteStream, StreamEventRow, StreamRow}; -pub use utils::{CountRow, DeliveredEvent, OrderKey}; +pub use utils::{CountRow, DeliveredEventRow, OrderKey}; + +pub type StreamCid = cid::Cid; diff --git a/store/src/sql/entities/stream.rs b/store/src/sql/entities/stream.rs new file mode 100644 index 000000000..9916fcd4f --- /dev/null +++ b/store/src/sql/entities/stream.rs @@ -0,0 +1,68 @@ +use cid::Cid; +use sqlx::{sqlite::SqliteRow, Row}; + +use super::StreamCid; + +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct StreamRow { + pub cid: Vec, + pub sep: String, + pub sep_val: Vec, +} + +impl StreamRow { + pub fn insert() -> &'static str { + "INSERT INTO ceramic_one_stream (cid, sep, sep_value) VALUES ($1, $2, $3) returning cid" + } +} + +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct StreamEventRow { + pub cid: Vec, + pub prev: Option>, + pub deliverable: bool, +} + +impl StreamEventRow { + /// Requires binding one argument: + /// $1 = stream_cid (bytes) + pub fn fetch_by_stream_cid() -> &'static str { + r#" + SELECT e.cid as "cid", eh.prev as "prev", + e.delivered IS NOT NULL as "deliverable" + FROM ceramic_one_stream s + JOIN ceramic_one_event_metadata eh on eh.stream_cid = s.cid + JOIN ceramic_one_event e on e.cid = eh.cid + WHERE s.cid = $1"# + } +} + +#[derive(Debug, Clone)] +pub struct IncompleteStream { + pub stream_cid: StreamCid, + pub row_id: i64, +} + +impl IncompleteStream { + /// Requires binding two arguments: + /// $1 = highwater mark (i64) + /// $2 = limit (usize) + pub fn fetch_all_with_undelivered() -> &'static str { + r#" + SELECT DISTINCT s.cid as "stream_cid", s.rowid + FROM ceramic_one_stream s + JOIN ceramic_one_event_metadata eh on eh.stream_cid = s.cid + JOIN ceramic_one_event e on e.cid = eh.cid + WHERE e.delivered is NULL and s.rowid > $1 + LIMIT $2"# + } +} + +impl sqlx::FromRow<'_, SqliteRow> for IncompleteStream { + fn from_row(row: &SqliteRow) -> sqlx::Result { + let cid: Vec = row.try_get("stream_cid")?; + let row_id: i64 = row.try_get("rowid")?; + let stream_cid = Cid::try_from(cid).map_err(|e| sqlx::Error::Decode(Box::new(e)))?; + Ok(Self { stream_cid, row_id }) + } +} diff --git a/store/src/sql/entities/utils.rs b/store/src/sql/entities/utils.rs index d8efc5f5a..3b6555607 100644 --- a/store/src/sql/entities/utils.rs +++ b/store/src/sql/entities/utils.rs @@ -24,12 +24,12 @@ impl TryFrom for EventId { } #[derive(sqlx::FromRow)] -pub struct DeliveredEvent { +pub struct DeliveredEventRow { pub cid: Vec, pub new_highwater_mark: i64, } -impl DeliveredEvent { +impl DeliveredEventRow { /// assumes rows are sorted by `delivered` ascending pub fn parse_query_results(current: i64, rows: Vec) -> Result<(i64, Vec)> { let max: i64 = rows.last().map_or(current, |r| r.new_highwater_mark + 1); diff --git a/store/src/sql/mod.rs b/store/src/sql/mod.rs index 884b04b9b..886b241b5 100644 --- a/store/src/sql/mod.rs +++ b/store/src/sql/mod.rs @@ -6,7 +6,10 @@ mod sqlite; #[cfg(test)] mod test; -pub use access::{CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest}; +pub use access::{ + CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneStream, + InsertResult, InsertedEvent, StreamEventMetadata, +}; pub use root::SqliteRootStore; pub use sqlite::{SqlitePool, SqliteTransaction}; diff --git a/store/src/sql/query.rs b/store/src/sql/query.rs index c73ffc9d1..fcc935b9a 100644 --- a/store/src/sql/query.rs +++ b/store/src/sql/query.rs @@ -124,7 +124,7 @@ impl EventQuery { /// $1 = delivered (i64) /// $2 = cid (bytes) pub fn mark_ready_to_deliver() -> &'static str { - "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2;" + "UPDATE ceramic_one_event SET delivered = $1 WHERE cid = $2 and delivered is NULL;" } } @@ -170,11 +170,13 @@ impl ReconQuery { "INSERT INTO ceramic_one_event ( order_key, cid, ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7 + ahash_4, ahash_5, ahash_6, ahash_7, + delivered ) VALUES ( $1, $2, $3, $4, $5, $6, - $7, $8, $9, $10 + $7, $8, $9, $10, + $11 );" } diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index d0e55c777..6e581977d 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -32,8 +32,19 @@ fn random_event(cid: &str) -> EventInsertable { order_key, body: EventInsertableBody { cid, - deliverable: false, blocks: vec![], + deliverable: true, + header: super::entities::EventHeader::Init { + cid, + header: ceramic_event::unvalidated::init::Header::new( + vec![CONTROLLER.to_string()], + SEP_KEY.to_string(), + vec![3, 2, 45, 8], + None, + None, + None, + ), + }, }, } } @@ -48,8 +59,7 @@ async fn hash_range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let hash = CeramicOneEvent::hash_range( &pool, @@ -70,8 +80,7 @@ async fn range_query() { .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let ids = CeramicOneEvent::range( &pool,