Skip to content

Commit

Permalink
Merge pull request #2558 from waterson/pr-2554
Browse files Browse the repository at this point in the history
Handle retrying sign_counterparty_commitment failures
  • Loading branch information
TheBlueMatt authored Nov 2, 2023
2 parents d795e24 + 014a336 commit 281a0ae
Show file tree
Hide file tree
Showing 8 changed files with 687 additions and 130 deletions.
1 change: 1 addition & 0 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl SignerProvider for KeyProvider {
inner,
state,
disable_revocation_policy_check: false,
available: Arc::new(Mutex::new(true)),
})
}

Expand Down
323 changes: 323 additions & 0 deletions lightning/src/ln/async_signer_tests.rs

Large diffs are not rendered by default.

318 changes: 213 additions & 105 deletions lightning/src/ln/channel.rs

Large diffs are not rendered by default.

99 changes: 85 additions & 14 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3802,7 +3802,7 @@ where

let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let (chan, msg) = match peer_state.channel_by_id.remove(temporary_channel_id) {
let (chan, msg_opt) = match peer_state.channel_by_id.remove(temporary_channel_id) {
Some(ChannelPhase::UnfundedOutboundV1(chan)) => {
let funding_txo = find_funding_output(&chan, &funding_transaction)?;

Expand Down Expand Up @@ -3841,10 +3841,12 @@ where
}),
};

peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
node_id: chan.context.get_counterparty_node_id(),
msg,
});
if let Some(msg) = msg_opt {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
node_id: chan.context.get_counterparty_node_id(),
msg,
});
}
match peer_state.channel_by_id.entry(chan.context.channel_id()) {
hash_map::Entry::Occupied(_) => {
panic!("Generated duplicate funding txid?");
Expand Down Expand Up @@ -6229,7 +6231,7 @@ where

let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let (chan, funding_msg, monitor) =
let (chan, funding_msg_opt, monitor) =
match peer_state.channel_by_id.remove(&msg.temporary_channel_id) {
Some(ChannelPhase::UnfundedInboundV1(inbound_chan)) => {
match inbound_chan.funding_created(msg, best_block, &self.signer_provider, &self.logger) {
Expand All @@ -6252,17 +6254,20 @@ where
None => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id))
};

match peer_state.channel_by_id.entry(funding_msg.channel_id) {
match peer_state.channel_by_id.entry(chan.context.channel_id()) {
hash_map::Entry::Occupied(_) => {
Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
Err(MsgHandleErrInternal::send_err_msg_no_close(
"Already had channel with the new channel_id".to_owned(),
chan.context.channel_id()
))
},
hash_map::Entry::Vacant(e) => {
let mut id_to_peer_lock = self.id_to_peer.lock().unwrap();
match id_to_peer_lock.entry(chan.context.channel_id()) {
hash_map::Entry::Occupied(_) => {
return Err(MsgHandleErrInternal::send_err_msg_no_close(
"The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
funding_msg.channel_id))
chan.context.channel_id()))
},
hash_map::Entry::Vacant(i_e) => {
let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
Expand All @@ -6274,10 +6279,12 @@ where
// hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
// accepted payment from yet. We do, however, need to wait to send our channel_ready
// until we have persisted our monitor.
peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
node_id: counterparty_node_id.clone(),
msg: funding_msg,
});
if let Some(msg) = funding_msg_opt {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
node_id: counterparty_node_id.clone(),
msg,
});
}

if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) {
handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state,
Expand All @@ -6288,9 +6295,13 @@ where
Ok(())
} else {
log_error!(self.logger, "Persisting initial ChannelMonitor failed, implying the funding outpoint was duplicated");
let channel_id = match funding_msg_opt {
Some(msg) => msg.channel_id,
None => chan.context.channel_id(),
};
return Err(MsgHandleErrInternal::send_err_msg_no_close(
"The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
funding_msg.channel_id));
channel_id));
}
}
}
Expand Down Expand Up @@ -7216,6 +7227,66 @@ where
has_update
}

/// When a call to a [`ChannelSigner`] method returns an error, this indicates that the signer
/// is (temporarily) unavailable, and the operation should be retried later.
///
/// This method allows for that retry - either checking for any signer-pending messages to be
/// attempted in every channel, or in the specifically provided channel.
///
/// [`ChannelSigner`]: crate::sign::ChannelSigner
#[cfg(test)] // This is only implemented for one signer method, and should be private until we
// actually finish implementing it fully.
pub fn signer_unblocked(&self, channel_opt: Option<(PublicKey, ChannelId)>) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);

