Skip to content

Commit

Permalink
Merge pull request lightningdevkit#2967 from tnull/2024-03-refactor-d…
Browse files Browse the repository at this point in the history
…rop-handle-message

Split `PeerManager::handle_message` to avoid explicit `mem::drop`
  • Loading branch information
G8XSU authored Apr 5, 2024
2 parents 1e54dd6 + f2ecf8d commit 3eb61f7
Showing 1 changed file with 40 additions and 6 deletions.
46 changes: 40 additions & 6 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1591,15 +1591,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}

/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
///
/// Returns the message back if it needs to be broadcasted to all other peers.
fn handle_message(
&self,
peer_mutex: &Mutex<Peer>,
mut peer_lock: MutexGuard<Peer>,
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
peer_lock: MutexGuard<Peer>,
message: wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>
) -> Result<Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0;
let logger = WithContext::from(&self.logger, Some(their_node_id), None);

let message = match self.do_handle_message_holding_peer_lock(peer_lock, message, &their_node_id, &logger)? {
Some(processed_message) => processed_message,
None => return Ok(None),
};

self.do_handle_message_without_peer_lock(peer_mutex, message, &their_node_id, &logger)
}

// Conducts all message processing that requires us to hold the `peer_lock`.
//
// Returns `None` if the message was fully processed and otherwise returns the message back to
// allow it to be subsequently processed by `do_handle_message_without_peer_lock`.
fn do_handle_message_holding_peer_lock<'a>(
&self,
mut peer_lock: MutexGuard<Peer>,
message: wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
their_node_id: &PublicKey,
logger: &WithContext<'a, L>
) -> Result<Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError>
{
peer_lock.received_message_since_timer_tick = true;

// Need an Init as first message
Expand Down Expand Up @@ -1680,8 +1702,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
peer_lock.received_channel_announce_since_backlogged = true;
}

mem::drop(peer_lock);
Ok(Some(message))
}

// Conducts all message processing that doesn't require us to hold the `peer_lock`.
//
// Returns the message back if it needs to be broadcasted to all other peers.
fn do_handle_message_without_peer_lock<'a>(
&self,
peer_mutex: &Mutex<Peer>,
message: wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
their_node_id: &PublicKey,
logger: &WithContext<'a, L>
) -> Result<Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError>
{
if is_gossip_msg(message.type_id()) {
log_gossip!(logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id));
} else {
Expand Down Expand Up @@ -1883,7 +1917,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
Ok(should_forward)
}

fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
match msg {
wire::Message::ChannelAnnouncement(ref msg) => {
log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
Expand Down Expand Up @@ -2275,7 +2309,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
// We do not have the peers write lock, so we just store that we're
// about to disconnect the peer and do it after we finish
// processing most messages.
let msg = msg.map(|msg| wire::Message::<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg));
let msg = msg.map(|msg| wire::Message::<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg));
peers_to_disconnect.insert(node_id, msg);
},
msgs::ErrorAction::DisconnectPeerWithWarning { msg } => {
Expand Down

0 comments on commit 3eb61f7

Please sign in to comment.