diff --git a/sim-cli/src/parsing.rs b/sim-cli/src/parsing.rs index 65ffdd1f..6c876fe1 100755 --- a/sim-cli/src/parsing.rs +++ b/sim-cli/src/parsing.rs @@ -14,7 +14,7 @@ use simln_lib::{ Simulation, SimulationCfg, WriteResults, }; use simln_lib::{ShortChannelID, SimulationError}; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::fs; use std::path::PathBuf; use std::sync::Arc; @@ -357,12 +357,12 @@ async fn get_clients( nodes: Vec, ) -> Result< ( - HashMap>>, + BTreeMap>>, HashMap, ), LightningError, > { - let mut clients: HashMap>> = HashMap::new(); + let mut clients: BTreeMap>> = BTreeMap::new(); let mut clients_info: HashMap = HashMap::new(); for connection in nodes { @@ -574,7 +574,7 @@ pub async fn parse_sim_params(cli: &Cli) -> anyhow::Result { } pub async fn get_validated_activities( - clients: &HashMap>>, + clients: &BTreeMap>>, nodes_info: HashMap, activity: Vec, ) -> Result, LightningError> { diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index 8570c023..f12c7392 100755 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -11,7 +11,8 @@ use rand::{Rng, RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use random_activity::RandomActivityError; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::cmp::Reverse; +use std::collections::{BTreeMap, BinaryHeap, HashSet}; use std::fmt::{Display, Formatter}; use std::hash::{DefaultHasher, Hash, Hasher}; use std::marker::Send; @@ -591,7 +592,7 @@ pub struct Simulation { /// Config for the simulation itself. cfg: SimulationCfg, /// The lightning node that is being simulated. - nodes: HashMap>>, + nodes: BTreeMap>>, /// Results logger that holds the simulation statistics. results: Arc>, /// Track all tasks spawned for use in the simulation. When used in the `run` method, it will wait for @@ -619,14 +620,14 @@ struct ExecutorKit { source_info: NodeInfo, /// We use an arc mutex here because some implementations of the trait will be very expensive to clone. /// See [NetworkGraphView] for details. - network_generator: Arc>, + network_generator: Arc, payment_generator: Box, } impl Simulation { pub fn new( cfg: SimulationCfg, - nodes: HashMap>>, + nodes: BTreeMap>>, tasks: TaskTracker, clock: Arc, shutdown_trigger: Trigger, @@ -954,7 +955,7 @@ impl Simulation { source_info: description.source.clone(), // Defined activities have very simple generators, so the traits required are implemented on // a single struct which we just cheaply clone. - network_generator: Arc::new(Mutex::new(activity_generator.clone())), + network_generator: Arc::new(activity_generator.clone()), payment_generator: Box::new(activity_generator), }); } @@ -971,7 +972,7 @@ impl Simulation { // Collect capacity of each node from its view of its own channels. Total capacity is divided by two to // avoid double counting capacity (as each node has a counterparty in the channel). let mut generators = Vec::new(); - let mut active_nodes = HashMap::new(); + let mut active_nodes = BTreeMap::new(); // Do a first pass to get the capacity of each node which we need to be able to create a network generator. // While we're at it, we get the node info and store it with capacity to create activity generators in our @@ -994,18 +995,12 @@ impl Simulation { active_nodes.insert(node_info.pubkey, (node_info, capacity)); } - // Create a network generator with a shared RNG for all nodes. - let network_generator = Arc::new(Mutex::new( + let network_generator = Arc::new( NetworkGraphView::new( active_nodes.values().cloned().collect(), MutRng::new(self.cfg.seed.map(|seed| (seed, None))), ) .map_err(SimulationError::RandomActivityError)?, - )); - - log::info!( - "Created network generator: {}.", - network_generator.lock().await ); for (node_info, capacity) in active_nodes.values() { @@ -1076,6 +1071,32 @@ impl Simulation { channels } + fn generate_payments( + &self, + heap: &mut BinaryHeap>, + source: &NodeInfo, + node_generator: &A, + index: usize, + ) -> Result<(), SimulationError> { + let mut current_count = 0; + loop { + let wait = get_payment_delay(current_count, source, node_generator)?; + heap.push(Reverse((wait, index))); + if node_generator.payment_count().is_none() { + return Ok(()); + } + if let Some(c) = node_generator.payment_count() { + if c == current_count { + log::info!( + "Payment count has been met for {source}: {c} payments. Stopping the activity." + ); + return Ok(()); + } + } + current_count += 1; + } + } + /// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present /// for every source node in the set of executors. async fn dispatch_producers( @@ -1084,36 +1105,64 @@ impl Simulation { producer_channels: HashMap>, tasks: &TaskTracker, ) -> Result<(), SimulationError> { - for executor in executors { + let mut heap: BinaryHeap> = BinaryHeap::new(); + for (index, executor) in executors.iter().enumerate() { + self.generate_payments( + &mut heap, + &executor.source_info, + executor.payment_generator.as_ref(), + index, + )?; + } + while let Some(Reverse((wait, index))) = heap.pop() { + let executor = executors + .get(index) + .ok_or(SimulationError::MissingNodeError( + "Missing executor".to_string(), + ))?; let sender = producer_channels.get(&executor.source_info.pubkey).ok_or( SimulationError::RandomActivityError(RandomActivityError::ValueError(format!( "Activity producer for: {} not found.", executor.source_info.pubkey, ))), )?; + let (destination, capacity) = executor + .network_generator + .choose_destination(executor.source_info.pubkey) + .map_err(SimulationError::DestinationGenerationError)?; // pe: produce events let pe_shutdown = self.shutdown_trigger.clone(); let pe_listener = self.shutdown_listener.clone(); let pe_sender = sender.clone(); let clock = self.clock.clone(); - + let source = executor.source_info.clone(); + // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get + // a payment amount something has gone wrong (because we should have validated that we can always + // generate amounts), so we exit. + let amount = match executor.payment_generator.payment_amount(capacity) { + Ok(amt) => { + if amt == 0 { + log::debug!("Skipping zero amount payment for {source} -> {destination}."); + continue; + } + amt + }, + Err(e) => { + return Err(SimulationError::PaymentGenerationError(e)); + }, + }; tasks.spawn(async move { - let source = executor.source_info.clone(); - - log::info!( - "Starting activity producer for {}: {}.", - source, - executor.payment_generator - ); + log::info!("Starting activity producer for {}.", source); if let Err(e) = produce_events( - executor.source_info, - executor.network_generator, - executor.payment_generator, + source.clone(), clock, pe_sender, pe_listener, + wait, + destination, + amount, ) .await { @@ -1124,7 +1173,6 @@ impl Simulation { } }); } - Ok(()) } } @@ -1213,74 +1261,43 @@ async fn consume_events( /// produce events generates events for the activity description provided. It accepts a shutdown listener so it can /// exit if other threads signal that they have errored out. -async fn produce_events( +async fn produce_events( source: NodeInfo, - network_generator: Arc>, - node_generator: Box, clock: Arc, sender: Sender, listener: Listener, + wait: Duration, + destination: NodeInfo, + amount: u64, ) -> Result<(), SimulationError> { - let mut current_count = 0; - loop { - if let Some(c) = node_generator.payment_count() { - if c == current_count { - log::info!( - "Payment count has been met for {source}: {c} payments. Stopping the activity." - ); - return Ok(()); + select! { + biased; + _ = listener.clone() => { + return Ok(()); + }, + // Wait until our time to next payment has elapsed then execute a random amount payment to a random + // destination. + _ = clock.sleep(wait) => { + log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination); + + // Send the payment, exiting if we can no longer send to the consumer. + let event = SimulationEvent::SendPayment(destination.clone(), amount); + select!{ + biased; + _ = listener.clone() => { + return Ok(()); + }, + send_result = sender.send(event.clone()) => { + if send_result.is_err(){ + return Err(SimulationError::MpscChannelError( + format!("Stopped activity producer for {amount}: {source} -> {destination}."))); + } + }, } - } - - let wait = get_payment_delay(current_count, &source, node_generator.as_ref())?; - select! { - biased; - _ = listener.clone() => { - return Ok(()); - }, - // Wait until our time to next payment has elapsed then execute a random amount payment to a random - // destination. - _ = clock.sleep(wait) => { - let (destination, capacity) = network_generator.lock().await.choose_destination(source.pubkey).map_err(SimulationError::DestinationGenerationError)?; - - // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get - // a payment amount something has gone wrong (because we should have validated that we can always - // generate amounts), so we exit. - let amount = match node_generator.payment_amount(capacity) { - Ok(amt) => { - if amt == 0 { - log::debug!("Skipping zero amount payment for {source} -> {destination}."); - continue; - } - amt - }, - Err(e) => { - return Err(SimulationError::PaymentGenerationError(e)); - }, - }; - - log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination); - - // Send the payment, exiting if we can no longer send to the consumer. - let event = SimulationEvent::SendPayment(destination.clone(), amount); - select!{ - biased; - _ = listener.clone() => { - return Ok(()); - }, - send_result = sender.send(event.clone()) => { - if send_result.is_err(){ - return Err(SimulationError::MpscChannelError( - format!("Stopped activity producer for {amount}: {source} -> {destination}."))); - } - }, - } - - current_count += 1; - }, - } } + }; + Ok(()) } /// Gets the wait time for the next payment. If this is the first payment being generated, and a specific start delay @@ -1451,7 +1468,7 @@ async fn run_results_logger( /// out. In the multiple-producer case, a single producer shutting down does not drop *all* sending channels so the /// consumer will not exit and a trigger is required. async fn produce_simulation_results( - nodes: HashMap>>, + nodes: BTreeMap>>, mut output_receiver: Receiver, results: Sender<(Payment, PaymentResult)>, listener: Listener, @@ -1570,7 +1587,7 @@ mod tests { use bitcoin::secp256k1::PublicKey; use bitcoin::Network; use mockall::mock; - use std::collections::HashMap; + use std::collections::BTreeMap; use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -1817,7 +1834,7 @@ mod tests { /// "we don't control any nodes". #[tokio::test] async fn test_validate_node_network_empty_nodes() { - let empty_nodes: HashMap>> = HashMap::new(); + let empty_nodes: BTreeMap>> = BTreeMap::new(); let simulation = test_utils::create_simulation(empty_nodes); let result = simulation.validate_node_network().await; diff --git a/simln-lib/src/sim_node.rs b/simln-lib/src/sim_node.rs index 8a504d53..09b51fce 100755 --- a/simln-lib/src/sim_node.rs +++ b/simln-lib/src/sim_node.rs @@ -8,6 +8,7 @@ use bitcoin::secp256k1::PublicKey; use bitcoin::{Network, ScriptBuf, TxOut}; use lightning::ln::chan_utils::make_funding_redeemscript; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; use std::collections::{hash_map::Entry, HashMap}; use std::fmt::Display; use std::sync::Arc; @@ -1010,8 +1011,8 @@ impl SimGraph { pub async fn ln_node_from_graph( graph: Arc>, routing_graph: Arc>, -) -> HashMap>> { - let mut nodes: HashMap>> = HashMap::new(); +) -> BTreeMap>> { + let mut nodes: BTreeMap>> = BTreeMap::new(); for pk in graph.lock().await.nodes.keys() { nodes.insert( diff --git a/simln-lib/src/test_utils.rs b/simln-lib/src/test_utils.rs index c0aa8c37..d02c6563 100644 --- a/simln-lib/src/test_utils.rs +++ b/simln-lib/src/test_utils.rs @@ -6,7 +6,8 @@ use lightning::ln::features::Features; use mockall::mock; use rand::distributions::Uniform; use rand::Rng; -use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; +use std::collections::BTreeMap; +use std::{fmt, sync::Arc, time::Duration}; use tokio::sync::Mutex; use tokio_util::task::TaskTracker; @@ -96,7 +97,7 @@ mock! { /// Type alias for the result of setup_test_nodes. type TestNodesResult = ( Vec, - HashMap>>, + BTreeMap>>, ); /// A builder for creating mock Lightning nodes for testing purposes. @@ -159,7 +160,7 @@ impl LightningTestNodeBuilder { /// Builds only the client map, omitting node info. /// Useful for network-specific testing. Returns a map of public keys to mocked /// Lightning node clients. - pub fn build_clients_only(self) -> HashMap>> { + pub fn build_clients_only(self) -> BTreeMap>> { let (_, clients) = self.build_full(); clients } @@ -170,7 +171,7 @@ impl LightningTestNodeBuilder { pub fn build_full(self) -> TestNodesResult { let nodes = create_nodes(self.node_count, self.initial_balance); let mut node_infos = Vec::new(); - let mut clients: HashMap>> = HashMap::new(); + let mut clients: BTreeMap>> = BTreeMap::new(); for (idx, (mut node_info, _)) in nodes.into_iter().enumerate() { if self.keysend_indices.contains(&idx) { @@ -198,7 +199,7 @@ impl LightningTestNodeBuilder { /// Creates a new simulation with the given clients and activity definitions. /// Note: This sets a runtime for the simulation of 0, so run() will exit immediately. pub fn create_simulation( - clients: HashMap>>, + clients: BTreeMap>>, ) -> Simulation { let (shutdown_trigger, shutdown_listener) = triggered::trigger(); Simulation::new(