Skip to content

Commit

Permalink
refactor: modify IOD task to hopefully be more readable
Browse files Browse the repository at this point in the history
- modified the approach to keep everything in the maps until we're done and change things in place when it's identified as deliverable.
this has the advantage of better cancel safety and more consistency about what is in the maps.
- also renamed some things from header->metadata
  • Loading branch information
dav1do committed Jun 20, 2024
1 parent 8cb990a commit a8c2c32
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 105 deletions.
229 changes: 132 additions & 97 deletions service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ enum StreamEvent {
}

impl StreamEvent {
fn is_deliverable(&self) -> bool {
match self {
StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => true,
StreamEvent::Undelivered(_) => false,
}
}

/// Builds a stream event from the database if it exists.
async fn load_by_cid(pool: &SqlitePool, cid: EventCid) -> Result<Option<Self>> {
// TODO: Condense the multiple DB queries happening here into a single query
Expand All @@ -118,7 +125,7 @@ impl StreamEvent {
return Ok(None);
};

let known_prev = match &parsed_body.header {
let known_prev = match &parsed_body.metadata {
EventMetadata::Init { cid, .. } => {
assert!(
deliverable,
Expand Down Expand Up @@ -147,7 +154,7 @@ impl StreamEvent {

impl From<InsertableBodyWithMeta> for StreamEvent {
fn from(ev: InsertableBodyWithMeta) -> Self {
match ev.header {
match ev.metadata {
EventMetadata::Init { cid, .. } => StreamEvent::InitEvent(cid),
EventMetadata::Data { cid, prev, .. } | EventMetadata::Time { cid, prev, .. } => {
let meta = StreamEventMetadata::new(cid, prev);
Expand All @@ -173,10 +180,11 @@ impl StreamEventMetadata {
}
}

#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
/// ~540 bytes per event in this struct
pub(crate) struct StreamEvents {
/// Map of `event.prev` to `event.cid` to find the previous event easily.
/// Map of `event.prev` to `event.cid` to determine which event depended on a newly
/// discovered deliverable event.
prev_map: HashMap<PrevCid, EventCid>,
/// Map of `event.cid` to `metadata` for quick lookup of the event metadata.
cid_map: HashMap<EventCid, StreamEvent>,
Expand All @@ -186,21 +194,26 @@ pub(crate) struct StreamEvents {
new_deliverable: VecDeque<EventCid>,
}

impl Default for StreamEvents {
fn default() -> Self {
Self {
prev_map: HashMap::default(),
cid_map: HashMap::default(),
// default to true so we try to follow the event history on the the first batch loading
// will also avoid any possible race conditions if we somehow get things out of order on the channel
should_process: true,
new_deliverable: VecDeque::new(),
}
}
}

impl StreamEvents {
fn new(event: StreamEvent) -> Self {
let mut new = Self::default();
new.add_event(event);
new
}

// we'll be processed if something in memory depends on this event
fn update_should_process_for_new_delivered(&mut self, new_cid: &EventCid) {
// don't reset the flag if we're already supposed to be processed
if !self.should_process {
self.should_process = self.prev_map.contains_key(new_cid);
}
}

/// returns true if this is a new event.
fn add_event(&mut self, event: StreamEvent) -> bool {
let cid = match &event {
Expand All @@ -217,7 +230,7 @@ impl StreamEvents {
self.prev_map.insert(meta.prev, meta.cid);
if !self.should_process {
// we depend on something in memory
self.should_process = self.prev_map.contains_key(&meta.prev)
self.should_process = self.prev_map.contains_key(&meta.prev);
}
meta.cid
}
Expand All @@ -226,115 +239,138 @@ impl StreamEvents {
self.cid_map.insert(cid, event).is_none()
}

fn remove_by_prev_cid(&mut self, prev: &Cid) -> Option<EventCid> {
if let Some(cid) = self.prev_map.remove(prev) {
self.cid_map.remove(&cid);
Some(cid)
} else {
None
/// we'll be processed if something in memory depends on this event
fn update_should_process_for_new_delivered(&mut self, new_cid: &EventCid) {
// don't reset the flag if we're already supposed to be processed
if !self.should_process {
self.should_process = self.prev_map.contains_key(new_cid);
}
}

/// Called when we've persisted the deliverable events to the database and can clean up our state.
/// Returns `true` if we're finished processing and can be dropped from memory.
/// Returns `false` if we have more work to do and should be retained for future processing (i.e we have more we need to discover)
/// Returns `false` if we have more work to do and should be retained for future processing
fn processing_completed(&mut self) -> bool {
self.should_process = false;

for cid in self.new_deliverable.iter() {
if let Some(ev) = self.cid_map.get_mut(cid) {
match ev {
StreamEvent::InitEvent(cid)
| StreamEvent::KnownDeliverable(StreamEventMetadata { cid, .. }) => {
unreachable!("should not have found a delivered event in our deliverable queue CID={}", cid);
}
StreamEvent::Undelivered(meta) => {
// we're delivered now
*ev = StreamEvent::KnownDeliverable(meta.clone());
}
}
}
}
self.new_deliverable.clear();
!self
.cid_map
.iter()
.any(|(_, ev)| matches!(ev, StreamEvent::Undelivered(_)))
}

/// When we discover the prev event is deliverable, we can mark ourselves as deliverable.
/// This adds us to the queue to insert in the database and updates our state to deliverable.
fn discovered_deliverable_prev(&mut self, cid: EventCid) {
self.new_deliverable.push_back(cid);
let event = self
.cid_map
.get_mut(&cid)
.expect("Must have event in cid_map");
match event {
StreamEvent::InitEvent(cid)
| StreamEvent::KnownDeliverable(StreamEventMetadata { cid, .. }) => {
unreachable!(
"should not have found a deliverable event in our undelivered queue: {}",
cid,
)
}
StreamEvent::Undelivered(meta) => {
// we're delivered now
*self.cid_map.get_mut(&cid).unwrap() = StreamEvent::KnownDeliverable(meta.clone());
}
}
}

async fn order_events(&mut self, pool: &SqlitePool) -> Result<()> {
// We collect everything we can into memory and then order things.
// If our prev is the init event or already been delivered, we can mark ourselves as deliverable.
// If our prev wasn't deliverable yet, we track it and repeat (i.e. follow its prev if we don't have it)

let mut deliverable_queue = VecDeque::new();
let mut undelivered =
// If our prev is deliverable then we can mark ourselves as deliverable. If our prev wasn't deliverable yet,
// we track it and repeat (i.e. add it to our state and the set we're iterating to attempt to load its prev).
// We mutate out state as we go adding things to the queue and changing their known deliverability so that
// if we get canceled while querying the database, we can pick up where we left off. Our queue will still
// have all the events in the order they need to be inserted, and the cid_map state will reflect their deliverability.
let mut undelivered_q =
VecDeque::from_iter(self.cid_map.iter().filter_map(|(cid, ev)| match ev {
StreamEvent::Undelivered(meta) => {
assert_eq!(meta.cid, *cid);
Some((meta.cid, meta.prev))
debug_assert_eq!(meta.cid, *cid);
Some(meta.clone())
}
_ => None,
}));

while let Some((cid, prev)) = undelivered.pop_front() {
if let Some(prev_event) = self.cid_map.get(&prev) {
match prev_event {
StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => {
trace!(
%prev,
%cid,
"Found event whose prev is already in memory and IS deliverable!"
);
deliverable_queue.push_back(cid)
debug!(count=%undelivered_q.len(), "undelivered events to process");

while let Some(StreamEventMetadata {
cid: undelivered_cid,
prev: desired_prev,
}) = undelivered_q.pop_front()
{
if let Some(known_prev) = self.cid_map.get(&desired_prev) {
if known_prev.is_deliverable() {
trace!(
%undelivered_cid,
%desired_prev,
"Found event whose prev is already in memory and IS deliverable!"
);
self.discovered_deliverable_prev(undelivered_cid);
} else {
trace!(
%undelivered_cid,
%desired_prev,
"Found event whose prev is already in memory but NOT deliverable."
);
// nothing to do until it arrives on the channel
}
} else if let Some(discovered_prev) =
StreamEvent::load_by_cid(pool, desired_prev).await?
{
match &discovered_prev {
// we found our prev in the database and it's deliverable, so we're deliverable now
StreamEvent::InitEvent(cid)
| StreamEvent::KnownDeliverable(StreamEventMetadata { cid, .. }) => {
trace!(prev=%cid, cid=%undelivered_cid, "Found deliverable prev event in database");
self.discovered_deliverable_prev(undelivered_cid);
}
StreamEvent::Undelivered(_) => {
trace!(
%prev,
%cid,
"Found event whose prev is already in memory but NOT deliverable."
);
// nothing to do until it arrives on the channel
// it's not deliverable yet so we add track it and append it to the queue we're iterating to search for its prev.
// if we find something deliverable, it will end up marking this new event and then the original event deliverable
// in the final loop at the end.
StreamEvent::Undelivered(prev_meta) => {
undelivered_q.push_back(StreamEventMetadata {
cid: prev_meta.cid,
prev: prev_meta.prev,
});
self.add_event(discovered_prev);
}
}
} else {
let prev_event = StreamEvent::load_by_cid(pool, prev).await?;
if let Some(known_prev) = prev_event {
match &known_prev {
StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => {
deliverable_queue.push_back(cid);
}
StreamEvent::Undelivered(undelivered_ev) => {
// add this to the queue we're processing and try to follow it back to something deliverable
undelivered.push_back((undelivered_ev.cid, undelivered_ev.prev));
}
}
self.add_event(known_prev);
} else {
trace!("Found event that depends on another event we haven't discovered yet");
}
trace!("Found event that depends on another event we haven't discovered yet");
}
}

let mut newly_ready = deliverable_queue.clone();
let mut newly_ready = self.new_deliverable.clone();
while let Some(cid) = newly_ready.pop_front() {
if let Some(now_ready_ev) = self.remove_by_prev_cid(&cid) {
if let Some(ev) = self.cid_map.get(&now_ready_ev) {
match ev {
StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => {
// this would result in a no-op as we to re-update it, but we'll fail loudly so tests crash if it ever happens and we can avoid it ever happening
unreachable!("should not have found a deliverable event when we expected only undelivered events! CID={}", now_ready_ev);
}
StreamEvent::Undelivered(_) => {
newly_ready.push_back(now_ready_ev);
}
if let Some(now_ready) = self.prev_map.get(&cid) {
let ev = self
.cid_map
.get(now_ready)
.expect("must have value in cid_map if it's in prev_map")
.to_owned();
match ev {
StreamEvent::InitEvent(cid) => {
unreachable!("should not have found an undelivered init event and added it to our delivery queue {}", cid);
}
StreamEvent::KnownDeliverable(_) => {
// This is fine as we could have already discovered and added ourself to the queue above.
// We get marked as KnownDeliverable in that case and we don't have anything more to do.
}
StreamEvent::Undelivered(meta) => {
// This might have unlocked something else, so we add it to the back of the queue to check.
newly_ready.push_back(meta.cid);
self.discovered_deliverable_prev(meta.cid);
}
}
deliverable_queue.push_back(now_ready_ev);
newly_ready.push_back(now_ready_ev);
}
}
self.new_deliverable = deliverable_queue;
debug!(count=%self.new_deliverable.len(), "deliverable events discovered");
Ok(())
}
Expand All @@ -359,7 +395,7 @@ impl OrderingState {
/// We will get lots of init events we can ignore unless we need them, otherwise they'll be stuck in memory for a long time.
fn add_inserted_events(&mut self, events: Vec<InsertableBodyWithMeta>) {
for ev in events {
let stream_cid = ev.header.stream_cid();
let stream_cid = ev.metadata.stream_cid();
let event = ev.into();
self.add_stream_event(stream_cid, event);
}
Expand All @@ -378,12 +414,11 @@ impl OrderingState {
/// but for now it assumes it can process all the streams and events in one go. It should be idempotent, so if it fails, it can be retried.
async fn process_streams(&mut self, pool: &SqlitePool) -> Result<()> {
for (_stream_cid, stream_events) in self.pending_by_stream.iter_mut() {
if !stream_events.should_process {
continue;
if stream_events.should_process {
stream_events.order_events(pool).await?;
self.deliverable
.extend(stream_events.new_deliverable.iter());
}
stream_events.order_events(pool).await?;
self.deliverable
.extend(stream_events.new_deliverable.iter());
}

self.persist_ready_events(pool).await?;
Expand Down Expand Up @@ -420,7 +455,7 @@ impl OrderingState {
// We can start processing and we'll follow the stream history if we have it. In that case, we either arrive
// at the beginning and mark them all delivered, or we find a gap and stop processing and leave them in memory.
// In this case, we won't discover them until we start running recon with a peer, so maybe we should drop them
// or otherwise mark them ignored somehow.
// or otherwise mark them ignored somehow. When this function ends, we do drop everything so for now it's probably okay.
event_cnt += state
.process_undelivered_events_batch(pool, undelivered)
.await?;
Expand Down Expand Up @@ -450,7 +485,7 @@ impl OrderingState {

let loaded = CeramicEventService::parse_event_carfile_cid(event_cid, &carfile).await?;

let event = match &loaded.header {
let event = match &loaded.metadata {
EventMetadata::Init { cid, .. } => {
unreachable!("Init events should not be undelivered. CID={}", cid);
}
Expand All @@ -460,7 +495,7 @@ impl OrderingState {
};

event_cnt += 1;
self.add_stream_event(loaded.header.stream_cid(), event);
self.add_stream_event(loaded.metadata.stream_cid(), event);
}
self.process_streams(pool).await?;

Expand Down
Loading

0 comments on commit a8c2c32

Please sign in to comment.