let unblock_chan = |phase: &mut ChannelPhase<SP>, pending_msg_events: &mut Vec<MessageSendEvent>| {
let node_id = phase.context().get_counterparty_node_id();
if let ChannelPhase::Funded(chan) = phase {
let msgs = chan.signer_maybe_unblocked(&self.logger);
if let Some(updates) = msgs.commitment_update {
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
node_id,
updates,
});
}
if let Some(msg) = msgs.funding_signed {
pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
node_id,
msg,
});
}
if let Some(msg) = msgs.funding_created {
pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
node_id,
msg,
});
}
if let Some(msg) = msgs.channel_ready {
send_channel_ready!(self, pending_msg_events, chan, msg);
}
}
};

let per_peer_state = self.per_peer_state.read().unwrap();
if let Some((counterparty_node_id, channel_id)) = channel_opt {
if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
if let Some(chan) = peer_state.channel_by_id.get_mut(&channel_id) {
unblock_chan(chan, &mut peer_state.pending_msg_events);
}
}
} else {
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
for (_, chan) in peer_state.channel_by_id.iter_mut() {
unblock_chan(chan, &mut peer_state.pending_msg_events);
}
}
}
}

/// Check whether any channels have finished removing all pending updates after a shutdown
/// exchange and can now send a closing_signed.
/// Returns whether any closing_signed messages were generated.
Expand Down
43 changes: 35 additions & 8 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::util::test_utils::{panicking, TestChainMonitor, TestScorer, TestKeysI
use crate::util::errors::APIError;
use crate::util::config::{UserConfig, MaxDustHTLCExposure};
use crate::util::ser::{ReadableArgs, Writeable};
#[cfg(test)]
use crate::util::logger::Logger;

use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::blockdata::transaction::{Transaction, TxOut};
Expand Down Expand Up @@ -436,6 +438,25 @@ impl<'a, 'b, 'c> Node<'a, 'b, 'c> {
pub fn get_block_header(&self, height: u32) -> BlockHeader {
self.blocks.lock().unwrap()[height as usize].0.header
}
/// Changes the channel signer's availability for the specified peer and channel.
///
/// When `available` is set to `true`, the channel signer will behave normally. When set to
/// `false`, the channel signer will act like an off-line remote signer and will return `Err` for
/// several of the signing methods. Currently, only `get_per_commitment_point` and
/// `release_commitment_secret` are affected by this setting.
#[cfg(test)]
pub fn set_channel_signer_available(&self, peer_id: &PublicKey, chan_id: &ChannelId, available: bool) {
let per_peer_state = self.node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(peer_id).unwrap().lock().unwrap();
let signer = (|| {
match chan_lock.channel_by_id.get(chan_id) {
Some(phase) => phase.context().get_signer(),
None => panic!("Couldn't find a channel with id {}", chan_id),
}
})();
log_debug!(self.logger, "Setting channel signer for {} as available={}", chan_id, available);
signer.as_ecdsa().unwrap().set_available(available);
}
}

/// If we need an unsafe pointer to a `Node` (ie to reference it in a thread
Expand Down Expand Up @@ -924,7 +945,8 @@ macro_rules! unwrap_send_err {
pub fn check_added_monitors<CM: AChannelManager, H: NodeHolder<CM=CM>>(node: &H, count: usize) {
if let Some(chain_monitor) = node.chain_monitor() {
let mut added_monitors = chain_monitor.added_monitors.lock().unwrap();
assert_eq!(added_monitors.len(), count);
let n = added_monitors.len();
assert_eq!(n, count, "expected {} monitors to be added, not {}", count, n);
added_monitors.clear();
}
}
Expand Down Expand Up @@ -2119,12 +2141,13 @@ macro_rules! expect_channel_shutdown_state {
}

#[cfg(any(test, ldk_bench, feature = "_test_utils"))]
pub fn expect_channel_pending_event<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, expected_counterparty_node_id: &PublicKey) {
pub fn expect_channel_pending_event<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, expected_counterparty_node_id: &PublicKey) -> ChannelId {
let events = node.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
crate::events::Event::ChannelPending { ref counterparty_node_id, .. } => {
match &events[0] {
crate::events::Event::ChannelPending { channel_id, counterparty_node_id, .. } => {
assert_eq!(*expected_counterparty_node_id, *counterparty_node_id);
*channel_id
},
_ => panic!("Unexpected event"),
}
Expand Down Expand Up @@ -3232,24 +3255,28 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) {
// If a expects a channel_ready, it better not think it has received a revoke_and_ack
// from b
for reestablish in reestablish_1.iter() {
assert_eq!(reestablish.next_remote_commitment_number, 0);
let n = reestablish.next_remote_commitment_number;
assert_eq!(n, 0, "expected a->b next_remote_commitment_number to be 0, got {}", n);
}
}
if send_channel_ready.1 {
// If b expects a channel_ready, it better not think it has received a revoke_and_ack
// from a
for reestablish in reestablish_2.iter() {
assert_eq!(reestablish.next_remote_commitment_number, 0);
let n = reestablish.next_remote_commitment_number;
assert_eq!(n, 0, "expected b->a next_remote_commitment_number to be 0, got {}", n);
}
}
if send_channel_ready.0 || send_channel_ready.1 {
// If we expect any channel_ready's, both sides better have set
// next_holder_commitment_number to 1
for reestablish in reestablish_1.iter() {
assert_eq!(reestablish.next_local_commitment_number, 1);
let n = reestablish.next_local_commitment_number;
assert_eq!(n, 1, "expected a->b next_local_commitment_number to be 1, got {}", n);
}
for reestablish in reestablish_2.iter() {
assert_eq!(reestablish.next_local_commitment_number, 1);
let n = reestablish.next_local_commitment_number;
assert_eq!(n, 1, "expected b->a next_local_commitment_number to be 1, got {}", n);
}
}

Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9045,7 +9045,7 @@ fn test_duplicate_chan_id() {
}
};
check_added_monitors!(nodes[0], 0);
nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created);
nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created.unwrap());
// At this point we'll look up if the channel_id is present and immediately fail the channel
// without trying to persist the `ChannelMonitor`.
check_added_monitors!(nodes[1], 0);
Expand Down
3 changes: 3 additions & 0 deletions lightning/src/ln/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ mod monitor_tests;
#[cfg(test)]
#[allow(unused_mut)]
mod shutdown_tests;
#[cfg(test)]
#[allow(unused_mut)]
mod async_signer_tests;

