Skip to content

Commit

Permalink
Add ChainMonitor::archive_fully_resolved_monitor_channels
Browse files Browse the repository at this point in the history
  Archives fully resolved channel monitors by adding them to a backup
  location and removing them from the primary storage & the monitor set.

  This is useful for pruning fully resolved monitors from the monitor
  set and primary storage so they are not reloaded on every new new
  block connection.

  We also add a new function, `archive_persisted_channel` to the
  `Persist` trait that writes the monitor to an archive storage and
  removes it from the primary storage.
  • Loading branch information
jbesraa committed Apr 9, 2024
1 parent 0f728b8 commit 9962650
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 5 deletions.
7 changes: 5 additions & 2 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
let out = SearchingOutput::new(underlying_out);
let broadcast = Arc::new(TestBroadcaster{});
let router = FuzzRouter {};
use std::collections::HashSet;

macro_rules! make_node {
($node_id: expr, $fee_estimator: expr) => { {
Expand All @@ -467,7 +468,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
let keys_manager = Arc::new(KeyProvider { node_secret, rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(new_hash_map()) });
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
Arc::new(TestPersister {
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed)
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
archived_channels: Mutex::new(HashSet::new()),
}), Arc::clone(&keys_manager)));

let mut config = UserConfig::default();
Expand All @@ -494,7 +496,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
Arc::new(TestPersister {
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed)
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
archived_channels: Mutex::new(HashSet::new()),
}), Arc::clone(& $keys_manager)));

let mut config = UserConfig::default();
Expand Down
5 changes: 4 additions & 1 deletion fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,10 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger>) {

let broadcast = Arc::new(TestBroadcaster{ txn_broadcasted: Mutex::new(Vec::new()) });
let monitor = Arc::new(chainmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone(),
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) })));
Arc::new(TestPersister {
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) ,
archived_channels: Mutex::new(std::collections::HashSet::new()),
})));

let keys_manager = Arc::new(KeyProvider {
node_secret: our_network_key.clone(),
Expand Down
6 changes: 6 additions & 0 deletions fuzz/src/utils/test_persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use lightning::chain::chainmonitor::MonitorUpdateId;
use lightning::chain::transaction::OutPoint;
use lightning::util::test_channel_signer::TestChannelSigner;

use std::collections::HashSet;
use std::sync::Mutex;

pub struct TestPersister {
pub update_ret: Mutex<chain::ChannelMonitorUpdateStatus>,
pub archived_channels: Mutex<HashSet<OutPoint>>,
}
impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
Expand All @@ -17,4 +19,8 @@ impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
self.update_ret.lock().unwrap().clone()
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
self.archived_channels.lock().unwrap().insert(funding_txo);
}
}
31 changes: 31 additions & 0 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
///
/// [`Writeable::write`]: crate::util::ser::Writeable::write
fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
/// Archive a channel's monitor data.
///
/// It is reccommended to validate the channel is fully resolved before archiving the data.
///
/// A fully resolved channel is a channel that has been closed and settled on-chain, and no
/// funds it can claim no funds.
///
/// Archiving the data in a backup location is useful for hedging against data loss in case
/// of an unexpected failure/bug.
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
}

struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
Expand Down Expand Up @@ -656,6 +666,27 @@ where C::Target: chain::Filter,
}
}
}

/// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`].
///
/// This is useful for pruning fully resolved monitors from the monitor set and primary
/// storage so they are not reloaded on every new new block connection.
///
/// Depending on the `[Persist::archive_persisted_channel]` the monitor data could be moved
/// to an archive location or/and removed entirely.
pub fn archive_fully_resolved_channel_monitors(&self, to_archive: Vec<OutPoint>) {
let mut monitors = self.monitors.write().unwrap();
for funding_txo in to_archive {
let channel_monitor = monitors.get(&funding_txo);
if let Some(channel_monitor) = channel_monitor {
if channel_monitor.monitor.is_fully_resolved() {
self.persister.archive_persisted_channel(funding_txo);
monitors.remove(&funding_txo);
};
};
}
}

}

impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
Expand Down
1 change: 0 additions & 1 deletion lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,6 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
let is_all_funds_claimed = self.get_claimable_balances().is_empty();
let current_height = self.current_best_block().height;
let mut inner = self.inner.lock().unwrap();

match (inner.balances_empty_height, is_all_funds_claimed) {
(Some(balances_empty_height), true) => {
// Claimed all funds, check if reached the blocks threshold.
Expand Down
14 changes: 13 additions & 1 deletion lightning/src/ln/monitor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn do_chanmon_claim_value_coop_close(anchors: bool) {
user_config.manually_accept_inbound_channels = true;
}
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(user_config), Some(user_config)]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);

