Skip to content

Commit

Permalink
refactor: use event crate carfile parsing
Browse files Browse the repository at this point in the history
also changed struct sent to ordering task to avoid sending body we don't need and the ordering task correspondingly
  • Loading branch information
dav1do committed Jun 20, 2024
1 parent 82089d2 commit 4a32725
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 153 deletions.
4 changes: 2 additions & 2 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ impl OrderEvents {
unreachable!("Init events should have been filtered out since they're always deliverable");
}
Some(prev) => {
if let Some(in_mem) = new_cids.get(&prev) {
if *in_mem {
if let Some(in_mem_is_deliverable) = new_cids.get(&prev) {
if *in_mem_is_deliverable {
event.body.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
deliverable.push((event, header));
Expand Down
83 changes: 47 additions & 36 deletions service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::collections::{HashMap, VecDeque};

use anyhow::anyhow;
use ceramic_core::EventId;
use ceramic_event::unvalidated;
use ceramic_store::{CeramicOneEvent, SqlitePool};
use cid::Cid;
use ipld_core::ipld::Ipld;
use tracing::{debug, error, info, trace, warn};

use crate::{CeramicEventService, Error, Result};
use crate::{Error, Result};

use super::service::{EventMetadata, InsertableBodyWithMeta};
use super::service::{DiscoveredEvent, EventMetadata};

type StreamCid = Cid;
type EventCid = Cid;
Expand All @@ -18,7 +21,7 @@ pub struct DeliverableTask {
pub(crate) _handle: tokio::task::JoinHandle<()>,
/// Currently only receives events discovered over recon that are out of order and need to be marked ready (deliverable)
/// when their prev chain is discovered and complete (i.e. my prev is deliverable then I am deliverable).
pub(crate) tx_inserted: tokio::sync::mpsc::Sender<InsertableBodyWithMeta>,
pub(crate) tx_inserted: tokio::sync::mpsc::Sender<DiscoveredEvent>,
}

#[derive(Debug)]
Expand All @@ -37,8 +40,7 @@ impl OrderingTask {

/// Spawn a task to run the ordering task background process in a loop
pub async fn run(pool: SqlitePool, q_depth: usize) -> DeliverableTask {
let (tx_inserted, rx_inserted) =
tokio::sync::mpsc::channel::<InsertableBodyWithMeta>(q_depth);
let (tx_inserted, rx_inserted) = tokio::sync::mpsc::channel::<DiscoveredEvent>(q_depth);

let handle = tokio::spawn(async move { Self::run_loop(pool, rx_inserted).await });

Expand All @@ -50,7 +52,7 @@ impl OrderingTask {

async fn run_loop(
pool: SqlitePool,
mut rx_inserted: tokio::sync::mpsc::Receiver<InsertableBodyWithMeta>,
mut rx_inserted: tokio::sync::mpsc::Receiver<DiscoveredEvent>,
) {
let mut state = OrderingState::new();

Expand Down Expand Up @@ -117,22 +119,28 @@ impl StreamEvent {
// TODO: Condense the multiple DB queries happening here into a single query
let (exists, deliverable) = CeramicOneEvent::deliverable_by_cid(pool, &cid).await?;
if exists {
let parsed_body =
if let Some(body) = CeramicEventService::load_by_cid(pool, cid).await? {
body
} else {
warn!(%cid, "No event body found for event that should exist");
return Ok(None);
};
let data = CeramicOneEvent::value_by_cid(pool, &cid)
.await?
.ok_or_else(|| {
Error::new_app(anyhow!(
"Missing event data for event that must exist: CID={}",
cid
))
})?;
let (_cid, parsed) = unvalidated::Event::<Ipld>::decode_car(data.as_slice(), false)
.await
.map_err(Error::new_app)?;

let metadata = EventMetadata::from(parsed);

let known_prev = match &parsed_body.metadata {
EventMetadata::Init { cid, .. } => {
let known_prev = match &metadata {
EventMetadata::Init => {
assert!(
deliverable,
"Init event must always be deliverable. Found undelivered CID: {}",
cid
);
StreamEvent::InitEvent(*cid)
StreamEvent::InitEvent(cid)
}
EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => {
if deliverable {
Expand All @@ -152,13 +160,13 @@ impl StreamEvent {
}
}

impl From<InsertableBodyWithMeta> for StreamEvent {
fn from(ev: InsertableBodyWithMeta) -> Self {
impl From<DiscoveredEvent> for StreamEvent {
fn from(ev: DiscoveredEvent) -> Self {
match ev.metadata {
EventMetadata::Init { cid, .. } => StreamEvent::InitEvent(cid),
EventMetadata::Data { cid, prev, .. } | EventMetadata::Time { cid, prev, .. } => {
let meta = StreamEventMetadata::new(cid, prev);
if ev.body.deliverable() {
EventMetadata::Init => StreamEvent::InitEvent(ev.cid),
EventMetadata::Data { prev, .. } | EventMetadata::Time { prev, .. } => {
let meta = StreamEventMetadata::new(ev.cid, prev);
if ev.known_deliverable {
StreamEvent::KnownDeliverable(meta)
} else {
StreamEvent::Undelivered(meta)
Expand Down Expand Up @@ -389,9 +397,9 @@ impl OrderingState {
}

/// Add a stream to the list of streams to process.
fn add_inserted_events(&mut self, events: Vec<InsertableBodyWithMeta>) {
fn add_inserted_events(&mut self, events: Vec<DiscoveredEvent>) {
for ev in events {
let stream_cid = ev.metadata.stream_cid();
let stream_cid = ev.stream_cid();
let event = ev.into();
self.add_stream_event(stream_cid, event);
}
Expand Down Expand Up @@ -486,24 +494,27 @@ impl OrderingState {
) -> Result<usize> {
trace!(cnt=%event_data.len(), "Processing undelivered events batch");
let mut event_cnt = 0;
for (event_id, carfile) in event_data {
let event_cid = event_id.cid().ok_or_else(|| {
Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", event_id))
})?;
for (_event_id, carfile) in event_data {
let (cid, parsed_event) =
unvalidated::Event::<Ipld>::decode_car(carfile.as_slice(), false)
.await
.map_err(Error::new_app)?;

let loaded = CeramicEventService::parse_event_carfile_cid(event_cid, &carfile).await?;
let metadata = EventMetadata::from(parsed_event);

let event = match &loaded.metadata {
EventMetadata::Init { cid, .. } => {
let (stream_cid, loaded) = match &metadata {
EventMetadata::Init => {
unreachable!("Init events should not be undelivered. CID={}", cid);
}
EventMetadata::Data { cid, prev, .. } | EventMetadata::Time { cid, prev, .. } => {
StreamEvent::Undelivered(StreamEventMetadata::new(*cid, *prev))
}
EventMetadata::Data { stream_cid, prev }
| EventMetadata::Time { stream_cid, prev } => (
stream_cid,
StreamEvent::Undelivered(StreamEventMetadata::new(cid, *prev)),
),
};

event_cnt += 1;
self.add_stream_event(loaded.metadata.stream_cid(), event);
self.add_stream_event(*stream_cid, loaded);
}
self.process_streams(pool).await?;

Expand Down Expand Up @@ -542,7 +553,7 @@ mod test {
let events = get_n_events(n).await;
for event in events {
let (event, _) =
CeramicEventService::parse_event_carfile_order_key(event.0.to_owned(), &event.1)
CeramicEventService::validate_discovered_event(event.0.to_owned(), &event.1)
.await
.unwrap();
res.push(event);
Expand Down
Loading

0 comments on commit 4a32725

Please sign in to comment.