diff --git a/service/src/event/order_events.rs b/service/src/event/order_events.rs index a13719318..2a1c837fb 100644 --- a/service/src/event/order_events.rs +++ b/service/src/event/order_events.rs @@ -5,11 +5,11 @@ use cid::Cid; use crate::Result; -use super::service::EventHeader; +use super::service::EventMetadata; pub(crate) struct OrderEvents { - pub(crate) deliverable: Vec<(EventInsertable, EventHeader)>, - pub(crate) missing_history: Vec<(EventInsertable, EventHeader)>, + pub(crate) deliverable: Vec<(EventInsertable, EventMetadata)>, + pub(crate) missing_history: Vec<(EventInsertable, EventMetadata)>, } impl OrderEvents { @@ -18,7 +18,7 @@ impl OrderEvents { /// Will look up the prev from the database if needed to check if it's deliverable (could possibly change this for recon and allow the ordering task to handle it?) pub async fn try_new( pool: &SqlitePool, - mut candidate_events: Vec<(EventInsertable, EventHeader)>, + mut candidate_events: Vec<(EventInsertable, EventMetadata)>, ) -> Result { let mut new_cids: HashMap = HashMap::from_iter( candidate_events diff --git a/service/src/event/ordering_task.rs b/service/src/event/ordering_task.rs index d3ce40f72..05cd0c0f7 100644 --- a/service/src/event/ordering_task.rs +++ b/service/src/event/ordering_task.rs @@ -7,7 +7,7 @@ use tracing::{debug, error, info, trace, warn}; use crate::{CeramicEventService, Error, Result}; -use super::service::{EventHeader, InsertableBodyWithHeader}; +use super::service::{EventMetadata, InsertableBodyWithMeta}; /// How many events to select at once to see if they've become deliverable when we have downtime /// Used at startup and occassionally in case we ever dropped something @@ -27,7 +27,7 @@ pub struct DeliverableTask { pub(crate) _handle: tokio::task::JoinHandle<()>, /// Currently events discovered over recon that are out of order and need to be marked ready (deliverable) /// when their prev chain is discovered and complete (i.e. my prev is deliverable then I am deliverable). - pub(crate) tx_inserted: tokio::sync::mpsc::Sender, + pub(crate) tx_inserted: tokio::sync::mpsc::Sender, } #[derive(Debug)] @@ -36,7 +36,7 @@ pub struct OrderingTask {} impl OrderingTask { pub async fn run(pool: SqlitePool, q_depth: usize, load_delivered: bool) -> DeliverableTask { let (tx_inserted, rx_inserted) = - tokio::sync::mpsc::channel::(q_depth); + tokio::sync::mpsc::channel::(q_depth); let handle = tokio::spawn(async move { Self::run_loop(pool, load_delivered, rx_inserted).await }); @@ -50,7 +50,7 @@ impl OrderingTask { async fn run_loop( pool: SqlitePool, load_undelivered: bool, - mut rx_inserted: tokio::sync::mpsc::Receiver, + mut rx_inserted: tokio::sync::mpsc::Receiver, ) { // before starting, make sure we've updated any events in the database we missed // this could take a long time. possibly we want to put it in another task so we can start processing events immediately @@ -146,7 +146,7 @@ impl StreamEvent { }; let known_prev = match &parsed_body.header { - EventHeader::Init { cid, .. } => { + EventMetadata::Init { cid, .. } => { if !deliverable { warn!(%cid,"Found init event in database that wasn't previously marked as deliverable. Updating now..."); let mut tx = pool.begin_tx().await?; @@ -155,7 +155,7 @@ impl StreamEvent { } StreamEvent::InitEvent(*cid) } - EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => { + EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => { if deliverable { trace!(%cid, "Found deliverable event in database"); StreamEvent::KnownDeliverable(StreamEventMetadata::new(cid, *prev)) @@ -173,11 +173,11 @@ impl StreamEvent { } } -impl From for StreamEvent { - fn from(ev: InsertableBodyWithHeader) -> Self { +impl From for StreamEvent { + fn from(ev: InsertableBodyWithMeta) -> Self { match ev.header { - EventHeader::Init { cid, .. } => StreamEvent::InitEvent(cid), - EventHeader::Data { cid, prev, .. } | EventHeader::Time { cid, prev, .. } => { + EventMetadata::Init { cid, .. } => StreamEvent::InitEvent(cid), + EventMetadata::Data { cid, prev, .. } | EventMetadata::Time { cid, prev, .. } => { let meta = StreamEventMetadata::new(cid, prev); if ev.body.deliverable() { StreamEvent::KnownDeliverable(meta) @@ -388,7 +388,7 @@ impl OrderingState { /// Add a stream to the list of streams to process. /// We ignore delivered events for streams we're not tracking as we can look them up later if we need them. /// We will get lots of init events we can ignore unless we need them, otherwise they'll be stuck in memory for a long time. - fn add_inserted_events(&mut self, events: Vec) { + fn add_inserted_events(&mut self, events: Vec) { for ev in events { let stream_cid = ev.header.stream_cid(); let event = ev.into(); @@ -484,12 +484,12 @@ impl OrderingState { let loaded = CeramicEventService::parse_event_carfile_cid(event_cid, &carfile).await?; let event = match &loaded.header { - EventHeader::Init { cid, .. } => { + EventMetadata::Init { cid, .. } => { warn!(%cid,"Found init event in database that wasn't previously marked as deliverable. Updating now..."); to_store_asap.push(*cid); StreamEvent::InitEvent(*cid) } - EventHeader::Data { cid, prev, .. } | EventHeader::Time { cid, prev, .. } => { + EventMetadata::Data { cid, prev, .. } | EventMetadata::Time { cid, prev, .. } => { StreamEvent::Undelivered(StreamEventMetadata::new(*cid, *prev)) } }; diff --git a/service/src/event/service.rs b/service/src/event/service.rs index d6682fb0d..1e02532d4 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -72,7 +72,7 @@ impl CeramicEventService { pub(crate) async fn parse_event_carfile_order_key( event_id: EventId, carfile: &[u8], - ) -> Result<(EventInsertable, EventHeader)> { + ) -> Result<(EventInsertable, EventMetadata)> { let mut insertable = EventInsertable::try_from_carfile(event_id, carfile).await?; let header = Self::parse_event_body(&mut insertable.body).await?; @@ -82,14 +82,14 @@ impl CeramicEventService { pub(crate) async fn parse_event_carfile_cid( cid: ceramic_core::Cid, carfile: &[u8], - ) -> Result { + ) -> Result { let mut body = EventInsertableBody::try_from_carfile(cid, carfile).await?; let header = Self::parse_event_body(&mut body).await?; - Ok(InsertableBodyWithHeader { body, header }) + Ok(InsertableBodyWithMeta { body, header }) } - pub(crate) async fn parse_event_body(body: &mut EventInsertableBody) -> Result { + pub(crate) async fn parse_event_body(body: &mut EventInsertableBody) -> Result { let cid = body.cid(); // purely for convenience writing out the match let ev_block = body.block_for_cid(&cid)?; @@ -104,7 +104,7 @@ impl CeramicEventService { let (deliverable, header) = match event_ipld { unvalidated::RawEvent::Time(t) => ( false, - EventHeader::Time { + EventMetadata::Time { cid, stream_cid: t.id(), prev: t.prev(), @@ -131,16 +131,16 @@ impl CeramicEventService { match payload { unvalidated::Payload::Data(d) => ( false, - EventHeader::Data { + EventMetadata::Data { cid, stream_cid: *d.id(), prev: *d.prev(), }, ), - unvalidated::Payload::Init(_init) => (true, EventHeader::Init { cid }), + unvalidated::Payload::Init(_init) => (true, EventMetadata::Init { cid }), } } - unvalidated::RawEvent::Unsigned(_init) => (true, EventHeader::Init { cid }), + unvalidated::RawEvent::Unsigned(_init) => (true, EventMetadata::Init { cid }), }; body.set_deliverable(deliverable); Ok(header) @@ -239,7 +239,7 @@ impl CeramicEventService { .iter() .find(|(i, _)| i.order_key == ev.order_key) { - let new = InsertableBodyWithHeader { + let new = InsertableBodyWithMeta { body: ev.body.clone(), header: header.to_owned(), }; @@ -274,7 +274,7 @@ impl CeramicEventService { pub(crate) async fn load_by_cid( pool: &SqlitePool, cid: ceramic_core::Cid, - ) -> Result> { + ) -> Result> { let data = if let Some(ev) = CeramicOneEvent::value_by_cid(pool, &cid).await? { ev } else { @@ -283,7 +283,7 @@ impl CeramicEventService { let mut body = EventInsertableBody::try_from_carfile(cid, &data).await?; let header = Self::parse_event_body(&mut body).await?; - Ok(Some(InsertableBodyWithHeader { body, header })) + Ok(Some(InsertableBodyWithMeta { body, header })) } } @@ -311,15 +311,15 @@ impl From for Vec { } #[derive(Debug, Clone)] -pub(crate) struct InsertableBodyWithHeader { +pub(crate) struct InsertableBodyWithMeta { pub(crate) body: EventInsertableBody, - pub(crate) header: EventHeader, + pub(crate) header: EventMetadata, } #[derive(Debug, Clone, PartialEq, Eq)] /// An event header wrapper for use in the store crate. /// TODO: replace this with something from the event crate -pub(crate) enum EventHeader { +pub(crate) enum EventMetadata { Init { cid: ceramic_core::Cid, }, @@ -335,12 +335,12 @@ pub(crate) enum EventHeader { }, } -impl EventHeader { +impl EventMetadata { /// Returns the stream CID of the event pub(crate) fn stream_cid(&self) -> ceramic_core::Cid { match self { - EventHeader::Init { cid, .. } => *cid, - EventHeader::Data { stream_cid, .. } | EventHeader::Time { stream_cid, .. } => { + EventMetadata::Init { cid, .. } => *cid, + EventMetadata::Data { stream_cid, .. } | EventMetadata::Time { stream_cid, .. } => { *stream_cid } } @@ -348,8 +348,8 @@ impl EventHeader { pub(crate) fn prev(&self) -> Option { match self { - EventHeader::Init { .. } => None, - EventHeader::Data { prev, .. } | EventHeader::Time { prev, .. } => Some(*prev), + EventMetadata::Init { .. } => None, + EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => Some(*prev), } } }