-
Notifications
You must be signed in to change notification settings - Fork 376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a parallel async event handler to OnionMessenger and pass it directly to BackgroundProcessor #3060
Add a parallel async event handler to OnionMessenger and pass it directly to BackgroundProcessor #3060
Conversation
This allows us to have a bound on any `OnionMessenger` without having to create bounds for every bound in `OnionMessenger`.
If we have a handful of futures we want to make progress on simultaneously we need some way to poll all of them in series, which we add here in the form of `MultiFuturePoller`. Its probably not as effecient as some of the options in the `futures` crate, but it is very trivial and not that bad.
e5106f9
to
a915c6f
Compare
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## main #3060 +/- ##
==========================================
+ Coverage 89.83% 92.08% +2.25%
==========================================
Files 116 118 +2
Lines 96472 113057 +16585
Branches 96472 113057 +16585
==========================================
+ Hits 86663 104113 +17450
+ Misses 7244 6726 -518
+ Partials 2565 2218 -347 ☔ View full report in Codecov by Sentry. |
62317c8
to
15d1135
Compare
Looks like 1.63 builds are sad |
/// A type implementing [`EntropySource`] | ||
type EntropySource: EntropySource + ?Sized; | ||
/// A type that may be dereferenced to [`Self::EntropySource`] | ||
type ES: Deref<Target = Self::EntropySource>; | ||
/// A type implementing [`NodeSigner`] | ||
type NodeSigner: NodeSigner + ?Sized; | ||
/// A type that may be dereferenced to [`Self::NodeSigner`] | ||
type NS: Deref<Target = Self::NodeSigner>; | ||
/// A type implementing [`Logger`] | ||
type Logger: Logger + ?Sized; | ||
/// A type that may be dereferenced to [`Self::Logger`] | ||
type L: Deref<Target = Self::Logger>; | ||
/// A type implementing [`NodeIdLookUp`] | ||
type NodeIdLookUp: NodeIdLookUp + ?Sized; | ||
/// A type that may be dereferenced to [`Self::NodeIdLookUp`] | ||
type NL: Deref<Target = Self::NodeIdLookUp>; | ||
/// A type implementing [`MessageRouter`] | ||
type MessageRouter: MessageRouter + ?Sized; | ||
/// A type that may be dereferenced to [`Self::MessageRouter`] | ||
type MR: Deref<Target = Self::MessageRouter>; | ||
/// A type implementing [`OffersMessageHandler`] | ||
type OffersMessageHandler: OffersMessageHandler + ?Sized; | ||
/// A type that may be dereferenced to [`Self::OffersMessageHandler`] | ||
type OMH: Deref<Target = Self::OffersMessageHandler>; | ||
/// A type implementing [`CustomOnionMessageHandler`] | ||
type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized; | ||
/// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: end sentences in periods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think they're full sentences?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AChannelManager
's docs have the same sentence structure and use periods. Feel free to ignore, though.
Some(Event::OnionMessageIntercepted { .. }) => { | ||
futures.push(Some(handler(next_event.unwrap()))); | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this parallelism affect how users structure their database storage for intercepted OMs? Just wondering if some extra docs could be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure why it would matter? DBs should generally have no issue with parallel insertions, no matter the structure really.
15d1135
to
8e6dcaa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending a second reviewer.
// We process events in parallel, but we want to complete `OnionMessageIntercepted` events | ||
// prior to `OnionMessagePeerConnected` ones. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not have OnionMessenger
store the events in two separate Vec
s? Seems like the code would be a lot simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhhh, right, duh, that would be simpler.
In the next commit, `OnionMessenger` events are handled in parallel using rust async. When we do that, we'll want to handle `OnionMessageIntercepted` events prior to `OnionMessagePeerConnected` ones. While we'd generally prefer to handle all events in the order they were generated, if we want to handle them in parallel, we don't want a `OnionMessageIntercepted` event to start being processed, then handle an `OnionMessagePeerConnected` prior to the first completing. This could cause us to store a freshly-intercepted message for a peer in a DB that was just wiped because the peer is now connected. This does run the risk of processing a `OnionMessagePeerConnected` event prior to an `OnionMessageIntercepted` event (because a peer connected, then disconnected, then we received a message for that peer all before any events were handled), that is somewhat less likely and discarding a message in a rare race is better than leaving a message lying around undelivered. Thus, here, we store `OnionMessenger` events in separate `Vec`s which we can pull from in message-type-order.
This adds an `OnionMessenger::process_pending_events_async` mirroring the same in `ChannelManager`. However, unlike the one in `ChannelManager`, this processes the events in parallel by spawning all futures and using the new `MultiFuturePoller`. Because `OnionMessenger` just generates a stream of messages to store/fetch, we first process all the events to store new messages, `await` them, then process all the events to fetch stored messages, ensuring reordering shouldn't result in lost messages (unless we race with a peer disconnection, which could happen anyway).
When `OnionMessenger` first developed a timer and events interface, we accessed the `OnionMessenger` indirectly via the `PeerManager`. While this is a fairly awkward interface, it avoided a large pile of generics on the background processor interfaces. However, since we now have an `AOnionMessenger` trait, this concern is no longer significant. Further, because we now want to use the built-in `OnionMessenger` async event processing method, we really need a direct referene to the `OnionMessenger` in the background processor, which we add here optionally.
This never really made a lot of sense from an API perspective, but was required to avoid handing the background processor an explicit `OnionMessegner`, which we are now doing. Thus, we can simply drop these bounds as unnecessary.
8e6dcaa
to
fadb268
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
fadb268
to
21aebd2
Compare
Addressed @jkczyz's point and squashed: $ git diff-tree -U1 fadb26875 21aebd2d7
diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs
index 6b7cd5b05..a17f514a5 100644
--- a/lightning-background-processor/src/lib.rs
+++ b/lightning-background-processor/src/lib.rs
@@ -57,5 +57,2 @@ use std::time::Instant;
-#[cfg(not(feature = "std"))]
-use alloc::boxed::Box;
-
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
@@ -696,3 +693,3 @@ where
let fetch_time = &fetch_time;
- Box::pin(async move { // We should be able to drop the Box once our MSRV is 1.68
+ async move { // We should be able to drop the Box once our MSRV is 1.68
if let Some(network_graph) = network_graph {
@@ -711,3 +708,3 @@ where
event_handler(event).await;
- })
+ }
}; |
LGTM after the comment about |
21aebd2
to
eedceeb
Compare
🤦 $ git diff-tree -U1 21aebd2d eedceeb3
diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs
index a17f514a5..a5d56c459 100644
--- a/lightning-background-processor/src/lib.rs
+++ b/lightning-background-processor/src/lib.rs
@@ -693,3 +693,3 @@ where
let fetch_time = &fetch_time;
- async move { // We should be able to drop the Box once our MSRV is 1.68
+ async move {
if let Some(network_graph) = network_graph { |
Unfortunately tests are still failing due to |
Doh! That's why the Box was there... |
eedceeb
to
fadb268
Compare
Reverted to include the |
lol ffs merged without noticing there were still fixup commits...oh well, they were just doc fixes so not really a big deal. |
This adds an
OnionMessenger::process_pending_events_async
mirroring the same in
ChannelManager
. However, unlike the one inChannelManager
, this processes the events in parallel by spawningall futures and using the new
MultiFuturePoller
.Because
OnionMessenger
just generates a stream of messages tostore/fetch, we first process all the events to store new messages,
await
them, then process all the events to fetch stored messages,ensuring reordering shouldn't result in lost messages (unless we
race with a peer disconnection, which could happen anyway).