diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs index 17b17a619..1bfd9cad8 100644 --- a/service/src/event/order_events.rs +++ b/service/src/event/order_events.rs @@ -50,8 +50,8 @@ impl OrderEvents { unreachable!("Init events should have been filtered out since they're always deliverable"); } Some(prev) => { - if let Some(in_mem) = new_cids.get(&prev) { - if *in_mem { + if let Some(in_mem_is_deliverable) = new_cids.get(&prev) { + if *in_mem_is_deliverable { event.body.set_deliverable(true); *new_cids.get_mut(&event.cid()).expect("CID must exist") = true; deliverable.push((event, header)); diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index ded1cc898..786d1340c 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -1,13 +1,16 @@ use std::collections::{HashMap, VecDeque}; +use anyhow::anyhow; use ceramic_core::EventId; +use ceramic_event::unvalidated; use ceramic_store::{CeramicOneEvent, SqlitePool}; use cid::Cid; +use ipld_core::ipld::Ipld; use tracing::{debug, error, info, trace, warn}; use crate::{CeramicEventService, Error, Result}; -use super::service::{EventMetadata, InsertableBodyWithMeta}; +use super::service::{DiscoveredEvent, EventMetadata}; type StreamCid = Cid; type EventCid = Cid; @@ -18,7 +21,7 @@ pub struct DeliverableTask { pub(crate) _handle: tokio::task::JoinHandle<()>, /// Currently only receives events discovered over recon that are out of order and need to be marked ready (deliverable) /// when their prev chain is discovered and complete (i.e. my prev is deliverable then I am deliverable). - pub(crate) tx_inserted: tokio::sync::mpsc::Sender, + pub(crate) tx_inserted: tokio::sync::mpsc::Sender, } #[derive(Debug)] @@ -37,8 +40,7 @@ impl OrderingTask { /// Spawn a task to run the ordering task background process in a loop pub async fn run(pool: SqlitePool, q_depth: usize) -> DeliverableTask { - let (tx_inserted, rx_inserted) = - tokio::sync::mpsc::channel::(q_depth); + let (tx_inserted, rx_inserted) = tokio::sync::mpsc::channel::(q_depth); let handle = tokio::spawn(async move { Self::run_loop(pool, rx_inserted).await }); @@ -50,7 +52,7 @@ impl OrderingTask { async fn run_loop( pool: SqlitePool, - mut rx_inserted: tokio::sync::mpsc::Receiver, + mut rx_inserted: tokio::sync::mpsc::Receiver, ) { let mut state = OrderingState::new(); @@ -117,22 +119,28 @@ impl StreamEvent { // TODO: Condense the multiple DB queries happening here into a single query let (exists, deliverable) = CeramicOneEvent::deliverable_by_cid(pool, &cid).await?; if exists { - let parsed_body = - if let Some(body) = CeramicEventService::load_by_cid(pool, cid).await? { - body - } else { - warn!(%cid, "No event body found for event that should exist"); - return Ok(None); - }; + let data = CeramicOneEvent::value_by_cid(pool, &cid) + .await? + .ok_or_else(|| { + Error::new_app(anyhow!( + "Missing event data for event that must exist: CID={}", + cid + )) + })?; + let (_cid, parsed) = unvalidated::Event::::decode_car(data.as_slice(), false) + .await + .map_err(Error::new_app)?; + + let metadata = EventMetadata::from(parsed); - let known_prev = match &parsed_body.metadata { - EventMetadata::Init { cid, .. } => { + let known_prev = match &metadata { + EventMetadata::Init => { assert!( deliverable, "Init event must always be deliverable. Found undelivered CID: {}", cid ); - StreamEvent::InitEvent(*cid) + StreamEvent::InitEvent(cid) } EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => { if deliverable { @@ -152,13 +160,13 @@ impl StreamEvent { } } -impl From for StreamEvent { - fn from(ev: InsertableBodyWithMeta) -> Self { +impl From for StreamEvent { + fn from(ev: DiscoveredEvent) -> Self { match ev.metadata { - EventMetadata::Init { cid, .. } => StreamEvent::InitEvent(cid), - EventMetadata::Data { cid, prev, .. } | EventMetadata::Time { cid, prev, .. } => { - let meta = StreamEventMetadata::new(cid, prev); - if ev.body.deliverable() { + EventMetadata::Init => StreamEvent::InitEvent(ev.cid), + EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => { + let meta = StreamEventMetadata::new(ev.cid, prev); + if ev.known_deliverable { StreamEvent::KnownDeliverable(meta) } else { StreamEvent::Undelivered(meta) @@ -389,9 +397,9 @@ impl OrderingState { } /// Add a stream to the list of streams to process. - fn add_inserted_events(&mut self, events: Vec) { + fn add_inserted_events(&mut self, events: Vec) { for ev in events { - let stream_cid = ev.metadata.stream_cid(); + let stream_cid = ev.stream_cid(); let event = ev.into(); self.add_stream_event(stream_cid, event); } @@ -486,24 +494,27 @@ impl OrderingState { ) -> Result { trace!(cnt=%event_data.len(), "Processing undelivered events batch"); let mut event_cnt = 0; - for (event_id, carfile) in event_data { - let event_cid = event_id.cid().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", event_id)) - })?; + for (_event_id, carfile) in event_data { + let (cid, parsed_event) = + unvalidated::Event::::decode_car(carfile.as_slice(), false) + .await + .map_err(Error::new_app)?; - let loaded = CeramicEventService::parse_event_carfile_cid(event_cid, &carfile).await?; + let metadata = EventMetadata::from(parsed_event); - let event = match &loaded.metadata { - EventMetadata::Init { cid, .. } => { + let (stream_cid, loaded) = match &metadata { + EventMetadata::Init => { unreachable!("Init events should not be undelivered. CID={}", cid); } - EventMetadata::Data { cid, prev, .. } | EventMetadata::Time { cid, prev, .. } => { - StreamEvent::Undelivered(StreamEventMetadata::new(*cid, *prev)) - } + EventMetadata::Data { stream_cid, prev } + | EventMetadata::Time { stream_cid, prev } => ( + stream_cid, + StreamEvent::Undelivered(StreamEventMetadata::new(cid, *prev)), + ), }; event_cnt += 1; - self.add_stream_event(loaded.metadata.stream_cid(), event); + self.add_stream_event(*stream_cid, loaded); } self.process_streams(pool).await?; @@ -542,7 +553,7 @@ mod test { let events = get_n_events(n).await; for event in events { let (event, _) = - CeramicEventService::parse_event_carfile_order_key(event.0.to_owned(), &event.1) + CeramicEventService::validate_discovered_event(event.0.to_owned(), &event.1) .await .unwrap(); res.push(event); diff --git a/service/src/event/service.rs b/service/src/event/service.rs index 982804e10..6f7dd6fa8 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -81,86 +81,35 @@ impl CeramicEventService { .await?; Ok(()) } - /// This function is used to parse the event from the carfile and return the insertable event and the previous cid pointer. - /// Probably belongs in the event crate. - pub(crate) async fn parse_event_carfile_order_key( - event_id: EventId, + + /// Currently only verifies that the event parses into a valid ceramic event, determining whether it's + /// immediately deliverable because it's an init event or it needs review. + /// In the future, we will need to do more event validation (verify all EventID pieces, hashes, signatures, etc). + pub(crate) async fn validate_discovered_event( + event_id: ceramic_core::EventId, carfile: &[u8], ) -> Result<(EventInsertable, EventMetadata)> { - let mut insertable = EventInsertable::try_from_carfile(event_id, carfile).await?; - - let header = Self::parse_event_body(&mut insertable.body).await?; - Ok((insertable, header)) - } + let event_cid = event_id.cid().ok_or_else(|| { + Error::new_app(anyhow::anyhow!("EventId missing CID. EventID={}", event_id)) + })?; + + let (cid, parsed_event) = unvalidated::Event::::decode_car(carfile, false) + .await + .map_err(Error::new_app)?; + + if event_cid != cid { + return Err(Error::new_app(anyhow::anyhow!( + "EventId CID ({}) does not match the body CID ({})", + event_cid, + cid + ))); + } - pub(crate) async fn parse_event_carfile_cid( - cid: ceramic_core::Cid, - carfile: &[u8], - ) -> Result { + let metadata = EventMetadata::from(parsed_event); let mut body = EventInsertableBody::try_from_carfile(cid, carfile).await?; + body.set_deliverable(matches!(metadata, EventMetadata::Init { .. })); - let header = Self::parse_event_body(&mut body).await?; - Ok(InsertableBodyWithMeta { - body, - metadata: header, - }) - } - - pub(crate) async fn parse_event_body(body: &mut EventInsertableBody) -> Result { - let cid = body.cid(); // purely for convenience writing out the match - let ev_block = body.block_for_cid(&cid)?; - - trace!(count=%body.blocks().len(), %cid, "parsing event blocks"); - let event_ipld: unvalidated::RawEvent = - serde_ipld_dagcbor::from_slice(&ev_block.bytes).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("event block is not valid event format"), - ) - })?; - - let (deliverable, header) = match event_ipld { - unvalidated::RawEvent::Time(t) => ( - false, - EventMetadata::Time { - cid, - stream_cid: t.id(), - prev: t.prev(), - }, - ), - unvalidated::RawEvent::Signed(signed) => { - let link = signed.link().ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("event should have a link")) - })?; - let link = body - .blocks() - .iter() - .find(|b| b.cid() == link) - .ok_or_else(|| { - Error::new_invalid_arg(anyhow::anyhow!("prev CID missing from carfile")) - })?; - let payload: unvalidated::Payload = - serde_ipld_dagcbor::from_slice(&link.bytes).map_err(|e| { - Error::new_invalid_arg( - anyhow::anyhow!(e).context("Failed to follow event link"), - ) - })?; - - match payload { - unvalidated::Payload::Data(d) => ( - false, - EventMetadata::Data { - cid, - stream_cid: *d.id(), - prev: *d.prev(), - }, - ), - unvalidated::Payload::Init(_init) => (true, EventMetadata::Init { cid }), - } - } - unvalidated::RawEvent::Unsigned(_init) => (true, EventMetadata::Init { cid }), - }; - body.set_deliverable(deliverable); - Ok(header) + Ok((EventInsertable::try_new(event_id, body)?, metadata)) } #[tracing::instrument(skip(self, items), level = tracing::Level::DEBUG, fields(items = items.len()))] @@ -212,7 +161,7 @@ impl CeramicEventService { for event in items { let insertable = - Self::parse_event_carfile_order_key(event.key.to_owned(), event.value).await?; + Self::validate_discovered_event(event.key.to_owned(), event.value).await?; to_insert.push(insertable); } @@ -256,12 +205,13 @@ impl CeramicEventService { .iter() .find(|(i, _)| i.order_key == ev.order_key) { - let new = InsertableBodyWithMeta { - body: ev.body.clone(), + let discovered = DiscoveredEvent { + cid: ev.cid(), + known_deliverable: ev.deliverable(), metadata: metadata.to_owned(), }; - trace!(event_cid=%ev.cid(), deliverable=%ev.deliverable(), "sending delivered to ordering task"); - if let Err(e) = self.delivery_task.tx_inserted.try_send(new) { + trace!(?discovered, "sending delivered to ordering task"); + if let Err(e) = self.delivery_task.tx_inserted.try_send(discovered) { match e { tokio::sync::mpsc::error::TrySendError::Full(e) => { warn!(event=?e, limit=%PENDING_EVENTS_CHANNEL_DEPTH, "Delivery task full. Dropping event and will not be able to mark deliverable new stream event arrives or process is restarted"); @@ -287,24 +237,6 @@ impl CeramicEventService { missing_history, }) } - - pub(crate) async fn load_by_cid( - pool: &SqlitePool, - cid: ceramic_core::Cid, - ) -> Result> { - let data = if let Some(ev) = CeramicOneEvent::value_by_cid(pool, &cid).await? { - ev - } else { - return Ok(None); - }; - - let mut body = EventInsertableBody::try_from_carfile(cid, &data).await?; - let header = Self::parse_event_body(&mut body).await?; - Ok(Some(InsertableBodyWithMeta { - body, - metadata: header, - })) - } } #[derive(Debug, PartialEq, Eq, Default)] @@ -330,42 +262,61 @@ impl From for Vec { } } -#[derive(Debug, Clone)] -pub(crate) struct InsertableBodyWithMeta { - pub(crate) body: EventInsertableBody, - pub(crate) metadata: EventMetadata, +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DiscoveredEvent { + pub cid: ceramic_core::Cid, + pub known_deliverable: bool, + pub metadata: EventMetadata, +} + +impl DiscoveredEvent { + pub(crate) fn stream_cid(&self) -> ceramic_core::Cid { + match self.metadata { + EventMetadata::Init => self.cid, + EventMetadata::Data { stream_cid, .. } | EventMetadata::Time { stream_cid, .. } => { + stream_cid + } + } + } } #[derive(Debug, Clone, PartialEq, Eq)] /// An event header wrapper for use in the store crate. /// TODO: replace this with something from the event crate pub(crate) enum EventMetadata { - Init { - cid: ceramic_core::Cid, - }, + /// The init CID and stream CID are the same + Init, Data { - cid: ceramic_core::Cid, stream_cid: ceramic_core::Cid, prev: ceramic_core::Cid, }, Time { - cid: ceramic_core::Cid, stream_cid: ceramic_core::Cid, prev: ceramic_core::Cid, }, } -impl EventMetadata { - /// Returns the stream CID of the event - pub(crate) fn stream_cid(&self) -> ceramic_core::Cid { - match self { - EventMetadata::Init { cid, .. } => *cid, - EventMetadata::Data { stream_cid, .. } | EventMetadata::Time { stream_cid, .. } => { - *stream_cid - } +impl From> for EventMetadata { + fn from(value: unvalidated::Event) -> Self { + match value { + unvalidated::Event::Time(t) => EventMetadata::Time { + stream_cid: t.id(), + prev: t.prev(), + }, + + unvalidated::Event::Signed(signed) => match signed.payload() { + unvalidated::Payload::Data(d) => EventMetadata::Data { + stream_cid: *d.id(), + prev: *d.prev(), + }, + unvalidated::Payload::Init(_init) => EventMetadata::Init, + }, + unvalidated::Event::Unsigned(_init) => EventMetadata::Init, } } +} +impl EventMetadata { pub(crate) fn prev(&self) -> Option { match self { EventMetadata::Init { .. } => None,