Skip to content

Commit

Permalink
Use new OutputSweeper to sweep spendable outputs.
Browse files Browse the repository at this point in the history
  • Loading branch information
valentinewallace committed May 8, 2024
1 parent f2b3c97 commit fcc899a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 34 deletions.
9 changes: 9 additions & 0 deletions src/bitcoind_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use bitcoin::{Network, OutPoint, TxOut, WPubkeyHash};
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::events::bump_transaction::{Utxo, WalletSource};
use lightning::log_error;
use lightning::sign::ChangeDestinationSource;
use lightning::util::logger::Logger;
use lightning_block_sync::http::HttpEndpoint;
use lightning_block_sync::rpc::RpcClient;
Expand Down Expand Up @@ -317,6 +318,14 @@ impl BroadcasterInterface for BitcoindClient {
}
}

impl ChangeDestinationSource for BitcoindClient {
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
tokio::task::block_in_place(move || {
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
})
}
}

impl WalletSource for BitcoindClient {
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()> {
let utxos = tokio::task::block_in_place(move || {
Expand Down
114 changes: 80 additions & 34 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use bitcoin::BlockHash;
use bitcoin_bech32::WitnessProgram;
use disk::{INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME};
use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus};
use lightning::chain::{Filter, Watch};
use lightning::chain::{BestBlock, Filter, Watch};
use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet};
use lightning::events::{Event, PaymentFailureReason, PaymentPurpose};
use lightning::ln::channelmanager::{self, RecentPaymentDetails};
Expand All @@ -30,10 +30,14 @@ use lightning::routing::gossip;
use lightning::routing::gossip::{NodeId, P2PGossipSync};
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::ProbabilisticScoringFeeParameters;
use lightning::sign::{EntropySource, InMemorySigner, KeysManager, SpendableOutputDescriptor};
use lightning::sign::{EntropySource, InMemorySigner, KeysManager};
use lightning::util::config::UserConfig;
use lightning::util::persist::{self, KVStore, MonitorUpdatingPersister};
use lightning::util::persist::{
self, KVStore, MonitorUpdatingPersister, OUTPUT_SWEEPER_PERSISTENCE_KEY,
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use lightning::util::sweep as ldk_sweep;
use lightning::{chain, impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
use lightning_background_processor::{process_events_async, GossipSync};
use lightning_block_sync::init;
Expand Down Expand Up @@ -172,13 +176,26 @@ pub(crate) type BumpTxEventHandler = BumpTransactionEventHandler<
Arc<FilesystemLogger>,
>;

pub(crate) type OutputSweeper = ldk_sweep::OutputSweeper<
Arc<BitcoindClient>,
Arc<BitcoindClient>,
Arc<BitcoindClient>,
Arc<dyn Filter + Send + Sync>,
Arc<FilesystemStore>,
Arc<FilesystemLogger>,
Arc<KeysManager>,
>;

// Needed due to rust-lang/rust#63033.
struct OutputSweeperWrapper(Arc<OutputSweeper>);

async fn handle_ldk_events(
channel_manager: Arc<ChannelManager>, bitcoind_client: &BitcoindClient,
network_graph: &NetworkGraph, keys_manager: &KeysManager,
bump_tx_event_handler: &BumpTxEventHandler, peer_manager: Arc<PeerManager>,
inbound_payments: Arc<Mutex<InboundPaymentInfoStorage>>,
outbound_payments: Arc<Mutex<OutboundPaymentInfoStorage>>, fs_store: Arc<FilesystemStore>,
network: Network, event: Event,
output_sweeper: OutputSweeperWrapper, network: Network, event: Event,
) {
match event {
Event::FundingGenerationReady {
Expand Down Expand Up @@ -461,22 +478,8 @@ async fn handle_ldk_events(
forwarding_channel_manager.process_pending_htlc_forwards();
});
},
Event::SpendableOutputs { outputs, channel_id: _ } => {
// SpendableOutputDescriptors, of which outputs is a vec of, are critical to keep track
// of! While a `StaticOutput` descriptor is just an output to a static, well-known key,
// other descriptors are not currently ever regenerated for you by LDK. Once we return
// from this method, the descriptor will be gone, and you may lose track of some funds.
//
// Here we simply persist them to disk, with a background task running which will try
// to spend them regularly (possibly duplicatively/RBF'ing them). These can just be
// treated as normal funds where possible - they are only spendable by us and there is
// no rush to claim them.
for output in outputs {
let key = hex_utils::hex_str(&keys_manager.get_secure_random_bytes());
// Note that if the type here changes our read code needs to change as well.
let output: SpendableOutputDescriptor = output;
fs_store.write(PENDING_SPENDABLE_OUTPUT_DIR, "", &key, &output.encode()).unwrap();
}
Event::SpendableOutputs { outputs, channel_id } => {
output_sweeper.0.track_spendable_outputs(outputs, channel_id, false, None).unwrap();
},
Event::ChannelPending { channel_id, counterparty_node_id, .. } => {
println!(
Expand Down Expand Up @@ -743,14 +746,50 @@ async fn start_ldk() {
}
};

// Step 12: Sync ChannelMonitors and ChannelManager to chain tip
// Step 12: Initialize the OutputSweeper.
let (sweeper_best_block, output_sweeper) = match fs_store.read(
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_KEY,
) {
Err(e) if e.kind() == io::ErrorKind::NotFound => {
let sweeper = OutputSweeper::new(
channel_manager.current_best_block(),
broadcaster.clone(),
fee_estimator.clone(),
None,
keys_manager.clone(),
bitcoind_client.clone(),
fs_store.clone(),
logger.clone(),
);
(channel_manager.current_best_block(), sweeper)
},
Ok(mut bytes) => {
let read_args = (
broadcaster.clone(),
fee_estimator.clone(),
None,
keys_manager.clone(),
bitcoind_client.clone(),
fs_store.clone(),
logger.clone(),
);
let mut reader = io::Cursor::new(&mut bytes);
<(BestBlock, OutputSweeper)>::read(&mut reader, read_args)
.expect("Failed to deserialize OutputSweeper")
},
Err(e) => panic!("Failed to read OutputSweeper with {}", e),
};

// Step 13: Sync ChannelMonitors, ChannelManager and OutputSweeper to chain tip
let mut chain_listener_channel_monitors = Vec::new();
let mut cache = UnboundedCache::new();
let chain_tip = if restarting_node {
let mut chain_listeners = vec![(
channel_manager_blockhash,
&channel_manager as &(dyn chain::Listen + Send + Sync),
)];
let mut chain_listeners = vec![
(channel_manager_blockhash, &channel_manager as &(dyn chain::Listen + Send + Sync)),
(sweeper_best_block.block_hash, &output_sweeper as &(dyn chain::Listen + Send + Sync)),
];

for (blockhash, channel_monitor) in channelmonitors.drain(..) {
let outpoint = channel_monitor.get_funding_txo().0;
Expand Down Expand Up @@ -780,7 +819,7 @@ async fn start_ldk() {
polled_chain_tip
};

// Step 13: Give ChannelMonitors to ChainMonitor
// Step 14: Give ChannelMonitors to ChainMonitor
for item in chain_listener_channel_monitors.drain(..) {
let channel_monitor = item.1 .0;
let funding_outpoint = item.2;
Expand All @@ -790,11 +829,11 @@ async fn start_ldk() {
);
}

// Step 14: Optional: Initialize the P2PGossipSync
// Step 15: Optional: Initialize the P2PGossipSync
let gossip_sync =
Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, Arc::clone(&logger)));

// Step 15: Initialize the PeerManager
// Step 16: Initialize the PeerManager
let channel_manager: Arc<ChannelManager> = Arc::new(channel_manager);
let onion_messenger: Arc<OnionMessenger> = Arc::new(OnionMessenger::new(
Arc::clone(&keys_manager),
Expand Down Expand Up @@ -832,7 +871,7 @@ async fn start_ldk() {
gossip_sync.add_utxo_lookup(Some(utxo_lookup));

// ## Running LDK
// Step 16: Initialize networking
// Step 17: Initialize networking

let peer_manager_connection_handler = peer_manager.clone();
let listening_port = args.ldk_peer_listening_port;
Expand All @@ -858,14 +897,17 @@ async fn start_ldk() {
}
});

// Step 17: Connect and Disconnect Blocks
// Step 18: Connect and Disconnect Blocks
let output_sweeper: Arc<OutputSweeper> = Arc::new(output_sweeper);
let channel_manager_listener = channel_manager.clone();
let chain_monitor_listener = chain_monitor.clone();
let output_sweeper_listener = output_sweeper.clone();
let bitcoind_block_source = bitcoind_client.clone();
let network = args.network;
tokio::spawn(async move {
let chain_poller = poll::ChainPoller::new(bitcoind_block_source.as_ref(), network);
let chain_listener = (chain_monitor_listener, channel_manager_listener);
let chain_listener =
(chain_monitor_listener, &(channel_manager_listener, output_sweeper_listener));
let mut spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener);
loop {
spv_client.poll_best_tip().await.unwrap();
Expand Down Expand Up @@ -904,7 +946,7 @@ async fn start_ldk() {
.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.lock().unwrap().encode())
.unwrap();

// Step 18: Handle LDK Events
// Step 19: Handle LDK Events
let channel_manager_event_listener = Arc::clone(&channel_manager);
let bitcoind_client_event_listener = Arc::clone(&bitcoind_client);
let network_graph_event_listener = Arc::clone(&network_graph);
Expand All @@ -913,6 +955,7 @@ async fn start_ldk() {
let outbound_payments_event_listener = Arc::clone(&outbound_payments);
let fs_store_event_listener = Arc::clone(&fs_store);
let peer_manager_event_listener = Arc::clone(&peer_manager);
let output_sweeper_event_listener = Arc::clone(&output_sweeper);
let network = args.network;
let event_handler = move |event: Event| {
let channel_manager_event_listener = Arc::clone(&channel_manager_event_listener);
Expand All @@ -924,6 +967,7 @@ async fn start_ldk() {
let outbound_payments_event_listener = Arc::clone(&outbound_payments_event_listener);
let fs_store_event_listener = Arc::clone(&fs_store_event_listener);
let peer_manager_event_listener = Arc::clone(&peer_manager_event_listener);
let output_sweeper_event_listener = Arc::clone(&output_sweeper_event_listener);
async move {
handle_ldk_events(
channel_manager_event_listener,
Expand All @@ -935,17 +979,18 @@ async fn start_ldk() {
inbound_payments_event_listener,
outbound_payments_event_listener,
fs_store_event_listener,
OutputSweeperWrapper(output_sweeper_event_listener),
network,
event,
)
.await;
}
};

// Step 19: Persist ChannelManager and NetworkGraph
// Step 20: Persist ChannelManager and NetworkGraph
let persister = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into()));

// Step 20: Background Processing
// Step 21: Background Processing
let (bp_exit, bp_exit_check) = tokio::sync::watch::channel(());
let mut background_processor = tokio::spawn(process_events_async(
Arc::clone(&persister),
Expand Down Expand Up @@ -1034,6 +1079,7 @@ async fn start_ldk() {
}
});

// TODO: remove this, since the new `OutputSweeper` was added in LDK v0.0.123.
tokio::spawn(sweep::periodic_sweep(
ldk_data_dir.clone(),
Arc::clone(&keys_manager),
Expand Down

0 comments on commit fcc899a

Please sign in to comment.