Skip to content

Commit

Permalink
fix: correct/better IOD and more
Browse files Browse the repository at this point in the history
- store events include more metadata to avoid reparsing the body repeatedly
- store returns InsertResult with more useful info for clients
- rewrote the IOD to be correct and more performant, including test of long streams
  • Loading branch information
dav1do committed Jun 17, 2024
1 parent 554eec0 commit cfea650
Show file tree
Hide file tree
Showing 18 changed files with 1,149 additions and 967 deletions.
3 changes: 1 addition & 2 deletions service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ ceramic-store.workspace = true
cid.workspace = true
hex.workspace = true
ipld-core.workspace = true
serde_ipld_dagcbor.workspace = true
iroh-bitswap.workspace = true
multihash-codetable.workspace = true
recon.workspace = true
Expand All @@ -32,8 +31,8 @@ ipld-core.workspace = true
multibase.workspace = true
paste = "1.0"
rand.workspace = true
serde.workspace = true
serde_ipld_dagcbor.workspace = true
serde.workspace = true
test-log.workspace = true
tmpdir.workspace = true
tokio.workspace = true
Expand Down
1 change: 1 addition & 0 deletions service/src/event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod order_events;
mod ordering_task;
mod service;
mod store;
Expand Down
102 changes: 102 additions & 0 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::collections::HashSet;

use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool};
use cid::Cid;

use crate::Result;

pub(crate) struct OrderEvents {
pub(crate) deliverable: Vec<EventInsertable>,
pub(crate) missing_history: Vec<EventInsertable>,
}

impl OrderEvents {
/// Groups the events into lists of those with a delivered prev and those without. This can be used to return an error if the event is required to have history.
/// The events will be marked as deliverable so that they can be passed directly to the store to be persisted.
/// Will look up the prev from the database if needed to check if it's deliverable (could possibly change this for recon and allow the ordering task to handle it?)
pub async fn try_new(
pool: &SqlitePool,
mut candidate_events: Vec<EventInsertable>,
) -> Result<Self> {
let new_cids: HashSet<Cid> = HashSet::from_iter(candidate_events.iter().map(|e| e.cid()));
let mut deliverable = Vec::with_capacity(candidate_events.len());
candidate_events.retain(|e| {
if e.deliverable() {
deliverable.push(e.clone());
false
} else {
true
}
});
if candidate_events.is_empty() {
return Ok(OrderEvents {
deliverable,
missing_history: Vec::new(),
});
}

let mut prevs_in_memory = Vec::with_capacity(candidate_events.len());
let mut missing_history = Vec::with_capacity(candidate_events.len());

while let Some(mut event) = candidate_events.pop() {
match &event.prev() {
None => {
unreachable!("Init events should have been filtered out since they're always deliverable");
}
Some(prev) => {
if new_cids.contains(prev) {
prevs_in_memory.push(event.clone());
continue;
} else {
let (_exists, prev_deliverable) =
CeramicOneEvent::deliverable_by_cid(pool, prev).await?;
if prev_deliverable {
event.set_deliverable(true);
deliverable.push(event);
} else {
// technically, we may have the "rosetta stone" event in memory that could unlock this chain, if we loaded everything and recursed,
// but the immediate prev is not in this set and has not been delivered to the client yet, so they shouldn't have known how to
// construct this event so we'll consider this missing history. This can be used to return an error if the event is required to have history.
missing_history.push(event);
}
}
}
}
}

// We add the events to the deliverable list until nothing changes.
// It should be a small set and it will shrink each loop, so continually looping is acceptable.
loop {
let mut made_changes = false;
while let Some(mut event) = prevs_in_memory.pop() {
match &event.prev() {
None => {
unreachable!(
"Init events should have been filtered out of the in memory set"
);
}
Some(prev) => {
// a hashset would be better loopkup but we're not going to have that many events so hashing
// for a handful of lookups and then convert back to a vec probably isn't worth it.
if deliverable.iter().any(|e| e.cid() == *prev) {
event.set_deliverable(true);
deliverable.push(event);
made_changes = true;
} else {
prevs_in_memory.push(event);
}
}
}
}
if !made_changes {
missing_history.extend(prevs_in_memory);
break;
}
}

Ok(OrderEvents {
deliverable,
missing_history,
})
}
}
Loading

0 comments on commit cfea650

Please sign in to comment.