Skip to content

Commit

Permalink
Merge pull request #103 from TheBlueMatt/main
Browse files Browse the repository at this point in the history
Make SpendableOutput claims more robust
  • Loading branch information
tnull committed May 19, 2023
2 parents 7aceee9 + b0987a4 commit 30a9cb7
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 30 deletions.
84 changes: 54 additions & 30 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ mod cli;
mod convert;
mod disk;
mod hex_utils;
mod sweep;

use crate::bitcoind_client::BitcoindClient;
use crate::disk::FilesystemLogger;
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::consensus::encode;
use bitcoin::network::constants::Network;
use bitcoin::secp256k1::Secp256k1;
use bitcoin::BlockHash;
use bitcoin_bech32::WitnessProgram;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::keysinterface::{EntropySource, InMemorySigner, KeysManager};
use lightning::chain::keysinterface::{
EntropySource, InMemorySigner, KeysManager, SpendableOutputDescriptor,
};
use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus};
use lightning::chain::{Filter, Watch};
use lightning::events::{Event, PaymentFailureReason, PaymentPurpose};
Expand All @@ -30,6 +31,7 @@ use lightning::routing::gossip;
use lightning::routing::gossip::{NodeId, P2PGossipSync};
use lightning::routing::router::DefaultRouter;
use lightning::util::config::UserConfig;
use lightning::util::persist::KVStorePersister;
use lightning::util::ser::ReadableArgs;
use lightning_background_processor::{process_events_async, GossipSync};
use lightning_block_sync::init;
Expand All @@ -52,6 +54,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};

pub(crate) const PENDING_SPENDABLE_OUTPUT_DIR: &'static str = "pending_spendable_outputs";

