From fcc899adc5f5aeb3d2ea2b8d56b655879aa1f16b Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 7 May 2024 15:33:42 -0400 Subject: [PATCH] Use new OutputSweeper to sweep spendable outputs. --- src/bitcoind_client.rs | 9 ++++ src/main.rs | 114 +++++++++++++++++++++++++++++------------ 2 files changed, 89 insertions(+), 34 deletions(-) diff --git a/src/bitcoind_client.rs b/src/bitcoind_client.rs index c00f52f..57c1960 100644 --- a/src/bitcoind_client.rs +++ b/src/bitcoind_client.rs @@ -18,6 +18,7 @@ use bitcoin::{Network, OutPoint, TxOut, WPubkeyHash}; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::events::bump_transaction::{Utxo, WalletSource}; use lightning::log_error; +use lightning::sign::ChangeDestinationSource; use lightning::util::logger::Logger; use lightning_block_sync::http::HttpEndpoint; use lightning_block_sync::rpc::RpcClient; @@ -317,6 +318,14 @@ impl BroadcasterInterface for BitcoindClient { } } +impl ChangeDestinationSource for BitcoindClient { + fn get_change_destination_script(&self) -> Result { + tokio::task::block_in_place(move || { + Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() })) + }) + } +} + impl WalletSource for BitcoindClient { fn list_confirmed_utxos(&self) -> Result, ()> { let utxos = tokio::task::block_in_place(move || { diff --git a/src/main.rs b/src/main.rs index d78b522..bf3e597 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use bitcoin::BlockHash; use bitcoin_bech32::WitnessProgram; use disk::{INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME}; use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus}; -use lightning::chain::{Filter, Watch}; +use lightning::chain::{BestBlock, Filter, Watch}; use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet}; use lightning::events::{Event, PaymentFailureReason, PaymentPurpose}; use lightning::ln::channelmanager::{self, RecentPaymentDetails}; @@ -30,10 +30,14 @@ use lightning::routing::gossip; use lightning::routing::gossip::{NodeId, P2PGossipSync}; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::ProbabilisticScoringFeeParameters; -use lightning::sign::{EntropySource, InMemorySigner, KeysManager, SpendableOutputDescriptor}; +use lightning::sign::{EntropySource, InMemorySigner, KeysManager}; use lightning::util::config::UserConfig; -use lightning::util::persist::{self, KVStore, MonitorUpdatingPersister}; +use lightning::util::persist::{ + self, KVStore, MonitorUpdatingPersister, OUTPUT_SWEEPER_PERSISTENCE_KEY, + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; +use lightning::util::sweep as ldk_sweep; use lightning::{chain, impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use lightning_background_processor::{process_events_async, GossipSync}; use lightning_block_sync::init; @@ -172,13 +176,26 @@ pub(crate) type BumpTxEventHandler = BumpTransactionEventHandler< Arc, >; +pub(crate) type OutputSweeper = ldk_sweep::OutputSweeper< + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + +// Needed due to rust-lang/rust#63033. +struct OutputSweeperWrapper(Arc); + async fn handle_ldk_events( channel_manager: Arc, bitcoind_client: &BitcoindClient, network_graph: &NetworkGraph, keys_manager: &KeysManager, bump_tx_event_handler: &BumpTxEventHandler, peer_manager: Arc, inbound_payments: Arc>, outbound_payments: Arc>, fs_store: Arc, - network: Network, event: Event, + output_sweeper: OutputSweeperWrapper, network: Network, event: Event, ) { match event { Event::FundingGenerationReady { @@ -461,22 +478,8 @@ async fn handle_ldk_events( forwarding_channel_manager.process_pending_htlc_forwards(); }); }, - Event::SpendableOutputs { outputs, channel_id: _ } => { - // SpendableOutputDescriptors, of which outputs is a vec of, are critical to keep track - // of! While a `StaticOutput` descriptor is just an output to a static, well-known key, - // other descriptors are not currently ever regenerated for you by LDK. Once we return - // from this method, the descriptor will be gone, and you may lose track of some funds. - // - // Here we simply persist them to disk, with a background task running which will try - // to spend them regularly (possibly duplicatively/RBF'ing them). These can just be - // treated as normal funds where possible - they are only spendable by us and there is - // no rush to claim them. - for output in outputs { - let key = hex_utils::hex_str(&keys_manager.get_secure_random_bytes()); - // Note that if the type here changes our read code needs to change as well. - let output: SpendableOutputDescriptor = output; - fs_store.write(PENDING_SPENDABLE_OUTPUT_DIR, "", &key, &output.encode()).unwrap(); - } + Event::SpendableOutputs { outputs, channel_id } => { + output_sweeper.0.track_spendable_outputs(outputs, channel_id, false, None).unwrap(); }, Event::ChannelPending { channel_id, counterparty_node_id, .. } => { println!( @@ -743,14 +746,50 @@ async fn start_ldk() { } }; - // Step 12: Sync ChannelMonitors and ChannelManager to chain tip + // Step 12: Initialize the OutputSweeper. + let (sweeper_best_block, output_sweeper) = match fs_store.read( + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + ) { + Err(e) if e.kind() == io::ErrorKind::NotFound => { + let sweeper = OutputSweeper::new( + channel_manager.current_best_block(), + broadcaster.clone(), + fee_estimator.clone(), + None, + keys_manager.clone(), + bitcoind_client.clone(), + fs_store.clone(), + logger.clone(), + ); + (channel_manager.current_best_block(), sweeper) + }, + Ok(mut bytes) => { + let read_args = ( + broadcaster.clone(), + fee_estimator.clone(), + None, + keys_manager.clone(), + bitcoind_client.clone(), + fs_store.clone(), + logger.clone(), + ); + let mut reader = io::Cursor::new(&mut bytes); + <(BestBlock, OutputSweeper)>::read(&mut reader, read_args) + .expect("Failed to deserialize OutputSweeper") + }, + Err(e) => panic!("Failed to read OutputSweeper with {}", e), + }; + + // Step 13: Sync ChannelMonitors, ChannelManager and OutputSweeper to chain tip let mut chain_listener_channel_monitors = Vec::new(); let mut cache = UnboundedCache::new(); let chain_tip = if restarting_node { - let mut chain_listeners = vec![( - channel_manager_blockhash, - &channel_manager as &(dyn chain::Listen + Send + Sync), - )]; + let mut chain_listeners = vec![ + (channel_manager_blockhash, &channel_manager as &(dyn chain::Listen + Send + Sync)), + (sweeper_best_block.block_hash, &output_sweeper as &(dyn chain::Listen + Send + Sync)), + ]; for (blockhash, channel_monitor) in channelmonitors.drain(..) { let outpoint = channel_monitor.get_funding_txo().0; @@ -780,7 +819,7 @@ async fn start_ldk() { polled_chain_tip }; - // Step 13: Give ChannelMonitors to ChainMonitor + // Step 14: Give ChannelMonitors to ChainMonitor for item in chain_listener_channel_monitors.drain(..) { let channel_monitor = item.1 .0; let funding_outpoint = item.2; @@ -790,11 +829,11 @@ async fn start_ldk() { ); } - // Step 14: Optional: Initialize the P2PGossipSync + // Step 15: Optional: Initialize the P2PGossipSync let gossip_sync = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, Arc::clone(&logger))); - // Step 15: Initialize the PeerManager + // Step 16: Initialize the PeerManager let channel_manager: Arc = Arc::new(channel_manager); let onion_messenger: Arc = Arc::new(OnionMessenger::new( Arc::clone(&keys_manager), @@ -832,7 +871,7 @@ async fn start_ldk() { gossip_sync.add_utxo_lookup(Some(utxo_lookup)); // ## Running LDK - // Step 16: Initialize networking + // Step 17: Initialize networking let peer_manager_connection_handler = peer_manager.clone(); let listening_port = args.ldk_peer_listening_port; @@ -858,14 +897,17 @@ async fn start_ldk() { } }); - // Step 17: Connect and Disconnect Blocks + // Step 18: Connect and Disconnect Blocks + let output_sweeper: Arc = Arc::new(output_sweeper); let channel_manager_listener = channel_manager.clone(); let chain_monitor_listener = chain_monitor.clone(); + let output_sweeper_listener = output_sweeper.clone(); let bitcoind_block_source = bitcoind_client.clone(); let network = args.network; tokio::spawn(async move { let chain_poller = poll::ChainPoller::new(bitcoind_block_source.as_ref(), network); - let chain_listener = (chain_monitor_listener, channel_manager_listener); + let chain_listener = + (chain_monitor_listener, &(channel_manager_listener, output_sweeper_listener)); let mut spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); loop { spv_client.poll_best_tip().await.unwrap(); @@ -904,7 +946,7 @@ async fn start_ldk() { .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.lock().unwrap().encode()) .unwrap(); - // Step 18: Handle LDK Events + // Step 19: Handle LDK Events let channel_manager_event_listener = Arc::clone(&channel_manager); let bitcoind_client_event_listener = Arc::clone(&bitcoind_client); let network_graph_event_listener = Arc::clone(&network_graph); @@ -913,6 +955,7 @@ async fn start_ldk() { let outbound_payments_event_listener = Arc::clone(&outbound_payments); let fs_store_event_listener = Arc::clone(&fs_store); let peer_manager_event_listener = Arc::clone(&peer_manager); + let output_sweeper_event_listener = Arc::clone(&output_sweeper); let network = args.network; let event_handler = move |event: Event| { let channel_manager_event_listener = Arc::clone(&channel_manager_event_listener); @@ -924,6 +967,7 @@ async fn start_ldk() { let outbound_payments_event_listener = Arc::clone(&outbound_payments_event_listener); let fs_store_event_listener = Arc::clone(&fs_store_event_listener); let peer_manager_event_listener = Arc::clone(&peer_manager_event_listener); + let output_sweeper_event_listener = Arc::clone(&output_sweeper_event_listener); async move { handle_ldk_events( channel_manager_event_listener, @@ -935,6 +979,7 @@ async fn start_ldk() { inbound_payments_event_listener, outbound_payments_event_listener, fs_store_event_listener, + OutputSweeperWrapper(output_sweeper_event_listener), network, event, ) @@ -942,10 +987,10 @@ async fn start_ldk() { } }; - // Step 19: Persist ChannelManager and NetworkGraph + // Step 20: Persist ChannelManager and NetworkGraph let persister = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into())); - // Step 20: Background Processing + // Step 21: Background Processing let (bp_exit, bp_exit_check) = tokio::sync::watch::channel(()); let mut background_processor = tokio::spawn(process_events_async( Arc::clone(&persister), @@ -1034,6 +1079,7 @@ async fn start_ldk() { } }); + // TODO: remove this, since the new `OutputSweeper` was added in LDK v0.0.123. tokio::spawn(sweep::periodic_sweep( ldk_data_dir.clone(), Arc::clone(&keys_manager),