diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 64fe84a09a..0322f10c9d 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -10,28 +10,31 @@ //! //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager +use bitcoin::{BlockHash, Txid}; use core::cmp; use core::ops::Deref; use core::str::FromStr; -use bitcoin::{BlockHash, Txid}; -use crate::{io, log_error}; use crate::prelude::*; +use crate::{io, log_error}; use crate::chain; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::Persist; -use crate::sign::{EntropySource, ecdsa::EcdsaChannelSigner, SignerProvider}; +use crate::chain::channelmonitor::{ + ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID, +}; use crate::chain::transaction::OutPoint; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID}; use crate::ln::channelmanager::AChannelManager; use crate::routing::gossip::NetworkGraph; use crate::routing::scoring::WriteableScore; +use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, Writeable}; /// The alphabet of characters allowed for namespaces and keys. -pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"; +pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"; /// The maximum number of characters namespaces and keys may have. pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120; @@ -124,12 +127,16 @@ pub trait KVStore { /// `primary_namespace` and `secondary_namespace`. /// /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound - fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Result, io::Error>; + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error>; /// Persists the given data under the given `key`. /// /// Will create the given `primary_namespace` and `secondary_namespace` if not already present /// in the store. - fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Result<(), io::Error>; + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> Result<(), io::Error>; /// Removes any data that had previously been persisted under the given `key`. /// /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily @@ -145,13 +152,17 @@ pub trait KVStore { /// Returns successfully if no data will be stored for the given `primary_namespace`, /// `secondary_namespace`, and `key`, independently of whether it was present before its /// invokation or not. - fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Result<(), io::Error>; + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), io::Error>; /// Returns a list of keys that are stored under the given `secondary_namespace` in /// `primary_namespace`. /// /// Returns the keys in arbitrary order, so users requiring a particular order need to sort the /// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown. - fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Result, io::Error>; + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error>; } /// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk. @@ -175,7 +186,6 @@ where fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>; } - impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A where CM::Target: 'static + AChannelManager, @@ -183,24 +193,30 @@ where S::Target: WriteableScore<'a>, { fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> { - self.write(CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + self.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.get_cm().encode()) + &channel_manager.get_cm().encode(), + ) } fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { - self.write(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + self.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode()) + &network_graph.encode(), + ) } fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> { - self.write(SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + self.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, - &scorer.encode()) + &scorer.encode(), + ) } } @@ -210,27 +226,34 @@ impl Persist) -> chain::ChannelMonitorUpdateStatus { + fn persist_new_channel( + &self, funding_txo: OutPoint, monitor: &ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index); match self.write( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - &key, &monitor.encode()) - { + &key, + &monitor.encode(), + ) { Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError + Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError, } } - fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor) -> chain::ChannelMonitorUpdateStatus { + fn update_persisted_channel( + &self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, + monitor: &ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index); match self.write( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - &key, &monitor.encode()) - { + &key, + &monitor.encode(), + ) { Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError + Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError, } } @@ -242,7 +265,7 @@ impl Persist monitor, - Err(_) => return + Err(_) => return, }; match self.write( ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, @@ -250,8 +273,8 @@ impl Persist {} - Err(_e) => return + Ok(()) => {}, + Err(_e) => return, }; let _ = self.remove( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, @@ -274,12 +297,14 @@ where let mut res = Vec::new(); for stored_key in kv_store.list( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE)? - { + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + )? { if stored_key.len() < 66 { return Err(io::Error::new( io::ErrorKind::InvalidData, - "Stored key has invalid length")); + "Stored key has invalid length", + )); } let txid = Txid::from_str(stored_key.split_at(64).0).map_err(|_| { @@ -291,8 +316,11 @@ where })?; match <(BlockHash, ChannelMonitor<::EcdsaSigner>)>::read( - &mut io::Cursor::new( - kv_store.read(CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, &stored_key)?), + &mut io::Cursor::new(kv_store.read( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + &stored_key, + )?), (&*entropy_source, &*signer_provider), ) { Ok((block_hash, channel_monitor)) => { @@ -305,13 +333,13 @@ where )); } res.push((block_hash, channel_monitor)); - } + }, Err(_) => { return Err(io::Error::new( io::ErrorKind::InvalidData, - "Failed to read ChannelMonitor" + "Failed to read ChannelMonitor", )) - } + }, } } Ok(res) @@ -407,7 +435,7 @@ where ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, BI::Target: BroadcasterInterface, - FE::Target: FeeEstimator + FE::Target: FeeEstimator, { kv_store: K, logger: L, @@ -415,7 +443,7 @@ where entropy_source: ES, signer_provider: SP, broadcaster: BI, - fee_estimator: FE + fee_estimator: FE, } #[allow(dead_code)] @@ -427,7 +455,7 @@ where ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, BI::Target: BroadcasterInterface, - FE::Target: FeeEstimator + FE::Target: FeeEstimator, { /// Constructs a new [`MonitorUpdatingPersister`]. /// @@ -447,7 +475,7 @@ where /// [`MonitorUpdatingPersister::cleanup_stale_updates`]. pub fn new( kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, - signer_provider: SP, broadcaster: BI, fee_estimator: FE + signer_provider: SP, broadcaster: BI, fee_estimator: FE, ) -> Self { MonitorUpdatingPersister { kv_store, @@ -456,7 +484,7 @@ where entropy_source, signer_provider, broadcaster, - fee_estimator + fee_estimator, } } @@ -465,7 +493,12 @@ where /// It is extremely important that your [`KVStore::read`] implementation uses the /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the /// documentation for [`MonitorUpdatingPersister`]. - pub fn read_all_channel_monitors_with_updates(&self) -> Result::EcdsaSigner>)>, io::Error> { + pub fn read_all_channel_monitors_with_updates( + &self, + ) -> Result< + Vec<(BlockHash, ChannelMonitor<::EcdsaSigner>)>, + io::Error, + > { let monitor_list = self.kv_store.list( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, @@ -496,7 +529,8 @@ where /// function to accomplish this. Take care to limit the number of parallel readers. pub fn read_channel_monitor_with_updates( &self, monitor_key: String, - ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> { + ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + { let monitor_name = MonitorName::new(monitor_key)?; let (block_hash, monitor) = self.read_monitor(&monitor_name)?; let mut current_update_id = monitor.get_latest_update_id(); @@ -511,21 +545,22 @@ where Err(err) if err.kind() == io::ErrorKind::NotFound => { // We can't find any more updates, so we are done. break; - } + }, Err(err) => return Err(err), }; - monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger) + monitor + .update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger) .map_err(|e| { - log_error!( - self.logger, - "Monitor update failed. monitor: {} update: {} reason: {:?}", - monitor_name.as_str(), - update_name.as_str(), - e - ); - io::Error::new(io::ErrorKind::Other, "Monitor update failed") - })?; + log_error!( + self.logger, + "Monitor update failed. monitor: {} update: {} reason: {:?}", + monitor_name.as_str(), + update_name.as_str(), + e + ); + io::Error::new(io::ErrorKind::Other, "Monitor update failed") + })?; } Ok((block_hash, monitor)) } @@ -533,7 +568,8 @@ where /// Read a channel monitor. fn read_monitor( &self, monitor_name: &MonitorName, - ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> { + ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> + { let outpoint: OutPoint = monitor_name.try_into()?; let mut monitor_cursor = io::Cursor::new(self.kv_store.read( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, @@ -564,7 +600,7 @@ where } else { Ok((blockhash, channel_monitor)) } - } + }, Err(e) => { log_error!( self.logger, @@ -573,7 +609,7 @@ where e, ); Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor")) - } + }, } } @@ -613,9 +649,10 @@ where for monitor_key in monitor_keys { let monitor_name = MonitorName::new(monitor_key)?; let (_, current_monitor) = self.read_monitor(&monitor_name)?; - let updates = self - .kv_store - .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())?; + let updates = self.kv_store.list( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_name.as_str(), + )?; for update in updates { let update_name = UpdateName::new(update)?; // if the update_id is lower than the stored monitor, delete @@ -633,20 +670,27 @@ where } } -impl - Persist for MonitorUpdatingPersister +impl< + ChannelSigner: EcdsaChannelSigner, + K: Deref, + L: Deref, + ES: Deref, + SP: Deref, + BI: Deref, + FE: Deref, + > Persist for MonitorUpdatingPersister where K::Target: KVStore, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, BI::Target: BroadcasterInterface, - FE::Target: FeeEstimator + FE::Target: FeeEstimator, { /// Persists a new channel. This means writing the entire monitor to the /// parametrized [`KVStore`]. fn persist_new_channel( - &self, funding_txo: OutPoint, monitor: &ChannelMonitor + &self, funding_txo: OutPoint, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { // Determine the proper key for this monitor let monitor_name = MonitorName::from(funding_txo); @@ -662,9 +706,7 @@ where monitor_name.as_str(), &monitor_bytes, ) { - Ok(_) => { - chain::ChannelMonitorUpdateStatus::Completed - } + Ok(_) => chain::ChannelMonitorUpdateStatus::Completed, Err(e) => { log_error!( self.logger, @@ -675,7 +717,7 @@ where e ); chain::ChannelMonitorUpdateStatus::UnrecoverableError - } + }, } } @@ -690,7 +732,7 @@ where /// - The update is at [`CLOSED_CHANNEL_UPDATE_ID`] fn update_persisted_channel( &self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>, - monitor: &ChannelMonitor + monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { if let Some(update) = update { if update.update_id != CLOSED_CHANNEL_UPDATE_ID @@ -715,7 +757,7 @@ where e ); chain::ChannelMonitorUpdateStatus::UnrecoverableError - } + }, } } else { let monitor_name = MonitorName::from(funding_txo); @@ -723,29 +765,30 @@ where // the new one in order to determine the cleanup range. let maybe_old_monitor = match monitor.get_latest_update_id() { CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(), - _ => None + _ => None, }; // We could write this update, but it meets criteria of our design that calls for a full monitor write. let monitor_update_status = self.persist_new_channel(funding_txo, monitor); if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status { - let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID { - // If there is an error while reading old monitor, we skip clean up. - maybe_old_monitor.map(|(_, ref old_monitor)| { - let start = old_monitor.get_latest_update_id(); - // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID - let end = cmp::min( - start.saturating_add(self.maximum_pending_updates), - CLOSED_CHANNEL_UPDATE_ID - 1, - ); - (start, end) - }) - } else { - let end = monitor.get_latest_update_id(); - let start = end.saturating_sub(self.maximum_pending_updates); - Some((start, end)) - }; + let cleanup_range = + if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID { + // If there is an error while reading old monitor, we skip clean up. + maybe_old_monitor.map(|(_, ref old_monitor)| { + let start = old_monitor.get_latest_update_id(); + // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID + let end = cmp::min( + start.saturating_add(self.maximum_pending_updates), + CLOSED_CHANNEL_UPDATE_ID - 1, + ); + (start, end) + }) + } else { + let end = monitor.get_latest_update_id(); + let start = end.saturating_sub(self.maximum_pending_updates); + Some((start, end)) + }; if let Some((start, end)) = cleanup_range { self.cleanup_in_range(monitor_name, start, end); @@ -765,7 +808,7 @@ where let monitor_key = monitor_name.as_str().to_string(); let monitor = match self.read_channel_monitor_with_updates(monitor_key) { Ok((_block_hash, monitor)) => monitor, - Err(_) => return + Err(_) => return, }; match self.kv_store.write( ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, @@ -773,7 +816,7 @@ where monitor_name.as_str(), &monitor.encode(), ) { - Ok(()) => {} + Ok(()) => {}, Err(_e) => return, }; let _ = self.kv_store.remove( @@ -785,14 +828,15 @@ where } } -impl MonitorUpdatingPersister +impl + MonitorUpdatingPersister where ES::Target: EntropySource + Sized, K::Target: KVStore, L::Target: Logger, SP::Target: SignerProvider + Sized, BI::Target: BroadcasterInterface, - FE::Target: FeeEstimator + FE::Target: FeeEstimator, { // Cleans up monitor updates for given monitor in range `start..=end`. fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) { @@ -883,7 +927,7 @@ impl UpdateName { Ok(u) => Ok(u.into()), Err(_) => { Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name")) - } + }, } } @@ -905,10 +949,10 @@ mod tests { use crate::chain::ChannelMonitorUpdateStatus; use crate::events::{ClosureReason, MessageSendEventsProvider}; use crate::ln::functional_test_utils::*; - use crate::util::test_utils::{self, TestLogger, TestStore}; - use crate::{check_added_monitors, check_closed_broadcast}; use crate::sync::Arc; use crate::util::test_channel_signer::TestChannelSigner; + use crate::util::test_utils::{self, TestLogger, TestStore}; + use crate::{check_added_monitors, check_closed_broadcast}; const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5; @@ -928,23 +972,44 @@ mod tests { #[test] fn monitor_from_outpoint_works() { let monitor_name1 = MonitorName::from(OutPoint { - txid: Txid::from_str("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(), + txid: Txid::from_str( + "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef", + ) + .unwrap(), index: 1, }); - assert_eq!(monitor_name1.as_str(), "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1"); + assert_eq!( + monitor_name1.as_str(), + "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1" + ); let monitor_name2 = MonitorName::from(OutPoint { - txid: Txid::from_str("f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef").unwrap(), + txid: Txid::from_str( + "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef", + ) + .unwrap(), index: u16::MAX, }); - assert_eq!(monitor_name2.as_str(), "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535"); + assert_eq!( + monitor_name2.as_str(), + "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535" + ); } #[test] fn bad_monitor_string_fails() { - assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()).is_err()); - assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()).is_err()); - assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()).is_err()); + assert!(MonitorName::new( + "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string() + ) + .is_err()); + assert!(MonitorName::new( + "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string() + ) + .is_err()); + assert!(MonitorName::new( + "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string() + ) + .is_err()); } // Exercise the `MonitorUpdatingPersister` with real channels and payments. @@ -997,15 +1062,18 @@ mod tests { // Check that the persisted channel data is empty before any channels are // open. - let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); + let mut persisted_chan_data_0 = + persister_0.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_0.len(), 0); - let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); + let mut persisted_chan_data_1 = + persister_1.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_1.len(), 0); // Helper to make sure the channel is on the expected update ID. macro_rules! check_persisted_data { ($expected_update_id: expr) => { - persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); + persisted_chan_data_0 = + persister_0.read_all_channel_monitors_with_updates().unwrap(); // check that we stored only one monitor assert_eq!(persisted_chan_data_0.len(), 1); for (_, mon) in persisted_chan_data_0.iter() { @@ -1015,26 +1083,41 @@ mod tests { // if the CM is at consolidation threshold, ensure no updates are stored. let monitor_name = MonitorName::from(mon.get_funding_txo().0); if mon.get_latest_update_id() % persister_0_max_pending_updates == 0 - || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID { + || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID + { assert_eq!( - persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_name.as_str()).unwrap().len(), + persister_0 + .kv_store + .list( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_name.as_str() + ) + .unwrap() + .len(), 0, "updates stored when they shouldn't be in persister 0" ); } } - persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); + persisted_chan_data_1 = + persister_1.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_1.len(), 1); for (_, mon) in persisted_chan_data_1.iter() { assert_eq!(mon.get_latest_update_id(), $expected_update_id); let monitor_name = MonitorName::from(mon.get_funding_txo().0); // if the CM is at consolidation threshold, ensure no updates are stored. if mon.get_latest_update_id() % persister_1_max_pending_updates == 0 - || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID { + || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID + { assert_eq!( - persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - monitor_name.as_str()).unwrap().len(), + persister_1 + .kv_store + .list( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_name.as_str() + ) + .unwrap() + .len(), 0, "updates stored when they shouldn't be in persister 1" ); @@ -1073,19 +1156,47 @@ mod tests { // updates. let error_message = "Channel force-closed"; - nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id(), error_message.to_string()).unwrap(); + nodes[0] + .node + .force_close_broadcasting_latest_txn( + &nodes[0].node.list_channels()[0].channel_id, + &nodes[1].node.get_our_node_id(), + error_message.to_string(), + ) + .unwrap(); - check_closed_event(&nodes[0], 1, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }, false, &[nodes[1].node.get_our_node_id()], 100000); + check_closed_event( + &nodes[0], + 1, + ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }, + false, + &[nodes[1].node.get_our_node_id()], + 100000, + ); check_closed_broadcast!(nodes[0], true); check_added_monitors!(nodes[0], 1); let node_txn = nodes[0].tx_broadcaster.txn_broadcast(); assert_eq!(node_txn.len(), 1); - connect_block(&nodes[1], &create_dummy_block(nodes[0].best_block_hash(), 42, vec![node_txn[0].clone(), node_txn[0].clone()])); + connect_block( + &nodes[1], + &create_dummy_block( + nodes[0].best_block_hash(), + 42, + vec![node_txn[0].clone(), node_txn[0].clone()], + ), + ); check_closed_broadcast!(nodes[1], true); - check_closed_event(&nodes[1], 1, ClosureReason::CommitmentTxConfirmed, false, &[nodes[0].node.get_our_node_id()], 100000); + check_closed_event( + &nodes[1], + 1, + ClosureReason::CommitmentTxConfirmed, + false, + &[nodes[0].node.get_our_node_id()], + 100000, + ); check_added_monitors!(nodes[1], 1); // Make sure everything is persisted as expected after close. @@ -1096,8 +1207,22 @@ mod tests { let (_, monitor) = &persisted_chan_data[0]; let monitor_name = MonitorName::from(monitor.get_funding_txo().0); // The channel should have 0 updates, as it wrote a full monitor and consolidated. - assert_eq!(persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0); - assert_eq!(persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0); + assert_eq!( + persister_0 + .kv_store + .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()) + .unwrap() + .len(), + 0 + ); + assert_eq!( + persister_1 + .kv_store + .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()) + .unwrap() + .len(), + 0 + ); } // Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a @@ -1112,13 +1237,33 @@ mod tests { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let chan = create_announced_chan_between_nodes(&nodes, 0, 1); let error_message = "Channel force-closed"; - nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id(), error_message.to_string()).unwrap(); - check_closed_event(&nodes[1], 1, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }, false, &[nodes[0].node.get_our_node_id()], 100000); + nodes[1] + .node + .force_close_broadcasting_latest_txn( + &chan.2, + &nodes[0].node.get_our_node_id(), + error_message.to_string(), + ) + .unwrap(); + check_closed_event( + &nodes[1], + 1, + ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }, + false, + &[nodes[0].node.get_our_node_id()], + 100000, + ); { let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap(); let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0]; - let test_txo = OutPoint { txid: Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 }; + let test_txo = OutPoint { + txid: Txid::from_str( + "8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be", + ) + .unwrap(), + index: 0, + }; let ro_persister = MonitorUpdatingPersister { kv_store: &TestStore::new(true), @@ -1132,24 +1277,24 @@ mod tests { match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) { ChannelMonitorUpdateStatus::UnrecoverableError => { // correct result - } + }, ChannelMonitorUpdateStatus::Completed => { panic!("Completed persisting new channel when shouldn't have") - } + }, ChannelMonitorUpdateStatus::InProgress => { panic!("Returned InProgress when shouldn't have") - } + }, } match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1) { ChannelMonitorUpdateStatus::UnrecoverableError => { // correct result - } + }, ChannelMonitorUpdateStatus::Completed => { panic!("Completed persisting new channel when shouldn't have") - } + }, ChannelMonitorUpdateStatus::InProgress => { panic!("Returned InProgress when shouldn't have") - } + }, } added_monitors.clear(); } @@ -1219,7 +1364,12 @@ mod tests { let monitor_name = MonitorName::from(monitor.get_funding_txo().0); persister_0 .kv_store - .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str(), &[0u8; 1]) + .write( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_name.as_str(), + UpdateName::from(1).as_str(), + &[0u8; 1], + ) .unwrap(); // Do the stale update cleanup @@ -1228,20 +1378,43 @@ mod tests { // Confirm the stale update is unreadable/gone assert!(persister_0 .kv_store - .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str()) + .read( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_name.as_str(), + UpdateName::from(1).as_str() + ) .is_err()); // Force close. let error_message = "Channel force-closed"; - nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id(), error_message.to_string()).unwrap(); - check_closed_event(&nodes[0], 1, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }, false, &[nodes[1].node.get_our_node_id()], 100000); + nodes[0] + .node + .force_close_broadcasting_latest_txn( + &nodes[0].node.list_channels()[0].channel_id, + &nodes[1].node.get_our_node_id(), + error_message.to_string(), + ) + .unwrap(); + check_closed_event( + &nodes[0], + 1, + ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }, + false, + &[nodes[1].node.get_our_node_id()], + 100000, + ); check_closed_broadcast!(nodes[0], true); check_added_monitors!(nodes[0], 1); // Write an update near u64::MAX persister_0 .kv_store - .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str(), &[0u8; 1]) + .write( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_name.as_str(), + UpdateName::from(u64::MAX - 1).as_str(), + &[0u8; 1], + ) .unwrap(); // Do the stale update cleanup @@ -1250,11 +1423,18 @@ mod tests { // Confirm the stale update is unreadable/gone assert!(persister_0 .kv_store - .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str()) + .read( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_name.as_str(), + UpdateName::from(u64::MAX - 1).as_str() + ) .is_err()); } - fn persist_fn(_persist: P) -> bool where P::Target: Persist { + fn persist_fn(_persist: P) -> bool + where + P::Target: Persist, + { true }