Skip to content

Commit

Permalink
fix: clean up docs/comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dav1do committed Jun 20, 2024
1 parent a8c2c32 commit 82089d2
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type PrevCid = Cid;
#[derive(Debug)]
pub struct DeliverableTask {
pub(crate) _handle: tokio::task::JoinHandle<()>,
/// Currently events discovered over recon that are out of order and need to be marked ready (deliverable)
/// Currently only receives events discovered over recon that are out of order and need to be marked ready (deliverable)
/// when their prev chain is discovered and complete (i.e. my prev is deliverable then I am deliverable).
pub(crate) tx_inserted: tokio::sync::mpsc::Sender<InsertableBodyWithMeta>,
}
Expand Down Expand Up @@ -181,7 +181,7 @@ impl StreamEventMetadata {
}

#[derive(Debug, Clone)]
/// ~540 bytes per event in this struct
/// ~500 bytes per event in this struct
pub(crate) struct StreamEvents {
/// Map of `event.prev` to `event.cid` to determine which event depended on a newly
/// discovered deliverable event.
Expand Down Expand Up @@ -276,7 +276,7 @@ impl StreamEvents {
)
}
StreamEvent::Undelivered(meta) => {
// we're delivered now
// we're deliverable now
*self.cid_map.get_mut(&cid).unwrap() = StreamEvent::KnownDeliverable(meta.clone());
}
}
Expand Down Expand Up @@ -326,14 +326,11 @@ impl StreamEvents {
{
match &discovered_prev {
// we found our prev in the database and it's deliverable, so we're deliverable now
StreamEvent::InitEvent(cid)
| StreamEvent::KnownDeliverable(StreamEventMetadata { cid, .. }) => {
trace!(prev=%cid, cid=%undelivered_cid, "Found deliverable prev event in database");
StreamEvent::InitEvent(_) | StreamEvent::KnownDeliverable(_) => {
self.discovered_deliverable_prev(undelivered_cid);
}
// it's not deliverable yet so we add track it and append it to the queue we're iterating to search for its prev.
// if we find something deliverable, it will end up marking this new event and then the original event deliverable
// in the final loop at the end.
// if we follow this chain to something deliverable in this loop, the values we have in memory will be updated in the final loop at the end.
StreamEvent::Undelivered(prev_meta) => {
undelivered_q.push_back(StreamEventMetadata {
cid: prev_meta.cid,
Expand Down Expand Up @@ -364,7 +361,8 @@ impl StreamEvents {
// We get marked as KnownDeliverable in that case and we don't have anything more to do.
}
StreamEvent::Undelivered(meta) => {
// This might have unlocked something else, so we add it to the back of the queue to check.
// Discovering this event's prev (and therefore this event) as deliverable might have unlocked
// something else, so we add it to the back of the queue to check.
newly_ready.push_back(meta.cid);
self.discovered_deliverable_prev(meta.cid);
}
Expand All @@ -391,8 +389,6 @@ impl OrderingState {
}

/// Add a stream to the list of streams to process.
/// We ignore delivered events for streams we're not tracking as we can look them up later if we need them.
/// We will get lots of init events we can ignore unless we need them, otherwise they'll be stuck in memory for a long time.
fn add_inserted_events(&mut self, events: Vec<InsertableBodyWithMeta>) {
for ev in events {
let stream_cid = ev.metadata.stream_cid();
Expand All @@ -401,6 +397,9 @@ impl OrderingState {
}
}

/// Add an event to the list of events to process. Only creates a new stream to track if it's an undelivered event.
/// We ignore delivered events for streams we're not tracking as we can look them up later if we need them.
/// As we get lots of init events, we don't want them to be stuck in memory unless we have a reason to track them.
fn add_stream_event(&mut self, stream_cid: StreamCid, event: StreamEvent) {
if let Some(stream) = self.pending_by_stream.get_mut(&stream_cid) {
stream.add_event(event);
Expand All @@ -421,7 +420,16 @@ impl OrderingState {
}
}

self.persist_ready_events(pool).await?;
match self.persist_ready_events(pool).await {
Ok(_) => {}
Err(err) => {
// Clear the queue as we'll rediscover it on the next run, rather than try to double update everything.
// We will no-op the updates so it doesn't really hurt but it's unnecessary.
// The StreamEvents in our pending_by_stream map all have their state updated in memory so we can pick up where we left off.
self.deliverable.clear();
return Err(err);
}
}
// keep things that still have missing history but don't process them again until we get something new
self.pending_by_stream
.retain(|_, stream_events| !stream_events.processing_completed());
Expand Down

0 comments on commit 82089d2

Please sign in to comment.