let (_, _, chan_id, funding_tx) =
create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 1_000_000);
Expand Down Expand Up @@ -262,6 +262,18 @@ fn do_chanmon_claim_value_coop_close(anchors: bool) {

assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());

// Test we can archived fully resolved channel monitor.
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1); // one monitor
// first archive should set balances_empty_height to current block height
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors(vec![funding_outpoint]);
connect_blocks(&nodes[0], 2016);
// Second call after 2016 blocks, should archive the monitor
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors(vec![funding_outpoint]);
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 0); // no monitor
// cleanup
nodes.get_mut(0).unwrap().chain_source.remove_watched_txn_and_outputs(funding_outpoint,
funding_tx.txid(), funding_tx.output[0].script_pubkey.clone());
}

#[test]
Expand Down
58 changes: 58 additions & 0 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
/// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";

/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// The primary namespace under which the [`NetworkGraph`] will be persisted.
pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
/// The secondary namespace under which the [`NetworkGraph`] will be persisted.
Expand Down Expand Up @@ -212,6 +217,34 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + ?Sized> Persist<Ch
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
}
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
let monitor_name = MonitorName::from(funding_txo);
let monitor = match self.read(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
) {
Ok(monitor) => monitor,
Err(_) => return
};
match self.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
&monitor
) {
Ok(()) => {}
Err(_e) => return
};
let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
let _ = self.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
false
);
}
}

/// Read previously persisted [`ChannelMonitor`]s from the store.
Expand Down Expand Up @@ -718,6 +751,31 @@ where
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
}
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
let monitor_name = MonitorName::from(funding_txo);
let monitor = match self.read_monitor(&monitor_name) {
Ok((_block_hash, monitor)) => monitor,
Err(_) => return
};
match self.kv_store.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
&monitor.encode()
) {
Ok(()) => {},
Err(_e) => {
return
}
};
let _ = self.kv_store.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
false,
);
}
}

impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
Expand Down
29 changes: 29 additions & 0 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use crate::blinded_path::BlindedPath;
use crate::blinded_path::payment::ReceiveTlvs;
use crate::chain;
use crate::chain::chainmonitor::Persist;
use crate::chain::WatchedOutput;
use crate::chain::chaininterface;
use crate::chain::chaininterface::ConfirmationTarget;
Expand Down Expand Up @@ -504,6 +505,10 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
}
res
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
<TestPersister as Persist<TestChannelSigner>>::archive_persisted_channel(&self.persister, funding_txo);
}
}

pub struct TestPersister {
Expand All @@ -516,20 +521,27 @@ pub struct TestPersister {
/// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the
/// MonitorUpdateId here.
pub offchain_monitor_updates: Mutex<HashMap<OutPoint, HashSet<MonitorUpdateId>>>,
/// When we get an archive_persisted_channel call, we insert the OutPoint here.
pub archived_channels: Mutex<HashSet<OutPoint>>,
}
impl TestPersister {
pub fn new() -> Self {
Self {
update_rets: Mutex::new(VecDeque::new()),
chain_sync_monitor_persistences: Mutex::new(new_hash_map()),
offchain_monitor_updates: Mutex::new(new_hash_map()),
archived_channels: Mutex::new(new_hash_set()),
}
}

/// Queue an update status to return.
pub fn set_update_ret(&self, next_ret: chain::ChannelMonitorUpdateStatus) {
self.update_rets.lock().unwrap().push_back(next_ret);
}
// Check if the given OutPoint has been archived.
pub fn is_archived(&self, funding_txo: OutPoint) -> bool {
self.archived_channels.lock().unwrap().contains(&funding_txo)
}
}
impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Signer> for TestPersister {
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<Signer>, _id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
Expand All @@ -552,6 +564,19 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
}
ret
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
self.archived_channels.lock().unwrap().insert(funding_txo);
// remove the channel from the offchain_monitor_updates map
match self.offchain_monitor_updates.lock().unwrap().remove(&funding_txo) {
Some(_) => {},
None => {
// If the channel was not in the offchain_monitor_updates map, it should be in the
// chain_sync_monitor_persistences map.
assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some());
}
};
}
}

pub struct TestStore {
Expand Down Expand Up @@ -1363,6 +1388,10 @@ impl TestChainSource {
watched_outputs: Mutex::new(new_hash_set()),
}
}
pub fn remove_watched_txn_and_outputs(&self, outpoint: OutPoint, txid: Txid, script_pubkey: ScriptBuf) {
self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone()));
self.watched_txn.lock().unwrap().remove(&(txid, script_pubkey));
}
}

impl UtxoLookup for TestChainSource {
Expand Down

0 comments on commit 9962650

Please sign in to comment.