Skip to content

Commit

Permalink
Support normal channel operation with async signing
Browse files Browse the repository at this point in the history
This is a do-over of #2653, wherein we support asynchronous signing for
'normal' channel operation. This involves allowing the following
`ChannelSigner` methods to return an `Err` result, indicating that the
requested value is not available:

- get_per_commitment_point
- release_commitment_secret
- sign_counterparty_commitment

When the value does become available, channel operation can be resumed by
invoking `signer_unblocked`.

Note that this adds the current and next per-commitment point to the state
that is persisted by the channel monitor.
  • Loading branch information
waterson committed Jan 27, 2024
1 parent 51d9ee3 commit 1bea55f
Show file tree
Hide file tree
Showing 10 changed files with 1,777 additions and 267 deletions.
190 changes: 175 additions & 15 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use lightning::ln::functional_test_utils::*;
use lightning::offers::invoice::{BlindedPayInfo, UnsignedBolt12Invoice};
use lightning::offers::invoice_request::UnsignedInvoiceRequest;
use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath};
use lightning::util::test_channel_signer::{TestChannelSigner, EnforcementState};
use lightning::util::test_channel_signer::{TestChannelSigner, EnforcementState, ops};
use lightning::util::errors::APIError;
use lightning::util::logger::Logger;
use lightning::util::config::UserConfig;
Expand All @@ -72,6 +72,8 @@ use std::sync::atomic;
use std::io::Cursor;
use bitcoin::bech32::u5;

#[allow(unused)]
const ASYNC_OPS: u32 = ops::GET_PER_COMMITMENT_POINT | ops::RELEASE_COMMITMENT_SECRET | ops::SIGN_COUNTERPARTY_COMMITMENT;
const MAX_FEE: u32 = 10_000;
struct FuzzEstimator {
ret_val: atomic::AtomicU32,
Expand Down Expand Up @@ -297,7 +299,6 @@ impl SignerProvider for KeyProvider {
inner,
state,
disable_revocation_policy_check: false,
available: Arc::new(Mutex::new(true)),
})
}

Expand Down Expand Up @@ -829,7 +830,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == node_id {
for update_add in update_add_htlcs.iter() {
out.locked_write(format!("Delivering update_add_htlc to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_add_htlc to node {} from node {}.\n", idx, $node).as_bytes());
if !$corrupt_forward {
dest.handle_update_add_htlc(&nodes[$node].get_our_node_id(), update_add);
} else {
Expand All @@ -844,19 +845,19 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
}
}
for update_fulfill in update_fulfill_htlcs.iter() {
out.locked_write(format!("Delivering update_fulfill_htlc to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_fulfill_htlc to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_update_fulfill_htlc(&nodes[$node].get_our_node_id(), update_fulfill);
}
for update_fail in update_fail_htlcs.iter() {
out.locked_write(format!("Delivering update_fail_htlc to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_fail_htlc to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_update_fail_htlc(&nodes[$node].get_our_node_id(), update_fail);
}
for update_fail_malformed in update_fail_malformed_htlcs.iter() {
out.locked_write(format!("Delivering update_fail_malformed_htlc to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_fail_malformed_htlc to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_update_fail_malformed_htlc(&nodes[$node].get_our_node_id(), update_fail_malformed);
}
if let Some(msg) = update_fee {
out.locked_write(format!("Delivering update_fee to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_fee to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_update_fee(&nodes[$node].get_our_node_id(), &msg);
}
let processed_change = !update_add_htlcs.is_empty() || !update_fulfill_htlcs.is_empty() ||
Expand All @@ -873,7 +874,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
} });
break;
}
out.locked_write(format!("Delivering commitment_signed to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering commitment_signed to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_commitment_signed(&nodes[$node].get_our_node_id(), &commitment_signed);
break;
}
Expand All @@ -882,15 +883,15 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
events::MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == *node_id {
out.locked_write(format!("Delivering revoke_and_ack to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering revoke_and_ack to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_revoke_and_ack(&nodes[$node].get_our_node_id(), msg);
}
}
},
events::MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == *node_id {
out.locked_write(format!("Delivering channel_reestablish to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering channel_reestablish to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_channel_reestablish(&nodes[$node].get_our_node_id(), msg);
}
}
Expand All @@ -913,7 +914,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
_ => if out.may_fail.load(atomic::Ordering::Acquire) {
return;
} else {
panic!("Unhandled message event {:?}", event)
panic!("Unhandled message event on node {}, {:?}", $node, event)
},
}
if $limit_events != ProcessMessages::AllMessages {
Expand Down Expand Up @@ -1289,6 +1290,118 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
},
0x89 => { fee_est_c.ret_val.store(253, atomic::Ordering::Release); nodes[2].maybe_update_chan_fees(); },

#[cfg(async_signing)]
0xa0 => {
let states = keys_manager_a.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_unavailable(ASYNC_OPS);
}
#[cfg(async_signing)]
0xa1 => {
let states = keys_manager_a.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::GET_PER_COMMITMENT_POINT);
nodes[0].signer_unblocked(None);
}
#[cfg(async_signing)]
0xa2 => {
let states = keys_manager_a.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::RELEASE_COMMITMENT_SECRET);
nodes[0].signer_unblocked(None);
}
#[cfg(async_signing)]
0xa3 => {
let states = keys_manager_a.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::SIGN_COUNTERPARTY_COMMITMENT);
nodes[0].signer_unblocked(None);
}

#[cfg(async_signing)]
0xa4 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().next().unwrap().lock().unwrap().set_signer_unavailable(ASYNC_OPS);
}
#[cfg(async_signing)]
0xa5 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::GET_PER_COMMITMENT_POINT);
nodes[1].signer_unblocked(None);
}
#[cfg(async_signing)]
0xa6 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::RELEASE_COMMITMENT_SECRET);
nodes[1].signer_unblocked(None);
}
#[cfg(async_signing)]
0xa7 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::SIGN_COUNTERPARTY_COMMITMENT);
nodes[1].signer_unblocked(None);
}

