Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay broadcasting Channel Updates until connected to peers #2731

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 106 additions & 28 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
/// The peer is currently connected (i.e. we've seen a
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
/// [`ChannelMessageHandler::peer_disconnected`].
is_connected: bool,
pub is_connected: bool,
}

impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
Expand Down Expand Up @@ -1392,6 +1392,9 @@ where

pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,

/// Tracks the message events that are to be broadcasted when we are connected to some peer.
pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,

entropy_source: ES,
node_signer: NS,
signer_provider: SP,
Expand Down Expand Up @@ -1976,7 +1979,7 @@ macro_rules! handle_error {
match $internal {
Ok(msg) => Ok(msg),
Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => {
let mut msg_events = Vec::with_capacity(2);
let mut msg_event = None;

if let Some((shutdown_res, update_option)) = shutdown_finish {
let counterparty_node_id = shutdown_res.counterparty_node_id;
Expand All @@ -1988,7 +1991,8 @@ macro_rules! handle_error {

$self.finish_close_channel(shutdown_res);
if let Some(update) = update_option {
msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand All @@ -1998,17 +2002,17 @@ macro_rules! handle_error {

if let msgs::ErrorAction::IgnoreError = err.action {
} else {
msg_events.push(events::MessageSendEvent::HandleError {
msg_event = Some(events::MessageSendEvent::HandleError {
node_id: $counterparty_node_id,
action: err.action.clone()
});
}

if !msg_events.is_empty() {
if let Some(msg_event) = msg_event {
let per_peer_state = $self.per_peer_state.read().unwrap();
if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) {
let mut peer_state = peer_state_mutex.lock().unwrap();
peer_state.pending_msg_events.append(&mut msg_events);
peer_state.pending_msg_events.push(msg_event);
}
}

Expand Down Expand Up @@ -2466,6 +2470,7 @@ where
funding_batch_states: Mutex::new(BTreeMap::new()),

pending_offers_messages: Mutex::new(Vec::new()),
pending_broadcast_messages: Mutex::new(Vec::new()),

entropy_source,
node_signer,
Expand Down Expand Up @@ -2957,17 +2962,11 @@ where
}
};
if let Some(update) = update_opt {
// Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
// not try to broadcast it via whatever peer we have.
let per_peer_state = self.per_peer_state.read().unwrap();
let a_peer_state_opt = per_peer_state.get(peer_node_id)
.ok_or(per_peer_state.values().next());
if let Ok(a_peer_state_mutex) = a_peer_state_opt {
let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
// If we have some Channel Update to broadcast, we cache it and broadcast it later.
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}

Ok(counterparty_node_id)
Expand Down Expand Up @@ -4043,6 +4042,7 @@ where
.ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;

shaavan marked this conversation as resolved.
Show resolved Hide resolved
for channel_id in channel_ids {
if !peer_state.has_channel(channel_id) {
return Err(APIError::ChannelUnavailable {
Expand All @@ -4059,7 +4059,8 @@ where
}
if let ChannelPhase::Funded(channel) = channel_phase {
if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
} else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
node_id: channel.context.get_counterparty_node_id(),
Expand Down Expand Up @@ -4969,7 +4970,8 @@ where
if n >= DISABLE_GOSSIP_TICKS {
chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand All @@ -4983,7 +4985,8 @@ where
if n >= ENABLE_GOSSIP_TICKS {
chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -6642,9 +6645,8 @@ where
}
if let Some(ChannelPhase::Funded(chan)) = chan_option {
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -7304,7 +7306,8 @@ where
if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) {
failed_channels.push(chan.context.force_shutdown(false, ClosureReason::HolderForceClosed));
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -7489,7 +7492,8 @@ where
// We're done with this channel. We got a closing_signed and sent back
// a closing_signed with a closing transaction to broadcast.
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -8209,7 +8213,7 @@ where
/// will randomly be placed first or last in the returned array.
///
/// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate`
/// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among
/// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among
/// the `MessageSendEvent`s to the specific peer they were generated under.
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let events = RefCell::new(Vec::new());
Expand All @@ -8229,6 +8233,7 @@ where
result = NotifyOption::DoPersist;
}

let mut is_any_peer_connected = false;
let mut pending_events = Vec::new();
let per_peer_state = self.per_peer_state.read().unwrap();
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
Expand All @@ -8237,6 +8242,15 @@ where
if peer_state.pending_msg_events.len() > 0 {
pending_events.append(&mut peer_state.pending_msg_events);
}
if peer_state.is_connected {
is_any_peer_connected = true
}
}

// Ensure that we are connected to some peers before getting broadcast messages.
if is_any_peer_connected {
let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
pending_events.append(&mut broadcast_msgs);
}

if !pending_events.is_empty() {
Expand Down Expand Up @@ -8441,6 +8455,7 @@ where
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;

peer_state.channel_by_id.retain(|_, phase| {
match phase {
// Retain unfunded channels.
Expand Down Expand Up @@ -8513,7 +8528,8 @@ where
let reason_message = format!("{}", reason);
failed_channels.push(channel.context.force_shutdown(true, reason));
if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -8960,7 +8976,12 @@ where
// Gossip
&events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
// [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`]
// This check here is to ensure exhaustivity.
shaavan marked this conversation as resolved.
Show resolved Hide resolved
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => {
debug_assert!(false, "This event shouldn't have been here");
false
},
&events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
&events::MessageSendEvent::SendChannelUpdate { .. } => false,
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
Expand Down Expand Up @@ -11149,6 +11170,8 @@ where

pending_offers_messages: Mutex::new(Vec::new()),

pending_broadcast_messages: Mutex::new(Vec::new()),

entropy_source: args.entropy_source,
node_signer: args.node_signer,
signer_provider: args.signer_provider,
Expand Down Expand Up @@ -11678,6 +11701,61 @@ mod tests {
}
}

#[test]
fn test_channel_update_cached() {
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);

let chan = create_announced_chan_between_nodes(&nodes, 0, 1);

nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
check_added_monitors!(nodes[0], 1);
check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);

// Confirm that the channel_update was not sent immediately to node[1] but was cached.
let node_1_events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(node_1_events.len(), 0);

{
// Assert that ChannelUpdate message has been added to node[0] pending broadcast messages
let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
assert_eq!(pending_broadcast_messages.len(), 1);
}

// Test that we do not retrieve the pending broadcast messages when we are not connected to any peer
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());

nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());

let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(node_0_events.len(), 0);

// Now we reconnect to a peer
nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
features: nodes[2].node.init_features(), networks: None, remote_network_address: None
}, true).unwrap();
nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
features: nodes[0].node.init_features(), networks: None, remote_network_address: None
}, false).unwrap();

// Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages
let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(node_0_events.len(), 1);
shaavan marked this conversation as resolved.
Show resolved Hide resolved
match &node_0_events[0] {
MessageSendEvent::BroadcastChannelUpdate { .. } => (),
_ => panic!("Unexpected event"),
}
{
// Assert that ChannelUpdate message has been cleared from nodes[0] pending broadcast messages
let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
assert_eq!(pending_broadcast_messages.len(), 0);
}
}

#[test]
fn test_drop_disconnected_peers_when_removing_channels() {
let chanmon_cfgs = create_chanmon_cfgs(2);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems all the test changes in this file from here down can be reverted.

Expand Down
Loading
Loading