diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index b5fc9996e29..09a17e42bfa 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -892,7 +892,7 @@ pub(super) struct PeerState where SP::Target: SignerProvider { /// The peer is currently connected (i.e. we've seen a /// [`ChannelMessageHandler::peer_connected`] and no corresponding /// [`ChannelMessageHandler::peer_disconnected`]. - is_connected: bool, + pub is_connected: bool, } impl PeerState where SP::Target: SignerProvider { @@ -1392,6 +1392,9 @@ where pending_offers_messages: Mutex>>, + /// Tracks the message events that are to be broadcasted when we are connected to some peer. + pending_broadcast_messages: Mutex>, + entropy_source: ES, node_signer: NS, signer_provider: SP, @@ -1976,7 +1979,7 @@ macro_rules! handle_error { match $internal { Ok(msg) => Ok(msg), Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => { - let mut msg_events = Vec::with_capacity(2); + let mut msg_event = None; if let Some((shutdown_res, update_option)) = shutdown_finish { let counterparty_node_id = shutdown_res.counterparty_node_id; @@ -1988,7 +1991,8 @@ macro_rules! handle_error { $self.finish_close_channel(shutdown_res); if let Some(update) = update_option { - msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -1998,17 +2002,17 @@ macro_rules! handle_error { if let msgs::ErrorAction::IgnoreError = err.action { } else { - msg_events.push(events::MessageSendEvent::HandleError { + msg_event = Some(events::MessageSendEvent::HandleError { node_id: $counterparty_node_id, action: err.action.clone() }); } - if !msg_events.is_empty() { + if let Some(msg_event) = msg_event { let per_peer_state = $self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) { let mut peer_state = peer_state_mutex.lock().unwrap(); - peer_state.pending_msg_events.append(&mut msg_events); + peer_state.pending_msg_events.push(msg_event); } } @@ -2466,6 +2470,7 @@ where funding_batch_states: Mutex::new(BTreeMap::new()), pending_offers_messages: Mutex::new(Vec::new()), + pending_broadcast_messages: Mutex::new(Vec::new()), entropy_source, node_signer, @@ -2957,17 +2962,11 @@ where } }; if let Some(update) = update_opt { - // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if - // not try to broadcast it via whatever peer we have. - let per_peer_state = self.per_peer_state.read().unwrap(); - let a_peer_state_opt = per_peer_state.get(peer_node_id) - .ok_or(per_peer_state.values().next()); - if let Ok(a_peer_state_mutex) = a_peer_state_opt { - let mut a_peer_state = a_peer_state_mutex.lock().unwrap(); - a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } + // If we have some Channel Update to broadcast, we cache it and broadcast it later. + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); } Ok(counterparty_node_id) @@ -4043,6 +4042,7 @@ where .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + for channel_id in channel_ids { if !peer_state.has_channel(channel_id) { return Err(APIError::ChannelUnavailable { @@ -4059,7 +4059,8 @@ where } if let ChannelPhase::Funded(channel) = channel_phase { if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) { peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { node_id: channel.context.get_counterparty_node_id(), @@ -4969,7 +4970,8 @@ where if n >= DISABLE_GOSSIP_TICKS { chan.set_channel_update_status(ChannelUpdateStatus::Disabled); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -4983,7 +4985,8 @@ where if n >= ENABLE_GOSSIP_TICKS { chan.set_channel_update_status(ChannelUpdateStatus::Enabled); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -6642,9 +6645,8 @@ where } if let Some(ChannelPhase::Funded(chan)) = chan_option { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -7304,7 +7306,8 @@ where if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) { failed_channels.push(chan.context.force_shutdown(false, ClosureReason::HolderForceClosed)); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -7489,7 +7492,8 @@ where // We're done with this channel. We got a closing_signed and sent back // a closing_signed with a closing transaction to broadcast. if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -8209,7 +8213,7 @@ where /// will randomly be placed first or last in the returned array. /// /// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate` - /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among + /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among /// the `MessageSendEvent`s to the specific peer they were generated under. fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); @@ -8229,6 +8233,7 @@ where result = NotifyOption::DoPersist; } + let mut is_any_peer_connected = false; let mut pending_events = Vec::new(); let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { @@ -8237,6 +8242,15 @@ where if peer_state.pending_msg_events.len() > 0 { pending_events.append(&mut peer_state.pending_msg_events); } + if peer_state.is_connected { + is_any_peer_connected = true + } + } + + // Ensure that we are connected to some peers before getting broadcast messages. + if is_any_peer_connected { + let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap(); + pending_events.append(&mut broadcast_msgs); } if !pending_events.is_empty() { @@ -8441,6 +8455,7 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; + peer_state.channel_by_id.retain(|_, phase| { match phase { // Retain unfunded channels. @@ -8513,7 +8528,8 @@ where let reason_message = format!("{}", reason); failed_channels.push(channel.context.force_shutdown(true, reason)); if let Ok(update) = self.get_channel_update_for_broadcast(&channel) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -8960,7 +8976,12 @@ where // Gossip &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, - &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, + // [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`] + // This check here is to ensure exhaustivity. + &events::MessageSendEvent::BroadcastChannelUpdate { .. } => { + debug_assert!(false, "This event shouldn't have been here"); + false + }, &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::SendChannelUpdate { .. } => false, &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, @@ -11149,6 +11170,8 @@ where pending_offers_messages: Mutex::new(Vec::new()), + pending_broadcast_messages: Mutex::new(Vec::new()), + entropy_source: args.entropy_source, node_signer: args.node_signer, signer_provider: args.signer_provider, @@ -11678,6 +11701,61 @@ mod tests { } } + #[test] + fn test_channel_update_cached() { + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let chan = create_announced_chan_between_nodes(&nodes, 0, 1); + + nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap(); + check_added_monitors!(nodes[0], 1); + check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000); + + // Confirm that the channel_update was not sent immediately to node[1] but was cached. + let node_1_events = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(node_1_events.len(), 0); + + { + // Assert that ChannelUpdate message has been added to node[0] pending broadcast messages + let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap(); + assert_eq!(pending_broadcast_messages.len(), 1); + } + + // Test that we do not retrieve the pending broadcast messages when we are not connected to any peer + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id()); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + + nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id()); + nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + + let node_0_events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(node_0_events.len(), 0); + + // Now we reconnect to a peer + nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init { + features: nodes[2].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, false).unwrap(); + + // Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages + let node_0_events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(node_0_events.len(), 1); + match &node_0_events[0] { + MessageSendEvent::BroadcastChannelUpdate { .. } => (), + _ => panic!("Unexpected event"), + } + { + // Assert that ChannelUpdate message has been cleared from nodes[0] pending broadcast messages + let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap(); + assert_eq!(pending_broadcast_messages.len(), 0); + } + } + #[test] fn test_drop_disconnected_peers_when_removing_channels() { let chanmon_cfgs = create_chanmon_cfgs(2); diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index e7fc68924ef..fa1b1e7905f 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -1531,11 +1531,29 @@ macro_rules! check_warn_msg { }} } +/// Checks if at least one peer is connected. +fn is_any_peer_connected(node: &Node) -> bool { + let peer_state = node.node.per_peer_state.read().unwrap(); + for (_, peer_mutex) in peer_state.iter() { + let peer = peer_mutex.lock().unwrap(); + if peer.is_connected { return true; } + } + false +} + /// Check that a channel's closing channel update has been broadcasted, and optionally /// check whether an error message event has occurred. pub fn check_closed_broadcast(node: &Node, num_channels: usize, with_error_msg: bool) -> Vec { + let mut dummy_connected = false; + if !is_any_peer_connected(node) { + connect_dummy_node(&node); + dummy_connected = true; + } let msg_events = node.node.get_and_clear_pending_msg_events(); assert_eq!(msg_events.len(), if with_error_msg { num_channels * 2 } else { num_channels }); + if dummy_connected { + disconnect_dummy_node(&node); + } msg_events.into_iter().filter_map(|msg_event| { match msg_event { MessageSendEvent::BroadcastChannelUpdate { ref msg } => { @@ -3039,6 +3057,28 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec(node: &Node<'a, 'b, 'c>) { + let node_id_dummy = PublicKey::from_slice(&[2; 33]).unwrap(); + + let mut dummy_init_features = InitFeatures::empty(); + dummy_init_features.set_static_remote_key_required(); + + let init_dummy = msgs::Init { + features: dummy_init_features, + networks: None, + remote_network_address: None + }; + + node.node.peer_connected(&node_id_dummy, &init_dummy, true).unwrap(); + node.onion_messenger.peer_connected(&node_id_dummy, &init_dummy, true).unwrap(); +} + +pub fn disconnect_dummy_node<'a, 'b: 'a, 'c: 'b>(node: &Node<'a, 'b, 'c>) { + let node_id_dummy = PublicKey::from_slice(&[2; 33]).unwrap(); + node.node.peer_disconnected(&node_id_dummy); + node.onion_messenger.peer_disconnected(&node_id_dummy); +} + // Note that the following only works for CLTV values up to 128 pub const ACCEPTED_HTLC_SCRIPT_WEIGHT: usize = 137; // Here we have a diff due to HTLC CLTV expiry being < 2^15 in test pub const ACCEPTED_HTLC_SCRIPT_WEIGHT_ANCHORS: usize = 140; // Here we have a diff due to HTLC CLTV expiry being < 2^15 in test @@ -3150,15 +3190,21 @@ pub fn check_preimage_claim<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>, prev_txn: &Vec< } pub fn handle_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec>, a: usize, b: usize, needs_err_handle: bool, expected_error: &str) { + let mut dummy_connected = false; + if !is_any_peer_connected(&nodes[a]) { + connect_dummy_node(&nodes[a]); + dummy_connected = true + } + let events_1 = nodes[a].node.get_and_clear_pending_msg_events(); assert_eq!(events_1.len(), 2); - let as_update = match events_1[0] { + let as_update = match events_1[1] { MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; - match events_1[1] { + match events_1[0] { MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => { assert_eq!(node_id, nodes[b].node.get_our_node_id()); assert_eq!(msg.data, expected_error); @@ -3175,17 +3221,24 @@ pub fn handle_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec panic!("Unexpected event"), } - + if dummy_connected { + disconnect_dummy_node(&nodes[a]); + dummy_connected = false; + } + if !is_any_peer_connected(&nodes[b]) { + connect_dummy_node(&nodes[b]); + dummy_connected = true; + } let events_2 = nodes[b].node.get_and_clear_pending_msg_events(); assert_eq!(events_2.len(), if needs_err_handle { 1 } else { 2 }); - let bs_update = match events_2[0] { + let bs_update = match events_2.last().unwrap() { MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; if !needs_err_handle { - match events_2[1] { + match events_2[0] { MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => { assert_eq!(node_id, nodes[a].node.get_our_node_id()); assert_eq!(msg.data, expected_error); @@ -3197,7 +3250,9 @@ pub fn handle_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec panic!("Unexpected event"), } } - + if dummy_connected { + disconnect_dummy_node(&nodes[b]); + } for node in nodes { node.gossip_sync.handle_channel_update(&as_update).unwrap(); node.gossip_sync.handle_channel_update(&bs_update).unwrap(); diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 14aa14b4398..c0254e35fd9 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -2371,13 +2371,13 @@ fn channel_monitor_network_test() { connect_blocks(&nodes[3], TEST_FINAL_CLTV + LATENCY_GRACE_PERIOD_BLOCKS + 1); let events = nodes[3].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 2); - let close_chan_update_1 = match events[0] { + let close_chan_update_1 = match events[1] { MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; - match events[1] { + match events[0] { MessageSendEvent::HandleError { action: ErrorAction::DisconnectPeer { .. }, node_id } => { assert_eq!(node_id, nodes[4].node.get_our_node_id()); }, @@ -2403,13 +2403,13 @@ fn channel_monitor_network_test() { connect_blocks(&nodes[4], TEST_FINAL_CLTV - CLTV_CLAIM_BUFFER + 2); let events = nodes[4].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 2); - let close_chan_update_2 = match events[0] { + let close_chan_update_2 = match events[1] { MessageSendEvent::BroadcastChannelUpdate { ref msg } => { msg.clone() }, _ => panic!("Unexpected event"), }; - match events[1] { + match events[0] { MessageSendEvent::HandleError { action: ErrorAction::DisconnectPeer { .. }, node_id } => { assert_eq!(node_id, nodes[3].node.get_our_node_id()); }, @@ -4605,7 +4605,7 @@ fn test_static_spendable_outputs_preimage_tx() { MessageSendEvent::UpdateHTLCs { .. } => {}, _ => panic!("Unexpected event"), } - match events[1] { + match events[2] { MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("Unexepected event"), } @@ -4648,7 +4648,7 @@ fn test_static_spendable_outputs_timeout_tx() { mine_transaction(&nodes[1], &commitment_tx[0]); check_added_monitors!(nodes[1], 1); let events = nodes[1].node.get_and_clear_pending_msg_events(); - match events[0] { + match events[1] { MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("Unexpected event"), } @@ -5062,7 +5062,7 @@ fn test_duplicate_payment_hash_one_failure_one_success() { MessageSendEvent::UpdateHTLCs { .. } => {}, _ => panic!("Unexpected event"), } - match events[1] { + match events[2] { MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("Unexepected event"), } @@ -5140,7 +5140,7 @@ fn test_dynamic_spendable_outputs_local_htlc_success_tx() { MessageSendEvent::UpdateHTLCs { .. } => {}, _ => panic!("Unexpected event"), } - match events[1] { + match events[2] { MessageSendEvent::BroadcastChannelUpdate { .. } => {}, _ => panic!("Unexepected event"), } @@ -7321,6 +7321,9 @@ fn test_announce_disable_channels() { let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + // Connect a dummy node for proper future events broadcasting + connect_dummy_node(&nodes[0]); + create_announced_chan_between_nodes(&nodes, 0, 1); create_announced_chan_between_nodes(&nodes, 1, 0); create_announced_chan_between_nodes(&nodes, 0, 1);