#[cfg(async_signing)]
0xa8 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().last().unwrap().lock().unwrap().set_signer_unavailable(ASYNC_OPS);
}
#[cfg(async_signing)]
0xa9 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().last().unwrap().lock().unwrap().set_signer_available(ops::GET_PER_COMMITMENT_POINT);
nodes[1].signer_unblocked(None);
}
#[cfg(async_signing)]
0xaa => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().last().unwrap().lock().unwrap().set_signer_available(ops::RELEASE_COMMITMENT_SECRET);
nodes[1].signer_unblocked(None);
}
#[cfg(async_signing)]
0xab => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().last().unwrap().lock().unwrap().set_signer_available(ops::SIGN_COUNTERPARTY_COMMITMENT);
nodes[1].signer_unblocked(None);
}

#[cfg(async_signing)]
0xac => {
let states = keys_manager_c.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_unavailable(ASYNC_OPS);
}
#[cfg(async_signing)]
0xad => {
let states = keys_manager_c.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::GET_PER_COMMITMENT_POINT);
nodes[2].signer_unblocked(None);
}
#[cfg(async_signing)]
0xae => {
let states = keys_manager_c.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::RELEASE_COMMITMENT_SECRET);
nodes[2].signer_unblocked(None);
}
#[cfg(async_signing)]
0xaf => {
let states = keys_manager_c.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::SIGN_COUNTERPARTY_COMMITMENT);
nodes[2].signer_unblocked(None);
}

