Skip to content

Commit

Permalink
Add test for the change in previous commit
Browse files Browse the repository at this point in the history
- Also add a correction in accquiring locks in last commit
  • Loading branch information
shaavan committed Dec 9, 2023
1 parent 313e824 commit 163d535
Showing 1 changed file with 143 additions and 0 deletions.
143 changes: 143 additions & 0 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2971,6 +2971,86 @@ where
Ok(counterparty_node_id)
}

#[cfg(test)]
// Function to test the peer removing from per_peer_state midway of a force close.
fn test_force_close_channel_with_peer(&self, channel_id: &ChannelId, peer_node_id: &PublicKey, peer_msg: Option<&String>, broadcast: bool)
-> Result<PublicKey, APIError> {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(peer_node_id)
.ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?;
let (update_opt, counterparty_node_id) = {
let mut peer_state = peer_state_mutex.lock().unwrap();
let closure_reason = if let Some(peer_msg) = peer_msg {
ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) }
} else {
ClosureReason::HolderForceClosed
};
if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id.clone()) {
log_error!(self.logger, "Force-closing channel {}", channel_id);
self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason);
let mut chan_phase = remove_channel_phase!(self, chan_phase_entry);
mem::drop(peer_state);
mem::drop(per_peer_state);
match chan_phase {
ChannelPhase::Funded(mut chan) => {
self.finish_close_channel(chan.context.force_shutdown(broadcast));
(self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id())
},
ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => {
self.finish_close_channel(chan_phase.context_mut().force_shutdown(false));
// Unfunded channel has no update
(None, chan_phase.context().get_counterparty_node_id())
},
}
} else if peer_state.inbound_channel_request_by_id.remove(channel_id).is_some() {
log_error!(self.logger, "Force-closing channel {}", &channel_id);
// N.B. that we don't send any channel close event here: we
// don't have a user_channel_id, and we never sent any opening
// events anyway.
(None, *peer_node_id)
} else {
return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", channel_id, peer_node_id) });
}
};

// Test: The peer_state corresponding to counterparty_node is removed at this point
{
let mut per_peer_state = self.per_peer_state.write().unwrap();
per_peer_state.remove(peer_node_id);
}

if let Some(update) = update_opt {
// Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
// not try to broadcast it via whatever peer we are connected to.
let brodcast_message_evt = events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
};

let per_peer_state = self.per_peer_state.read().unwrap();

// Attempt to get the peer_state_mutex for the peer we force-closed on (counterparty).
let peer_state_mutex_opt = per_peer_state.get(peer_node_id);

match peer_state_mutex_opt {
Some(peer_state_mutex) => {
let mut peer_state = peer_state_mutex.lock().unwrap();
peer_state.pending_msg_events.push(brodcast_message_evt);
}
None => {
// If we could not find the couterparty in our per_peer_state, we poll
// the messages together in pending_broadcast_messages, and broadcast
// them later.
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(brodcast_message_evt);
log_info!(self.logger, "Not able to broadcast channel_update of force-closed channel right now.
Will try rebroadcasting later.");
}
}
}

Ok(counterparty_node_id)
}

fn force_close_sending_error(&self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, broadcast: bool) -> Result<(), APIError> {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
match self.force_close_channel_with_peer(channel_id, counterparty_node_id, None, broadcast) {
Expand Down Expand Up @@ -11456,6 +11536,69 @@ mod tests {
}
}

fn do_test_rebroadcasting_of_force_close_msg_to_a_peer(connected: bool) {
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);

let chan = create_announced_chan_between_nodes(&nodes, 0, 1);

nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());

if !connected {
nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());
}

nodes[0].node.test_force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
check_added_monitors!(nodes[0], 1);
check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);

if connected {
// Assert that channelUpdate message has been added to node[2] pending msg events
let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let peer_state_2 = nodes_0_per_peer_state.get(&nodes[2].node.get_our_node_id()).unwrap().lock().unwrap();
assert_eq!(peer_state_2.pending_msg_events.len(), 1);
}
else {
{
// Assert that channelUpdate message has been added to node[2] pending msg events
let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
assert_eq!(pending_broadcast_messages.len(), 1);
}
// Now node 0, and 2 reconnects
nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
features: nodes[1].node.init_features(), networks: None, remote_network_address: None
}, true).unwrap();
nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
features: nodes[0].node.init_features(), networks: None, remote_network_address: None
}, false).unwrap();

{
// Assert that channelUpdate message has been added to node[2] pending msg events
let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
let peer_state_2 = nodes_0_per_peer_state.get(&nodes[2].node.get_our_node_id()).unwrap().lock().unwrap();
assert_eq!(peer_state_2.pending_msg_events.len(), 1);
}

{
// Assert that channelUpdate message has been added to node[2] pending msg events
let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
assert_eq!(pending_broadcast_messages.len(), 0);
}
}

let _ = nodes[0].node.get_and_clear_pending_msg_events();
}

#[test]
fn test_rebroadcasting_of_force_close_msg_to_a_peer() {
do_test_rebroadcasting_of_force_close_msg_to_a_peer(false);
do_test_rebroadcasting_of_force_close_msg_to_a_peer(true);
}

#[test]
fn test_drop_disconnected_peers_when_removing_channels() {
let chanmon_cfgs = create_chanmon_cfgs(2);
Expand Down

0 comments on commit 163d535

Please sign in to comment.