Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support normal channel operation with async signing #2849

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 175 additions & 15 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use lightning::ln::functional_test_utils::*;
use lightning::offers::invoice::{BlindedPayInfo, UnsignedBolt12Invoice};
use lightning::offers::invoice_request::UnsignedInvoiceRequest;
use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath};
use lightning::util::test_channel_signer::{TestChannelSigner, EnforcementState};
use lightning::util::test_channel_signer::{TestChannelSigner, EnforcementState, ops};
use lightning::util::errors::APIError;
use lightning::util::logger::Logger;
use lightning::util::config::UserConfig;
Expand All @@ -72,6 +72,8 @@ use std::sync::atomic;
use std::io::Cursor;
use bitcoin::bech32::u5;

#[allow(unused)]
const ASYNC_OPS: u32 = ops::GET_PER_COMMITMENT_POINT | ops::RELEASE_COMMITMENT_SECRET | ops::SIGN_COUNTERPARTY_COMMITMENT;
const MAX_FEE: u32 = 10_000;
struct FuzzEstimator {
ret_val: atomic::AtomicU32,
Expand Down Expand Up @@ -297,7 +299,6 @@ impl SignerProvider for KeyProvider {
inner,
state,
disable_revocation_policy_check: false,
available: Arc::new(Mutex::new(true)),
})
}

Expand Down Expand Up @@ -829,7 +830,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == node_id {
for update_add in update_add_htlcs.iter() {
out.locked_write(format!("Delivering update_add_htlc to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_add_htlc to node {} from node {}.\n", idx, $node).as_bytes());
if !$corrupt_forward {
dest.handle_update_add_htlc(&nodes[$node].get_our_node_id(), update_add);
} else {
Expand All @@ -844,19 +845,19 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
}
}
for update_fulfill in update_fulfill_htlcs.iter() {
out.locked_write(format!("Delivering update_fulfill_htlc to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_fulfill_htlc to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_update_fulfill_htlc(&nodes[$node].get_our_node_id(), update_fulfill);
}
for update_fail in update_fail_htlcs.iter() {
out.locked_write(format!("Delivering update_fail_htlc to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_fail_htlc to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_update_fail_htlc(&nodes[$node].get_our_node_id(), update_fail);
}
for update_fail_malformed in update_fail_malformed_htlcs.iter() {
out.locked_write(format!("Delivering update_fail_malformed_htlc to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_fail_malformed_htlc to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_update_fail_malformed_htlc(&nodes[$node].get_our_node_id(), update_fail_malformed);
}
if let Some(msg) = update_fee {
out.locked_write(format!("Delivering update_fee to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering update_fee to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_update_fee(&nodes[$node].get_our_node_id(), &msg);
}
let processed_change = !update_add_htlcs.is_empty() || !update_fulfill_htlcs.is_empty() ||
Expand All @@ -873,7 +874,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
} });
break;
}
out.locked_write(format!("Delivering commitment_signed to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering commitment_signed to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_commitment_signed(&nodes[$node].get_our_node_id(), &commitment_signed);
break;
}
Expand All @@ -882,15 +883,15 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
events::MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == *node_id {
out.locked_write(format!("Delivering revoke_and_ack to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering revoke_and_ack to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_revoke_and_ack(&nodes[$node].get_our_node_id(), msg);
}
}
},
events::MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
for (idx, dest) in nodes.iter().enumerate() {
if dest.get_our_node_id() == *node_id {
out.locked_write(format!("Delivering channel_reestablish to node {}.\n", idx).as_bytes());
out.locked_write(format!("Delivering channel_reestablish to node {} from node {}.\n", idx, $node).as_bytes());
dest.handle_channel_reestablish(&nodes[$node].get_our_node_id(), msg);
}
}
Expand All @@ -913,7 +914,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
_ => if out.may_fail.load(atomic::Ordering::Acquire) {
return;
} else {
panic!("Unhandled message event {:?}", event)
panic!("Unhandled message event on node {}, {:?}", $node, event)
},
}
if $limit_events != ProcessMessages::AllMessages {
Expand Down Expand Up @@ -1289,6 +1290,118 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
},
0x89 => { fee_est_c.ret_val.store(253, atomic::Ordering::Release); nodes[2].maybe_update_chan_fees(); },

#[cfg(async_signing)]
0xa0 => {
let states = keys_manager_a.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_unavailable(ASYNC_OPS);
}
#[cfg(async_signing)]
0xa1 => {
let states = keys_manager_a.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::GET_PER_COMMITMENT_POINT);
nodes[0].signer_unblocked(None);
}
#[cfg(async_signing)]
0xa2 => {
let states = keys_manager_a.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::RELEASE_COMMITMENT_SECRET);
nodes[0].signer_unblocked(None);
}
#[cfg(async_signing)]
0xa3 => {
let states = keys_manager_a.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::SIGN_COUNTERPARTY_COMMITMENT);
nodes[0].signer_unblocked(None);
}

#[cfg(async_signing)]
0xa4 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().next().unwrap().lock().unwrap().set_signer_unavailable(ASYNC_OPS);
}
#[cfg(async_signing)]
0xa5 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::GET_PER_COMMITMENT_POINT);
nodes[1].signer_unblocked(None);
}
#[cfg(async_signing)]
0xa6 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::RELEASE_COMMITMENT_SECRET);
nodes[1].signer_unblocked(None);
}
#[cfg(async_signing)]
0xa7 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::SIGN_COUNTERPARTY_COMMITMENT);
nodes[1].signer_unblocked(None);
}

