From 8d5df479c625613047b24053eb60136b5b60f96e Mon Sep 17 00:00:00 2001 From: Blake Johnson Date: Fri, 21 Mar 2025 12:14:19 -0500 Subject: [PATCH 1/8] Refactoring the shutdown process to fix a payment count bug --- simln-lib/src/lib.rs | 460 +++++++++++++++++++++---------------------- 1 file changed, 227 insertions(+), 233 deletions(-) diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index 47dba7b0..9c15ac66 100755 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -21,7 +21,7 @@ use std::{collections::HashMap, sync::Arc, time::SystemTime}; use thiserror::Error; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; -use tokio::{select, time, time::Duration}; +use tokio::{select, time, time::Duration, time::Instant}; use tokio_util::task::TaskTracker; use triggered::{Listener, Trigger}; @@ -713,73 +713,75 @@ impl Simulation { self.nodes.len() ); - // Before we start the simulation up, start tasks that will be responsible for gathering simulation data. - // The event channels are shared across our functionality: - // - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the - // final result of the event that it has taken. - // - Event Receiver: used by data reporting to receive events that have been simulated that need to be - // tracked and recorded. - let (event_sender, event_receiver) = channel(1); - self.run_data_collection(event_receiver, &self.tasks); - - // Get an execution kit per activity that we need to generate and spin up consumers for each source node. - let activities = match self.activity_executors().await { - Ok(a) => a, - Err(e) => { - // If we encounter an error while setting up the activity_executors, - // we need to shutdown and return. - self.shutdown(); - return Err(e); - }, - }; - let consumer_channels = self.dispatch_consumers( - activities - .iter() - .map(|generator| generator.source_info.pubkey) - .collect(), - event_sender.clone(), - &self.tasks, - ); - - // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity. - // The producers will use their own TaskTracker so that the simulation can be shutdown if they all finish. - let producer_tasks = TaskTracker::new(); - match self - .dispatch_producers(activities, consumer_channels, &producer_tasks) - .await { - Ok(_) => {}, - Err(e) => { - // If we encounter an error in dispatch_producers, we need to shutdown and return. - self.shutdown(); - return Err(e); - }, - } - - // Start a task that waits for the producers to finish. - // If all producers finish, then there is nothing left to do and the simulation can be shutdown. - let producer_trigger = self.shutdown_trigger.clone(); - self.tasks.spawn(async move { - producer_tasks.close(); - producer_tasks.wait().await; - log::info!("All producers finished. Shutting down."); - producer_trigger.trigger() - }); + // Before we start the simulation up, start tasks that will be responsible for gathering simulation data. + // The event channels are shared across our functionality: + // - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the + // final result of the event that it has taken. + // - Event Receiver: used by data reporting to receive events that have been simulated that need to be + // tracked and recorded. + let (event_sender, event_receiver) = channel(1); + self.run_data_collection(event_receiver, &self.tasks); + + // Get an execution kit per activity that we need to generate and spin up consumers for each source node. + let activities = match self.activity_executors().await { + Ok(a) => a, + Err(e) => { + // If we encounter an error while setting up the activity_executors, + // we need to shutdown and return. + self.shutdown(); + return Err(e); + }, + }; + let consumer_channels = self.dispatch_consumers( + activities + .iter() + .map(|generator| generator.source_info.pubkey) + .collect(), + event_sender.clone(), + &self.tasks, + ); - // Start a task that will shutdown the simulation if the total_time is met. - if let Some(total_time) = self.cfg.total_time { - let t = self.shutdown_trigger.clone(); - let l = self.shutdown_listener.clone(); + // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity. + // The producers will use their own TaskTracker so that the simulation can be shutdown if they all finish. + let producer_tasks = TaskTracker::new(); + match self + .dispatch_producers(activities, consumer_channels, &producer_tasks) + .await + { + Ok(_) => {}, + Err(e) => { + // If we encounter an error in dispatch_producers, we need to shutdown and return. + self.shutdown(); + return Err(e); + }, + } + // Start a task that waits for the producers to finish. + // If all producers finish, then there is nothing left to do and the simulation can be shutdown. + let producer_trigger = self.shutdown_trigger.clone(); self.tasks.spawn(async move { - if time::timeout(total_time, l).await.is_err() { - log::info!( - "Simulation run for {}s. Shutting down.", - total_time.as_secs() - ); - t.trigger() - } + producer_tasks.close(); + producer_tasks.wait().await; + log::info!("All producers finished. Shutting down."); + producer_trigger.trigger() }); + + // Start a task that will shutdown the simulation if the total_time is met. + if let Some(total_time) = self.cfg.total_time { + let t = self.shutdown_trigger.clone(); + let l = self.shutdown_listener.clone(); + + self.tasks.spawn(async move { + if time::timeout(total_time, l).await.is_err() { + log::info!( + "Simulation run for {}s. Shutting down.", + total_time.as_secs() + ); + t.trigger() + } + }); + } } Ok(()) @@ -853,13 +855,8 @@ impl Simulation { let csr_write_results = self.cfg.write_results.clone(); tasks.spawn(async move { log::debug!("Starting simulation results consumer."); - if let Err(e) = consume_simulation_results( - result_logger, - results_receiver, - listener, - csr_write_results, - ) - .await + if let Err(e) = + consume_simulation_results(result_logger, results_receiver, csr_write_results).await { shutdown.trigger(); log::error!("Consume simulation results exited with error: {e:?}."); @@ -988,16 +985,13 @@ impl Simulation { // Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull // events from and the results sender to report the events it has triggered for further monitoring. // ce: consume event - let ce_listener = self.shutdown_listener.clone(); let ce_shutdown = self.shutdown_trigger.clone(); let ce_output_sender = output_sender.clone(); let ce_node = node.clone(); tasks.spawn(async move { let node_info = ce_node.lock().await.get_info().clone(); log::debug!("Starting events consumer for {}.", node_info); - if let Err(e) = - consume_events(ce_node, receiver, ce_output_sender, ce_listener).await - { + if let Err(e) = consume_events(ce_node, receiver, ce_output_sender).await { ce_shutdown.trigger(); log::error!("Event consumer for node {node_info} exited with error: {e:?}."); } else { @@ -1065,77 +1059,65 @@ async fn consume_events( node: Arc>, mut receiver: Receiver, sender: Sender, - listener: Listener, ) -> Result<(), SimulationError> { loop { - select! { - biased; - _ = listener.clone() => { - return Ok(()); - }, - simulation_event = receiver.recv() => { - if let Some(event) = simulation_event { - match event { - SimulationEvent::SendPayment(dest, amt_msat) => { - let mut node = node.lock().await; - - let mut payment = Payment { - source: node.get_info().pubkey, - hash: None, - amount_msat: amt_msat, - destination: dest.pubkey, - dispatch_time: SystemTime::now(), - }; - - let outcome = match node.send_payment(dest.pubkey, amt_msat).await { - Ok(payment_hash) => { - log::debug!( - "Send payment: {} -> {}: ({}).", - node.get_info(), - dest, - hex::encode(payment_hash.0) - ); - // We need to track the payment outcome using the payment hash that we have received. - payment.hash = Some(payment_hash); - SimulationOutput::SendPaymentSuccess(payment) - } - Err(e) => { - log::error!( - "Error while sending payment {} -> {}.", - node.get_info(), - dest - ); - - match e { - LightningError::PermanentError(s) => { - return Err(SimulationError::LightningError(LightningError::PermanentError(s))); - } - _ => SimulationOutput::SendPaymentFailure( - payment, - PaymentResult::not_dispatched(), - ), - } - } - }; - - select!{ - biased; - _ = listener.clone() => { - return Ok(()) - } - send_result = sender.send(outcome.clone()) => { - if send_result.is_err() { - return Err(SimulationError::MpscChannelError( - format!("Error sending simulation output {outcome:?}."))); - } - } + let simulation_event = receiver.recv().await; + if let Some(event) = simulation_event { + match event { + SimulationEvent::SendPayment(dest, amt_msat) => { + let mut node = node.lock().await; + + let mut payment = Payment { + source: node.get_info().pubkey, + hash: None, + amount_msat: amt_msat, + destination: dest.pubkey, + dispatch_time: SystemTime::now(), + }; + + let outcome = match node.send_payment(dest.pubkey, amt_msat).await { + Ok(payment_hash) => { + log::debug!( + "Send payment: {} -> {}: ({}).", + node.get_info(), + dest, + hex::encode(payment_hash.0) + ); + // We need to track the payment outcome using the payment hash that we have received. + payment.hash = Some(payment_hash); + SimulationOutput::SendPaymentSuccess(payment) + }, + Err(e) => { + log::error!( + "Error while sending payment {} -> {}.", + node.get_info(), + dest + ); + + match e { + LightningError::PermanentError(s) => { + return Err(SimulationError::LightningError( + LightningError::PermanentError(s), + )); + }, + _ => SimulationOutput::SendPaymentFailure( + payment, + PaymentResult::not_dispatched(), + ), } - } + }, + }; + + let send_result = sender.send(outcome.clone()).await; + if send_result.is_err() { + return Err(SimulationError::MpscChannelError(format!( + "Error sending simulation output {outcome:?}." + ))); } - } else { - return Ok(()) - } + }, } + } else { + return Ok(()); } } } @@ -1246,7 +1228,6 @@ fn get_payment_delay( async fn consume_simulation_results( logger: Arc>, mut receiver: Receiver<(Payment, PaymentResult)>, - listener: Listener, write_results: Option, ) -> Result<(), SimulationError> { let mut writer = match write_results { @@ -1264,36 +1245,28 @@ async fn consume_simulation_results( let mut counter = 1; loop { - select! { - biased; - _ = listener.clone() => { - writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| { - SimulationError::FileError - }))?; - return Ok(()); - }, - payment_result = receiver.recv() => { - match payment_result { - Some((details, result)) => { - logger.lock().await.report_result(&details, &result); - log::trace!("Resolved dispatched payment: {} with: {}.", details, result); - - if let Some((ref mut w, batch_size)) = writer { - w.serialize((details, result)).map_err(|e| { - let _ = w.flush(); - SimulationError::CsvError(e) - })?; - counter = counter % batch_size + 1; - if batch_size == counter { - w.flush().map_err(|_| { - SimulationError::FileError - })?; - } - } - }, - None => return writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| SimulationError::FileError)), + let payment_result = receiver.recv().await; + match payment_result { + Some((details, result)) => { + logger.lock().await.report_result(&details, &result); + log::trace!("Resolved dispatched payment: {} with: {}.", details, result); + + if let Some((ref mut w, batch_size)) = writer { + w.serialize((details, result)).map_err(|e| { + let _ = w.flush(); + SimulationError::CsvError(e) + })?; + counter = counter % batch_size + 1; + if batch_size == counter { + w.flush().map_err(|_| SimulationError::FileError)?; + } } - } + }, + None => { + return writer.map_or(Ok(()), |(ref mut w, _)| { + w.flush().map_err(|_| SimulationError::FileError) + }) + }, } } } @@ -1384,44 +1357,37 @@ async fn produce_simulation_results( tasks: &TaskTracker, ) -> Result<(), SimulationError> { let result = loop { - tokio::select! { - biased; - _ = listener.clone() => { - break Ok(()) - }, - output = output_receiver.recv() => { - match output { - Some(simulation_output) => { - match simulation_output{ - SimulationOutput::SendPaymentSuccess(payment) => { - if let Some(source_node) = nodes.get(&payment.source) { - tasks.spawn(track_payment_result( - source_node.clone(), results.clone(), payment, listener.clone() - )); - } else { - break Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source))); - } - }, - SimulationOutput::SendPaymentFailure(payment, result) => { - select!{ - _ = listener.clone() => { - return Ok(()); - }, - send_result = results.send((payment, result.clone())) => { - if send_result.is_err(){ - break Err(SimulationError::MpscChannelError( - format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", - payment.hash, payment.dispatch_time), - )); - } - }, - } - } - }; + let output = output_receiver.recv().await; + match output { + Some(simulation_output) => { + match simulation_output { + SimulationOutput::SendPaymentSuccess(payment) => { + if let Some(source_node) = nodes.get(&payment.source) { + tasks.spawn(track_payment_result( + source_node.clone(), + results.clone(), + payment, + listener.clone(), + )); + } else { + break Err(SimulationError::MissingNodeError(format!( + "Source node with public key: {} unavailable.", + payment.source + ))); + } }, - None => break Ok(()) - } - } + SimulationOutput::SendPaymentFailure(payment, result) => { + let send_result = results.send((payment, result.clone())).await; + if send_result.is_err() { + break Err(SimulationError::MpscChannelError( + format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", + payment.hash, payment.dispatch_time), + )); + } + }, + }; + }, + None => break Ok(()), } }; @@ -1443,21 +1409,45 @@ async fn track_payment_result( let res = match payment.hash { Some(hash) => { log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0)); - let track_payment = node.track_payment(&hash, listener.clone()); - - match track_payment.await { - Ok(res) => { - log::debug!( - "Track payment {} result: {:?}.", - hex::encode(hash.0), - res.payment_outcome - ); - res - }, - Err(e) => { - log::error!("Track payment failed for {}: {e}.", hex::encode(hash.0)); - PaymentResult::track_payment_failed() - }, + + // Trigger and listener to stop the implementation specific track payment functions (node.track_payment()) + let (stop, listen) = triggered::trigger(); + + // Timer for waiting after getting the shutdown signal in order for current tracking to complete + let mut timer: Option = None; + + loop { + tokio::select! { + biased; + // The shutdown listener is triggered and we have not started a timer yet + _ = async {}, if listener.clone().is_triggered() && timer.is_none() => { + log::debug!("Shutdown received by track_payment_result, starting timer..."); + timer = Some(time::sleep_until(Instant::now() + Duration::from_secs(3))); + }, + // The timer has been started and it expires + Some(_) = conditional_sleeper(timer) => { + log::error!("Track payment failed for {}. The shutdown timer expired.", hex::encode(hash.0)); + stop.trigger(); + timer = None; + } + // The payment tracking completes + res = node.track_payment(&hash, listen.clone()) => { + match res { + Ok(res) => { + log::info!( + "Track payment {} result: {:?}.", + hex::encode(hash.0), + res.payment_outcome + ); + break res; + }, + Err(e) => { + log::error!("Track payment failed for {}: {e}.", hex::encode(hash.0)); + break PaymentResult::track_payment_failed(); + }, + } + } + } } }, // None means that the payment was not dispatched, so we cannot track it. @@ -1469,17 +1459,11 @@ async fn track_payment_result( }, }; - select! { - biased; - _ = listener.clone() => { - log::debug!("Track payment result received a shutdown signal."); - }, - send_payment_result = results.send((payment, res.clone())) => { - if send_payment_result.is_err() { - return Err(SimulationError::MpscChannelError( - format!("Failed to send payment result {res} for payment {payment}."))) - } - } + let send_payment_result = results.send((payment, res.clone())).await; + if send_payment_result.is_err() { + return Err(SimulationError::MpscChannelError(format!( + "Failed to send payment result {res} for payment {payment}." + ))); } log::trace!("Result tracking complete. Payment result tracker exiting."); @@ -1487,6 +1471,16 @@ async fn track_payment_result( Ok(()) } +async fn conditional_sleeper(t: Option) -> Option<()> { + match t { + Some(timer) => { + timer.await; + Some(()) + }, + None => None, + } +} + #[cfg(test)] mod tests { use crate::{ From 58cd75b593a4484742c01c730436c8662c156632 Mon Sep 17 00:00:00 2001 From: Blake Johnson Date: Mon, 31 Mar 2025 15:42:22 -0500 Subject: [PATCH 2/8] Removing unnecessary clone so that the event sender will be dropped --- simln-lib/src/lib.rs | 126 +++++++++++++++++++++---------------------- 1 file changed, 62 insertions(+), 64 deletions(-) diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index 9c15ac66..36662126 100755 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -713,75 +713,73 @@ impl Simulation { self.nodes.len() ); + // Before we start the simulation up, start tasks that will be responsible for gathering simulation data. + // The event channels are shared across our functionality: + // - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the + // final result of the event that it has taken. + // - Event Receiver: used by data reporting to receive events that have been simulated that need to be + // tracked and recorded. + let (event_sender, event_receiver) = channel(1); + self.run_data_collection(event_receiver, &self.tasks); + + // Get an execution kit per activity that we need to generate and spin up consumers for each source node. + let activities = match self.activity_executors().await { + Ok(a) => a, + Err(e) => { + // If we encounter an error while setting up the activity_executors, + // we need to shutdown and return. + self.shutdown(); + return Err(e); + }, + }; + let consumer_channels = self.dispatch_consumers( + activities + .iter() + .map(|generator| generator.source_info.pubkey) + .collect(), + event_sender, + &self.tasks, + ); + + // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity. + // The producers will use their own TaskTracker so that the simulation can be shutdown if they all finish. + let producer_tasks = TaskTracker::new(); + match self + .dispatch_producers(activities, consumer_channels, &producer_tasks) + .await { - // Before we start the simulation up, start tasks that will be responsible for gathering simulation data. - // The event channels are shared across our functionality: - // - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the - // final result of the event that it has taken. - // - Event Receiver: used by data reporting to receive events that have been simulated that need to be - // tracked and recorded. - let (event_sender, event_receiver) = channel(1); - self.run_data_collection(event_receiver, &self.tasks); - - // Get an execution kit per activity that we need to generate and spin up consumers for each source node. - let activities = match self.activity_executors().await { - Ok(a) => a, - Err(e) => { - // If we encounter an error while setting up the activity_executors, - // we need to shutdown and return. - self.shutdown(); - return Err(e); - }, - }; - let consumer_channels = self.dispatch_consumers( - activities - .iter() - .map(|generator| generator.source_info.pubkey) - .collect(), - event_sender.clone(), - &self.tasks, - ); + Ok(_) => {}, + Err(e) => { + // If we encounter an error in dispatch_producers, we need to shutdown and return. + self.shutdown(); + return Err(e); + }, + } - // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity. - // The producers will use their own TaskTracker so that the simulation can be shutdown if they all finish. - let producer_tasks = TaskTracker::new(); - match self - .dispatch_producers(activities, consumer_channels, &producer_tasks) - .await - { - Ok(_) => {}, - Err(e) => { - // If we encounter an error in dispatch_producers, we need to shutdown and return. - self.shutdown(); - return Err(e); - }, - } + // Start a task that waits for the producers to finish. + // If all producers finish, then there is nothing left to do and the simulation can be shutdown. + let producer_trigger = self.shutdown_trigger.clone(); + self.tasks.spawn(async move { + producer_tasks.close(); + producer_tasks.wait().await; + log::info!("All producers finished. Shutting down."); + producer_trigger.trigger() + }); + + // Start a task that will shutdown the simulation if the total_time is met. + if let Some(total_time) = self.cfg.total_time { + let t = self.shutdown_trigger.clone(); + let l = self.shutdown_listener.clone(); - // Start a task that waits for the producers to finish. - // If all producers finish, then there is nothing left to do and the simulation can be shutdown. - let producer_trigger = self.shutdown_trigger.clone(); self.tasks.spawn(async move { - producer_tasks.close(); - producer_tasks.wait().await; - log::info!("All producers finished. Shutting down."); - producer_trigger.trigger() + if time::timeout(total_time, l).await.is_err() { + log::info!( + "Simulation run for {}s. Shutting down.", + total_time.as_secs() + ); + t.trigger() + } }); - - // Start a task that will shutdown the simulation if the total_time is met. - if let Some(total_time) = self.cfg.total_time { - let t = self.shutdown_trigger.clone(); - let l = self.shutdown_listener.clone(); - - self.tasks.spawn(async move { - if time::timeout(total_time, l).await.is_err() { - log::info!( - "Simulation run for {}s. Shutting down.", - total_time.as_secs() - ); - t.trigger() - } - }); - } } Ok(()) From da2c0fb5594c97e53d4e9505c9c6d34258729d4a Mon Sep 17 00:00:00 2001 From: Blake Johnson Date: Tue, 8 Apr 2025 16:22:59 -0500 Subject: [PATCH 3/8] Adding two tests to verify the shutdown process --- simln-lib/src/lib.rs | 224 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 223 insertions(+), 1 deletion(-) diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index 36662126..84253d82 100755 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -1481,9 +1481,11 @@ async fn conditional_sleeper(t: Option) -> Option<()> { #[cfg(test)] mod tests { + use crate::test_utils::MockLightningNode; + use crate::TaskTracker; use crate::{ get_payment_delay, test_utils, test_utils::LightningTestNodeBuilder, LightningError, - LightningNode, MutRng, PaymentGenerationError, PaymentGenerator, + LightningNode, MutRng, PaymentGenerationError, PaymentGenerator, Simulation, SimulationCfg, }; use bitcoin::secp256k1::PublicKey; use bitcoin::Network; @@ -1764,4 +1766,224 @@ mod tests { assert!(result.is_ok()); } + + #[tokio::test] + async fn test_shutdown_timeout() { + // Create test nodes + let nodes = test_utils::create_nodes(2, 100_000); + let mut node_1 = nodes.first().unwrap().0.clone(); + let mut node_2 = nodes.get(1).unwrap().0.clone(); + node_1.features.set_keysend_optional(); + node_2.features.set_keysend_optional(); + let node_1_for_get_info = node_1.clone(); + let node_2_for_get_info = node_2.clone(); + + // Create mock nodes with all necessary expectations + let mut mock_node_1 = MockLightningNode::new(); + let mut mock_node_2 = MockLightningNode::new(); + + // Set up node 1 expectations + mock_node_1.expect_get_info().return_const(node_1.clone()); + mock_node_1 + .expect_get_network() + .returning(|| Ok(Network::Regtest)); + mock_node_1 + .expect_list_channels() + .returning(|| Ok(vec![100_000])); + mock_node_1 + .expect_get_node_info() + .returning(move |_| Ok(node_1_for_get_info.clone())); + mock_node_1 + .expect_send_payment() + .returning(|_, _| Ok(lightning::ln::PaymentHash([0; 32]))); + mock_node_1.expect_track_payment().returning(|_, _| { + Ok(crate::PaymentResult { + htlc_count: 1, + payment_outcome: crate::PaymentOutcome::Success, + }) + }); + + // Set up node 2 expectations + mock_node_2.expect_get_info().return_const(node_2.clone()); + mock_node_2 + .expect_get_network() + .returning(|| Ok(Network::Regtest)); + mock_node_2 + .expect_list_channels() + .returning(|| Ok(vec![100_000])); + mock_node_2 + .expect_get_node_info() + .returning(move |_| Ok(node_2_for_get_info.clone())); + mock_node_2 + .expect_send_payment() + .returning(|_, _| Ok(lightning::ln::PaymentHash([0; 32]))); + mock_node_2.expect_track_payment().returning(|_, _| { + Ok(crate::PaymentResult { + htlc_count: 1, + payment_outcome: crate::PaymentOutcome::Success, + }) + }); + + // Create a hashmap of nodes + let mut clients: HashMap>> = HashMap::new(); + clients.insert(node_1.pubkey, Arc::new(Mutex::new(mock_node_1))); + clients.insert(node_2.pubkey, Arc::new(Mutex::new(mock_node_2))); + + // Define payment activity: 2000 msats every 1 second + let activity_definition = crate::ActivityDefinition { + source: node_1, + destination: node_2, + start_secs: None, // Start immediately + count: None, // No limit + interval_secs: crate::ValueOrRange::Value(1), // 1 second interval + amount_msat: crate::ValueOrRange::Value(2000), // 2000 msats + }; + + // Create simulation with a 3 second timeout + let timeout_secs = 3; + let simulation = Simulation::new( + SimulationCfg::new( + Some(timeout_secs), // Set 3 second timeout + 1000, // Expected payment size + 0.1, // Activity multiplier + None, // No result writing + Some(42), // Seed for determinism + ), + clients, + vec![activity_definition], // Use defined activity with 2000 msats every second + TaskTracker::new(), + ); + + // Run the simulation and measure how long it takes + let start = std::time::Instant::now(); + let result = simulation.run().await; + let elapsed = start.elapsed(); + + // Verify the simulation shut down correctly + assert!(result.is_ok(), "Simulation should end without error"); + + // Check that simulation ran for approximately the timeout duration (with some margin) + // We allow some extra time for shutdown procedures to complete + let margin = Duration::from_secs(1); + assert!( + elapsed >= Duration::from_secs(timeout_secs.into()) + && elapsed <= Duration::from_secs(timeout_secs.into()) + margin, + "Simulation should have run for approximately {timeout_secs} seconds, but took {:?}", + elapsed + ); + + // Check the payment stats + let total_payments = simulation.get_total_payments().await; + + // We expect at least 2 payments to be attempted (about one per second) + // The exact number may vary due to timing, but should be at least 2 + assert!( + total_payments >= 2, + "Expected at least 2 payments to be attempted, got {}", + total_payments + ); + } + + #[tokio::test] + async fn test_shutdown_completion() { + // Create test nodes + let nodes = test_utils::create_nodes(2, 100_000); + let mut node_1 = nodes.first().unwrap().0.clone(); + let mut node_2 = nodes.get(1).unwrap().0.clone(); + node_1.features.set_keysend_optional(); + node_2.features.set_keysend_optional(); + let node_1_for_get_info = node_1.clone(); + let node_2_for_get_info = node_2.clone(); + + // Create mock nodes with all necessary expectations + let mut mock_node_1 = MockLightningNode::new(); + let mut mock_node_2 = MockLightningNode::new(); + + // Set up node 1 expectations + mock_node_1.expect_get_info().return_const(node_1.clone()); + mock_node_1 + .expect_get_network() + .returning(|| Ok(Network::Regtest)); + mock_node_1 + .expect_list_channels() + .returning(|| Ok(vec![100_000])); + mock_node_1 + .expect_get_node_info() + .returning(move |_| Ok(node_1_for_get_info.clone())); + mock_node_1 + .expect_send_payment() + .returning(|_, _| Ok(lightning::ln::PaymentHash([0; 32]))); + mock_node_1.expect_track_payment().returning(|_, _| { + Ok(crate::PaymentResult { + htlc_count: 1, + payment_outcome: crate::PaymentOutcome::Success, + }) + }); + + // Set up node 2 expectations + mock_node_2.expect_get_info().return_const(node_2.clone()); + mock_node_2 + .expect_get_network() + .returning(|| Ok(Network::Regtest)); + mock_node_2 + .expect_list_channels() + .returning(|| Ok(vec![100_000])); + mock_node_2 + .expect_get_node_info() + .returning(move |_| Ok(node_2_for_get_info.clone())); + mock_node_2 + .expect_send_payment() + .returning(|_, _| Ok(lightning::ln::PaymentHash([0; 32]))); + mock_node_2.expect_track_payment().returning(|_, _| { + Ok(crate::PaymentResult { + htlc_count: 1, + payment_outcome: crate::PaymentOutcome::Success, + }) + }); + + // Create a hashmap of nodes + let mut clients: HashMap>> = HashMap::new(); + clients.insert(node_1.pubkey, Arc::new(Mutex::new(mock_node_1))); + clients.insert(node_2.pubkey, Arc::new(Mutex::new(mock_node_2))); + + // Define an activity that will make 3 payments of 2000 msats each + let activity_definition = crate::ActivityDefinition { + source: node_1, + destination: node_2, + start_secs: None, // Start immediately + count: Some(3), // 3 payments + interval_secs: crate::ValueOrRange::Value(1), // 1 second interval + amount_msat: crate::ValueOrRange::Value(2000), // 2000 msats + }; + + // Create simulation that will run until activities complete + let simulation = Simulation::new( + SimulationCfg::new( + None, // No timeout + 1000, // Expected payment size + 0.1, // Activity multiplier + None, // No result writing + Some(42), // Seed for determinism + ), + clients, + vec![activity_definition], // Use defined activity with 3 payments + TaskTracker::new(), + ); + + // Run the simulation + let result = simulation.run().await; + + // Verify the simulation shut down correctly + assert!(result.is_ok(), "Simulation should end without error"); + + // Check the payment stats + let total_payments = simulation.get_total_payments().await; + + // We expect exactly 3 payments to be attempted + assert_eq!( + total_payments, 3, + "Expected exactly 3 payments to be attempted, got {}", + total_payments + ); + } } From e59417bde6f5151669718375e2cde2aa21b81ee4 Mon Sep 17 00:00:00 2001 From: Blake Johnson Date: Tue, 8 Apr 2025 16:34:27 -0500 Subject: [PATCH 4/8] Cleaning up the shutdown tests so that they share setup code --- simln-lib/src/lib.rs | 85 ++++++++++---------------------------------- 1 file changed, 18 insertions(+), 67 deletions(-) diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index 84253d82..8bfc3641 100755 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -1767,8 +1767,12 @@ mod tests { assert!(result.is_ok()); } - #[tokio::test] - async fn test_shutdown_timeout() { + #[allow(clippy::type_complexity)] + /// Helper to create and configure mock nodes for testing + fn setup_test_nodes() -> ( + (crate::NodeInfo, crate::NodeInfo), + HashMap>>, + ) { // Create test nodes let nodes = test_utils::create_nodes(2, 100_000); let mut node_1 = nodes.first().unwrap().0.clone(); @@ -1829,6 +1833,14 @@ mod tests { clients.insert(node_1.pubkey, Arc::new(Mutex::new(mock_node_1))); clients.insert(node_2.pubkey, Arc::new(Mutex::new(mock_node_2))); + ((node_1, node_2), clients) + } + + #[tokio::test] + async fn test_shutdown_timeout() { + // Set up test nodes + let ((node_1, node_2), clients) = setup_test_nodes(); + // Define payment activity: 2000 msats every 1 second let activity_definition = crate::ActivityDefinition { source: node_1, @@ -1872,11 +1884,9 @@ mod tests { elapsed ); - // Check the payment stats - let total_payments = simulation.get_total_payments().await; - // We expect at least 2 payments to be attempted (about one per second) // The exact number may vary due to timing, but should be at least 2 + let total_payments = simulation.get_total_payments().await; assert!( total_payments >= 2, "Expected at least 2 payments to be attempted, got {}", @@ -1886,65 +1896,8 @@ mod tests { #[tokio::test] async fn test_shutdown_completion() { - // Create test nodes - let nodes = test_utils::create_nodes(2, 100_000); - let mut node_1 = nodes.first().unwrap().0.clone(); - let mut node_2 = nodes.get(1).unwrap().0.clone(); - node_1.features.set_keysend_optional(); - node_2.features.set_keysend_optional(); - let node_1_for_get_info = node_1.clone(); - let node_2_for_get_info = node_2.clone(); - - // Create mock nodes with all necessary expectations - let mut mock_node_1 = MockLightningNode::new(); - let mut mock_node_2 = MockLightningNode::new(); - - // Set up node 1 expectations - mock_node_1.expect_get_info().return_const(node_1.clone()); - mock_node_1 - .expect_get_network() - .returning(|| Ok(Network::Regtest)); - mock_node_1 - .expect_list_channels() - .returning(|| Ok(vec![100_000])); - mock_node_1 - .expect_get_node_info() - .returning(move |_| Ok(node_1_for_get_info.clone())); - mock_node_1 - .expect_send_payment() - .returning(|_, _| Ok(lightning::ln::PaymentHash([0; 32]))); - mock_node_1.expect_track_payment().returning(|_, _| { - Ok(crate::PaymentResult { - htlc_count: 1, - payment_outcome: crate::PaymentOutcome::Success, - }) - }); - - // Set up node 2 expectations - mock_node_2.expect_get_info().return_const(node_2.clone()); - mock_node_2 - .expect_get_network() - .returning(|| Ok(Network::Regtest)); - mock_node_2 - .expect_list_channels() - .returning(|| Ok(vec![100_000])); - mock_node_2 - .expect_get_node_info() - .returning(move |_| Ok(node_2_for_get_info.clone())); - mock_node_2 - .expect_send_payment() - .returning(|_, _| Ok(lightning::ln::PaymentHash([0; 32]))); - mock_node_2.expect_track_payment().returning(|_, _| { - Ok(crate::PaymentResult { - htlc_count: 1, - payment_outcome: crate::PaymentOutcome::Success, - }) - }); - - // Create a hashmap of nodes - let mut clients: HashMap>> = HashMap::new(); - clients.insert(node_1.pubkey, Arc::new(Mutex::new(mock_node_1))); - clients.insert(node_2.pubkey, Arc::new(Mutex::new(mock_node_2))); + // Set up test nodes + let ((node_1, node_2), clients) = setup_test_nodes(); // Define an activity that will make 3 payments of 2000 msats each let activity_definition = crate::ActivityDefinition { @@ -1976,10 +1929,8 @@ mod tests { // Verify the simulation shut down correctly assert!(result.is_ok(), "Simulation should end without error"); - // Check the payment stats - let total_payments = simulation.get_total_payments().await; - // We expect exactly 3 payments to be attempted + let total_payments = simulation.get_total_payments().await; assert_eq!( total_payments, 3, "Expected exactly 3 payments to be attempted, got {}", From f38f2e11114618bc6901b506ebaf47e2fa0d3758 Mon Sep 17 00:00:00 2001 From: Blake Johnson Date: Mon, 14 Apr 2025 11:25:45 -0500 Subject: [PATCH 5/8] Adding a shutdown test for the manual shutdown case --- simln-lib/src/lib.rs | 68 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index 8bfc3641..e6d53b16 100755 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -1937,4 +1937,72 @@ mod tests { total_payments ); } + + #[tokio::test] + async fn test_manual_shutdown() { + // Set up test nodes + let ((node_1, node_2), clients) = setup_test_nodes(); + + // Define an activity that would make many payments (but we will shut down early) + let activity_definition = crate::ActivityDefinition { + source: node_1, + destination: node_2, + start_secs: None, + count: Some(10), + interval_secs: crate::ValueOrRange::Value(1), + amount_msat: crate::ValueOrRange::Value(2000), + }; + + // Create simulation with no timeout + let simulation = Simulation::new( + SimulationCfg::new( + None, // No timeout + 1000, // Expected payment size + 0.1, // Activity multiplier + None, // No result writing + Some(42), // Seed for determinism + ), + clients, + vec![activity_definition], // Use defined activity with many payments + TaskTracker::new(), + ); + + // Create a cloned reference for the shutdown task + let sim_clone = simulation.clone(); + + // Start a task that will manually shut down the simulation after a short delay + tokio::spawn(async move { + // Wait a little bit to allow some payments to complete + tokio::time::sleep(Duration::from_secs(3)).await; + + // Manually trigger shutdown + log::info!("Manually triggering simulation shutdown"); + sim_clone.shutdown(); + }); + + // Run the simulation (should be interrupted by our manual shutdown) + let start = std::time::Instant::now(); + let result = simulation.run().await; + let elapsed = start.elapsed(); + + // Verify the simulation shut down correctly + assert!(result.is_ok(), "Simulation should end without error"); + + // Check that simulation ran for approximately the time until manual shutdown (with some margin) + let expected_runtime = Duration::from_secs(3); + let margin = Duration::from_secs(1); + assert!( + elapsed >= expected_runtime && elapsed <= expected_runtime + margin, + "Simulation should have run for approximately 3 seconds, but took {:?}", + elapsed + ); + + // We expect fewer than the total 100 payments to be attempted + let total_payments = simulation.get_total_payments().await; + assert!( + total_payments > 0 && total_payments < 10, + "Expected between 1 and 99 payments to be attempted, got {}", + total_payments + ); + } } From 56badbd2cebbc713ebdc29ae178a3fffa849d336 Mon Sep 17 00:00:00 2001 From: Blake Johnson Date: Mon, 14 Apr 2025 12:26:10 -0500 Subject: [PATCH 6/8] Adding a test for the error shutdown case --- simln-lib/src/lib.rs | 83 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 3 deletions(-) diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index e6d53b16..fb58e006 100755 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -1943,7 +1943,7 @@ mod tests { // Set up test nodes let ((node_1, node_2), clients) = setup_test_nodes(); - // Define an activity that would make many payments (but we will shut down early) + // Define an activity that would make payments (but we will shut down early) let activity_definition = crate::ActivityDefinition { source: node_1, destination: node_2, @@ -1997,11 +1997,88 @@ mod tests { elapsed ); - // We expect fewer than the total 100 payments to be attempted + // We expect fewer than the total 10 payments to be attempted let total_payments = simulation.get_total_payments().await; assert!( total_payments > 0 && total_payments < 10, - "Expected between 1 and 99 payments to be attempted, got {}", + "Expected between 1 and 9 payments to be attempted, got {}", + total_payments + ); + } + + #[tokio::test] + async fn test_shutdown_on_error() { + // Set up test nodes with one that will produce a permanent error + let ((node_1, node_2), mut clients) = setup_test_nodes(); + let mut mock_node_1 = MockLightningNode::new(); + + // Set up node 1 expectations + let node_1_clone = node_1.clone(); + mock_node_1.expect_get_info().return_const(node_1.clone()); + mock_node_1 + .expect_get_network() + .returning(|| Ok(Network::Regtest)); + mock_node_1 + .expect_list_channels() + .returning(|| Ok(vec![100_000])); + mock_node_1 + .expect_get_node_info() + .returning(move |_| Ok(node_1_clone.clone())); + mock_node_1.expect_track_payment().returning(|_, _| { + Ok(crate::PaymentResult { + htlc_count: 1, + payment_outcome: crate::PaymentOutcome::Success, + }) + }); + mock_node_1.expect_send_payment().returning(|_, _| { + Err(LightningError::PermanentError( + "Simulated permanent error".to_string(), + )) + }); + + clients.insert(node_1.pubkey, Arc::new(Mutex::new(mock_node_1))); + + // Define an activity that attempts to send a payment from node_1 to node_2 + let activity_definition = crate::ActivityDefinition { + source: node_1, + destination: node_2, + start_secs: None, // Start immediately + count: Some(5), // Would try multiple payments if no error occurred + interval_secs: crate::ValueOrRange::Value(1), // 1 second interval + amount_msat: crate::ValueOrRange::Value(2000), // 2000 msats + }; + + // Create simulation with a long timeout that we don't expect to be reached + let simulation = Simulation::new( + SimulationCfg::new( + Some(30), // 30 second timeout (shouldn't matter) + 1000, // Expected payment size + 0.1, // Activity multiplier + None, // No result writing + Some(42), // Seed for determinism + ), + clients, + vec![activity_definition], + TaskTracker::new(), + ); + + // Run the simulation (should be interrupted by the error) + let start = std::time::Instant::now(); + let _ = simulation.run().await; + let elapsed = start.elapsed(); + + // Check that simulation ran for a short time (less than our expected 5 seconds) + assert!( + elapsed < Duration::from_secs(5), + "Simulation should have shut down quickly after encountering the error, took {:?}", + elapsed + ); + + // We expect no successful payments to be recorded + let total_payments = simulation.get_total_payments().await; + assert_eq!( + total_payments, 0, + "Expected no payments to be recorded, got {}", total_payments ); } From adab9352bd1001726e7016528deb0eb27ae20e36 Mon Sep 17 00:00:00 2001 From: Blake Johnson Date: Wed, 23 Apr 2025 13:06:25 -0500 Subject: [PATCH 7/8] Cleaning up comments, variable names, and imports --- simln-lib/src/lib.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index fb58e006..bdbf538c 100755 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -21,7 +21,7 @@ use std::{collections::HashMap, sync::Arc, time::SystemTime}; use thiserror::Error; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; -use tokio::{select, time, time::Duration, time::Instant}; +use tokio::{select, time, time::Duration, time::Instant, time::Sleep}; use tokio_util::task::TaskTracker; use triggered::{Listener, Trigger}; @@ -1345,8 +1345,7 @@ async fn run_results_logger( /// /// Note: this producer does not accept a shutdown trigger because it only expects to be dispatched once. In the single /// producer case exit will drop the only sending channel and the receiving channel provided to the consumer will error -/// out. In the multiple-producer case, a single producer shutting down does not drop *all* sending channels so the -/// consumer will not exit and a trigger is required. +/// out. async fn produce_simulation_results( nodes: HashMap>>, mut output_receiver: Receiver, @@ -1409,27 +1408,28 @@ async fn track_payment_result( log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0)); // Trigger and listener to stop the implementation specific track payment functions (node.track_payment()) - let (stop, listen) = triggered::trigger(); + let (track_payment_trigger, track_payment_listener) = triggered::trigger(); // Timer for waiting after getting the shutdown signal in order for current tracking to complete - let mut timer: Option = None; + let mut timer: Option = None; + let mut timer_started = false; loop { tokio::select! { - biased; // The shutdown listener is triggered and we have not started a timer yet - _ = async {}, if listener.clone().is_triggered() && timer.is_none() => { + _ = async {}, if listener.clone().is_triggered() && !timer_started => { log::debug!("Shutdown received by track_payment_result, starting timer..."); timer = Some(time::sleep_until(Instant::now() + Duration::from_secs(3))); + timer_started = true; }, // The timer has been started and it expires Some(_) = conditional_sleeper(timer) => { log::error!("Track payment failed for {}. The shutdown timer expired.", hex::encode(hash.0)); - stop.trigger(); + track_payment_trigger.trigger(); timer = None; } // The payment tracking completes - res = node.track_payment(&hash, listen.clone()) => { + res = node.track_payment(&hash, track_payment_listener.clone()) => { match res { Ok(res) => { log::info!( @@ -1469,7 +1469,7 @@ async fn track_payment_result( Ok(()) } -async fn conditional_sleeper(t: Option) -> Option<()> { +async fn conditional_sleeper(t: Option) -> Option<()> { match t { Some(timer) => { timer.await; From cf68cb0c62caae7f13a15b4ed75e14eafeb0d254 Mon Sep 17 00:00:00 2001 From: Blake Johnson Date: Wed, 23 Apr 2025 13:57:36 -0500 Subject: [PATCH 8/8] Adding shutdown test cases for multiple activities --- simln-lib/src/lib.rs | 166 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 165 insertions(+), 1 deletion(-) diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index bdbf538c..c6d3f1fe 100755 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -1939,7 +1939,7 @@ mod tests { } #[tokio::test] - async fn test_manual_shutdown() { + async fn test_shutdown_manual() { // Set up test nodes let ((node_1, node_2), clients) = setup_test_nodes(); @@ -2082,4 +2082,168 @@ mod tests { total_payments ); } + + #[tokio::test] + async fn test_shutdown_multiple_activities() { + let ((node_1, node_2), clients) = setup_test_nodes(); + + // Define activity 1: From node 0 to node 1 with a count limit of 3 payments + let activity_1 = crate::ActivityDefinition { + source: node_1.clone(), + destination: node_2.clone(), + start_secs: None, // Start immediately + count: Some(3), // Limited to 3 payments + interval_secs: crate::ValueOrRange::Value(1), // 1 second interval + amount_msat: crate::ValueOrRange::Value(2000), // 2000 msats + }; + + // Define activity 2: From node 2 to node 1 with no count limit + let activity_2 = crate::ActivityDefinition { + source: node_2.clone(), + destination: node_1.clone(), + start_secs: None, // Start immediately + count: None, // No limit (will run forever) + interval_secs: crate::ValueOrRange::Value(1), // 1 second interval + amount_msat: crate::ValueOrRange::Value(3000), // 3000 msats + }; + + // Create simulation with a timeout of 6 seconds + // This gives enough time for first activity to complete (3 payments at 1 second each) + // and for the second activity to continue making payments after that + let timeout_secs = 6; + let simulation = Simulation::new( + SimulationCfg::new( + Some(timeout_secs), // Run for 6 seconds + 1000, // Expected payment size + 0.1, // Activity multiplier + None, // No result writing + Some(42), // Seed for determinism + ), + clients, + vec![activity_1, activity_2], // Use both activities + TaskTracker::new(), + ); + + // Run the simulation for the full timeout duration + let start = std::time::Instant::now(); + let result = simulation.run().await; + let elapsed = start.elapsed(); + + // Verify the simulation ran correctly + assert!(result.is_ok(), "Simulation should end without error"); + + // Verify it ran for approximately the timeout duration + let margin = Duration::from_secs(1); + assert!( + elapsed >= Duration::from_secs(6) && elapsed <= Duration::from_secs(6) + margin, + "Simulation should have run for approximately {} seconds, but took {:?}", + timeout_secs, + elapsed + ); + + // We expect at least 8 payments in total: + // - 3 from activity 1 (which completes its count) + // - At least 5 from activity 2 (1 per second for 6 seconds) + // The exact number may vary slightly due to timing, but should be at least 8 + let total_payments = simulation.get_total_payments().await; + assert!( + total_payments >= 8, + "Expected at least 8 payments to be attempted, got {}", + total_payments + ); + } + + #[tokio::test] + async fn test_shutdown_multiple_activities_with_error() { + // Set up test nodes with node_1 that will produce a permanent error + let ((node_1, node_2), mut clients) = setup_test_nodes(); + + // Create mock for node_1 that will return a permanent error on send_payment + let mut mock_node_1 = MockLightningNode::new(); + let node_1_clone = node_1.clone(); + + // Set up basic expectations for node_1 + mock_node_1.expect_get_info().return_const(node_1.clone()); + mock_node_1 + .expect_get_network() + .returning(|| Ok(Network::Regtest)); + mock_node_1 + .expect_list_channels() + .returning(|| Ok(vec![100_000])); + mock_node_1 + .expect_get_node_info() + .returning(move |_| Ok(node_1_clone.clone())); + mock_node_1.expect_track_payment().returning(|_, _| { + Ok(crate::PaymentResult { + htlc_count: 1, + payment_outcome: crate::PaymentOutcome::Success, + }) + }); + + // Set up node_1 to return a permanent error on send_payment + mock_node_1.expect_send_payment().returning(|_, _| { + Err(LightningError::PermanentError( + "Simulated permanent error".to_string(), + )) + }); + + // Replace node_1 with our new mock + clients.insert(node_1.pubkey, Arc::new(Mutex::new(mock_node_1))); + + // Define two activities + // Activity 1: From node_1 to node_2 - This will encounter the permanent error + let activity_1 = crate::ActivityDefinition { + source: node_1.clone(), + destination: node_2.clone(), + start_secs: None, + count: None, // No limit + interval_secs: crate::ValueOrRange::Value(1), // 1 second interval + amount_msat: crate::ValueOrRange::Value(2000), // 2000 msats + }; + + // Activity 2: From node_2 to node_1 - This would normally succeed + let activity_2 = crate::ActivityDefinition { + source: node_2.clone(), + destination: node_1.clone(), + start_secs: Some(2), // Start 2 seconds after the first activity + count: Some(10), // 10 payments if no error + interval_secs: crate::ValueOrRange::Value(1), // 1 second interval + amount_msat: crate::ValueOrRange::Value(3000), // 3000 msats + }; + + // Create simulation with a long timeout that we don't expect to be reached + let simulation = Simulation::new( + SimulationCfg::new( + Some(30), // 30 second timeout (shouldn't matter) + 1000, // Expected payment size + 0.1, // Activity multiplier + None, // No result writing + Some(42), // Seed for determinism + ), + clients, + vec![activity_1, activity_2], // Use both activities + TaskTracker::new(), + ); + + // Run the simulation (should be interrupted by the error) + let start = std::time::Instant::now(); + let _ = simulation.run().await; + let elapsed = start.elapsed(); + + // Check that simulation ran for a short time (less than 3 seconds) + assert!( + elapsed < Duration::from_secs(3), + "Simulation should have shut down quickly after encountering the error, took {:?}", + elapsed + ); + + // We expect no successful payments to be recorded since the first activity errors immediately + // and it should shut down the entire simulation + let total_payments = simulation.get_total_payments().await; + assert_eq!( + total_payments, 0, + "Expected no payments to be recorded, got {}", + total_payments + ); + } }