pub(crate) enum HTLCStatus {
Pending,
Succeeded,
Expand Down Expand Up @@ -107,7 +111,7 @@ async fn handle_ldk_events(
channel_manager: &Arc<ChannelManager>, bitcoind_client: &BitcoindClient,
network_graph: &NetworkGraph, keys_manager: &KeysManager,
inbound_payments: &PaymentInfoStorage, outbound_payments: &PaymentInfoStorage,
network: Network, event: Event,
persister: &Arc<FilesystemPersister>, network: Network, event: Event,
) {
match event {
Event::FundingGenerationReady {
Expand Down Expand Up @@ -331,20 +335,23 @@ async fn handle_ldk_events(
});
}
Event::SpendableOutputs { outputs } => {
let destination_address = bitcoind_client.get_new_address().await;
let output_descriptors = &outputs.iter().map(|a| a).collect::<Vec<_>>();
let tx_feerate =
bitcoind_client.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
let spending_tx = keys_manager
.spend_spendable_outputs(
output_descriptors,
Vec::new(),
destination_address.script_pubkey(),
tx_feerate,
&Secp256k1::new(),
)
.unwrap();
bitcoind_client.broadcast_transaction(&spending_tx);
// SpendableOutputDescriptors, of which outputs is a vec of, are critical to keep track
// of! While a `StaticOutput` descriptor is just an output to a static, well-known key,
// other descriptors are not currently ever regenerated for you by LDK. Once we return
// from this method, the descriptor will be gone, and you may lose track of some funds.
//
// Here we simply persist them to disk, with a background task running which will try
// to spend them regularly (possibly duplicatively/RBF'ing them). These can just be
// treated as normal funds where possible - they are only spendable by us and there is
// no rush to claim them.
for output in outputs {
let key = hex_utils::hex_str(&keys_manager.get_secure_random_bytes());
// Note that if the type here changes our read code needs to change as well.
let output: SpendableOutputDescriptor = output;
persister
.persist(&format!("{}/{}", PENDING_SPENDABLE_OUTPUT_DIR, key), &output)
.unwrap();
}
}
Event::ChannelPending { channel_id, counterparty_node_id, .. } => {
println!(
Expand Down Expand Up @@ -693,6 +700,7 @@ async fn start_ldk() {
let keys_manager_event_listener = Arc::clone(&keys_manager);
let inbound_payments_event_listener = Arc::clone(&inbound_payments);
let outbound_payments_event_listener = Arc::clone(&outbound_payments);
let persister_event_listener = Arc::clone(&persister);
let network = args.network;
let event_handler = move |event: Event| {
let channel_manager_event_listener = Arc::clone(&channel_manager_event_listener);
Expand All @@ -701,6 +709,7 @@ async fn start_ldk() {
let keys_manager_event_listener = Arc::clone(&keys_manager_event_listener);
let inbound_payments_event_listener = Arc::clone(&inbound_payments_event_listener);
let outbound_payments_event_listener = Arc::clone(&outbound_payments_event_listener);
let persister_event_listener = Arc::clone(&persister_event_listener);
async move {
handle_ldk_events(
&channel_manager_event_listener,
Expand All @@ -709,6 +718,7 @@ async fn start_ldk() {
&keys_manager_event_listener,
&inbound_payments_event_listener,
&outbound_payments_event_listener,
&persister_event_listener,
network,
event,
)
Expand All @@ -722,7 +732,7 @@ async fn start_ldk() {
// Step 20: Background Processing
let (bp_exit, bp_exit_check) = tokio::sync::watch::channel(());
let background_processor = tokio::spawn(process_events_async(
persister,
Arc::clone(&persister),
event_handler,
chain_monitor.clone(),
channel_manager.clone(),
Expand Down Expand Up @@ -781,24 +791,38 @@ async fn start_ldk() {
});

// Regularly broadcast our node_announcement. This is only required (or possible) if we have
// some public channels, and is only useful if we have public listen address(es) to announce.
// In a production environment, this should occur only after the announcement of new channels
// to avoid churn in the global network graph.
// some public channels.
let peer_man = Arc::clone(&peer_manager);
let chan_man = Arc::clone(&channel_manager);
let network = args.network;
if !args.ldk_announced_listen_addr.is_empty() {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
tokio::spawn(async move {
// First wait a minute until we have some peers and maybe have opened a channel.
tokio::time::sleep(Duration::from_secs(60)).await;
// Then, update our announcement once an hour to keep it fresh but avoid unnecessary churn
// in the global gossip network.
let mut interval = tokio::time::interval(Duration::from_secs(3600));
loop {
interval.tick().await;
// Don't bother trying to announce if we don't have any public channls, though our
// peers should drop such an announcement anyway. Note that announcement may not
// propagate until we have a channel with 6+ confirmations.
if chan_man.list_channels().iter().any(|chan| chan.is_public) {
peer_man.broadcast_node_announcement(
[0; 3],
args.ldk_announced_node_name,
args.ldk_announced_listen_addr.clone(),
);
}
});
}
}
});

tokio::spawn(sweep::periodic_sweep(
ldk_data_dir.clone(),
Arc::clone(&keys_manager),
Arc::clone(&logger),
Arc::clone(&persister),
Arc::clone(&bitcoind_client),
));

// Start the CLI.
cli::poll_for_user_input(
Expand All @@ -809,7 +833,7 @@ async fn start_ldk() {
Arc::clone(&onion_messenger),
inbound_payments,
outbound_payments,
ldk_data_dir.clone(),
ldk_data_dir,
network,
Arc::clone(&logger),
)
Expand Down
127 changes: 127 additions & 0 deletions src/sweep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::io::{Read, Seek, SeekFrom};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{fs, io};

use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::keysinterface::{EntropySource, KeysManager, SpendableOutputDescriptor};
use lightning::util::logger::Logger;
use lightning::util::persist::KVStorePersister;
use lightning::util::ser::{Readable, WithoutLength};

use bitcoin::secp256k1::Secp256k1;

use crate::hex_utils;
use crate::BitcoindClient;
use crate::FilesystemLogger;
use crate::FilesystemPersister;

/// If we have any pending claimable outputs, we should slowly sweep them to our Bitcoin Core
/// wallet. We technically don't need to do this - they're ours to spend when we want and can just
/// use them to build new transactions instead, but we cannot feed them direclty into Bitcoin
/// Core's wallet so we have to sweep.
///
/// Note that this is unececssary for [`SpendableOutputDescriptor::StaticOutput`]s, which *do* have
/// an associated secret key we could simply import into Bitcoin Core's wallet, but for consistency
/// we don't do that here either.
pub(crate) async fn periodic_sweep(
ldk_data_dir: String, keys_manager: Arc<KeysManager>, logger: Arc<FilesystemLogger>,
persister: Arc<FilesystemPersister>, bitcoind_client: Arc<BitcoindClient>,
) {
// Regularly claim outputs which are exclusively spendable by us and send them to Bitcoin Core.
// Note that if you more tightly integrate your wallet with LDK you may not need to do this -
// these outputs can just be treated as normal outputs during coin selection.
let pending_spendables_dir =
format!("{}/{}", crate::PENDING_SPENDABLE_OUTPUT_DIR, ldk_data_dir);
let processing_spendables_dir = format!("{}/processing_spendable_outputs", ldk_data_dir);
let spendables_dir = format!("{}/spendable_outputs", ldk_data_dir);

// We batch together claims of all spendable outputs generated each day, however only after
// batching any claims of spendable outputs which were generated prior to restart. On a mobile
// device we likely won't ever be online for more than a minute, so we have to ensure we sweep
// any pending claims on startup, but for an always-online node you may wish to sweep even less
// frequently than this (or move the interval await to the top of the loop)!
//
// There is no particular rush here, we just have to ensure funds are availably by the time we
// need to send funds.
let mut interval = tokio::time::interval(Duration::from_secs(60 * 60 * 24));

loop {
interval.tick().await; // Note that the first tick completes immediately
if let Ok(dir_iter) = fs::read_dir(&pending_spendables_dir) {
// Move any spendable descriptors from pending folder so that we don't have any
// races with new files being added.
for file_res in dir_iter {
let file = file_res.unwrap();
// Only move a file if its a 32-byte-hex'd filename, otherwise it might be a
// temporary file.
if file.file_name().len() == 64 {
fs::create_dir_all(&processing_spendables_dir).unwrap();
let mut holding_path = PathBuf::new();
holding_path.push(&processing_spendables_dir);
holding_path.push(&file.file_name());
fs::rename(file.path(), holding_path).unwrap();
}
}
// Now concatenate all the pending files we moved into one file in the
// `spendable_outputs` directory and drop the processing directory.
let mut outputs = Vec::new();
if let Ok(processing_iter) = fs::read_dir(&processing_spendables_dir) {
for file_res in processing_iter {
outputs.append(&mut fs::read(file_res.unwrap().path()).unwrap());
}
}
if !outputs.is_empty() {
let key = hex_utils::hex_str(&keys_manager.get_secure_random_bytes());
persister
.persist(&format!("spendable_outputs/{}", key), &WithoutLength(&outputs))
.unwrap();
fs::remove_dir_all(&processing_spendables_dir).unwrap();
}
}
// Iterate over all the sets of spendable outputs in `spendables_dir` and try to claim
// them.
// Note that here we try to claim each set of spendable outputs over and over again
// forever, even long after its been claimed. While this isn't an issue per se, in practice
// you may wish to track when the claiming transaction has confirmed and remove the
// spendable outputs set. You may also wish to merge groups of unspent spendable outputs to
// combine batches.
if let Ok(dir_iter) = fs::read_dir(&spendables_dir) {
for file_res in dir_iter {
let mut outputs: Vec<SpendableOutputDescriptor> = Vec::new();
let mut file = fs::File::open(file_res.unwrap().path()).unwrap();
loop {
// Check if there are any bytes left to read, and if so read a descriptor.
match file.read_exact(&mut [0; 1]) {
Ok(_) => {
file.seek(SeekFrom::Current(-1)).unwrap();
}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => Err(e).unwrap(),
}
outputs.push(Readable::read(&mut file).unwrap());
}
let destination_address = bitcoind_client.get_new_address().await;
let output_descriptors = &outputs.iter().map(|a| a).collect::<Vec<_>>();
let tx_feerate =
bitcoind_client.get_est_sat_per_1000_weight(ConfirmationTarget::Background);
if let Ok(spending_tx) = keys_manager.spend_spendable_outputs(
output_descriptors,
Vec::new(),
destination_address.script_pubkey(),
tx_feerate,
&Secp256k1::new(),
) {
// Note that, most likely, we've already sweeped this set of outputs
// and they're already confirmed on-chain, so this broadcast will fail.
bitcoind_client.broadcast_transaction(&spending_tx);
} else {
lightning::log_error!(
logger,
"Failed to sweep spendable outputs! This may indicate the outputs are dust. Will try again in a day.");
}
}
}
}
}

0 comments on commit 30a9cb7

Please sign in to comment.