-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: massive commit addressing most of the comments/discussion today
- modified InsertResult across store/api/recon to keep guarantees: recon bools are in the right order and api will only fail individual writes rather than the entire batch (this was an existing bug) - rename ceramic_one_event_header -> ceramic_one_event_metadata and associated types/files - fix IOD code to work. probably does more work than necessary but without sending every changed stream to the task or doing things inline, we can miss events that are being processed while new writes come in (will optimze later as needed) - includes only sending deliverable recon events to the ordering task, rather than all changes to streams, as API writes should have been ordered and we handle discovering changes while we process in the task - removed deliverable/ordering logic from store crate. still need shared event crate type for interface - renamed lots of things: removed "commit" wording, use deliverable instead of delivered - added better docs/comments outstanding: - migrations are still not done - controller.. we didn't talk about how we want to normalize/store controller info
- Loading branch information
Showing
25 changed files
with
660 additions
and
451 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
-- Add down migration script here | ||
DROP TABLE IF EXISTS "ceramic_one_event_metadata"; |
4 changes: 3 additions & 1 deletion
4
...sqlite/20240530125008_event_header.up.sql → ...lite/20240530125008_event_metadata.up.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,11 @@ | ||
-- Add up migration script here | ||
CREATE TABLE IF NOT EXISTS "ceramic_one_event_header" ( | ||
CREATE TABLE IF NOT EXISTS "ceramic_one_event_metadata" ( | ||
"cid" BLOB NOT NULL, -- event cid | ||
"event_type" INTEGER NOT NULL, -- enum EventType: Init, Data, Time | ||
"stream_cid" BLOB NOT NULL, -- id field in header. can't have FK because stream may not exist until we discover it but should reference ceramic_one_stream(cid) | ||
"prev" BLOB, -- prev event cid (can't have FK because node may not know about prev event) | ||
PRIMARY KEY(cid), | ||
FOREIGN KEY(cid) REFERENCES ceramic_one_event(cid) | ||
); | ||
|
||
CREATE INDEX IF NOT EXISTS "idx_ceramic_one_event_metadata_stream_cid" ON "ceramic_one_event_metadata" ("stream_cid"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
mod order_events; | ||
mod ordering_task; | ||
mod service; | ||
mod store; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
use std::collections::HashSet; | ||
|
||
use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; | ||
use cid::Cid; | ||
|
||
use crate::Result; | ||
|
||
pub(crate) struct OrderEvents { | ||
pub(crate) deliverable: Vec<EventInsertable>, | ||
pub(crate) missing_history: Vec<EventInsertable>, | ||
} | ||
|
||
impl OrderEvents { | ||
/// Groups the events into lists those with a delivered prev and those without. This can be used to return an error if the event is required to have history. | ||
/// The events will be marked as deliverable so that they can be passed directly to the store to be persisted. | ||
pub async fn try_new( | ||
pool: &SqlitePool, | ||
mut candidate_events: Vec<EventInsertable>, | ||
) -> Result<Self> { | ||
// move all the init events to the front so we make sure to add them first and get the deliverable order correct | ||
let new_cids: HashSet<Cid> = HashSet::from_iter(candidate_events.iter().map(|e| e.cid())); | ||
let mut deliverable = Vec::with_capacity(candidate_events.len()); | ||
candidate_events.retain(|e| { | ||
if e.deliverable() { | ||
deliverable.push(e.clone()); | ||
false | ||
} else { | ||
true | ||
} | ||
}); | ||
if candidate_events.is_empty() { | ||
return Ok(OrderEvents { | ||
deliverable, | ||
missing_history: Vec::new(), | ||
}); | ||
} | ||
|
||
let mut prevs_in_memory = Vec::with_capacity(candidate_events.len()); | ||
let mut missing_history = Vec::with_capacity(candidate_events.len()); | ||
|
||
while let Some(mut event) = candidate_events.pop() { | ||
match &event.prev() { | ||
None => { | ||
unreachable!("Init events should have been filtered out since they're always deliverable"); | ||
} | ||
Some(prev) => { | ||
if new_cids.contains(prev) { | ||
prevs_in_memory.push(event.clone()); | ||
continue; | ||
} else { | ||
let (_exists, prev_deliverable) = | ||
CeramicOneEvent::deliverable_by_cid(pool, prev).await?; | ||
if prev_deliverable { | ||
event.set_deliverable(true); | ||
deliverable.push(event); | ||
} else { | ||
// technically, we may have the "rosetta stone" event in memory that could unlock this chain, if we loaded everything and recursed, | ||
// but the immediate prev is not in this set and has not been delivered to the client yet, so they shouldn't have known how to | ||
// construct this event so we'll consider this missing history. This can be used to return an error if the event is required to have history. | ||
missing_history.push(event); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// We add the events to the deliverable list until nothing changes. | ||
// It should be a small set and it will shrink each loop, so continually looping is acceptable. | ||
loop { | ||
let mut made_changes = false; | ||
while let Some(mut event) = prevs_in_memory.pop() { | ||
match &event.prev() { | ||
None => { | ||
unreachable!( | ||
"Init events should have been filtered out of the in memory set" | ||
); | ||
} | ||
Some(prev) => { | ||
// a hashset would be better loopkup but we're not going to have that many events so hashing | ||
// for a handful of lookups and then convert back to a vec probably isn't worth it. | ||
if deliverable.iter().any(|e| e.cid() == *prev) { | ||
event.set_deliverable(true); | ||
deliverable.push(event); | ||
made_changes = true; | ||
} else { | ||
prevs_in_memory.push(event); | ||
} | ||
} | ||
} | ||
} | ||
if !made_changes { | ||
missing_history.extend(prevs_in_memory); | ||
break; | ||
} | ||
} | ||
|
||
Ok(OrderEvents { | ||
deliverable, | ||
missing_history, | ||
}) | ||
} | ||
} |
Oops, something went wrong.