Skip to content

simln-lib/refactor: fully deterministic produce events #277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions sim-cli/src/parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use simln_lib::{
Simulation, SimulationCfg, WriteResults,
};
use simln_lib::{ShortChannelID, SimulationError};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -357,12 +357,12 @@ async fn get_clients(
nodes: Vec<NodeConnection>,
) -> Result<
(
HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
HashMap<PublicKey, NodeInfo>,
),
LightningError,
> {
let mut clients: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = HashMap::new();
let mut clients: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = BTreeMap::new();
let mut clients_info: HashMap<PublicKey, NodeInfo> = HashMap::new();

for connection in nodes {
Expand Down Expand Up @@ -574,7 +574,7 @@ pub async fn parse_sim_params(cli: &Cli) -> anyhow::Result<SimParams> {
}

pub async fn get_validated_activities(
clients: &HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
clients: &BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
nodes_info: HashMap<PublicKey, NodeInfo>,
activity: Vec<ActivityParser>,
) -> Result<Vec<ActivityDefinition>, LightningError> {
Expand Down
195 changes: 106 additions & 89 deletions simln-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use random_activity::RandomActivityError;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::cmp::Reverse;
use std::collections::{BTreeMap, BinaryHeap, HashSet};
use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::marker::Send;
Expand Down Expand Up @@ -591,7 +592,7 @@ pub struct Simulation<C: Clock + 'static> {
/// Config for the simulation itself.
cfg: SimulationCfg,
/// The lightning node that is being simulated.
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
/// Results logger that holds the simulation statistics.
results: Arc<Mutex<PaymentResultLogger>>,
/// Track all tasks spawned for use in the simulation. When used in the `run` method, it will wait for
Expand Down Expand Up @@ -619,14 +620,14 @@ struct ExecutorKit {
source_info: NodeInfo,
/// We use an arc mutex here because some implementations of the trait will be very expensive to clone.
/// See [NetworkGraphView] for details.
network_generator: Arc<Mutex<dyn DestinationGenerator>>,
network_generator: Arc<dyn DestinationGenerator>,
payment_generator: Box<dyn PaymentGenerator>,
}

impl<C: Clock + 'static> Simulation<C> {
pub fn new(
cfg: SimulationCfg,
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
tasks: TaskTracker,
clock: Arc<C>,
shutdown_trigger: Trigger,
Expand Down Expand Up @@ -954,7 +955,7 @@ impl<C: Clock + 'static> Simulation<C> {
source_info: description.source.clone(),
// Defined activities have very simple generators, so the traits required are implemented on
// a single struct which we just cheaply clone.
network_generator: Arc::new(Mutex::new(activity_generator.clone())),
network_generator: Arc::new(activity_generator.clone()),
payment_generator: Box::new(activity_generator),
});
}
Expand All @@ -971,7 +972,7 @@ impl<C: Clock + 'static> Simulation<C> {
// Collect capacity of each node from its view of its own channels. Total capacity is divided by two to
// avoid double counting capacity (as each node has a counterparty in the channel).
let mut generators = Vec::new();
let mut active_nodes = HashMap::new();
let mut active_nodes = BTreeMap::new();

// Do a first pass to get the capacity of each node which we need to be able to create a network generator.
// While we're at it, we get the node info and store it with capacity to create activity generators in our
Expand All @@ -994,18 +995,12 @@ impl<C: Clock + 'static> Simulation<C> {
active_nodes.insert(node_info.pubkey, (node_info, capacity));
}