#[cfg(async_signing)]
0xa8 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().last().unwrap().lock().unwrap().set_signer_unavailable(ASYNC_OPS);
}
#[cfg(async_signing)]
0xa9 => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().last().unwrap().lock().unwrap().set_signer_available(ops::GET_PER_COMMITMENT_POINT);
nodes[1].signer_unblocked(None);
}
#[cfg(async_signing)]
0xaa => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().last().unwrap().lock().unwrap().set_signer_available(ops::RELEASE_COMMITMENT_SECRET);
nodes[1].signer_unblocked(None);
}
#[cfg(async_signing)]
0xab => {
let states = keys_manager_b.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 2);
states.values().last().unwrap().lock().unwrap().set_signer_available(ops::SIGN_COUNTERPARTY_COMMITMENT);
nodes[1].signer_unblocked(None);
}

#[cfg(async_signing)]
0xac => {
let states = keys_manager_c.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_unavailable(ASYNC_OPS);
}
#[cfg(async_signing)]
0xad => {
let states = keys_manager_c.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::GET_PER_COMMITMENT_POINT);
nodes[2].signer_unblocked(None);
}
#[cfg(async_signing)]
0xae => {
let states = keys_manager_c.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::RELEASE_COMMITMENT_SECRET);
nodes[2].signer_unblocked(None);
}
#[cfg(async_signing)]
0xaf => {
let states = keys_manager_c.enforcement_states.lock().unwrap();
assert_eq!(states.len(), 1);
states.values().next().unwrap().lock().unwrap().set_signer_available(ops::SIGN_COUNTERPARTY_COMMITMENT);
nodes[2].signer_unblocked(None);
}