0xf0 => {
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
if let Some(id) = pending_updates.get(0) {
Expand Down Expand Up @@ -1382,10 +1495,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
// after we resolve all pending events.
// First make sure there are no pending monitor updates, resetting the error state
// and calling force_channel_monitor_updated for each monitor.
*monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;

out.locked_write(b"Restoring monitors...\n");
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[0].process_monitor_events();
Expand All @@ -1404,7 +1514,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
}

// Next, make sure peers are all connected to each other
out.locked_write(b"Reconnecting peers...\n");

if chan_a_disconnected {
out.locked_write(b"Reconnecting node 0 and node 1...\n");
nodes[0].peer_connected(&nodes[1].get_our_node_id(), &Init {
features: nodes[1].init_features(), networks: None, remote_network_address: None
}, true).unwrap();
Expand All @@ -1414,6 +1527,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
chan_a_disconnected = false;
}
if chan_b_disconnected {
out.locked_write(b"Reconnecting node 1 and node 2...\n");
nodes[1].peer_connected(&nodes[2].get_our_node_id(), &Init {
features: nodes[2].init_features(), networks: None, remote_network_address: None
}, true).unwrap();
Expand All @@ -1423,8 +1537,33 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
chan_b_disconnected = false;
}

out.locked_write(b"Restoring signers...\n");

*monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;

#[cfg(async_signing)]
{
for state in keys_manager_a.enforcement_states.lock().unwrap().values() {
state.lock().unwrap().set_signer_available(!0);
}
for state in keys_manager_b.enforcement_states.lock().unwrap().values() {
state.lock().unwrap().set_signer_available(!0);
}
for state in keys_manager_c.enforcement_states.lock().unwrap().values() {
state.lock().unwrap().set_signer_available(!0);
}
nodes[0].signer_unblocked(None);
nodes[1].signer_unblocked(None);
nodes[2].signer_unblocked(None);
}

out.locked_write(b"Running event queues to quiescence...\n");

for i in 0..std::usize::MAX {
if i == 100 { panic!("It may take may iterations to settle the state, but it should not take forever"); }

// Then, make sure any current forwards make their way to their destination
if process_msg_events!(0, false, ProcessMessages::AllMessages) { continue; }
if process_msg_events!(1, false, ProcessMessages::AllMessages) { continue; }
Expand All @@ -1437,13 +1576,34 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
break;
}

out.locked_write(b"All channels restored to normal operation.\n");

// Finally, make sure that at least one end of each channel can make a substantial payment
assert!(
send_payment(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut payment_id, &mut payment_idx) ||
send_payment(&nodes[1], &nodes[0], chan_a, 10_000_000, &mut payment_id, &mut payment_idx));
out.locked_write(b"Successfully sent a payment between node 0 and node 1.\n");

assert!(
send_payment(&nodes[1], &nodes[2], chan_b, 10_000_000, &mut payment_id, &mut payment_idx) ||
send_payment(&nodes[2], &nodes[1], chan_b, 10_000_000, &mut payment_id, &mut payment_idx));
out.locked_write(b"Successfully sent a payment between node 1 and node 2.\n");

out.locked_write(b"Flushing pending messages.\n");
for i in 0..std::usize::MAX {
if i == 100 { panic!("It may take may iterations to settle the state, but it should not take forever"); }

// Then, make sure any current forwards make their way to their destination
if process_msg_events!(0, false, ProcessMessages::AllMessages) { continue; }
if process_msg_events!(1, false, ProcessMessages::AllMessages) { continue; }
if process_msg_events!(2, false, ProcessMessages::AllMessages) { continue; }
// ...making sure any pending PendingHTLCsForwardable events are handled and
// payments claimed.
if process_events!(0, false) { continue; }
if process_events!(1, false) { continue; }
if process_events!(2, false) { continue; }
break;
}

last_htlc_clear_fee_a = fee_est_a.ret_val.load(atomic::Ordering::Acquire);
last_htlc_clear_fee_b = fee_est_b.ret_val.load(atomic::Ordering::Acquire);
Expand Down
4 changes: 1 addition & 3 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2944,9 +2944,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
},
commitment_txid: htlc.commitment_txid,
per_commitment_number: htlc.per_commitment_number,
per_commitment_point: self.onchain_tx_handler.signer.get_per_commitment_point(
htlc.per_commitment_number, &self.onchain_tx_handler.secp_ctx,
),
per_commitment_point: htlc.per_commitment_point,
feerate_per_kw: 0,
htlc: htlc.htlc,
preimage: htlc.preimage,
Expand Down
3 changes: 3 additions & 0 deletions lightning/src/chain/onchaintx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub(crate) struct ExternalHTLCClaim {
pub(crate) htlc: HTLCOutputInCommitment,
pub(crate) preimage: Option<PaymentPreimage>,
pub(crate) counterparty_sig: Signature,
pub(crate) per_commitment_point: bitcoin::secp256k1::PublicKey,
}

// Represents the different types of claims for which events are yielded externally to satisfy said
Expand Down Expand Up @@ -1177,9 +1178,11 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
})
.map(|(htlc_idx, htlc)| {
let counterparty_htlc_sig = holder_commitment.counterparty_htlc_sigs[htlc_idx];

ExternalHTLCClaim {
commitment_txid: trusted_tx.txid(),
per_commitment_number: trusted_tx.commitment_number(),
per_commitment_point: trusted_tx.per_commitment_point(),
htlc: htlc.clone(),
preimage: *preimage,
counterparty_sig: counterparty_htlc_sig,
Expand Down
Loading

0 comments on commit 1bea55f

Please sign in to comment.