diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index dc67b198149..fcc1f8f5a64 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -3294,10 +3294,6 @@ fn do_test_durable_preimages_on_closed_channel(close_chans_before_reload: bool, if !close_chans_before_reload { check_closed_broadcast(&nodes[1], 1, true); check_closed_event(&nodes[1], 1, ClosureReason::CommitmentTxConfirmed, false, &[nodes[0].node.get_our_node_id()], 100000); - } else { - // While we forwarded the payment a while ago, we don't want to process events too early or - // we'll run background tasks we wanted to test individually. - expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, !close_only_a); } mine_transactions(&nodes[0], &[&as_closing_tx[0], bs_preimage_tx]); @@ -3308,24 +3304,33 @@ fn do_test_durable_preimages_on_closed_channel(close_chans_before_reload: bool, // Make sure the B<->C channel is still alive and well by sending a payment over it. let mut reconnect_args = ReconnectArgs::new(&nodes[1], &nodes[2]); reconnect_args.pending_responding_commitment_signed.1 = true; - if !close_chans_before_reload { - // TODO: If the A<->B channel was closed before we reloaded, the `ChannelManager` - // will consider the forwarded payment complete and allow the B<->C - // `ChannelMonitorUpdate` to complete, wiping the payment preimage. This should not - // be allowed, and needs fixing. - reconnect_args.pending_responding_commitment_signed_dup_monitor.1 = true; - } + // The B<->C `ChannelMonitorUpdate` shouldn't be allowed to complete, which is the + // equivalent to the responding `commitment_signed` being a duplicate for node B, thus we + // need to set the `pending_responding_commitment_signed_dup` flag. + reconnect_args.pending_responding_commitment_signed_dup_monitor.1 = true; reconnect_args.pending_raa.1 = true; reconnect_nodes(reconnect_args); + + // Once the blocked `ChannelMonitorUpdate` *finally* completes, the pending + // `PaymentForwarded` event will finally be released. let (outpoint, ab_update_id, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id_ab).unwrap().clone(); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, ab_update_id); - expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), true, false); - if !close_chans_before_reload { - // Once we call `process_pending_events` the final `ChannelMonitor` for the B<->C - // channel will fly, removing the payment preimage from it. - check_added_monitors(&nodes[1], 1); + + // If the A<->B channel was closed before we reload, we'll replay the claim against it on + // reload, causing the `PaymentForwarded` event to get replayed. + let evs = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(evs.len(), if close_chans_before_reload { 2 } else { 1 }); + for ev in evs { + if let Event::PaymentForwarded { .. } = ev { } + else { + panic!(); + } } + + // Once we call `process_pending_events` the final `ChannelMonitor` for the B<->C channel + // will fly, removing the payment preimage from it. + check_added_monitors(&nodes[1], 1); assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); send_payment(&nodes[1], &[&nodes[2]], 100_000); } @@ -3713,3 +3718,111 @@ fn test_partial_claim_mon_update_compl_actions() { send_payment(&nodes[2], &[&nodes[3]], 100_000); assert!(!get_monitor!(nodes[3], chan_4_id).get_stored_preimages().contains_key(&payment_hash)); } + + +#[test] +fn test_claim_to_closed_channel_blocks_forwarded_preimage_removal() { + // One of the last features for async persistence we implemented was the correct blocking of + // RAA(s) which remove a preimage from an outbound channel for a forwarded payment until the + // preimage write makes it durably to the closed inbound channel. + // This tests that behavior. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + // First open channels, route a payment, and force-close the first hop. + let chan_a = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000); + let chan_b = create_announced_chan_between_nodes_with_value(&nodes, 1, 2, 1_000_000, 500_000_000); + + let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1_000_000); + + nodes[0].node.force_close_broadcasting_latest_txn(&chan_a.2, &nodes[1].node.get_our_node_id(), String::new()).unwrap(); + check_added_monitors!(nodes[0], 1); + let a_reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }; + check_closed_event!(nodes[0], 1, a_reason, [nodes[1].node.get_our_node_id()], 1000000); + check_closed_broadcast!(nodes[0], true); + + let as_commit_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + assert_eq!(as_commit_tx.len(), 1); + + mine_transaction(&nodes[1], &as_commit_tx[0]); + check_added_monitors!(nodes[1], 1); + check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed, [nodes[0].node.get_our_node_id()], 1000000); + check_closed_broadcast!(nodes[1], true); + + // Now that B has a pending forwarded payment across it with the inbound edge on-chain, claim + // the payment on C and give B the preimage for it. + nodes[2].node.claim_funds(payment_preimage); + check_added_monitors!(nodes[2], 1); + expect_payment_claimed!(nodes[2], payment_hash, 1_000_000); + + let updates = get_htlc_update_msgs!(nodes[2], nodes[1].node.get_our_node_id()); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + nodes[1].node.handle_update_fulfill_htlc(nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); + check_added_monitors!(nodes[1], 1); + commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false); + + // At this point nodes[1] has the preimage and is waiting for the `ChannelMonitorUpdate` for + // channel A to hit disk. Until it does so, it shouldn't ever let the preimage dissapear from + // channel B's `ChannelMonitor` + assert!(get_monitor!(nodes[1], chan_b.2).get_all_current_outbound_htlcs().iter().any(|(_, (_, preimage))| *preimage == Some(payment_preimage))); + + // Once we complete the `ChannelMonitorUpdate` on channel A, and the `ChannelManager` processes + // background events (via `get_and_clear_pending_msg_events`), the final `ChannelMonitorUpdate` + // will fly and we'll drop the preimage from channel B's `ChannelMonitor`. We'll also release + // the `Event::PaymentForwarded`. + check_added_monitors!(nodes[1], 0); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + + nodes[1].chain_monitor.complete_sole_pending_chan_update(&chan_a.2); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + check_added_monitors!(nodes[1], 1); + assert!(!get_monitor!(nodes[1], chan_b.2).get_all_current_outbound_htlcs().iter().any(|(_, (_, preimage))| *preimage == Some(payment_preimage))); + expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, true, false); +} + +#[test] +fn test_claim_to_closed_channel_blocks_claimed_event() { + // One of the last features for async persistence we implemented was the correct blocking of + // event(s) until the preimage for a claimed HTLC is durably on disk in a ChannelMonitor for a + // closed channel. + // This tests that behavior. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + // First open channels, route a payment, and force-close the first hop. + let chan_a = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000); + + let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); + + nodes[0].node.force_close_broadcasting_latest_txn(&chan_a.2, &nodes[1].node.get_our_node_id(), String::new()).unwrap(); + check_added_monitors!(nodes[0], 1); + let a_reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }; + check_closed_event!(nodes[0], 1, a_reason, [nodes[1].node.get_our_node_id()], 1000000); + check_closed_broadcast!(nodes[0], true); + + let as_commit_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + assert_eq!(as_commit_tx.len(), 1); + + mine_transaction(&nodes[1], &as_commit_tx[0]); + check_added_monitors!(nodes[1], 1); + check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed, [nodes[0].node.get_our_node_id()], 1000000); + check_closed_broadcast!(nodes[1], true); + + // Now that B has a pending payment with the inbound HTLC on a closed channel, claim the + // payment on disk, but don't let the `ChannelMonitorUpdate` complete. This should prevent the + // `Event::PaymentClaimed` from being generated. + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + nodes[1].node.claim_funds(payment_preimage); + check_added_monitors!(nodes[1], 1); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + + // Once we complete the `ChannelMonitorUpdate` the `Event::PaymentClaimed` will become + // available. + nodes[1].chain_monitor.complete_sole_pending_chan_update(&chan_a.2); + expect_payment_claimed!(nodes[1], payment_hash, 1_000_000); +} diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index b312e0055ee..d6076cc5b3d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1185,7 +1185,7 @@ impl From<&MPPClaimHTLCSource> for HTLCClaimSource { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] /// The source of an HTLC which is being claimed as a part of an incoming payment. Each part is /// tracked in [`PendingMPPClaim`] as well as in [`ChannelMonitor`]s, so that it can be converted /// to an [`HTLCClaimSource`] for claim replays on startup. @@ -1338,11 +1338,12 @@ pub(super) struct PeerState where SP::Target: SignerProvider { /// entry here to note that the channel with the key's ID is blocked on a set of actions. actions_blocking_raa_monitor_updates: BTreeMap>, /// The latest [`ChannelMonitor::get_latest_update_id`] value for all closed channels as they - /// exist on-disk/in our [`chain::Watch`]. This *ignores* all pending updates not yet applied - /// in [`ChannelManager::pending_background_events`]. + /// exist on-disk/in our [`chain::Watch`]. /// /// If there are any updates pending in [`Self::in_flight_monitor_updates`] this will contain - /// the highest `update_id` of all the pending in-flight updates. + /// the highest `update_id` of all the pending in-flight updates (note that any pending updates + /// not yet applied sitting in [`ChannelManager::pending_background_events`] will also be + /// considered as they are also in [`Self::in_flight_monitor_updates`]). closed_channel_monitor_update_ids: BTreeMap, /// The peer is currently connected (i.e. we've seen a /// [`ChannelMessageHandler::peer_connected`] and no corresponding @@ -2972,55 +2973,16 @@ macro_rules! handle_error { /// [`ChannelMonitor`]/channel funding transaction) to begin with. macro_rules! locked_close_channel { ($self: ident, $peer_state: expr, $channel_context: expr, $shutdown_res_mut: expr) => {{ - if let Some((counterparty_node_id, funding_txo, channel_id, update)) = $shutdown_res_mut.monitor_update.take() { - if $self.background_events_processed_since_startup.load(Ordering::Acquire) { - handle_new_monitor_update!($self, funding_txo, update, $peer_state, - $channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER); - } else { - // We want to track the in-flight update both in `in_flight_monitor_updates` and in - // `pending_background_events` to avoid a race condition during - // `pending_background_events` processing where we complete one - // `ChannelMonitorUpdate` (but there are more pending as background events) but we - // conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to - // run post-completion actions. We could work around that with some effort, but its - // simpler to just track updates twice. - let in_flight_updates = $peer_state.in_flight_monitor_updates.entry(funding_txo) - .or_insert_with(Vec::new); - if !in_flight_updates.contains(&update) { - in_flight_updates.push(update.clone()); - } - let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo, - channel_id, - update, - }; - $self.pending_background_events.lock().unwrap().push(event); - } + if let Some((_, funding_txo, _, update)) = $shutdown_res_mut.monitor_update.take() { + handle_new_monitor_update!($self, funding_txo, update, $peer_state, + $channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER); } // If there's a possibility that we need to generate further monitor updates for this // channel, we need to store the last update_id of it. However, we don't want to insert // into the map (which prevents the `PeerState` from being cleaned up) for channels that // never even got confirmations (which would open us up to DoS attacks). - let mut update_id = $channel_context.get_latest_monitor_update_id(); + let update_id = $channel_context.get_latest_monitor_update_id(); if $channel_context.get_funding_tx_confirmation_height().is_some() || $channel_context.minimum_depth() == Some(0) || update_id > 1 { - // There may be some pending background events which we have to ignore when setting the - // latest update ID. - for event in $self.pending_background_events.lock().unwrap().iter() { - match event { - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, channel_id, update, .. } => { - if *channel_id == $channel_context.channel_id() && *counterparty_node_id == $channel_context.get_counterparty_node_id() { - update_id = cmp::min(update_id, update.update_id - 1); - } - }, - BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup(..) => { - // This is only generated for very old channels which were already closed - // on startup, so it should never be present for a channel that is closing - // here. - }, - BackgroundEvent::MonitorUpdatesComplete { .. } => {}, - } - } let chan_id = $channel_context.channel_id(); $peer_state.closed_channel_monitor_update_ids.insert(chan_id, update_id); } @@ -3325,8 +3287,8 @@ macro_rules! handle_new_monitor_update { }; ( $self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $logger: expr, - $chan_id: expr, $in_flight_updates: ident, $update_idx: ident, _internal_outer, - $completed: expr + $chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident, + _internal_outer, $completed: expr ) => { { $in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo) .or_insert_with(Vec::new); @@ -3338,8 +3300,30 @@ macro_rules! handle_new_monitor_update { $in_flight_updates.push($update); $in_flight_updates.len() - 1 }); - let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]); - handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed) + if $self.background_events_processed_since_startup.load(Ordering::Acquire) { + let update_res = $self.chain_monitor.update_channel($funding_txo, &$in_flight_updates[$update_idx]); + handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed) + } else { + // We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we + // fail to persist it. This is a fairly safe assumption, however, since anything we do + // during the startup sequence should be replayed exactly if we immediately crash. + let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: $counterparty_node_id, + funding_txo: $funding_txo, + channel_id: $chan_id, + update: $in_flight_updates[$update_idx].clone(), + }; + // We want to track the in-flight update both in `in_flight_monitor_updates` and in + // `pending_background_events` to avoid a race condition during + // `pending_background_events` processing where we complete one + // `ChannelMonitorUpdate` (but there are more pending as background events) but we + // conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to + // run post-completion actions. + // We could work around that with some effort, but its simpler to just track updates + // twice. + $self.pending_background_events.lock().unwrap().push(event); + false + } } }; ( $self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr, @@ -3347,12 +3331,35 @@ macro_rules! handle_new_monitor_update { ) => { { let logger = WithChannelContext::from(&$self.logger, &$chan_context, None); let chan_id = $chan_context.channel_id(); + let counterparty_node_id = $chan_context.get_counterparty_node_id(); let in_flight_updates; let idx; handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id, - in_flight_updates, idx, _internal_outer, + counterparty_node_id, in_flight_updates, idx, _internal_outer, + { + let _ = in_flight_updates.remove(idx); + }) + } }; + ( + $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, + $per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr, POST_CHANNEL_CLOSE + ) => { { + let logger = WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None); + let in_flight_updates; + let idx; + handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, + $channel_id, $counterparty_node_id, in_flight_updates, idx, _internal_outer, { let _ = in_flight_updates.remove(idx); + if in_flight_updates.is_empty() { + let update_actions = $peer_state.monitor_update_blocked_actions + .remove(&$channel_id).unwrap_or(Vec::new()); + + mem::drop($peer_state_lock); + mem::drop($per_peer_state_lock); + + $self.handle_monitor_update_completion_actions(update_actions); + } }) } }; ( @@ -3361,10 +3368,11 @@ macro_rules! handle_new_monitor_update { ) => { { let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); let chan_id = $chan.context.channel_id(); + let counterparty_node_id = $chan.context.get_counterparty_node_id(); let in_flight_updates; let idx; handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id, - in_flight_updates, idx, _internal_outer, + counterparty_node_id, in_flight_updates, idx, _internal_outer, { let _ = in_flight_updates.remove(idx); if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { @@ -3939,16 +3947,13 @@ where self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script) } - /// Ensures any saved latest ID in [`PeerState::closed_channel_monitor_update_ids`] is updated, - /// then applies the provided [`ChannelMonitorUpdate`]. - #[must_use] + /// Applies a [`ChannelMonitorUpdate`] which may or may not be for a channel which is closed. fn apply_post_close_monitor_update( &self, counterparty_node_id: PublicKey, channel_id: ChannelId, funding_txo: OutPoint, - mut monitor_update: ChannelMonitorUpdate, - ) -> ChannelMonitorUpdateStatus { + monitor_update: ChannelMonitorUpdate, + ) { // Note that there may be some post-close updates which need to be well-ordered with - // respect to the `update_id`, so we hold the `closed_channel_monitor_update_ids` lock - // here (and also make sure the `monitor_update` we're applying has the right id. + // respect to the `update_id`, so we hold the `peer_state` lock here. let per_peer_state = self.per_peer_state.read().unwrap(); let mut peer_state_lock = per_peer_state.get(&counterparty_node_id) .expect("We must always have a peer entry for a peer with which we have channels that have ChannelMonitors") @@ -3957,44 +3962,20 @@ where match peer_state.channel_by_id.entry(channel_id) { hash_map::Entry::Occupied(mut chan_phase) => { if let ChannelPhase::Funded(chan) = chan_phase.get_mut() { - let completed = handle_new_monitor_update!(self, funding_txo, + handle_new_monitor_update!(self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, chan); - return if completed { ChannelMonitorUpdateStatus::Completed } else { ChannelMonitorUpdateStatus::InProgress }; + return; } else { debug_assert!(false, "We shouldn't have an update for a non-funded channel"); } }, hash_map::Entry::Vacant(_) => {}, } - match peer_state.closed_channel_monitor_update_ids.entry(channel_id) { - btree_map::Entry::Vacant(entry) => { - let is_closing_unupdated_monitor = monitor_update.update_id == 1 - && monitor_update.updates.len() == 1 - && matches!(&monitor_update.updates[0], ChannelMonitorUpdateStep::ChannelForceClosed { .. }); - // If the ChannelMonitorUpdate is closing a channel that never got past initial - // funding (to have any commitment updates), we'll skip inserting in - // `locked_close_channel`, allowing us to avoid keeping around the PeerState for - // that peer. In that specific case we expect no entry in the map here. In any - // other cases, this is a bug, but in production we go ahead and recover by - // inserting the update_id and hoping its right. - debug_assert!(is_closing_unupdated_monitor, "Expected closing monitor against an unused channel, got {:?}", monitor_update); - if !is_closing_unupdated_monitor { - entry.insert(monitor_update.update_id); - } - }, - btree_map::Entry::Occupied(entry) => { - // If we're running in a threaded environment its possible we generate updates for - // a channel that is closing, then apply some preimage update, then go back and - // apply the close monitor update here. In order to ensure the updates are still - // well-ordered, we have to use the `closed_channel_monitor_update_ids` map to - // override the `update_id`, taking care to handle old monitors where the - // `latest_update_id` is already `u64::MAX`. - let latest_update_id = entry.into_mut(); - *latest_update_id = latest_update_id.saturating_add(1); - monitor_update.update_id = *latest_update_id; - } - } - self.chain_monitor.update_channel(funding_txo, &monitor_update) + + handle_new_monitor_update!( + self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, + counterparty_node_id, channel_id, POST_CHANNEL_CLOSE + ); } /// When a channel is removed, two things need to happen: @@ -4023,7 +4004,7 @@ where } if let Some((_, funding_txo, _channel_id, monitor_update)) = shutdown_res.monitor_update { debug_assert!(false, "This should have been handled in `locked_close_channel`"); - let _ = self.apply_post_close_monitor_update(shutdown_res.counterparty_node_id, shutdown_res.channel_id, funding_txo, monitor_update); + self.apply_post_close_monitor_update(shutdown_res.counterparty_node_id, shutdown_res.channel_id, funding_txo, monitor_update); } if self.background_events_processed_since_startup.load(Ordering::Acquire) { // If a `ChannelMonitorUpdate` was applied (i.e. any time we have a funding txo and are @@ -6318,9 +6299,7 @@ where let _ = self.chain_monitor.update_channel(funding_txo, &update); }, BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, channel_id, update } => { - // The monitor update will be replayed on startup if it doesnt complete, so no - // use bothering to care about the monitor update completing. - let _ = self.apply_post_close_monitor_update(counterparty_node_id, channel_id, funding_txo, update); + self.apply_post_close_monitor_update(counterparty_node_id, channel_id, funding_txo, update); }, BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -7085,129 +7064,107 @@ where debug_assert_ne!(self.pending_events.held_by_thread(), LockHeldState::HeldByThread); debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread); - { - let per_peer_state = self.per_peer_state.read().unwrap(); - let chan_id = prev_hop.channel_id; + let per_peer_state = self.per_peer_state.read().unwrap(); + let chan_id = prev_hop.channel_id; - let peer_state_opt = prev_hop.counterparty_node_id.as_ref().map( - |counterparty_node_id| per_peer_state.get(counterparty_node_id) - .map(|peer_mutex| peer_mutex.lock().unwrap()) - ).unwrap_or(None); + const MISSING_MON_ERROR: &'static str = + "If we're going to claim an HTLC against a channel, we should always have *some* state for the channel, even if just the latest ChannelMonitor update_id. This failure indicates we need to claim an HTLC from a channel for which we did not have a ChannelMonitor at startup and didn't create one while running."; - if peer_state_opt.is_some() { - let mut peer_state_lock = peer_state_opt.unwrap(); - let peer_state = &mut *peer_state_lock; - if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(chan_id) { - if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { - let counterparty_node_id = chan.context.get_counterparty_node_id(); - let logger = WithChannelContext::from(&self.logger, &chan.context, None); - let fulfill_res = - chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, payment_info, &&logger); - - match fulfill_res { - UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => { - let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false); - if let Some(action) = action_opt { - log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}", - chan_id, action); - peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); - } - if let Some(raa_blocker) = raa_blocker_opt { - peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker); - } - if !during_init { - handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_lock, - peer_state, per_peer_state, chan); - } else { - // If we're running during init we cannot update a monitor directly - - // they probably haven't actually been loaded yet. Instead, push the - // monitor update as a background event. - self.pending_background_events.lock().unwrap().push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo: prev_hop.funding_txo, - channel_id: prev_hop.channel_id, - update: monitor_update.clone(), - }); - } - } - UpdateFulfillCommitFetch::DuplicateClaim {} => { - let (action_opt, raa_blocker_opt) = completion_action(None, true); - if let Some(raa_blocker) = raa_blocker_opt { - // If we're making a claim during startup, its a replay of a - // payment claim from a `ChannelMonitor`. In some cases (MPP or - // if the HTLC was only recently removed) we make such claims - // after an HTLC has been removed from a channel entirely, and - // thus the RAA blocker has long since completed. - // - // In any other case, the RAA blocker must still be present and - // blocking RAAs. - debug_assert!(during_init || - peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker)); - } - let action = if let Some(action) = action_opt { - action - } else { - return; - }; + // Note here that `peer_state_opt` is always `Some` if `prev_hop.counterparty_node_id` is + // `Some`. This is relied on in the closed-channel case below. + let mut peer_state_opt = prev_hop.counterparty_node_id.as_ref().map( + |counterparty_node_id| per_peer_state.get(counterparty_node_id) + .map(|peer_mutex| peer_mutex.lock().unwrap()) + .expect(MISSING_MON_ERROR) + ); - mem::drop(peer_state_lock); + if let Some(peer_state_lock) = peer_state_opt.as_mut() { + let peer_state = &mut **peer_state_lock; + if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(chan_id) { + if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + let logger = WithChannelContext::from(&self.logger, &chan.context, None); + let fulfill_res = chan.get_update_fulfill_htlc_and_commit(prev_hop.htlc_id, payment_preimage, payment_info, &&logger); - log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}", + match fulfill_res { + UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => { + let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false); + if let Some(action) = action_opt { + log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}", chan_id, action); - if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately { - downstream_counterparty_node_id: node_id, - downstream_funding_outpoint: _, - blocking_action: blocker, downstream_channel_id: channel_id, - } = action { - if let Some(peer_state_mtx) = per_peer_state.get(&node_id) { - let mut peer_state = peer_state_mtx.lock().unwrap(); - if let Some(blockers) = peer_state - .actions_blocking_raa_monitor_updates - .get_mut(&channel_id) - { - let mut found_blocker = false; - blockers.retain(|iter| { - // Note that we could actually be blocked, in - // which case we need to only remove the one - // blocker which was added duplicatively. - let first_blocker = !found_blocker; - if *iter == blocker { found_blocker = true; } - *iter != blocker || !first_blocker - }); - debug_assert!(found_blocker); - } - } else { - debug_assert!(false); + peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); + } + if let Some(raa_blocker) = raa_blocker_opt { + peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker); + } + handle_new_monitor_update!(self, prev_hop.funding_txo, monitor_update, peer_state_opt, + peer_state, per_peer_state, chan); + } + UpdateFulfillCommitFetch::DuplicateClaim {} => { + let (action_opt, raa_blocker_opt) = completion_action(None, true); + if let Some(raa_blocker) = raa_blocker_opt { + // If we're making a claim during startup, its a replay of a + // payment claim from a `ChannelMonitor`. In some cases (MPP or + // if the HTLC was only recently removed) we make such claims + // after an HTLC has been removed from a channel entirely, and + // thus the RAA blocker has long since completed. + // + // In any other case, the RAA blocker must still be present and + // blocking RAAs. + debug_assert!(during_init || + peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker)); + } + let action = if let Some(action) = action_opt { + action + } else { + return; + }; + + mem::drop(peer_state_opt); + + log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}", + chan_id, action); + if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately { + downstream_counterparty_node_id: node_id, + downstream_funding_outpoint: _, + blocking_action: blocker, downstream_channel_id: channel_id, + } = action { + if let Some(peer_state_mtx) = per_peer_state.get(&node_id) { + let mut peer_state = peer_state_mtx.lock().unwrap(); + if let Some(blockers) = peer_state + .actions_blocking_raa_monitor_updates + .get_mut(&channel_id) + { + let mut found_blocker = false; + blockers.retain(|iter| { + // Note that we could actually be blocked, in + // which case we need to only remove the one + // blocker which was added duplicatively. + let first_blocker = !found_blocker; + if *iter == blocker { found_blocker = true; } + *iter != blocker || !first_blocker + }); + debug_assert!(found_blocker); } - } else if matches!(action, MonitorUpdateCompletionAction::PaymentClaimed { .. }) { - debug_assert!(during_init, - "Duplicate claims should always either be for forwarded payments(freeing another channel immediately) or during init (for claim replay)"); - mem::drop(per_peer_state); - self.handle_monitor_update_completion_actions([action]); } else { - debug_assert!(false, - "Duplicate claims should always either be for forwarded payments(freeing another channel immediately) or during init (for claim replay)"); - return; - }; - } + debug_assert!(false); + } + } else if matches!(action, MonitorUpdateCompletionAction::PaymentClaimed { .. }) { + debug_assert!(during_init, + "Duplicate claims should always either be for forwarded payments(freeing another channel immediately) or during init (for claim replay)"); + mem::drop(per_peer_state); + self.handle_monitor_update_completion_actions([action]); + } else { + debug_assert!(false, + "Duplicate claims should always either be for forwarded payments(freeing another channel immediately) or during init (for claim replay)"); + return; + }; } } - return; } + return; } } - let preimage_update = ChannelMonitorUpdate { - update_id: 0, // apply_post_close_monitor_update will set the right value - counterparty_node_id: prev_hop.counterparty_node_id, - updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { - payment_preimage, - payment_info, - }], - channel_id: Some(prev_hop.channel_id), - }; - if prev_hop.counterparty_node_id.is_none() { let payment_hash: PaymentHash = payment_preimage.into(); panic!( @@ -7217,34 +7174,28 @@ where ); } let counterparty_node_id = prev_hop.counterparty_node_id.expect("Checked immediately above"); + let mut peer_state = peer_state_opt.expect("peer_state_opt is always Some when the counterparty_node_id is Some"); - if !during_init { - // We update the ChannelMonitor on the backward link, after - // receiving an `update_fulfill_htlc` from the forward link. - let update_res = self.apply_post_close_monitor_update(counterparty_node_id, prev_hop.channel_id, prev_hop.funding_txo, preimage_update); - if update_res != ChannelMonitorUpdateStatus::Completed { - // TODO: This needs to be handled somehow - if we receive a monitor update - // with a preimage we *must* somehow manage to propagate it to the upstream - // channel, or we must have an ability to receive the same event and try - // again on restart. - log_error!(WithContext::from(&self.logger, None, Some(prev_hop.channel_id), None), - "Critical error: failed to update channel monitor with preimage {:?}: {:?}", - payment_preimage, update_res); - } + let update_id = if let Some(latest_update_id) = peer_state.closed_channel_monitor_update_ids.get_mut(&chan_id) { + *latest_update_id = latest_update_id.saturating_add(1); + *latest_update_id } else { - // If we're running during init we cannot update a monitor directly - they probably - // haven't actually been loaded yet. Instead, push the monitor update as a background - // event. - // TODO: Track this update as pending and only complete the completion action when it - // finishes. - let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id, - funding_txo: prev_hop.funding_txo, - channel_id: prev_hop.channel_id, - update: preimage_update, - }; - self.pending_background_events.lock().unwrap().push(event); - } + let err = "We need the latest ChannelMonitorUpdate ID to build a new update. +This should have been checked for availability on startup but somehow it is no longer available. +This indicates a bug inside LDK. Please report this error at https://github.com/lightningdevkit/rust-lightning/issues/new"; + log_error!(self.logger, "{}", err); + panic!("{}", err); + }; + + let preimage_update = ChannelMonitorUpdate { + update_id, + counterparty_node_id: Some(counterparty_node_id), + updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage, + payment_info, + }], + channel_id: Some(prev_hop.channel_id), + }; // Note that we do process the completion action here. This totally could be a // duplicate claim, but we have no way of knowing without interrogating the @@ -7254,29 +7205,27 @@ where let (action_opt, raa_blocker_opt) = completion_action(None, false); if let Some(raa_blocker) = raa_blocker_opt { - // TODO: Avoid always blocking the world for the write lock here. - let mut per_peer_state = self.per_peer_state.write().unwrap(); - let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(|| - Mutex::new(PeerState { - channel_by_id: new_hash_map(), - inbound_channel_request_by_id: new_hash_map(), - latest_features: InitFeatures::empty(), - pending_msg_events: Vec::new(), - in_flight_monitor_updates: BTreeMap::new(), - monitor_update_blocked_actions: BTreeMap::new(), - actions_blocking_raa_monitor_updates: BTreeMap::new(), - closed_channel_monitor_update_ids: BTreeMap::new(), - is_connected: false, - })); - let mut peer_state = peer_state_mutex.lock().unwrap(); - peer_state.actions_blocking_raa_monitor_updates .entry(prev_hop.channel_id) .or_default() .push(raa_blocker); } - self.handle_monitor_update_completion_actions(action_opt); + // Given the fact that we're in a bit of a weird edge case, its worth hashing the preimage + // to include the `payment_hash` in the log metadata here. + let payment_hash = payment_preimage.into(); + let logger = WithContext::from(&self.logger, Some(counterparty_node_id), Some(chan_id), Some(payment_hash)); + + if let Some(action) = action_opt { + log_trace!(logger, "Tracking monitor update completion action for closed channel {}: {:?}", + chan_id, action); + peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); + } + + handle_new_monitor_update!( + self, prev_hop.funding_txo, preimage_update, peer_state, peer_state, per_peer_state, + counterparty_node_id, chan_id, POST_CHANNEL_CLOSE + ); } fn finalize_claims(&self, sources: Vec) { @@ -13237,8 +13186,8 @@ where // Our channel information is out of sync with the `ChannelMonitor`, so // force the update to use the `ChannelMonitor`'s update_id for the close // update. - let latest_update_id = monitor.get_latest_update_id(); - update.update_id = latest_update_id.saturating_add(1); + let latest_update_id = monitor.get_latest_update_id().saturating_add(1); + update.update_id = latest_update_id; per_peer_state.entry(counterparty_node_id) .or_insert_with(|| Mutex::new(empty_peer_state())) .lock().unwrap() @@ -13322,6 +13271,7 @@ where for (funding_txo, monitor) in args.channel_monitors.iter() { if !funding_txo_set.contains(funding_txo) { + let mut should_queue_fc_update = false; if let Some(counterparty_node_id) = monitor.get_counterparty_node_id() { // If the ChannelMonitor had any updates, we may need to update it further and // thus track it in `closed_channel_monitor_update_ids`. If the channel never @@ -13330,17 +13280,21 @@ where // Note that a `ChannelMonitor` is created with `update_id` 0 and after we // provide it with a closure update its `update_id` will be at 1. if !monitor.offchain_closed() || monitor.get_latest_update_id() > 1 { + should_queue_fc_update = !monitor.offchain_closed(); + let mut latest_update_id = monitor.get_latest_update_id(); + if should_queue_fc_update { + latest_update_id += 1; + } per_peer_state.entry(counterparty_node_id) .or_insert_with(|| Mutex::new(empty_peer_state())) .lock().unwrap() .closed_channel_monitor_update_ids.entry(monitor.channel_id()) - .and_modify(|v| *v = cmp::max(monitor.get_latest_update_id(), *v)) - .or_insert(monitor.get_latest_update_id()); + .and_modify(|v| *v = cmp::max(latest_update_id, *v)) + .or_insert(latest_update_id); } } - if monitor.offchain_closed() { - // We already appled a ChannelForceClosed update. + if !should_queue_fc_update { continue; } @@ -13560,6 +13514,10 @@ where counterparty_node_id: $counterparty_node_id, channel_id: $monitor.channel_id(), }); + } else { + $peer_state.closed_channel_monitor_update_ids.entry($monitor.channel_id()) + .and_modify(|v| *v = cmp::max(max_in_flight_update_id, *v)) + .or_insert(max_in_flight_update_id); } if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() { log_error!($logger, "Duplicate in-flight monitor update set for the same channel!"); @@ -13647,6 +13605,7 @@ where } = &mut new_event { debug_assert_eq!(update.updates.len(), 1); debug_assert!(matches!(update.updates[0], ChannelMonitorUpdateStep::ChannelForceClosed { .. })); + let mut updated_id = false; for pending_event in pending_background_events.iter() { if let BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id: pending_cp, funding_txo: pending_funding, @@ -13656,15 +13615,32 @@ where && funding_txo == pending_funding && channel_id == pending_chan_id; if for_same_channel { + debug_assert!(update.update_id >= pending_update.update_id); if pending_update.updates.iter().any(|upd| matches!(upd, ChannelMonitorUpdateStep::ChannelForceClosed { .. })) { // If the background event we're looking at is just // force-closing the channel which already has a pending // force-close update, no need to duplicate it. continue 'each_bg_event; } + update.update_id = pending_update.update_id.saturating_add(1); + updated_id = true; } } } + let mut per_peer_state = per_peer_state.get(counterparty_node_id) + .expect("If we have pending updates for a channel it must have an entry") + .lock().unwrap(); + if updated_id { + per_peer_state + .closed_channel_monitor_update_ids.entry(*channel_id) + .and_modify(|v| *v = cmp::max(update.update_id, *v)) + .or_insert(update.update_id); + } + let in_flight_updates = per_peer_state.in_flight_monitor_updates + .entry(*funding_txo) + .or_insert_with(Vec::new); + debug_assert!(!in_flight_updates.iter().any(|upd| upd == update)); + in_flight_updates.push(update.clone()); } pending_background_events.push(new_event); } @@ -14156,10 +14132,18 @@ where testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()), }; + let mut processed_claims: HashSet> = new_hash_set(); for (_, monitor) in args.channel_monitors.iter() { for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() { if !payment_claims.is_empty() { for payment_claim in payment_claims { + if processed_claims.contains(&payment_claim.mpp_parts) { + // We might get the same payment a few times from different channels + // that the MPP payment was received using. There's no point in trying + // to claim the same payment again and again, so we check if the HTLCs + // are the same and skip the payment here. + continue; + } if payment_claim.mpp_parts.is_empty() { return Err(DecodeError::InvalidValue); } @@ -14214,6 +14198,7 @@ where (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr) ); } + processed_claims.insert(payment_claim.mpp_parts); } } else { let per_peer_state = channel_manager.per_peer_state.read().unwrap(); diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index e1f6116cf6f..28465a09660 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -900,7 +900,7 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) { if persist_both_monitors { if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[2] { } else { panic!(); } if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[3] { } else { panic!(); } - check_added_monitors(&nodes[3], 6); + check_added_monitors(&nodes[3], 4); } else { if let Event::PaymentClaimed { amount_msat: 15_000_000, .. } = events[2] { } else { panic!(); } check_added_monitors(&nodes[3], 3);