// Create a network generator with a shared RNG for all nodes.
let network_generator = Arc::new(Mutex::new(
let network_generator = Arc::new(
NetworkGraphView::new(
active_nodes.values().cloned().collect(),
MutRng::new(self.cfg.seed.map(|seed| (seed, None))),
)
.map_err(SimulationError::RandomActivityError)?,
));

log::info!(
"Created network generator: {}.",
network_generator.lock().await
);

for (node_info, capacity) in active_nodes.values() {
Expand Down Expand Up @@ -1076,6 +1071,32 @@ impl<C: Clock + 'static> Simulation<C> {
channels
}

fn generate_payments<A: PaymentGenerator + ?Sized>(
&self,
heap: &mut BinaryHeap<Reverse<(Duration, usize)>>,
source: &NodeInfo,
node_generator: &A,
index: usize,
) -> Result<(), SimulationError> {
let mut current_count = 0;
loop {
let wait = get_payment_delay(current_count, source, node_generator)?;
heap.push(Reverse((wait, index)));
if node_generator.payment_count().is_none() {
return Ok(());
}
if let Some(c) = node_generator.payment_count() {
if c == current_count {
log::info!(
"Payment count has been met for {source}: {c} payments. Stopping the activity."
);
return Ok(());
}
}
current_count += 1;
}
}

/// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present
/// for every source node in the set of executors.
async fn dispatch_producers(
Expand All @@ -1084,36 +1105,64 @@ impl<C: Clock + 'static> Simulation<C> {
producer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
tasks: &TaskTracker,
) -> Result<(), SimulationError> {
for executor in executors {
let mut heap: BinaryHeap<Reverse<(Duration, usize)>> = BinaryHeap::new();
for (index, executor) in executors.iter().enumerate() {
self.generate_payments(
&mut heap,
&executor.source_info,
executor.payment_generator.as_ref(),
index,
)?;
}
while let Some(Reverse((wait, index))) = heap.pop() {
let executor = executors
.get(index)
.ok_or(SimulationError::MissingNodeError(
"Missing executor".to_string(),
))?;
let sender = producer_channels.get(&executor.source_info.pubkey).ok_or(
SimulationError::RandomActivityError(RandomActivityError::ValueError(format!(
"Activity producer for: {} not found.",
executor.source_info.pubkey,
))),
)?;
let (destination, capacity) = executor
.network_generator
.choose_destination(executor.source_info.pubkey)
.map_err(SimulationError::DestinationGenerationError)?;

// pe: produce events
let pe_shutdown = self.shutdown_trigger.clone();
let pe_listener = self.shutdown_listener.clone();
let pe_sender = sender.clone();
let clock = self.clock.clone();

let source = executor.source_info.clone();
// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
// a payment amount something has gone wrong (because we should have validated that we can always
// generate amounts), so we exit.
let amount = match executor.payment_generator.payment_amount(capacity) {
Ok(amt) => {
if amt == 0 {
log::debug!("Skipping zero amount payment for {source} -> {destination}.");
continue;
}
amt
},
Err(e) => {
return Err(SimulationError::PaymentGenerationError(e));
},
};
tasks.spawn(async move {
let source = executor.source_info.clone();

log::info!(
"Starting activity producer for {}: {}.",
source,
executor.payment_generator
);
log::info!("Starting activity producer for {}.", source);

if let Err(e) = produce_events(
executor.source_info,
executor.network_generator,
executor.payment_generator,
source.clone(),
clock,
pe_sender,
pe_listener,
wait,
destination,
amount,
)
.await
{
Expand All @@ -1124,7 +1173,6 @@ impl<C: Clock + 'static> Simulation<C> {
}
});
}

Ok(())
}
}
Expand Down Expand Up @@ -1213,74 +1261,43 @@ async fn consume_events(

/// produce events generates events for the activity description provided. It accepts a shutdown listener so it can
/// exit if other threads signal that they have errored out.
async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator + ?Sized>(
async fn produce_events(
source: NodeInfo,
network_generator: Arc<Mutex<N>>,
node_generator: Box<A>,
clock: Arc<dyn Clock>,
sender: Sender<SimulationEvent>,
listener: Listener,
wait: Duration,
destination: NodeInfo,
amount: u64,
) -> Result<(), SimulationError> {
let mut current_count = 0;
loop {
if let Some(c) = node_generator.payment_count() {
if c == current_count {
log::info!(
"Payment count has been met for {source}: {c} payments. Stopping the activity."
);
return Ok(());
select! {
biased;
_ = listener.clone() => {
return Ok(());
},
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
// destination.
_ = clock.sleep(wait) => {
log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination);

// Send the payment, exiting if we can no longer send to the consumer.
let event = SimulationEvent::SendPayment(destination.clone(), amount);
select!{
biased;
_ = listener.clone() => {
return Ok(());
},
send_result = sender.send(event.clone()) => {
if send_result.is_err(){
return Err(SimulationError::MpscChannelError(
format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
}
},
}
}

let wait = get_payment_delay(current_count, &source, node_generator.as_ref())?;

select! {
biased;
_ = listener.clone() => {
return Ok(());
},
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
// destination.
_ = clock.sleep(wait) => {
let (destination, capacity) = network_generator.lock().await.choose_destination(source.pubkey).map_err(SimulationError::DestinationGenerationError)?;

// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
// a payment amount something has gone wrong (because we should have validated that we can always
// generate amounts), so we exit.
let amount = match node_generator.payment_amount(capacity) {
Ok(amt) => {
if amt == 0 {
log::debug!("Skipping zero amount payment for {source} -> {destination}.");
continue;
}
amt
},
Err(e) => {
return Err(SimulationError::PaymentGenerationError(e));
},
};

log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination);

// Send the payment, exiting if we can no longer send to the consumer.
let event = SimulationEvent::SendPayment(destination.clone(), amount);
select!{
biased;
_ = listener.clone() => {
return Ok(());
},
send_result = sender.send(event.clone()) => {
if send_result.is_err(){
return Err(SimulationError::MpscChannelError(
format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
}
},
}

current_count += 1;
},
}
}
};
Ok(())
}

/// Gets the wait time for the next payment. If this is the first payment being generated, and a specific start delay
Expand Down Expand Up @@ -1451,7 +1468,7 @@ async fn run_results_logger(
/// out. In the multiple-producer case, a single producer shutting down does not drop *all* sending channels so the
/// consumer will not exit and a trigger is required.
async fn produce_simulation_results(
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
mut output_receiver: Receiver<SimulationOutput>,
results: Sender<(Payment, PaymentResult)>,
listener: Listener,
Expand Down Expand Up @@ -1570,7 +1587,7 @@ mod tests {
use bitcoin::secp256k1::PublicKey;
use bitcoin::Network;
use mockall::mock;
use std::collections::HashMap;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -1817,7 +1834,7 @@ mod tests {
/// "we don't control any nodes".
#[tokio::test]
async fn test_validate_node_network_empty_nodes() {
let empty_nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = HashMap::new();
let empty_nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = BTreeMap::new();

let simulation = test_utils::create_simulation(empty_nodes);
let result = simulation.validate_node_network().await;
Expand Down
5 changes: 3 additions & 2 deletions simln-lib/src/sim_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use bitcoin::secp256k1::PublicKey;
use bitcoin::{Network, ScriptBuf, TxOut};
use lightning::ln::chan_utils::make_funding_redeemscript;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::collections::{hash_map::Entry, HashMap};
use std::fmt::Display;
use std::sync::Arc;
Expand Down Expand Up @@ -1010,8 +1011,8 @@ impl SimGraph {
pub async fn ln_node_from_graph(
graph: Arc<Mutex<SimGraph>>,
routing_graph: Arc<NetworkGraph<&'_ WrappedLog>>,
) -> HashMap<PublicKey, Arc<Mutex<dyn LightningNode + '_>>> {
let mut nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = HashMap::new();
) -> BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode + '_>>> {
let mut nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = BTreeMap::new();

for pk in graph.lock().await.nodes.keys() {
nodes.insert(
Expand Down
Loading