diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index c19fed5c360..9cc7cc4bafd 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2971,6 +2971,86 @@ where Ok(counterparty_node_id) } + #[cfg(test)] + // Function to test the peer removing from per_peer_state midway of a force close. + fn test_force_close_channel_with_peer(&self, channel_id: &ChannelId, peer_node_id: &PublicKey, peer_msg: Option<&String>, broadcast: bool) + -> Result { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex = per_peer_state.get(peer_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?; + let (update_opt, counterparty_node_id) = { + let mut peer_state = peer_state_mutex.lock().unwrap(); + let closure_reason = if let Some(peer_msg) = peer_msg { + ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) } + } else { + ClosureReason::HolderForceClosed + }; + if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id.clone()) { + log_error!(self.logger, "Force-closing channel {}", channel_id); + self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason); + let mut chan_phase = remove_channel_phase!(self, chan_phase_entry); + mem::drop(peer_state); + mem::drop(per_peer_state); + match chan_phase { + ChannelPhase::Funded(mut chan) => { + self.finish_close_channel(chan.context.force_shutdown(broadcast)); + (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id()) + }, + ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => { + self.finish_close_channel(chan_phase.context_mut().force_shutdown(false)); + // Unfunded channel has no update + (None, chan_phase.context().get_counterparty_node_id()) + }, + } + } else if peer_state.inbound_channel_request_by_id.remove(channel_id).is_some() { + log_error!(self.logger, "Force-closing channel {}", &channel_id); + // N.B. that we don't send any channel close event here: we + // don't have a user_channel_id, and we never sent any opening + // events anyway. + (None, *peer_node_id) + } else { + return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", channel_id, peer_node_id) }); + } + }; + + // Test: The peer_state corresponding to counterparty_node is removed at this point + { + let mut per_peer_state = self.per_peer_state.write().unwrap(); + per_peer_state.remove(peer_node_id); + } + + if let Some(update) = update_opt { + // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if + // not try to broadcast it via whatever peer we are connected to. + let brodcast_message_evt = events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }; + + let per_peer_state = self.per_peer_state.read().unwrap(); + + // Attempt to get the peer_state_mutex for the peer we force-closed on (counterparty). + let peer_state_mutex_opt = per_peer_state.get(peer_node_id); + + match peer_state_mutex_opt { + Some(peer_state_mutex) => { + let mut peer_state = peer_state_mutex.lock().unwrap(); + peer_state.pending_msg_events.push(brodcast_message_evt); + } + None => { + // If we could not find the couterparty in our per_peer_state, we poll + // the messages together in pending_broadcast_messages, and broadcast + // them later. + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(brodcast_message_evt); + log_info!(self.logger, "Not able to broadcast channel_update of force-closed channel right now. + Will try rebroadcasting later."); + } + } + } + + Ok(counterparty_node_id) + } + fn force_close_sending_error(&self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, broadcast: bool) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); match self.force_close_channel_with_peer(channel_id, counterparty_node_id, None, broadcast) { @@ -11456,6 +11536,69 @@ mod tests { } } + fn do_test_rebroadcasting_of_force_close_msg_to_a_peer(connected: bool) { + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let chan = create_announced_chan_between_nodes(&nodes, 0, 1); + + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id()); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + + if !connected { + nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id()); + nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + } + + nodes[0].node.test_force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap(); + check_added_monitors!(nodes[0], 1); + check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000); + + if connected { + // Assert that channelUpdate message has been added to node[2] pending msg events + let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap(); + let peer_state_2 = nodes_0_per_peer_state.get(&nodes[2].node.get_our_node_id()).unwrap().lock().unwrap(); + assert_eq!(peer_state_2.pending_msg_events.len(), 1); + } + else { + { + // Assert that channelUpdate message has been added to node[2] pending msg events + let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap(); + assert_eq!(pending_broadcast_messages.len(), 1); + } + // Now node 0, and 2 reconnects + nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init { + features: nodes[1].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, false).unwrap(); + + { + // Assert that channelUpdate message has been added to node[2] pending msg events + let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap(); + let peer_state_2 = nodes_0_per_peer_state.get(&nodes[2].node.get_our_node_id()).unwrap().lock().unwrap(); + assert_eq!(peer_state_2.pending_msg_events.len(), 1); + } + + { + // Assert that channelUpdate message has been added to node[2] pending msg events + let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap(); + assert_eq!(pending_broadcast_messages.len(), 0); + } + } + + let _ = nodes[0].node.get_and_clear_pending_msg_events(); + } + + #[test] + fn test_rebroadcasting_of_force_close_msg_to_a_peer() { + do_test_rebroadcasting_of_force_close_msg_to_a_peer(false); + do_test_rebroadcasting_of_force_close_msg_to_a_peer(true); + } + #[test] fn test_drop_disconnected_peers_when_removing_channels() { let chanmon_cfgs = create_chanmon_cfgs(2);