pub use self::peer_channel_encryptor::LN_MAX_MSG_LEN;

Expand Down
28 changes: 26 additions & 2 deletions lightning/src/util/test_channel_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub struct TestChannelSigner {
/// Channel state used for policy enforcement
pub state: Arc<Mutex<EnforcementState>>,
pub disable_revocation_policy_check: bool,
/// When `true` (the default), the signer will respond immediately with signatures. When `false`,
/// the signer will return an error indicating that it is unavailable.
pub available: Arc<Mutex<bool>>,
}

impl PartialEq for TestChannelSigner {
Expand All @@ -71,7 +74,8 @@ impl TestChannelSigner {
Self {
inner,
state,
disable_revocation_policy_check: false
disable_revocation_policy_check: false,
available: Arc::new(Mutex::new(true)),
}
}

Expand All @@ -84,7 +88,8 @@ impl TestChannelSigner {
Self {
inner,
state,
disable_revocation_policy_check
disable_revocation_policy_check,
available: Arc::new(Mutex::new(true)),
}
}

Expand All @@ -94,6 +99,16 @@ impl TestChannelSigner {
pub fn get_enforcement_state(&self) -> MutexGuard<EnforcementState> {
self.state.lock().unwrap()
}

/// Marks the signer's availability.
///
/// When `true`, methods are forwarded to the underlying signer as normal. When `false`, some
/// methods will return `Err` indicating that the signer is unavailable. Intended to be used for
/// testing asynchronous signing.
#[cfg(test)]
pub fn set_available(&self, available: bool) {
*self.available.lock().unwrap() = available;
}
}

impl ChannelSigner for TestChannelSigner {
Expand Down Expand Up @@ -133,6 +148,9 @@ impl EcdsaChannelSigner for TestChannelSigner {
self.verify_counterparty_commitment_tx(commitment_tx, secp_ctx);

{
if !*self.available.lock().unwrap() {
return Err(());
}
let mut state = self.state.lock().unwrap();
let actual_commitment_number = commitment_tx.commitment_number();
let last_commitment_number = state.last_counterparty_commitment;
Expand All @@ -149,13 +167,19 @@ impl EcdsaChannelSigner for TestChannelSigner {
}

fn validate_counterparty_revocation(&self, idx: u64, _secret: &SecretKey) -> Result<(), ()> {
if !*self.available.lock().unwrap() {
return Err(());
}
let mut state = self.state.lock().unwrap();
assert!(idx == state.last_counterparty_revoked_commitment || idx == state.last_counterparty_revoked_commitment - 1, "expecting to validate the current or next counterparty revocation - trying {}, current {}", idx, state.last_counterparty_revoked_commitment);
state.last_counterparty_revoked_commitment = idx;
Ok(())
}

fn sign_holder_commitment(&self, commitment_tx: &HolderCommitmentTransaction, secp_ctx: &Secp256k1<secp256k1::All>) -> Result<Signature, ()> {
if !*self.available.lock().unwrap() {
return Err(());
}
let trusted_tx = self.verify_holder_commitment_tx(commitment_tx, secp_ctx);
let state = self.state.lock().unwrap();
let commitment_number = trusted_tx.commitment_number();
Expand Down

0 comments on commit 281a0ae

Please sign in to comment.