From 4201fa8b45614cdb78d55f41e3973835289d4770 Mon Sep 17 00:00:00 2001 From: David Estes Date: Wed, 12 Jun 2024 23:14:56 -0600 Subject: [PATCH] fix: massive commit addressing most of the comments/discussion today - modified InsertResult across store/api/recon to keep guarantees: recon bools are in the right order and api will only fail individual writes rather than the entire batch (this was an existing bug) - rename ceramic_one_event_header -> ceramic_one_event_metadata and associated types/files - fix IOD code to work. probably does more work than necessary but without sending every changed stream to the task or doing things inline, we can miss events that are being processed while new writes come in (will optimze later as needed) - includes only sending deliverable recon events to the ordering task, rather than all changes to streams, as API writes should have been ordered and we handle discovering changes while we process in the task - removed deliverable/ordering logic from store crate. still need shared event crate type for interface - renamed lots of things: removed "commit" wording, use deliverable instead of delivered - added better docs/comments outstanding: - migrations are still not done - controller.. we didn't talk about how we want to normalize/store controller info --- api/src/lib.rs | 2 +- api/src/server.rs | 56 ++- api/src/tests.rs | 55 ++- .../20240530125008_event_header.down.sql | 2 - .../20240530125008_event_metadata.down.sql | 2 + ...l => 20240530125008_event_metadata.up.sql} | 4 +- service/src/event/mod.rs | 1 + service/src/event/order_events.rs | 102 ++++++ service/src/event/ordering_task.rs | 339 +++++++++--------- service/src/event/service.rs | 128 ++++--- service/src/event/store.rs | 15 +- service/src/tests/mod.rs | 14 +- service/src/tests/ordering.rs | 37 +- store/src/lib.rs | 6 +- store/src/metrics.rs | 11 +- store/src/migration.rs | 6 +- store/src/sql/access/event.rs | 162 +++------ store/src/sql/access/mod.rs | 4 +- store/src/sql/access/stream.rs | 34 +- store/src/sql/entities/event.rs | 72 +++- .../{event_header.rs => event_metadata.rs} | 19 +- store/src/sql/entities/mod.rs | 6 +- store/src/sql/entities/stream.rs | 19 +- store/src/sql/mod.rs | 4 +- store/src/sql/test.rs | 11 +- 25 files changed, 660 insertions(+), 451 deletions(-) delete mode 100644 migrations/sqlite/20240530125008_event_header.down.sql create mode 100644 migrations/sqlite/20240530125008_event_metadata.down.sql rename migrations/sqlite/{20240530125008_event_header.up.sql => 20240530125008_event_metadata.up.sql} (72%) create mode 100644 service/src/event/order_events.rs rename store/src/sql/entities/{event_header.rs => event_metadata.rs} (67%) diff --git a/api/src/lib.rs b/api/src/lib.rs index 7cb8cca41..b306ac7c1 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -3,7 +3,7 @@ mod server; pub use resume_token::ResumeToken; -pub use server::{EventStore, InterestStore, Server}; +pub use server::{EventInsertResult, EventStore, InterestStore, Server}; #[cfg(test)] mod tests; diff --git a/api/src/server.rs b/api/src/server.rs index 2ce7d297a..5289f51de 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -6,6 +6,7 @@ mod event; +use std::collections::HashMap; use std::time::Duration; use std::{future::Future, ops::Range}; use std::{marker::PhantomData, ops::RangeBounds}; @@ -162,11 +163,28 @@ impl InterestStore for Arc { } } +#[derive(Debug, Clone)] +pub struct EventInsertResult { + id: EventId, + // if set, the reason this event couldn't be inserted + failed: Option, +} + +impl EventInsertResult { + pub fn new(id: EventId, failed: Option) -> Self { + Self { id, failed } + } + + pub fn success(&self) -> bool { + self.failed.is_none() + } +} + /// Trait for accessing persistent storage of Events #[async_trait] pub trait EventStore: Send + Sync { /// Returns (new_key, new_value) where true if was newly inserted, false if it already existed. - async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; + async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; async fn range_with_values( &self, range: Range, @@ -199,7 +217,7 @@ pub trait EventStore: Send + Sync { #[async_trait::async_trait] impl EventStore for Arc { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result> { + async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result> { self.as_ref().insert_many(items).await } @@ -241,7 +259,7 @@ impl EventStore for Arc { struct EventInsert { id: EventId, data: Vec, - tx: tokio::sync::oneshot::Sender>, + tx: tokio::sync::oneshot::Sender>, } struct InsertTask { @@ -325,25 +343,35 @@ where if events.is_empty() { return; } - let mut oneshots = Vec::with_capacity(events.len()); + let mut oneshots = HashMap::with_capacity(events.len()); let mut items = Vec::with_capacity(events.len()); events.drain(..).for_each(|req: EventInsert| { - oneshots.push(req.tx); + oneshots.insert(req.id.to_bytes(), req.tx); items.push((req.id, req.data)); }); tracing::trace!("calling insert many with {} items.", items.len()); match event_store.insert_many(&items).await { Ok(results) => { tracing::debug!("insert many returned {} results.", results.len()); - for (tx, result) in oneshots.into_iter().zip(results.into_iter()) { - if let Err(e) = tx.send(Ok(result)) { - tracing::warn!("failed to send success response to api listener: {:?}", e); + for result in results { + if let Some(tx) = oneshots.remove(&result.id.to_bytes()) { + if let Err(e) = tx.send(Ok(result)) { + tracing::warn!( + "failed to send success response to api listener: {:?}", + e + ); + } + } else { + tracing::warn!( + "lost channel to respond to API listener for event ID: {:?}", + result.id + ); } } } Err(e) => { tracing::warn!("failed to insert events: {e}"); - for tx in oneshots.into_iter() { + for tx in oneshots.into_values() { if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}"))) { tracing::warn!("failed to send failed response to api listener: {:?}", e); } @@ -495,7 +523,7 @@ where .await? .map_err(|_| ErrorResponse::new("Database service not available".to_owned()))?; - let _new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx) + let new = tokio::time::timeout(INSERT_REQUEST_TIMEOUT, rx) .await .map_err(|_| { ErrorResponse::new("Timeout waiting for database service response".to_owned()) @@ -503,7 +531,13 @@ where .map_err(|_| ErrorResponse::new("No response. Database service crashed".to_owned()))? .map_err(|e| ErrorResponse::new(format!("Failed to insert event: {e}")))?; - Ok(EventsPostResponse::Success) + if let Some(failed) = new.failed { + Ok(EventsPostResponse::BadRequest(BadRequestResponse::new( + failed, + ))) + } else { + Ok(EventsPostResponse::Success) + } } pub async fn post_interests( diff --git a/api/src/tests.rs b/api/src/tests.rs index 6ef42c7eb..5f94acb3b 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -5,6 +5,7 @@ use std::{ops::Range, str::FromStr, sync::Arc}; use crate::server::decode_multibase_data; use crate::server::BuildResponse; use crate::server::Server; +use crate::EventInsertResult; use crate::{EventStore, InterestStore}; use anyhow::Result; @@ -121,7 +122,7 @@ mock! { pub EventStoreTest {} #[async_trait] impl EventStore for EventStoreTest { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; + async fn insert_many(&self, items: &[(EventId, Vec)]) -> Result>; async fn range_with_values( &self, range: Range, @@ -198,7 +199,12 @@ async fn create_event() { .expect_insert_many() .with(predicate::eq(args)) .times(1) - .returning(|_| Ok(vec![true])); + .returning(|input| { + Ok(input + .iter() + .map(|(id, _)| EventInsertResult::new(id.clone(), None)) + .collect()) + }); let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .events_post( @@ -211,6 +217,51 @@ async fn create_event() { .unwrap(); assert!(matches!(resp, EventsPostResponse::Success)); } + +#[tokio::test] +async fn create_event_fails() { + let peer_id = PeerId::random(); + let network = Network::Mainnet; + let expected_event_id = EventId::try_from(hex::decode(DATA_EVENT_ID).unwrap()).unwrap(); + + // Remove whitespace from event CAR file + let event_data = DATA_EVENT_CAR + .chars() + .filter(|c| !c.is_whitespace()) + .collect::(); + let mock_interest = MockAccessInterestStoreTest::new(); + let mut mock_event_store = MockEventStoreTest::new(); + mock_get_init_event(&mut mock_event_store); + let args = vec![( + expected_event_id.clone(), + decode_multibase_data(&event_data).unwrap(), + )]; + + mock_event_store + .expect_insert_many() + .with(predicate::eq(args)) + .times(1) + .returning(|input| { + Ok(input + .iter() + .map(|(id, _)| { + EventInsertResult::new(id.clone(), Some("Event is missing prev".to_string())) + }) + .collect()) + }); + let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store)); + let resp = server + .events_post( + models::EventData { + data: event_data.to_string(), + }, + &Context, + ) + .await + .unwrap(); + assert!(matches!(resp, EventsPostResponse::BadRequest(_))); +} + #[tokio::test] #[traced_test] async fn register_interest_sort_value() { diff --git a/migrations/sqlite/20240530125008_event_header.down.sql b/migrations/sqlite/20240530125008_event_header.down.sql deleted file mode 100644 index 3c9d50f94..000000000 --- a/migrations/sqlite/20240530125008_event_header.down.sql +++ /dev/null @@ -1,2 +0,0 @@ --- Add down migration script here -DROP TABLE IF EXISTS "ceramic_one_event_header"; \ No newline at end of file diff --git a/migrations/sqlite/20240530125008_event_metadata.down.sql b/migrations/sqlite/20240530125008_event_metadata.down.sql new file mode 100644 index 000000000..cda23cd1f --- /dev/null +++ b/migrations/sqlite/20240530125008_event_metadata.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS "ceramic_one_event_metadata"; \ No newline at end of file diff --git a/migrations/sqlite/20240530125008_event_header.up.sql b/migrations/sqlite/20240530125008_event_metadata.up.sql similarity index 72% rename from migrations/sqlite/20240530125008_event_header.up.sql rename to migrations/sqlite/20240530125008_event_metadata.up.sql index 70d4932d7..318fac30d 100644 --- a/migrations/sqlite/20240530125008_event_header.up.sql +++ b/migrations/sqlite/20240530125008_event_metadata.up.sql @@ -1,5 +1,5 @@ -- Add up migration script here -CREATE TABLE IF NOT EXISTS "ceramic_one_event_header" ( +CREATE TABLE IF NOT EXISTS "ceramic_one_event_metadata" ( "cid" BLOB NOT NULL, -- event cid "event_type" INTEGER NOT NULL, -- enum EventType: Init, Data, Time "stream_cid" BLOB NOT NULL, -- id field in header. can't have FK because stream may not exist until we discover it but should reference ceramic_one_stream(cid) @@ -7,3 +7,5 @@ CREATE TABLE IF NOT EXISTS "ceramic_one_event_header" ( PRIMARY KEY(cid), FOREIGN KEY(cid) REFERENCES ceramic_one_event(cid) ); + +CREATE INDEX IF NOT EXISTS "idx_ceramic_one_event_metadata_stream_cid" ON "ceramic_one_event_metadata" ("stream_cid"); diff --git a/service/src/event/mod.rs b/service/src/event/mod.rs index f7dce573a..69e12b26c 100644 --- a/service/src/event/mod.rs +++ b/service/src/event/mod.rs @@ -1,3 +1,4 @@ +mod order_events; mod ordering_task; mod service; mod store; diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs new file mode 100644 index 000000000..ae4c70e72 --- /dev/null +++ b/service/src/event/order_events.rs @@ -0,0 +1,102 @@ +use std::collections::HashSet; + +use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; +use cid::Cid; + +use crate::Result; + +pub(crate) struct OrderEvents { + pub(crate) deliverable: Vec, + pub(crate) missing_history: Vec, +} + +impl OrderEvents { + /// Groups the events into lists those with a delivered prev and those without. This can be used to return an error if the event is required to have history. + /// The events will be marked as deliverable so that they can be passed directly to the store to be persisted. + pub async fn try_new( + pool: &SqlitePool, + mut candidate_events: Vec, + ) -> Result { + // move all the init events to the front so we make sure to add them first and get the deliverable order correct + let new_cids: HashSet = HashSet::from_iter(candidate_events.iter().map(|e| e.cid())); + let mut deliverable = Vec::with_capacity(candidate_events.len()); + candidate_events.retain(|e| { + if e.deliverable() { + deliverable.push(e.clone()); + false + } else { + true + } + }); + if candidate_events.is_empty() { + return Ok(OrderEvents { + deliverable, + missing_history: Vec::new(), + }); + } + + let mut prevs_in_memory = Vec::with_capacity(candidate_events.len()); + let mut missing_history = Vec::with_capacity(candidate_events.len()); + + while let Some(mut event) = candidate_events.pop() { + match &event.prev() { + None => { + unreachable!("Init events should have been filtered out since they're always deliverable"); + } + Some(prev) => { + if new_cids.contains(prev) { + prevs_in_memory.push(event.clone()); + continue; + } else { + let (_exists, prev_deliverable) = + CeramicOneEvent::deliverable_by_cid(pool, prev).await?; + if prev_deliverable { + event.set_deliverable(true); + deliverable.push(event); + } else { + // technically, we may have the "rosetta stone" event in memory that could unlock this chain, if we loaded everything and recursed, + // but the immediate prev is not in this set and has not been delivered to the client yet, so they shouldn't have known how to + // construct this event so we'll consider this missing history. This can be used to return an error if the event is required to have history. + missing_history.push(event); + } + } + } + } + } + + // We add the events to the deliverable list until nothing changes. + // It should be a small set and it will shrink each loop, so continually looping is acceptable. + loop { + let mut made_changes = false; + while let Some(mut event) = prevs_in_memory.pop() { + match &event.prev() { + None => { + unreachable!( + "Init events should have been filtered out of the in memory set" + ); + } + Some(prev) => { + // a hashset would be better loopkup but we're not going to have that many events so hashing + // for a handful of lookups and then convert back to a vec probably isn't worth it. + if deliverable.iter().any(|e| e.cid() == *prev) { + event.set_deliverable(true); + deliverable.push(event); + made_changes = true; + } else { + prevs_in_memory.push(event); + } + } + } + } + if !made_changes { + missing_history.extend(prevs_in_memory); + break; + } + } + + Ok(OrderEvents { + deliverable, + missing_history, + }) + } +} diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index cdc2c8903..fb7f90388 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -1,17 +1,13 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use ceramic_store::{CandidateEvent, CeramicOneEvent, CeramicOneStream, SqlitePool, StreamCommit}; +use ceramic_store::{ + CeramicOneEvent, CeramicOneStream, InsertedEvent, SqlitePool, StreamEventMetadata, +}; use cid::Cid; -use itertools::Itertools; use tracing::{debug, error, info, trace, warn}; use crate::{Error, Result}; -/// How often should we try to review our internal state to see if we missed something? -/// Should this query the database to try to discover any events? Probably add some "channel full" global flag so we -/// know to start querying the database for more events when we recover. -const CHECK_ALL_INTERVAL_SECONDS: u64 = 60 * 10; // 10 minutes - type StreamCid = Cid; type EventCid = Cid; type PrevCid = Cid; @@ -19,8 +15,7 @@ type PrevCid = Cid; #[derive(Debug)] pub struct DeliverableTask { pub(crate) _handle: tokio::task::JoinHandle<()>, - pub(crate) tx_delivered: tokio::sync::mpsc::Sender, - pub(crate) tx_stream_update: tokio::sync::mpsc::Sender, + pub(crate) tx_delivered: tokio::sync::mpsc::Sender, } #[derive(Debug)] @@ -28,25 +23,21 @@ pub struct OrderingTask {} impl OrderingTask { pub async fn run(pool: SqlitePool, q_depth: usize, load_delivered: bool) -> DeliverableTask { - let (tx_delivered, rx_delivered) = tokio::sync::mpsc::channel::(q_depth); - let (tx_stream_update, rx_stream_update) = tokio::sync::mpsc::channel::(q_depth); + let (tx_delivered, rx_delivered) = tokio::sync::mpsc::channel::(q_depth); - let handle = tokio::spawn(async move { - Self::run_loop(pool, load_delivered, rx_delivered, rx_stream_update).await - }); + let handle = + tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx_delivered).await }); DeliverableTask { _handle: handle, tx_delivered, - tx_stream_update, } } async fn run_loop( pool: SqlitePool, load_undelivered: bool, - mut rx_delivered: tokio::sync::mpsc::Receiver, - mut rx_stream_update: tokio::sync::mpsc::Receiver, + mut rx_delivered: tokio::sync::mpsc::Receiver, ) { // before starting, make sure we've updated any events in the database we missed let mut state = OrderingState::new(); @@ -60,48 +51,47 @@ impl OrderingTask { return; } - let mut interval = - tokio::time::interval(std::time::Duration::from_secs(CHECK_ALL_INTERVAL_SECONDS)); loop { + // review anything we couldn't finish in case new writes came in while we were processing + // should no-op, but we don't want to require a deliverable event to trigger a review just in case + if state + .process_streams(&pool) + .await + .map_err(Self::log_error) + .is_err() + { + return; + } let mut delivered_events = Vec::with_capacity(100); - let mut updated_streams = Vec::with_capacity(100); - tokio::select! { - _new = rx_delivered.recv_many(&mut delivered_events, 100) => { - debug!(?delivered_events, "incoming writes!"); + if rx_delivered.recv_many(&mut delivered_events, 100).await > 0 { + debug!(?delivered_events, "new delivered events!"); + for event in delivered_events { + state.add_stream(event.stream_cid); } - _new = rx_stream_update.recv_many(&mut updated_streams, 100) => { - debug!(?updated_streams, "incoming updates!"); - for stream in updated_streams { - state.add_stream(stream); - } - } - _ = interval.tick() => { - // For this and the else branch, we allow the loop to complete and if state has anything to process, it will be processed. - debug!(stream_count=%state.streams.len(),"Process undelivered stream events interval triggered."); - } - else => { - debug!(stream_count=%state.streams.len(), "Server dropped the ordering task. Processing once more before exiting..."); - } - }; - if !delivered_events.is_empty() - && state - .process_delivered_events(&pool, delivered_events) + if state + .process_streams(&pool) .await .map_err(Self::log_error) .is_err() - { - return; - } + { + return; + } + } else if rx_delivered.is_closed() { + debug!( + "Server dropped the delivered events channel. Processing streams in memory once more before exiting." + ); - if state - .process_streams(&pool) - .await - .map_err(Self::log_error) - .is_err() - { - return; + if state + .process_streams(&pool) + .await + .map_err(Self::log_error) + .is_err() + { + return; + } + break; } } } @@ -128,17 +118,28 @@ impl OrderingTask { #[derive(Debug, Clone, Default)] /// ~540 bytes per event in this struct pub(crate) struct StreamEvents { - _cid: StreamCid, + /// Map of `event.prev` to `event.cid` for quick lookup of the next event in the stream. prev_map: HashMap, - cid_map: HashMap, + /// Map of `event.cid` to `metadata` for quick lookup of the event metadata. + cid_map: HashMap, + /// Events that can be delivered FIFO order for the stream + deliverable: VecDeque, + /// The total number of events in the stream when we started + total_events: usize, } impl StreamEvents { - fn new(cid: StreamCid, events: I) -> Self + fn new(_cid: StreamCid, events: I) -> Self where - I: IntoIterator, + I: ExactSizeIterator, { - let mut new = Self::new_empty(cid); + let total_events = events.len(); + let mut new = Self { + prev_map: HashMap::with_capacity(total_events), + cid_map: HashMap::with_capacity(total_events), + deliverable: VecDeque::with_capacity(total_events), + total_events, + }; for event in events { new.add_event(event); @@ -146,23 +147,26 @@ impl StreamEvents { new } - fn new_empty(cid: StreamCid) -> Self { - Self { - _cid: cid, - prev_map: HashMap::default(), - cid_map: HashMap::default(), - } + async fn new_from_db(stream: StreamCid, pool: &SqlitePool) -> Result { + let stream_events = CeramicOneStream::load_stream_events(pool, stream).await?; + trace!(?stream_events, "Loaded stream events for ordering"); + Ok(Self::new(stream, stream_events.into_iter())) + } + + fn is_empty(&self) -> bool { + // The init event can remain in the cid_map so we use the prev_map + self.prev_map.is_empty() } /// returns true if this is a new event. - fn add_event(&mut self, event: StreamCommit) -> bool { + fn add_event(&mut self, event: StreamEventMetadata) -> bool { if let Some(prev) = event.prev { self.prev_map.insert(prev, event.cid); } self.cid_map.insert(event.cid, event).is_none() } - fn remove_by_event_cid(&mut self, cid: &Cid) -> Option { + fn remove_by_event_cid(&mut self, cid: &Cid) -> Option { if let Some(ev) = self.cid_map.remove(cid) { if let Some(prev) = ev.prev { self.prev_map.remove(&prev); @@ -185,27 +189,41 @@ impl StreamEvents { fn delivered_events(&self) -> impl Iterator { self.cid_map .iter() - .filter_map(|(cid, event)| if event.delivered { Some(cid) } else { None }) + .filter_map(|(cid, event)| if event.deliverable { Some(cid) } else { None }) } - fn order_events(&mut self, start_with: EventCid) -> VecDeque { - let mut deliverable = VecDeque::with_capacity(self.prev_map.len()); + async fn order_events(pool: &SqlitePool, stream: StreamCid) -> Result { + let mut to_process = Self::new_from_db(stream, pool).await?; + if to_process.delivered_events().count() == 0 { + return Ok(to_process); + } - deliverable.push_back(start_with); - self.remove_by_event_cid(&start_with); - let mut tip = start_with; - // technically, could be in deliverable set as well if the stream is forking? - while let Some(next_event) = self.remove_by_prev_cid(&tip) { - deliverable.push_back(next_event); - tip = next_event; + let stream_event_count = to_process.cid_map.len(); + let delivered_cids = to_process.delivered_events().cloned().collect::>(); + let mut start_with = VecDeque::with_capacity(stream_event_count - delivered_cids.len()); + + for cid in delivered_cids { + if let Some(next_event) = to_process.remove_by_prev_cid(&cid) { + to_process.remove_by_event_cid(&next_event); + start_with.push_back(next_event); + } } - deliverable + + while let Some(new_tip) = start_with.pop_front() { + to_process.deliverable.push_back(new_tip); + let mut tip = new_tip; + while let Some(next_event) = to_process.remove_by_prev_cid(&tip) { + to_process.deliverable.push_back(next_event); + tip = next_event; + } + } + Ok(to_process) } } +#[derive(Debug)] pub struct OrderingState { streams: HashSet, - processed_streams: HashSet, deliverable: VecDeque, } @@ -213,92 +231,58 @@ impl OrderingState { fn new() -> Self { Self { streams: HashSet::new(), - processed_streams: HashSet::new(), deliverable: VecDeque::new(), } } /// Add a stream to the list of streams to process. This implies it has undelivered events and is worthwhile to attempt. - fn add_stream(&mut self, stream: StreamCid) { - self.processed_streams.remove(&stream); - self.streams.insert(stream); + fn add_stream(&mut self, stream: StreamCid) -> bool { + self.streams.insert(stream) } /// Process every stream we know about that has undelivered events that should be "unlocked" now. This could be adjusted to process commit things in batches, /// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried. Events that are /// delivered multiple times will not change the original delivered state. async fn process_streams(&mut self, pool: &SqlitePool) -> Result<()> { - for stream in &self.streams { - let stream_events = CeramicOneStream::load_stream_commits(pool, *stream).await?; - trace!(?stream_events, "Loaded stream events for ordering"); - let mut to_process = StreamEvents::new(*stream, stream_events.into_iter()); - if to_process.delivered_events().count() == 0 { - return Ok(()); - } - - let mut start_with = VecDeque::with_capacity(to_process.cid_map.len()); - let delivered_cids = to_process.delivered_events().cloned().collect::>(); - - for cid in delivered_cids { - if let Some(needs_me) = to_process.remove_by_prev_cid(&cid) { - start_with.push_back(needs_me); + let mut stream_cnt = HashMap::new(); + // we need to handle the fact that new writes can come in without knowing they're deliverable because we're still in the process of updating them. + // so when we finish the loop, we need to check if any of our streams had new writes and try again, if nothing changed we exit. + // this could certainly be optimized. we could check the count of events to make sure it's different, or we could load undelivered events and keep track of our + // total state, as anything forking that was deliverable would arrive on the incoming channel. for now, streams are short and this is probably sufficient. + // e.g. track in_progress as a HashMap and update it as we go. + loop { + let mut processed_streams = Vec::with_capacity(self.streams.len()); + for stream in &self.streams { + let ordered_events = StreamEvents::order_events(pool, *stream).await?; + stream_cnt.insert(*stream, ordered_events.total_events); + if ordered_events.is_empty() { + processed_streams.push(*stream); } + self.deliverable.extend(ordered_events.deliverable); } - while let Some(new_tip) = start_with.pop_front() { - self.deliverable.extend(to_process.order_events(new_tip)); - } - self.processed_streams.insert(*stream); - } - - if !self.deliverable.is_empty() { - tracing::debug!(count=%self.deliverable.len(), "Marking events as ready to deliver"); - let mut tx = pool.begin_tx().await?; - // We process the ready events as a FIFO queue so they are marked delivered before events that were added after and depend on them. - // Could use `pop_front` but we want to make sure we commit and then clear everything at once. - for cid in &self.deliverable { - CeramicOneEvent::mark_ready_to_deliver(&mut tx, cid).await?; - } - tx.commit().await?; - self.deliverable.clear(); - self.streams - .retain(|stream| !self.processed_streams.contains(stream)); - self.processed_streams.clear(); - } - Ok(()) - } - - /// We group the events into their stream and see if something was waiting for one of these. If so, we process the stream. - async fn process_delivered_events( - &mut self, - pool: &SqlitePool, - events: Vec, - ) -> Result<()> { - tracing::trace!("Processing {} delivered events for ordering", events.len()); - let stream_events: HashMap> = events - .into_iter() - .into_group_map_by(|event| event.stream_cid) - .into_iter() - .map(|(stream_cid, events)| { - ( - stream_cid, - events - .into_iter() - .map(|event| event.cid) - .collect::>(), - ) - }) - .collect(); - - for (stream, event_cids) in stream_events { - for cid in event_cids { - let (_exists, delivered) = CeramicOneEvent::delivered_by_cid(pool, &cid).await?; - if delivered { - self.add_stream(stream); + if !self.deliverable.is_empty() { + tracing::debug!(count=%self.deliverable.len(), "Marking events as ready to deliver"); + let mut tx = pool.begin_tx().await?; + // We process the ready events as a FIFO queue so they are marked delivered before events that were added after and depend on them. + // Could use `pop_front` but we want to make sure we commit and then clear everything at once. + for cid in &self.deliverable { + CeramicOneEvent::mark_ready_to_deliver(&mut tx, cid).await?; + } + tx.commit().await?; + self.deliverable.clear(); + self.streams + .retain(|stream| !processed_streams.contains(stream)); + // not strictly necessary as the next loop will not do anything, but we can avoid allocating a new vec + if self.streams.is_empty() { break; } + } else { + break; } + debug!(stream_state=?self, ?processed_streams, "Finished processing streams"); } + Ok(()) } @@ -315,9 +299,11 @@ impl OrderingState { let cids = CeramicOneStream::load_stream_cids_with_undelivered_events(pool, cid).await?; last_cid = cids.last().cloned(); + trace!(count=cids.len(), stream_cids=?cids, "Discovered streams with undelivered events"); for cid in cids { - self.add_stream(cid); - streams_discovered += 1; + if self.add_stream(cid) { + streams_discovered += 1; + } } self.process_streams(pool).await?; } @@ -351,51 +337,56 @@ mod test { let pool = SqlitePool::connect_in_memory().await.unwrap(); let s1_events = get_n_events(3).await; let s2_events = get_n_events(5).await; - let mut insert_later = Vec::with_capacity(2); let mut all_insertable = Vec::with_capacity(8); - for (i, event) in s1_events.iter().enumerate() { + for event in s1_events.iter() { let insertable = EventInsertable::try_new(event.0.to_owned(), &event.1) .await .unwrap(); - all_insertable.push(insertable.clone()); - if i == 1 { - insert_later.push(insertable); - } else { - CeramicOneEvent::insert_many(&pool, &[insertable], false) - .await - .unwrap(); - } + let expected_deliverable = insertable.deliverable(); + let res = CeramicOneEvent::insert_many(&pool, &[insertable.clone()]) + .await + .unwrap(); + assert_eq!(expected_deliverable, res.inserted[0].deliverable); + + all_insertable.push(insertable); } - for (i, event) in s2_events.iter().enumerate() { + for event in s2_events.iter() { let insertable = EventInsertable::try_new(event.0.to_owned(), &event.1) .await .unwrap(); - all_insertable.push(insertable.clone()); + let expected_deliverable = insertable.deliverable(); + let res = CeramicOneEvent::insert_many(&pool, &[insertable.clone()]) + .await + .unwrap(); + assert_eq!(expected_deliverable, res.inserted[0].deliverable); - if i == 1 { - insert_later.push(insertable); - } else { - CeramicOneEvent::insert_many(&pool, &[insertable], false) - .await - .unwrap(); - } + all_insertable.push(insertable); } - CeramicOneEvent::insert_many(&pool, &insert_later[..], false) - .await - .unwrap(); - for event in &all_insertable { - let (_exists, delivered) = CeramicOneEvent::delivered_by_cid(&pool, &event.cid()) + let (_exists, delivered) = CeramicOneEvent::deliverable_by_cid(&pool, &event.cid()) .await .unwrap(); - // init events are always delivered and the last two would have been okay.. but the rest should have been skipped + // init events are always delivered and the others should have been skipped if event.cid() == event.stream_cid() - || event.order_key == s1_events[1].0 - || event.order_key == s2_events[1].0 + || event.order_key == s1_events[0].0 + || event.order_key == s2_events[0].0 { - assert!(delivered); + assert!( + delivered, + "Event {:?} was not delivered. init={:?}, s1={:?}, s2={:?}", + event.cid(), + event.stream_cid(), + s1_events + .iter() + .map(|(e, _)| e.cid().unwrap()) + .collect::>(), + s2_events + .iter() + .map(|(e, _)| e.cid().unwrap()) + .collect::>(), + ); } else { assert!(!delivered); } @@ -407,7 +398,7 @@ mod test { assert_eq!(2, total); for event in &all_insertable { - let (_exists, delivered) = CeramicOneEvent::delivered_by_cid(&pool, &event.cid()) + let (_exists, delivered) = CeramicOneEvent::deliverable_by_cid(&pool, &event.cid()) .await .unwrap(); assert!(delivered); diff --git a/service/src/event/service.rs b/service/src/event/service.rs index bc615d963..491d5cc55 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -1,11 +1,11 @@ -use std::collections::HashSet; - use ceramic_core::EventId; use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; -use recon::InsertResult; use tracing::{trace, warn}; -use super::ordering_task::{DeliverableTask, OrderingTask}; +use super::{ + order_events::OrderEvents, + ordering_task::{DeliverableTask, OrderingTask}, +}; use crate::{Error, Result}; @@ -75,7 +75,7 @@ impl CeramicEventService { pub(crate) async fn insert_events_from_carfiles_local_api<'a>( &self, items: &[recon::ReconItem<'a, EventId>], - ) -> Result { + ) -> Result { self.insert_events(items, true).await } @@ -88,14 +88,26 @@ impl CeramicEventService { &self, items: &[recon::ReconItem<'a, EventId>], ) -> Result { - self.insert_events(items, false).await + let res = self.insert_events(items, false).await?; + let mut keys = vec![false; items.len()]; + // we need to put things back in the right order that the recon trait expects, even though we don't really care about the result + for (i, item) in items.iter().enumerate() { + let new_key = res + .store_result + .inserted + .iter() + .find(|e| e.order_key == *item.key) + .map_or(false, |e| e.new_key); // TODO: should we error if it's not in this set + keys[i] = new_key; + } + Ok(recon::InsertResult::new(keys)) } async fn insert_events<'a>( &self, items: &[recon::ReconItem<'a, EventId>], - require_history: bool, - ) -> Result { + history_required: bool, + ) -> Result { if items.is_empty() { return Ok(InsertResult::default()); } @@ -107,45 +119,75 @@ impl CeramicEventService { to_insert.push(insertable); } - let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..], require_history).await?; - for ev in res.delivered { - trace!(event=?ev, "sending delivered to ordering task"); - if let Err(e) = self.delivery_task.tx_delivered.try_send(ev) { - match e { - tokio::sync::mpsc::error::TrySendError::Full(e) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(event=?e, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); - } - } - } - } - let updated_streams = res - .undelivered + let ordered = OrderEvents::try_new(&self.pool, to_insert).await?; + + let missing_history = ordered + .missing_history .iter() - .map(|ev| ev.stream_cid) - .collect::>(); - for stream_cid in updated_streams { - trace!(event=?stream_cid, "sending updated to ordering task"); - if let Err(e) = self.delivery_task.tx_stream_update.try_send(stream_cid) { - match e { - tokio::sync::mpsc::error::TrySendError::Full(e) => { - // we should only be doing this during recon, in which case we can rediscover events. - // the delivery task will start picking up these events once it's drained since they are stored in the db - warn!(event=?e, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); - } - tokio::sync::mpsc::error::TrySendError::Closed(_) => { - warn!("Delivery task closed. shutting down"); - return Err(Error::new_fatal(anyhow::anyhow!("Delivery task closed"))); + .map(|e| e.order_key.clone()) + .collect(); + + let to_insert = if history_required { + ordered.deliverable + } else { + ordered + .deliverable + .into_iter() + .chain(ordered.missing_history) + .collect() + }; + + let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..]).await?; + // api writes shouldn't have any missed pieces that need ordering so we don't send those + if !history_required { + for ev in &res.inserted { + if ev.deliverable { + trace!(event=?ev, "sending delivered to ordering task"); + if let Err(e) = self.delivery_task.tx_delivered.try_send(ev.clone()) { + match e { + tokio::sync::mpsc::error::TrySendError::Full(e) => { + // we should only be doing this during recon, in which case we can rediscover events. + // the delivery task will start picking up these events once it's drained since they are stored in the db + warn!(event=?e, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable until queue drains"); + } + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + warn!("Delivery task closed. shutting down"); + return Err(Error::new_fatal(anyhow::anyhow!( + "Delivery task closed" + ))); + } + } } } } } - // TODO: this order is different than the original request!! - Ok(recon::InsertResult::new(res.keys)) + + Ok(InsertResult { + store_result: res, + missing_history, + }) + } +} + +#[derive(Debug, PartialEq, Eq, Default)] +pub struct InsertResult { + pub(crate) store_result: ceramic_store::InsertResult, + pub(crate) missing_history: Vec, +} + +impl From for Vec { + fn from(res: InsertResult) -> Self { + let mut api_res = + Vec::with_capacity(res.store_result.inserted.len() + res.missing_history.len()); + for ev in res.store_result.inserted { + api_res.push(ceramic_api::EventInsertResult::new(ev.order_key, None)); + } + for ev in res.missing_history { + api_res.push(ceramic_api::EventInsertResult::new( + ev, + Some("Failed to insert event as `prev` event was missing".to_owned()), + )); + } + api_res } } diff --git a/service/src/event/store.rs b/service/src/event/store.rs index 94e1f5e7d..fef6f4661 100644 --- a/service/src/event/store.rs +++ b/service/src/event/store.rs @@ -5,7 +5,7 @@ use ceramic_core::EventId; use ceramic_store::{CeramicOneBlock, CeramicOneEvent}; use cid::Cid; use iroh_bitswap::Block; -use recon::{HashCount, InsertResult, ReconItem, Result as ReconResult, Sha256a}; +use recon::{HashCount, ReconItem, Result as ReconResult, Sha256a}; use crate::event::CeramicEventService; @@ -25,7 +25,10 @@ impl recon::Store for CeramicEventService { /// Insert new keys into the key space. /// Returns true for each key if it did not previously exist, in the /// same order as the input iterator. - async fn insert_many(&self, items: &[ReconItem<'_, Self::Key>]) -> ReconResult { + async fn insert_many( + &self, + items: &[ReconItem<'_, Self::Key>], + ) -> ReconResult { let res = self.insert_events_from_carfiles_recon(items).await?; Ok(res) } @@ -104,7 +107,10 @@ impl iroh_bitswap::Store for CeramicEventService { #[async_trait::async_trait] impl ceramic_api::EventStore for CeramicEventService { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> anyhow::Result> { + async fn insert_many( + &self, + items: &[(EventId, Vec)], + ) -> anyhow::Result> { let items = items .iter() .map(|(key, val)| ReconItem::new(key, val.as_slice())) @@ -112,7 +118,8 @@ impl ceramic_api::EventStore for CeramicEventService { let res = self .insert_events_from_carfiles_local_api(&items[..]) .await?; - Ok(res.keys) + + Ok(res.into()) } async fn range_with_values( diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index 8576a0ee8..25a29e424 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -119,7 +119,7 @@ pub(crate) async fn check_deliverable( cid: &Cid, deliverable: bool, ) { - let (exists, delivered) = ceramic_store::CeramicOneEvent::delivered_by_cid(pool, cid) + let (exists, delivered) = ceramic_store::CeramicOneEvent::deliverable_by_cid(pool, cid) .await .unwrap(); assert!(exists); @@ -163,7 +163,10 @@ async fn data_event( } // returns init + N events -async fn get_n_events_with_model(model: &StreamId, number: usize) -> Vec<(EventId, Vec)> { +async fn get_init_plus_n_events_with_model( + model: &StreamId, + number: usize, +) -> Vec<(EventId, Vec)> { let signer = Box::new(signer().await); let init = init_event(model, &signer).await; @@ -201,17 +204,18 @@ async fn get_n_events_with_model(model: &StreamId, number: usize) -> Vec<(EventI pub(crate) async fn get_events_return_model() -> (StreamId, Vec<(EventId, Vec)>) { let model = StreamId::document(random_cid()); - let events = get_n_events_with_model(&model, 3).await; + let events = get_init_plus_n_events_with_model(&model, 3).await; (model, events) } // builds init -> data -> data that are a stream (will be a different stream each call) pub(crate) async fn get_events() -> Vec<(EventId, Vec)> { let model = StreamId::document(random_cid()); - get_n_events_with_model(&model, 3).await + get_init_plus_n_events_with_model(&model, 3).await } +// Get N events with the same model (init + N-1 data events) pub(crate) async fn get_n_events(number: usize) -> Vec<(EventId, Vec)> { let model = &StreamId::document(random_cid()); - get_n_events_with_model(model, number).await + get_init_plus_n_events_with_model(model, number - 1).await } diff --git a/service/src/tests/ordering.rs b/service/src/tests/ordering.rs index d0c174a1b..385752dd4 100644 --- a/service/src/tests/ordering.rs +++ b/service/src/tests/ordering.rs @@ -35,7 +35,7 @@ async fn add_and_assert_new_local_event(store: &CeramicEventService, item: Recon .insert_events_from_carfiles_local_api(&[item]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(1, new); } @@ -49,32 +49,17 @@ async fn test_init_event_delivered() { } #[tokio::test] -async fn test_missing_prev_error_history_required() { +async fn test_missing_prev_history_required_not_inserted() { let store = setup_service().await; let events = get_events().await; let data = &events[1]; let new = store .insert_events_from_carfiles_local_api(&[ReconItem::new(&data.0, &data.1)]) - .await; - match new { - Ok(v) => panic!("should have errored: {:?}", v), - Err(e) => { - match e { - crate::Error::InvalidArgument { error } => { - // yes fragile, but we want to make sure it's not a parsing error or something unexpected - assert!( - error.to_string().contains("Missing history for event"), - "{:?}", - error.to_string() - ); - } - e => { - panic!("unexpected error: {:?}", e); - } - }; - } - }; + .await + .unwrap(); + assert!(new.store_result.inserted.is_empty()); + assert_eq!(1, new.missing_history.len()); } #[tokio::test] @@ -112,7 +97,7 @@ async fn test_prev_in_same_write_history_required() { ]) .await .unwrap(); - let new = new.keys.into_iter().filter(|k| *k).count(); + let new = new.store_result.count_new_keys(); assert_eq!(2, new); check_deliverable(&store.pool, &init.0.cid().unwrap(), true).await; check_deliverable(&store.pool, &data.0.cid().unwrap(), true).await; @@ -294,7 +279,7 @@ async fn validate_all_delivered(store: &CeramicEventService, expected_delivered: #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn recon_lots_of_streams() { - // adds 101 events to 10 streams, mixes up the event order for each stream, inserts half + // adds 100 events to 10 streams, mixes up the event order for each stream, inserts half // the events for each stream before mixing up the stream order and inserting the rest let per_stream = 100; let num_streams = 10; @@ -303,7 +288,7 @@ async fn recon_lots_of_streams() { let mut all_cids = Vec::new(); let expected = per_stream * num_streams; for _ in 0..num_streams { - let mut events = crate::tests::get_n_events(per_stream - 1).await; + let mut events = crate::tests::get_n_events(per_stream).await; let cids = events .iter() .map(|e| e.0.cid().unwrap()) @@ -339,12 +324,12 @@ async fn recon_lots_of_streams() { // first just make sure they were all inserted (not delivered yet) for (i, cid) in all_cids.iter().enumerate() { let (exists, _delivered) = - ceramic_store::CeramicOneEvent::delivered_by_cid(&store.pool, cid) + ceramic_store::CeramicOneEvent::deliverable_by_cid(&store.pool, cid) .await .unwrap(); assert!(exists, "idx: {}. missing cid: {}", i, cid); } - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + tokio::time::sleep(std::time::Duration::from_millis(5000)).await; assert_eq!(expected, total_added); tokio::time::timeout( diff --git a/store/src/lib.rs b/store/src/lib.rs index 65a500300..8c419696a 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -11,9 +11,9 @@ pub use error::Error; pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use migration::DataMigrator; pub use sql::{ - entities::EventInsertable, entities::EventInsertableBody, CandidateEvent, CeramicOneBlock, - CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneStream, InsertResult, - Migrations, SqlitePool, SqliteRootStore, SqliteTransaction, StreamCommit, + entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent, + CeramicOneEventBlock, CeramicOneInterest, CeramicOneStream, InsertResult, InsertedEvent, + Migrations, SqlitePool, SqliteRootStore, SqliteTransaction, StreamEventMetadata, }; pub(crate) type Result = std::result::Result; diff --git a/store/src/metrics.rs b/store/src/metrics.rs index 855d4e960..18fbfbf78 100644 --- a/store/src/metrics.rs +++ b/store/src/metrics.rs @@ -13,7 +13,7 @@ use prometheus_client::{ }, registry::Registry, }; -use recon::{AssociativeHash, HashCount, InsertResult, ReconItem, Result as ReconResult}; +use recon::{AssociativeHash, HashCount, ReconItem, Result as ReconResult}; use tokio::time::Instant; #[derive(Clone, Debug, Hash, PartialEq, Eq)] @@ -158,7 +158,10 @@ impl ceramic_api::EventStore for StoreMetricsMiddleware where S: ceramic_api::EventStore, { - async fn insert_many(&self, items: &[(EventId, Vec)]) -> anyhow::Result> { + async fn insert_many( + &self, + items: &[(EventId, Vec)], + ) -> anyhow::Result> { let new_keys = StoreMetricsMiddleware::::record( &self.metrics, "api_insert_many", @@ -166,7 +169,7 @@ where ) .await?; - let key_cnt = new_keys.iter().filter(|k| **k).count(); + let key_cnt = new_keys.iter().filter(|k| k.success()).count(); self.metrics.record(&InsertEvent { cnt: key_cnt as u64, @@ -253,7 +256,7 @@ where Ok(new) } - async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { + async fn insert_many(&self, items: &[ReconItem<'_, K>]) -> ReconResult { let res = StoreMetricsMiddleware::::record( &self.metrics, "insert_many", diff --git a/store/src/migration.rs b/store/src/migration.rs index 691b45fbb..73a1631e8 100644 --- a/store/src/migration.rs +++ b/store/src/migration.rs @@ -1,12 +1,10 @@ use std::sync::OnceLock; use sqlx::{prelude::FromRow, types::chrono}; -use tracing::{info}; +use tracing::info; use crate::{ - sql::{ - entities::{EventCid, EventHeader, ReconEventBlockRaw}, - }, + sql::entities::{EventCid, EventHeader, ReconEventBlockRaw}, CeramicOneStream, Error, EventInsertableBody, Result, SqlitePool, }; diff --git a/store/src/sql/access/event.rs b/store/src/sql/access/event.rs index 8109a8970..26d7734af 100644 --- a/store/src/sql/access/event.rs +++ b/store/src/sql/access/event.rs @@ -13,8 +13,8 @@ use recon::{AssociativeHash, HashCount, Key, Result as ReconResult, Sha256a}; use crate::{ sql::{ entities::{ - rebuild_car, BlockRow, CountRow, DeliveredEventRow, EventCid, EventHeader, - EventInsertable, EventType, OrderKey, ReconEventBlockRaw, ReconHash, StreamCid, + rebuild_car, BlockRow, CountRow, DeliveredEventRow, EventHeader, EventInsertable, + OrderKey, ReconEventBlockRaw, ReconHash, StreamCid, }, query::{EventQuery, ReconQuery, ReconType, SqlBackend}, sqlite::SqliteTransaction, @@ -24,50 +24,48 @@ use crate::{ static GLOBAL_COUNTER: AtomicI64 = AtomicI64::new(0); -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -/// An event that has been delivered to the client. Generally returned from a batch of inserts. -pub struct CandidateEvent { - /// The Event CID - pub cid: EventCid, +#[derive(Debug, Clone, PartialEq, Eq)] +/// An event that was inserted into the database +pub struct InsertedEvent { + /// The event order key that was inserted + pub order_key: EventId, /// The Stream CID pub stream_cid: StreamCid, + /// Whether the event was marked as deliverable + pub deliverable: bool, + /// Whether the event was a new key + pub new_key: bool, } -impl CandidateEvent { +impl InsertedEvent { /// Create a new delivered event - fn new(cid: Cid, stream_cid: StreamCid) -> Self { - Self { cid, stream_cid } + fn new(order_key: EventId, new_key: bool, stream_cid: StreamCid, deliverable: bool) -> Self { + Self { + order_key, + stream_cid, + deliverable, + new_key, + } } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Default)] /// The result of inserting events into the database pub struct InsertResult { - /// True for new keys, false for existing keys. In same order as the input. BAH THIS ISN'T TRUE - pub keys: Vec, - /// The events that were delivered because they were init events or their previous events were known - pub delivered: Vec, - /// Undelivered events in this batch - pub undelivered: Vec, + /// The events that were marked as delivered in this batch + pub inserted: Vec, } impl InsertResult { - fn new( - keys: Vec, - delivered: Vec, - undelivered: Vec, - ) -> Self { - Self { - keys, - delivered, - undelivered, - } + /// The count of new keys added in this batch + pub fn count_new_keys(&self) -> usize { + self.inserted.iter().filter(|e| e.new_key).count() } } -impl From for recon::InsertResult { - fn from(res: InsertResult) -> Self { - Self { keys: res.keys } +impl InsertResult { + fn new(inserted: Vec) -> Self { + Self { inserted } } } @@ -79,7 +77,8 @@ impl CeramicOneEvent { GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst) } - async fn insert_key( + /// Insert the event and its hash into the ceramic_one_event table + async fn insert_event( tx: &mut SqliteTransaction<'_>, key: &EventId, deliverable: bool, @@ -123,56 +122,6 @@ impl CeramicOneEvent { Err(err) => Err(err.into()), } } - - /// Splits the evtents into deliverable and non-deliverable events. Non-deliverable events are checked against the database to see if they can be delivered. - /// If `require_history` is true, an error is returned if any event is missing history, oherwise they will not be marked as deliverable. - async fn with_deliverable_marker<'a>( - events: &'a [EventInsertable], - pool: &SqlitePool, - require_history: bool, - ) -> Result> { - // move all the init events to the front so we make sure to add them first and get the deliverable order correct - let mut insert = Vec::with_capacity(events.len()); - let mut insert_check_history = Vec::with_capacity(events.len()); - for event in events { - if event.body.event_type() == EventType::Init { - insert.push((true, event)); - } else { - insert_check_history.push(event); - } - } - - for event in insert_check_history { - match &event.body.header { - EventHeader::Init { .. } => { - unreachable!("Init events should have been filtered out") - } - EventHeader::Data { cid, prev, .. } | EventHeader::Time { cid, prev, .. } => { - // check for prev in this set and fallback to database - if insert.iter().any(|(_, e)| e.body.cid == *prev) { - insert.push((true, event)); - } else { - let (_, delivered) = Self::delivered_by_cid(pool, prev).await?; - if !delivered { - if require_history { - return Err(Error::new_invalid_arg(anyhow!( - "Missing history for event '{}' (prev: {})", - cid, - prev - ))); - } else { - insert.push((false, event)); - } - } else { - insert.push((true, event)); - } - } - } - } - } - - Ok(insert) - } } impl CeramicOneEvent { @@ -207,32 +156,33 @@ impl CeramicOneEvent { Ok(()) } - /// Insert many events into the database. If require_history is false, events can be stored without their previous events being present. - /// For local writes (i.e. over the API), `require_history` should be true. For events discovered over recon, it should be false. + /// Insert many events into the database. The events and their blocks and metadata are inserted in a single + /// transaction and either all successful or rolled back. + /// + /// IMPORTANT: + /// It is the caller's responsibility to order events marked deliverable correctly. + /// That is, events will be processed in the order they are given so earlier events are given a lower global ordering + /// and will be returned earlier in the feed. Events can be intereaved with different streams, but if two events + /// depend on each other, the `prev` must come first in the list to ensure the correct order for indexers and consumers. pub async fn insert_many( pool: &SqlitePool, to_add: &[EventInsertable], - require_history: bool, ) -> Result { - let mut new_keys = vec![false; to_add.len()]; - let mut delivered = Vec::with_capacity(to_add.len()); - let mut undelivered = Vec::with_capacity(to_add.len()); - // TODO: this changes the order of the keys so the response is not in the same order as the input as claimed - // we currently throw this result away in recon but the recon::Store trait claims that it will be met. would be nice to remove that claim - let to_add = Self::with_deliverable_marker(to_add, pool, require_history).await?; + let mut inserted = Vec::with_capacity(to_add.len()); let mut tx = pool.begin_tx().await.map_err(Error::from)?; - for (idx, (deliverable, item)) in to_add.iter().enumerate() { - let new_key = Self::insert_key(&mut tx, &item.order_key, *deliverable).await?; - let candidate = CandidateEvent::new(item.cid(), item.stream_cid()); - if *deliverable { - delivered.push(candidate); - // the insert failed so we didn't mark it as deliverable.. is this possible? - if !new_key { - Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?; - } - } else { - undelivered.push(candidate); + for item in to_add { + let new_key = + Self::insert_event(&mut tx, &item.order_key, item.body.deliverable).await?; + inserted.push(InsertedEvent::new( + item.order_key.clone(), + new_key, + item.stream_cid(), + item.body.deliverable, + )); + // the insert failed so we didn't mark it as deliverable.. is this possible? + if item.body.deliverable && !new_key { + Self::mark_ready_to_deliver(&mut tx, &item.cid()).await?; } if new_key { for block in item.body.blocks.iter() { @@ -245,12 +195,9 @@ impl CeramicOneEvent { CeramicOneStream::insert_event_header_tx(&mut tx, &item.body.header).await?; } - - // see above: this order has changed - new_keys[idx] = new_key; } tx.commit().await.map_err(Error::from)?; - let res = InsertResult::new(new_keys, delivered, undelivered); + let res = InsertResult::new(inserted); Ok(res) } @@ -364,8 +311,9 @@ impl CeramicOneEvent { } /// Finds if an event exists and has been previously delivered, meaning anything that depends on it can be delivered. - /// (bool, bool) = (exists, delivered) - pub async fn delivered_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { + /// returns (bool, bool) = (exists, deliverable) + /// We don't guarantee that a client has seen the event, just that it's been marked as deliverable and they could. + pub async fn deliverable_by_cid(pool: &SqlitePool, key: &Cid) -> Result<(bool, bool)> { #[derive(sqlx::FromRow)] struct CidExists { exists: bool, diff --git a/store/src/sql/access/mod.rs b/store/src/sql/access/mod.rs index 61dbcc2e9..bd6eefc3c 100644 --- a/store/src/sql/access/mod.rs +++ b/store/src/sql/access/mod.rs @@ -5,7 +5,7 @@ mod interest; mod stream; pub use block::CeramicOneBlock; -pub use event::{CandidateEvent, CeramicOneEvent, InsertResult}; +pub use event::{CeramicOneEvent, InsertResult, InsertedEvent}; pub use event_block::CeramicOneEventBlock; pub use interest::CeramicOneInterest; -pub use stream::{CeramicOneStream, StreamCommit}; +pub use stream::{CeramicOneStream, StreamEventMetadata}; diff --git a/store/src/sql/access/stream.rs b/store/src/sql/access/stream.rs index 042c45408..2da8abe3c 100644 --- a/store/src/sql/access/stream.rs +++ b/store/src/sql/access/stream.rs @@ -3,7 +3,7 @@ use cid::Cid; use crate::{ sql::entities::{ - EventHeader, EventHeaderRow, EventType, IncompleteStream, StreamCid, StreamCommitRow, + EventHeader, EventMetadataRow, EventType, IncompleteStream, StreamCid, StreamEventRow, StreamRow, }, Error, Result, SqlitePool, SqliteTransaction, @@ -14,19 +14,19 @@ pub struct CeramicOneStream {} #[derive(Debug, Clone, PartialEq, Eq)] /// Represents a stream event in a way that allows ordering it in the stream. It is metadata and not the event payload itself. -pub struct StreamCommit { +pub struct StreamEventMetadata { /// The event CID pub cid: Cid, /// The previous event CID pub prev: Option, - /// Whether the event has been delivered - pub delivered: bool, + /// Whether the event is deliverable + pub deliverable: bool, } -impl TryFrom for StreamCommit { +impl TryFrom for StreamEventMetadata { type Error = crate::Error; - fn try_from(row: StreamCommitRow) -> std::result::Result { + fn try_from(row: StreamEventRow) -> std::result::Result { let cid = Cid::try_from(row.cid) .map_err(|e| Error::new_app(anyhow!("Invalid event cid: {}", e)))?; let prev = row @@ -37,19 +37,19 @@ impl TryFrom for StreamCommit { Ok(Self { cid, prev, - delivered: row.delivered, + deliverable: row.deliverable, }) } } impl CeramicOneStream { - /// Load the events (commits) for a given stream - pub async fn load_stream_commits( + /// Load the events for a given stream. Will return nothing if the stream does not exist (i.e. the init event is undiscovered). + pub async fn load_stream_events( pool: &SqlitePool, stream_cid: StreamCid, - ) -> Result> { + ) -> Result> { let rows: Vec<(Vec, Option>, bool)> = - sqlx::query_as(StreamCommitRow::fetch_by_stream_cid()) + sqlx::query_as(StreamEventRow::fetch_by_stream_cid()) .bind(stream_cid.to_bytes()) .fetch_all(pool.reader()) .await?; @@ -60,10 +60,10 @@ impl CeramicOneStream { let cid = Cid::try_from(cid).expect("cid"); let prev = prev.map(Cid::try_from).transpose().expect("prev"); - StreamCommit { + StreamEventMetadata { cid, prev, - delivered, + deliverable: delivered, } }) .collect(); @@ -72,13 +72,15 @@ impl CeramicOneStream { } /// Load streams with undelivered events to see if they need to be delivered now. + /// cid_cursor is the last CID processed that you want to start after. + /// Start with `Cid::default()` to start at the beginning. pub async fn load_stream_cids_with_undelivered_events( pool: &SqlitePool, - last_cid: StreamCid, + cid_cursor: StreamCid, ) -> Result> { let streams: Vec = sqlx::query_as(IncompleteStream::fetch_all_with_undelivered()) - .bind(last_cid.to_bytes()) + .bind(cid_cursor.to_bytes()) .bind(1000) .fetch_all(pool.reader()) .await?; @@ -134,7 +136,7 @@ impl CeramicOneStream { ), }; - let _res = sqlx::query(EventHeaderRow::insert()) + let _res = sqlx::query(EventMetadataRow::insert()) .bind(cid) .bind(stream_cid) .bind(event_type) diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index 28b90f2d9..5f6ec4a3f 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -72,6 +72,25 @@ impl EventInsertable { pub fn stream_cid(&self) -> Cid { self.body.header.stream_cid() } + + /// Get the previous event CID if any + pub fn prev(&self) -> Option { + match &self.body.header { + EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => Some(*prev), + EventHeader::Init { .. } => None, + } + } + + /// Whether this event is deliverable currently + pub fn deliverable(&self) -> bool { + self.body.deliverable + } + + /// Mark the event as deliverable. + /// This will be used when inserting the event to make sure the field is updated accordingly. + pub fn set_deliverable(&mut self, deliverable: bool) { + self.body.deliverable = deliverable; + } } #[derive(Debug, Clone)] @@ -81,6 +100,8 @@ pub struct EventInsertableBody { pub(crate) cid: Cid, /// The event header data about the event type and stream pub(crate) header: EventHeader, + /// Whether the event is deliverable i.e. it's prev has been delivered and the chain is continuous to an init event + pub(crate) deliverable: bool, /// The blocks of the event // could use a map but there aren't that many blocks per event (right?) pub(crate) blocks: Vec, @@ -88,11 +109,17 @@ pub struct EventInsertableBody { impl EventInsertableBody { /// Create a new EventInsertRaw struct. Deliverable is set to false by default. - pub fn new(cid: Cid, header: EventHeader, blocks: Vec) -> Self { + pub fn new( + cid: Cid, + header: EventHeader, + blocks: Vec, + deliverable: bool, + ) -> Self { Self { cid, header, blocks, + deliverable, } } @@ -177,12 +204,15 @@ impl EventInsertableBody { let cid = event_cid; - let header = match event_ipld { - unvalidated::RawEvent::Time(t) => EventHeader::Time { - cid, - stream_cid: t.id(), - prev: t.prev(), - }, + let (deliverable, header) = match event_ipld { + unvalidated::RawEvent::Time(t) => ( + false, + EventHeader::Time { + cid, + stream_cid: t.id(), + prev: t.prev(), + }, + ), unvalidated::RawEvent::Signed(signed) => { let link = signed.link().ok_or_else(|| { Error::new_invalid_arg(anyhow::anyhow!("event should have a link")) @@ -198,24 +228,30 @@ impl EventInsertableBody { })?; match payload { - unvalidated::Payload::Data(d) => EventHeader::Data { - cid, - stream_cid: *d.id(), - prev: *d.prev(), - }, + unvalidated::Payload::Data(d) => ( + false, + EventHeader::Data { + cid, + stream_cid: *d.id(), + prev: *d.prev(), + }, + ), unvalidated::Payload::Init(init) => { let header = init.header().to_owned(); - EventHeader::Init { cid, header } + (true, EventHeader::Init { cid, header }) } } } - unvalidated::RawEvent::Unsigned(init) => EventHeader::Init { - cid, - header: init.header().to_owned(), - }, + unvalidated::RawEvent::Unsigned(init) => ( + true, + EventHeader::Init { + cid, + header: init.header().to_owned(), + }, + ), }; - Ok(Self::new(event_cid, header, blocks)) + Ok(Self::new(event_cid, header, blocks, deliverable)) } } diff --git a/store/src/sql/entities/event_header.rs b/store/src/sql/entities/event_metadata.rs similarity index 67% rename from store/src/sql/entities/event_header.rs rename to store/src/sql/entities/event_metadata.rs index 8d5be03f1..b40d829b0 100644 --- a/store/src/sql/entities/event_header.rs +++ b/store/src/sql/entities/event_metadata.rs @@ -8,20 +8,17 @@ pub enum EventType { } #[derive(Debug, Clone)] -pub struct EventHeaderRow { - pub cid: Vec, - pub stream_cid: Vec, - pub event_type: EventType, - pub prev: Option>, -} +pub struct EventMetadataRow {} -impl EventHeaderRow { +impl EventMetadataRow { pub fn insert() -> &'static str { - "INSERT INTO ceramic_one_event_header (cid, stream_cid, event_type, prev) VALUES ($1, $2, $3, $4)" + "INSERT INTO ceramic_one_event_metadata (cid, stream_cid, event_type, prev) VALUES ($1, $2, $3, $4)" } } #[derive(Debug, Clone, PartialEq, Eq)] +/// An event header wrapper for use in the store crate. +/// TODO: replace this with something from the event crate pub enum EventHeader { Init { cid: Cid, @@ -40,7 +37,8 @@ pub enum EventHeader { } impl EventHeader { - pub fn event_type(&self) -> EventType { + /// Returns the event type of the event header + pub(crate) fn event_type(&self) -> EventType { match self { EventHeader::Init { .. } => EventType::Init, EventHeader::Data { .. } => EventType::Data, @@ -48,7 +46,8 @@ impl EventHeader { } } - pub fn stream_cid(&self) -> Cid { + /// Returns the stream CID of the event + pub(crate) fn stream_cid(&self) -> Cid { match self { EventHeader::Init { cid, .. } => *cid, EventHeader::Data { stream_cid, .. } | EventHeader::Time { stream_cid, .. } => { diff --git a/store/src/sql/entities/mod.rs b/store/src/sql/entities/mod.rs index 780db4f0f..12a2bda20 100644 --- a/store/src/sql/entities/mod.rs +++ b/store/src/sql/entities/mod.rs @@ -1,7 +1,7 @@ mod block; mod event; mod event_block; -mod event_header; +mod event_metadata; mod hash; mod stream; mod utils; @@ -9,9 +9,9 @@ mod utils; pub use block::{BlockBytes, BlockRow}; pub use event::{rebuild_car, EventInsertable, EventInsertableBody}; pub use event_block::{EventBlockRaw, ReconEventBlockRaw}; -pub use event_header::{EventHeader, EventHeaderRow, EventType}; +pub use event_metadata::{EventHeader, EventMetadataRow, EventType}; pub use hash::{BlockHash, ReconHash}; -pub use stream::{IncompleteStream, StreamCommitRow, StreamRow}; +pub use stream::{IncompleteStream, StreamEventRow, StreamRow}; pub use utils::{CountRow, DeliveredEventRow, OrderKey}; diff --git a/store/src/sql/entities/stream.rs b/store/src/sql/entities/stream.rs index bb2ae5229..1d21cb49b 100644 --- a/store/src/sql/entities/stream.rs +++ b/store/src/sql/entities/stream.rs @@ -17,19 +17,21 @@ impl StreamRow { } #[derive(Debug, Clone, sqlx::FromRow)] -pub struct StreamCommitRow { +pub struct StreamEventRow { pub cid: Vec, pub prev: Option>, - pub delivered: bool, + pub deliverable: bool, } -impl StreamCommitRow { +impl StreamEventRow { + /// Requires binding one argument: + /// $1 = stream_cid (bytes) pub fn fetch_by_stream_cid() -> &'static str { r#" - SELECT eh.cid as "cid", eh.prev as "prev", - e.delivered IS NOT NULL as "delivered" + SELECT e.cid as "cid", eh.prev as "prev", + e.delivered IS NOT NULL as "deliverable" FROM ceramic_one_stream s - JOIN ceramic_one_event_header eh on eh.stream_cid = s.cid + JOIN ceramic_one_event_metadata eh on eh.stream_cid = s.cid JOIN ceramic_one_event e on e.cid = eh.cid WHERE s.cid = $1"# } @@ -41,11 +43,14 @@ pub struct IncompleteStream { } impl IncompleteStream { + /// Requires binding two arguments: + /// $1 = stream_cid (bytes) + /// $2 = limit (usize) pub fn fetch_all_with_undelivered() -> &'static str { r#" SELECT DISTINCT s.cid as "stream_cid" FROM ceramic_one_stream s - JOIN ceramic_one_event_header eh on eh.stream_cid = s.cid + JOIN ceramic_one_event_metadata eh on eh.stream_cid = s.cid JOIN ceramic_one_event e on e.cid = eh.cid WHERE e.delivered is NULL and s.cid > $1 LIMIT $2"# diff --git a/store/src/sql/mod.rs b/store/src/sql/mod.rs index cfcd43fa7..886b241b5 100644 --- a/store/src/sql/mod.rs +++ b/store/src/sql/mod.rs @@ -7,8 +7,8 @@ mod sqlite; mod test; pub use access::{ - CandidateEvent, CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, - CeramicOneStream, InsertResult, StreamCommit, + CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneStream, + InsertResult, InsertedEvent, StreamEventMetadata, }; pub use root::SqliteRootStore; pub use sqlite::{SqlitePool, SqliteTransaction}; diff --git a/store/src/sql/test.rs b/store/src/sql/test.rs index ad17e4073..d20ac4ed1 100644 --- a/store/src/sql/test.rs +++ b/store/src/sql/test.rs @@ -33,6 +33,7 @@ fn random_event(cid: &str) -> EventInsertable { body: EventInsertableBody { cid, blocks: vec![], + deliverable: true, header: super::entities::EventHeader::Init { cid, header: ceramic_event::unvalidated::init::Header::new( @@ -54,12 +55,11 @@ async fn hash_range_query() { let first = random_event("baeabeiazgwnti363jifhxaeaegbluw4ogcd2t5hsjaglo46wuwcgajqa5u"); let second = random_event("baeabeihyl35xdlfju3zrkvy2exmnl6wics3rc5ppz7hwg7l7g4brbtnpny"); - let x = CeramicOneEvent::insert_many(&pool, &[first, second], true) + let x = CeramicOneEvent::insert_many(&pool, &[first, second]) .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let hash = CeramicOneEvent::hash_range( &pool, @@ -76,12 +76,11 @@ async fn range_query() { let first = random_event("baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524"); let second = random_event("baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty"); let pool = SqlitePool::connect_in_memory().await.unwrap(); - let x = CeramicOneEvent::insert_many(&pool, &[first, second], true) + let x = CeramicOneEvent::insert_many(&pool, &[first, second]) .await .unwrap(); - let new = x.keys.into_iter().filter(|x| *x).count(); - assert_eq!(new, 2); + assert_eq!(x.count_new_keys(), 2); let ids = CeramicOneEvent::range( &pool,