0xf0 => {
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
if let Some(id) = pending_updates.get(0) {
Expand Down Expand Up @@ -1382,10 +1495,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
// after we resolve all pending events.
// First make sure there are no pending monitor updates, resetting the error state
// and calling force_channel_monitor_updated for each monitor.
*monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;

out.locked_write(b"Restoring monitors...\n");
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[0].process_monitor_events();
Expand All @@ -1404,7 +1514,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
}

// Next, make sure peers are all connected to each other
out.locked_write(b"Reconnecting peers...\n");

if chan_a_disconnected {
out.locked_write(b"Reconnecting node 0 and node 1...\n");
nodes[0].peer_connected(&nodes[1].get_our_node_id(), &Init {
features: nodes[1].init_features(), networks: None, remote_network_address: None
}, true).unwrap();
Expand All @@ -1414,6 +1527,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
chan_a_disconnected = false;
}
if chan_b_disconnected {
out.locked_write(b"Reconnecting node 1 and node 2...\n");
nodes[1].peer_connected(&nodes[2].get_our_node_id(), &Init {
features: nodes[2].init_features(), networks: None, remote_network_address: None
}, true).unwrap();
Expand All @@ -1423,8 +1537,33 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
chan_b_disconnected = false;
}

out.locked_write(b"Restoring signers...\n");

*monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;

#[cfg(async_signing)]
{
for state in keys_manager_a.enforcement_states.lock().unwrap().values() {
state.lock().unwrap().set_signer_available(!0);
}
for state in keys_manager_b.enforcement_states.lock().unwrap().values() {
state.lock().unwrap().set_signer_available(!0);
}
for state in keys_manager_c.enforcement_states.lock().unwrap().values() {
state.lock().unwrap().set_signer_available(!0);
}
nodes[0].signer_unblocked(None);
nodes[1].signer_unblocked(None);
nodes[2].signer_unblocked(None);
}

out.locked_write(b"Running event queues to quiescence...\n");

for i in 0..std::usize::MAX {
if i == 100 { panic!("It may take may iterations to settle the state, but it should not take forever"); }

// Then, make sure any current forwards make their way to their destination
if process_msg_events!(0, false, ProcessMessages::AllMessages) { continue; }
if process_msg_events!(1, false, ProcessMessages::AllMessages) { continue; }
Expand All @@ -1437,13 +1576,34 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
break;
}

out.locked_write(b"All channels restored to normal operation.\n");

// Finally, make sure that at least one end of each channel can make a substantial payment
assert!(
send_payment(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut payment_id, &mut payment_idx) ||
send_payment(&nodes[1], &nodes[0], chan_a, 10_000_000, &mut payment_id, &mut payment_idx));
out.locked_write(b"Successfully sent a payment between node 0 and node 1.\n");

assert!(
send_payment(&nodes[1], &nodes[2], chan_b, 10_000_000, &mut payment_id, &mut payment_idx) ||
send_payment(&nodes[2], &nodes[1], chan_b, 10_000_000, &mut payment_id, &mut payment_idx));
out.locked_write(b"Successfully sent a payment between node 1 and node 2.\n");

out.locked_write(b"Flushing pending messages.\n");
for i in 0..std::usize::MAX {
if i == 100 { panic!("It may take may iterations to settle the state, but it should not take forever"); }

// Then, make sure any current forwards make their way to their destination
if process_msg_events!(0, false, ProcessMessages::AllMessages) { continue; }
if process_msg_events!(1, false, ProcessMessages::AllMessages) { continue; }
if process_msg_events!(2, false, ProcessMessages::AllMessages) { continue; }
// ...making sure any pending PendingHTLCsForwardable events are handled and
// payments claimed.
if process_events!(0, false) { continue; }
if process_events!(1, false) { continue; }
if process_events!(2, false) { continue; }
break;
}

last_htlc_clear_fee_a = fee_est_a.ret_val.load(atomic::Ordering::Acquire);
last_htlc_clear_fee_b = fee_est_b.ret_val.load(atomic::Ordering::Acquire);
Expand Down
4 changes: 1 addition & 3 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2944,9 +2944,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
},
commitment_txid: htlc.commitment_txid,
per_commitment_number: htlc.per_commitment_number,
per_commitment_point: self.onchain_tx_handler.signer.get_per_commitment_point(
htlc.per_commitment_number, &self.onchain_tx_handler.secp_ctx,
),
per_commitment_point: htlc.per_commitment_point,
feerate_per_kw: 0,
htlc: htlc.htlc,
preimage: htlc.preimage,
Expand Down
3 changes: 3 additions & 0 deletions lightning/src/chain/onchaintx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub(crate) struct ExternalHTLCClaim {
pub(crate) htlc: HTLCOutputInCommitment,
pub(crate) preimage: Option<PaymentPreimage>,
pub(crate) counterparty_sig: Signature,
pub(crate) per_commitment_point: bitcoin::secp256k1::PublicKey,
}

// Represents the different types of claims for which events are yielded externally to satisfy said
Expand Down Expand Up @@ -1177,9 +1178,11 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
})
.map(|(htlc_idx, htlc)| {
let counterparty_htlc_sig = holder_commitment.counterparty_htlc_sigs[htlc_idx];

ExternalHTLCClaim {
commitment_txid: trusted_tx.txid(),
per_commitment_number: trusted_tx.commitment_number(),
per_commitment_point: trusted_tx.per_commitment_point(),
htlc: htlc.clone(),
preimage: *preimage,
counterparty_sig: counterparty_htlc_sig,
Expand Down
Loading
Loading