-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: IOD long streams could remain undelivered #387
Conversation
cfea650
to
41778d4
Compare
41778d4
to
4fed1e9
Compare
4fed1e9
to
1cd9f15
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's still pretty tough to follow the control flow here, though I do think this is better than the previous version.
Left a bunch of comments. Admittedly, a lot of my comments are kind of nit-picky. It's just that with code this complicated any little bit to improve readability and reduce cognitive overhead of someone trying to read and understand this code is extra valuable I think.
1cd9f15
to
75a65f2
Compare
6ebe671
to
4a32725
Compare
9c6cdc2
to
90379ed
Compare
- is now correct and handles long streams out of order successfully in all cases (afaict). Should be more performant as it only does work when it should be necessary. - includes test of long streams
- use map of {cid: deliverable} to reduce the number of entires we have to review at the end - use a VecDeque instead of a Vec to fix a bug where it would just process the same event and exit rather than find the in memory history - need to write tests to exercise this. it would have rejected an api write that had a prev in one batch while recon events would be sorted out. would be hard to encounter but merits tests, particulary because it's hard to encounter.
- process all undelivered events at server startup so we can fail the process not just the task - adjust loop to while condition - update some comments/function names - fail loudly (i.e. panic) on anything expected to be unreachable (next up: refactor to make those states unrepresentable)
- modified the approach to keep everything in the maps until we're done and change things in place when it's identified as deliverable. this has the advantage of better cancel safety and more consistency about what is in the maps. - also renamed some things from header->metadata
also changed struct sent to ordering task to avoid sending body we don't need and the ordering task correspondingly
the check should have been cid_map before too
we should never discover undelivered init events while processing the start up batch, but we can correct them so we aren't going to crash
Don't want the process to appear to be hanging without any indication of what is going on if there is a large backlog of undelivered events
use `send` instead of `try_send` to enforce backpressure, clean up comments/docs
40f6e25
to
4ed6721
Compare
if we're not done, we keep one event in memory until next time to optimize for in order streams. we also try to process on all undelivered events for a stream we're tracking as things fork and we don't know what happened while we were in the queue
Co-authored-by: Spencer T Brody <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Its merged but I do have a few places where code readability could be improved. Not urgent but thought I'd share.
use super::service::EventMetadata; | ||
|
||
pub(crate) struct OrderEvents { | ||
pub(crate) deliverable: Vec<(EventInsertable, EventMetadata)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these are meant to be read only can we add getters for them?
for event in stream_1.iter().chain(stream_2.iter()) { | ||
let insertable = CeramicEventService::validate_discovered_event( | ||
event.0.to_owned(), | ||
event.1.as_slice(), | ||
) | ||
.await | ||
.unwrap(); | ||
to_insert.push(insertable); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop exists in a few places in this test code. Can we abstract it into a function for clarity in reading the code?
let mut after_1 = Vec::with_capacity(10); | ||
let mut after_2 = Vec::with_capacity(10); | ||
for (event, _) in ordered.deliverable { | ||
assert!(event.deliverable()); | ||
if stream_1.iter().any(|e| e.0 == event.order_key) { | ||
after_1.push(event.order_key.clone()); | ||
} else { | ||
after_2.push(event.order_key.clone()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic to separate out the events into groups by there stream exists in a few places. Can we also make this into a function? That would help the test code read well as it doesn't get distracted by the particulars of how we split the data into groups.
Not necessary but this grouping
concept exists in itertools see https://docs.rs/itertools/latest/itertools/trait.Itertools.html#method.group_by. Might make the code a little clearer.
} | ||
let to_insert = to_insert_with_metadata | ||
.iter() | ||
.map(|(e, _)| e.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to clone all the events before inserting them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed by refactoring
let missing_history = ordered | ||
.missing_history | ||
.iter() | ||
.map(|(e, _)| e.order_key.clone()) | ||
.collect(); | ||
|
||
async fn parse_item<'a>( | ||
item: &ReconItem<'a, EventId>, | ||
) -> Result<(EventInsertable, Option<DeliverableMetadata>)> { | ||
let cid = item.key.cid().ok_or_else(|| { | ||
Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", item.key)) | ||
})?; | ||
// we want to end a conversation if any of the events aren't ceramic events and not store them | ||
// this includes making sure the key matched the body cid | ||
let (insertable_body, maybe_prev) = | ||
CeramicEventService::parse_event_carfile(cid, item.value).await?; | ||
let insertable = EventInsertable::try_new(item.key.to_owned(), insertable_body)?; | ||
Ok((insertable, maybe_prev)) | ||
} | ||
let to_insert_with_metadata = if history_required { | ||
ordered.deliverable | ||
} else { | ||
ordered | ||
.deliverable | ||
.into_iter() | ||
.chain(ordered.missing_history) | ||
.collect() | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of cloning the missing history why not have two separate calls to CeramicOneEvent::insert_many
. Maybe that is slower than cloning since its two transactions but thought I'd ask.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I wanted to avoid the database twice, but I adjusted things with the change to iterator that avoids needing to allocate
if history_required { | ||
return Ok(InsertResult { | ||
store_result: res, | ||
missing_history, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, can you remove this early return and guard the discovered logic with !history_required
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure I asked for the exact opposite in an earlier review 😅.
Can I ask why you don't like the early return? Personally I'm a big fan of early returns, I like removing the extra indentation layer and I think it helps reduce cognitive load by letting me see that this case is over and dealt with and I don't need to think about it any more, vs being in a big if block where there's some overhead to remembering what block I'm in and what the condition in play is right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally dislike early returns as they make it harder to determine the different code branches as they can be easy to miss. In this case both returns, return the same value so they were really the same code branch just duplicated.
Not a strong preference here either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fascinating! I've never heard an argument against early returns before. I often push for early returns when I do code reviews because I've always felt they improve code readability. I always thought that opinion was non-controversial 😅
Good to know it's not as clear-cut as I thought. I still like them but don't feel strongly either way. Maybe I'll stop pushing as hard for others to use them going forward
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also don't like early returns unless they're right away. At least in rust I prefer to rely on the fact that it's an expression and returns something.
That said, I think I can extract the big if block so it's easy to understand (and hopefully satisfies everyone 😁).
.push(DeliveredEvent::new(ev.body.cid, init_cid)); | ||
self.insert_now.push(ev); | ||
} | ||
let res = CeramicOneEvent::insert_many(&self.pool, &to_insert[..]).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In reading the code it seems like if we change this method to accept an iterator instead of a slice we do not have to do as much cloning of the data. For example we could keep the to_insert vector as a tuple of the insertable and the metadata and use an iter to hide the metadata. This would avoid having the map the insertable back to its metadata below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I thought that too but hesitated to add another refactor. I did it in the new branch and it removes some allocations (the clone above as well as collects)
|
||
let metadata = EventMetadata::from(parsed_event); | ||
let mut body = EventInsertableBody::try_from_carfile(cid, carfile).await?; | ||
body.set_deliverable(matches!(metadata, EventMetadata::Init { .. })); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this logic here instead of part of the event ordering bits? While true any init event is deliverable it seems like this logic should live with the other logic that knows that an init event doesn't have a prev.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved it to the Ordering task. Originally I thought it might be less likely to ever be missed if it just happened right away but I think it makes sense in the new location.
/// Returns `false` if we have more work to do and should be retained for future processing | ||
fn processing_completed(&mut self) -> bool { | ||
// if we're done, we don't need to bother cleaning up since we get dropped | ||
if !self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, instead of !...any(Undelivered)
use ..all(!Undelivered)
.
Address AES-83 and replaces part of #375 and incorporates a lot of the feedback from that one. Depends on #390.
This changes the initial "deliver all undelivered events" process to happen when creating a
CeramicEventService
, which means it could take a while, and will block (or prevent if it fails) the server from starting rather than happen in the background.There are new/better tests. Some where deleted/replaced that were specific to the IOD implementation details and the higher API is now tested exclusively. The general premise is to send recon events to the task, and it will track them in memory if they need history. Delivered events will be be stored if we have the stream in memory, and will trigger a review of the stream if they unblock something we have. If we don't have the stream, we drop them (but we need to learn about them jic we needed them).
When reviewing streams, we order as many events as we can and then insert them. We use mutable state, but things should be cancel and retry safe, as we don't drop things until we've successfully committed to the database. The previous bugs were due to some state management mistakes about when to keep things around, when to trigger a review, when to look things up (so pretty much everything 😞). This implementation should be simpler, handles these situations correctly, does less work and uses an enum rather than flags to make things harder to mess up but is pretty much a rewrite of
ordering_task.rs
.I'm not happy with the structs needed to wrap different varieties of the event (unvalidated, store with order key or without, etc). Cleaning that up will make event validation easier. This should fix the test in ceramicnetwork/js-ceramic#3205.